mirror of
https://github.com/openclaw/openclaw.git
synced 2026-02-09 05:19:32 +08:00
195 lines
5.0 KiB
TypeScript
195 lines
5.0 KiB
TypeScript
import { randomUUID } from "node:crypto";
|
|
import { resolveFetch } from "../infra/fetch.js";
|
|
|
|
export type SignalRpcOptions = {
|
|
baseUrl: string;
|
|
timeoutMs?: number;
|
|
};
|
|
|
|
export type SignalRpcError = {
|
|
code?: number;
|
|
message?: string;
|
|
data?: unknown;
|
|
};
|
|
|
|
export type SignalRpcResponse<T> = {
|
|
jsonrpc?: string;
|
|
result?: T;
|
|
error?: SignalRpcError;
|
|
id?: string | number | null;
|
|
};
|
|
|
|
export type SignalSseEvent = {
|
|
event?: string;
|
|
data?: string;
|
|
id?: string;
|
|
};
|
|
|
|
const DEFAULT_TIMEOUT_MS = 10_000;
|
|
|
|
function normalizeBaseUrl(url: string): string {
|
|
const trimmed = url.trim();
|
|
if (!trimmed) {
|
|
throw new Error("Signal base URL is required");
|
|
}
|
|
if (/^https?:\/\//i.test(trimmed)) {
|
|
return trimmed.replace(/\/+$/, "");
|
|
}
|
|
return `http://${trimmed}`.replace(/\/+$/, "");
|
|
}
|
|
|
|
async function fetchWithTimeout(url: string, init: RequestInit, timeoutMs: number) {
|
|
const fetchImpl = resolveFetch();
|
|
if (!fetchImpl) {
|
|
throw new Error("fetch is not available");
|
|
}
|
|
const controller = new AbortController();
|
|
const timer = setTimeout(() => controller.abort(), timeoutMs);
|
|
try {
|
|
return await fetchImpl(url, { ...init, signal: controller.signal });
|
|
} finally {
|
|
clearTimeout(timer);
|
|
}
|
|
}
|
|
|
|
export async function signalRpcRequest<T = unknown>(
|
|
method: string,
|
|
params: Record<string, unknown> | undefined,
|
|
opts: SignalRpcOptions,
|
|
): Promise<T> {
|
|
const baseUrl = normalizeBaseUrl(opts.baseUrl);
|
|
const id = randomUUID();
|
|
const body = JSON.stringify({
|
|
jsonrpc: "2.0",
|
|
method,
|
|
params,
|
|
id,
|
|
});
|
|
const res = await fetchWithTimeout(
|
|
`${baseUrl}/api/v1/rpc`,
|
|
{
|
|
method: "POST",
|
|
headers: { "Content-Type": "application/json" },
|
|
body,
|
|
},
|
|
opts.timeoutMs ?? DEFAULT_TIMEOUT_MS,
|
|
);
|
|
if (res.status === 201) {
|
|
return undefined as T;
|
|
}
|
|
const text = await res.text();
|
|
if (!text) {
|
|
throw new Error(`Signal RPC empty response (status ${res.status})`);
|
|
}
|
|
const parsed = JSON.parse(text) as SignalRpcResponse<T>;
|
|
if (parsed.error) {
|
|
const code = parsed.error.code ?? "unknown";
|
|
const msg = parsed.error.message ?? "Signal RPC error";
|
|
throw new Error(`Signal RPC ${code}: ${msg}`);
|
|
}
|
|
return parsed.result as T;
|
|
}
|
|
|
|
export async function signalCheck(
|
|
baseUrl: string,
|
|
timeoutMs = DEFAULT_TIMEOUT_MS,
|
|
): Promise<{ ok: boolean; status?: number | null; error?: string | null }> {
|
|
const normalized = normalizeBaseUrl(baseUrl);
|
|
try {
|
|
const res = await fetchWithTimeout(`${normalized}/api/v1/check`, { method: "GET" }, timeoutMs);
|
|
if (!res.ok) {
|
|
return { ok: false, status: res.status, error: `HTTP ${res.status}` };
|
|
}
|
|
return { ok: true, status: res.status, error: null };
|
|
} catch (err) {
|
|
return {
|
|
ok: false,
|
|
status: null,
|
|
error: err instanceof Error ? err.message : String(err),
|
|
};
|
|
}
|
|
}
|
|
|
|
export async function streamSignalEvents(params: {
|
|
baseUrl: string;
|
|
account?: string;
|
|
abortSignal?: AbortSignal;
|
|
onEvent: (event: SignalSseEvent) => void;
|
|
}): Promise<void> {
|
|
const baseUrl = normalizeBaseUrl(params.baseUrl);
|
|
const url = new URL(`${baseUrl}/api/v1/events`);
|
|
if (params.account) {
|
|
url.searchParams.set("account", params.account);
|
|
}
|
|
|
|
const fetchImpl = resolveFetch();
|
|
if (!fetchImpl) {
|
|
throw new Error("fetch is not available");
|
|
}
|
|
const res = await fetchImpl(url, {
|
|
method: "GET",
|
|
headers: { Accept: "text/event-stream" },
|
|
signal: params.abortSignal,
|
|
});
|
|
if (!res.ok || !res.body) {
|
|
throw new Error(`Signal SSE failed (${res.status} ${res.statusText || "error"})`);
|
|
}
|
|
|
|
const reader = res.body.getReader();
|
|
const decoder = new TextDecoder();
|
|
let buffer = "";
|
|
let currentEvent: SignalSseEvent = {};
|
|
|
|
const flushEvent = () => {
|
|
if (!currentEvent.data && !currentEvent.event && !currentEvent.id) {
|
|
return;
|
|
}
|
|
params.onEvent({
|
|
event: currentEvent.event,
|
|
data: currentEvent.data,
|
|
id: currentEvent.id,
|
|
});
|
|
currentEvent = {};
|
|
};
|
|
|
|
while (true) {
|
|
const { value, done } = await reader.read();
|
|
if (done) {
|
|
break;
|
|
}
|
|
buffer += decoder.decode(value, { stream: true });
|
|
let lineEnd = buffer.indexOf("\n");
|
|
while (lineEnd !== -1) {
|
|
let line = buffer.slice(0, lineEnd);
|
|
buffer = buffer.slice(lineEnd + 1);
|
|
if (line.endsWith("\r")) {
|
|
line = line.slice(0, -1);
|
|
}
|
|
|
|
if (line === "") {
|
|
flushEvent();
|
|
lineEnd = buffer.indexOf("\n");
|
|
continue;
|
|
}
|
|
if (line.startsWith(":")) {
|
|
lineEnd = buffer.indexOf("\n");
|
|
continue;
|
|
}
|
|
const [rawField, ...rest] = line.split(":");
|
|
const field = rawField.trim();
|
|
const rawValue = rest.join(":");
|
|
const value = rawValue.startsWith(" ") ? rawValue.slice(1) : rawValue;
|
|
if (field === "event") {
|
|
currentEvent.event = value;
|
|
} else if (field === "data") {
|
|
currentEvent.data = currentEvent.data ? `${currentEvent.data}\n${value}` : value;
|
|
} else if (field === "id") {
|
|
currentEvent.id = value;
|
|
}
|
|
lineEnd = buffer.indexOf("\n");
|
|
}
|
|
}
|
|
|
|
flushEvent();
|
|
}
|