mirror of
https://github.com/openclaw/openclaw.git
synced 2026-02-09 05:19:32 +08:00
fix: comprehensive BlueBubbles and channel cleanup (#11093)
* feat(bluebubbles): auto-strip markdown from outbound messages (#7402) * fix(security): add timeout to webhook body reading (#6762) Adds 30-second timeout to readBody() in voice-call, bluebubbles, and nostr webhook handlers. Prevents Slow-Loris DoS (CWE-400, CVSS 7.5). Merged with existing maxBytes protection in voice-call. * fix(security): unify Error objects and lint fixes in webhook timeouts (#6762) * fix: prevent plugins from auto-enabling without user consent (#3961) Changes default plugin enabled state from true to false in enablePluginEntry(). Preserves existing enabled:true values. Fixes #3932. * fix: apply hierarchical mediaMaxMb config to all channels (#8749) Generalizes resolveAttachmentMaxBytes() to use account → channel → global config resolution for all channels, not just BlueBubbles. Fixes #7847. * fix(bluebubbles): sanitize attachment filenames against header injection (#10333) Strip ", \r, \n, and \\ from filenames after path.basename() to prevent multipart Content-Disposition header injection (CWE-93, CVSS 5.4). Also adds sanitization to setGroupIconBlueBubbles which had zero filename sanitization. * fix(lint): exclude extensions/ from Oxlint preflight check (#9313) Extensions use PluginRuntime|null patterns that trigger no-redundant-type-constituents because PluginRuntime resolves to any. Excluding extensions/ from Oxlint unblocks user upgrades. Re-applies the approach from closed PR #10087. * fix(bluebubbles): add tempGuid to createNewChatWithMessage payload (#7745) Non-Private-API mode (AppleScript) requires tempGuid in send payloads. The main sendMessageBlueBubbles already had it, but createNewChatWithMessage was missing it, causing 400 errors for new chat creation without Private API. * fix: send stop-typing signal when run ends with NO_REPLY (#8785) Adds onCleanup callback to the typing controller that fires when the controller is cleaned up while typing was active (e.g., after NO_REPLY). Channels using createTypingCallbacks automatically get stop-typing on cleanup. This prevents the typing indicator from lingering in group chats when the agent decides not to reply. * fix(telegram): deduplicate skill commands in multi-agent setup (#5717) Two fixes: 1. Skip duplicate workspace dirs when listing skill commands across agents. Multiple agents sharing the same workspace would produce duplicate commands with _2, _3 suffixes. 2. Clear stale commands via deleteMyCommands before registering new ones. Commands from deleted skills now get cleaned up on restart. * fix: add size limits to unbounded in-memory caches (#4948) Adds max-size caps with oldest-entry eviction to prevent OOM in long-running deployments: - BlueBubbles serverInfoCache: 64 entries (already has TTL) - Google Chat authCache: 32 entries - Matrix directRoomCache: 1024 entries - Discord presenceCache: 5000 entries per account * fix: address review concerns (#11093) - Chain deleteMyCommands → setMyCommands to prevent race condition (#5717) - Rename enablePluginEntry to registerPluginEntry (now sets enabled: false) - Add Slow-Loris timeout test for readJsonBody (#6023)
This commit is contained in:
@@ -24,6 +24,7 @@
|
||||
"assets/",
|
||||
"dist/",
|
||||
"docs/_layouts/",
|
||||
"extensions/",
|
||||
"node_modules/",
|
||||
"patches/",
|
||||
"pnpm-lock.yaml/",
|
||||
|
||||
@@ -26,7 +26,9 @@ const AUDIO_MIME_CAF = new Set(["audio/x-caf", "audio/caf"]);
|
||||
function sanitizeFilename(input: string | undefined, fallback: string): string {
|
||||
const trimmed = input?.trim() ?? "";
|
||||
const base = trimmed ? path.basename(trimmed) : "";
|
||||
return base || fallback;
|
||||
const name = base || fallback;
|
||||
// Strip characters that could enable multipart header injection (CWE-93)
|
||||
return name.replace(/[\r\n"\\]/g, "_");
|
||||
}
|
||||
|
||||
function ensureExtension(filename: string, extension: string, fallbackBase: string): string {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk";
|
||||
import crypto from "node:crypto";
|
||||
import path from "node:path";
|
||||
import { resolveBlueBubblesAccount } from "./accounts.js";
|
||||
import { blueBubblesFetchWithTimeout, buildBlueBubblesApiUrl } from "./types.js";
|
||||
|
||||
@@ -336,10 +337,13 @@ export async function setGroupIconBlueBubbles(
|
||||
const parts: Uint8Array[] = [];
|
||||
const encoder = new TextEncoder();
|
||||
|
||||
// Sanitize filename to prevent multipart header injection (CWE-93)
|
||||
const safeFilename = path.basename(filename).replace(/[\r\n"\\]/g, "_") || "icon.png";
|
||||
|
||||
// Add file field named "icon" as per API spec
|
||||
parts.push(encoder.encode(`--${boundary}\r\n`));
|
||||
parts.push(
|
||||
encoder.encode(`Content-Disposition: form-data; name="icon"; filename="${filename}"\r\n`),
|
||||
encoder.encode(`Content-Disposition: form-data; name="icon"; filename="${safeFilename}"\r\n`),
|
||||
);
|
||||
parts.push(
|
||||
encoder.encode(`Content-Type: ${opts.contentType ?? "application/octet-stream"}\r\n\r\n`),
|
||||
|
||||
@@ -393,6 +393,48 @@ describe("BlueBubbles webhook monitor", () => {
|
||||
expect(res.statusCode).toBe(400);
|
||||
});
|
||||
|
||||
it("returns 400 when request body times out (Slow-Loris protection)", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const account = createMockAccount();
|
||||
const config: OpenClawConfig = {};
|
||||
const core = createMockRuntime();
|
||||
setBlueBubblesRuntime(core);
|
||||
|
||||
unregister = registerBlueBubblesWebhookTarget({
|
||||
account,
|
||||
config,
|
||||
runtime: { log: vi.fn(), error: vi.fn() },
|
||||
core,
|
||||
path: "/bluebubbles-webhook",
|
||||
});
|
||||
|
||||
// Create a request that never sends data or ends (simulates slow-loris)
|
||||
const req = new EventEmitter() as IncomingMessage;
|
||||
req.method = "POST";
|
||||
req.url = "/bluebubbles-webhook";
|
||||
req.headers = {};
|
||||
(req as unknown as { socket: { remoteAddress: string } }).socket = {
|
||||
remoteAddress: "127.0.0.1",
|
||||
};
|
||||
req.destroy = vi.fn();
|
||||
|
||||
const res = createMockResponse();
|
||||
|
||||
const handledPromise = handleBlueBubblesWebhookRequest(req, res);
|
||||
|
||||
// Advance past the 30s timeout
|
||||
await vi.advanceTimersByTimeAsync(31_000);
|
||||
|
||||
const handled = await handledPromise;
|
||||
expect(handled).toBe(true);
|
||||
expect(res.statusCode).toBe(400);
|
||||
expect(req.destroy).toHaveBeenCalled();
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("authenticates via password query parameter", async () => {
|
||||
const account = createMockAccount({ password: "secret-token" });
|
||||
const config: OpenClawConfig = {};
|
||||
|
||||
@@ -508,14 +508,29 @@ export function registerBlueBubblesWebhookTarget(target: WebhookTarget): () => v
|
||||
};
|
||||
}
|
||||
|
||||
async function readJsonBody(req: IncomingMessage, maxBytes: number) {
|
||||
async function readJsonBody(req: IncomingMessage, maxBytes: number, timeoutMs = 30_000) {
|
||||
const chunks: Buffer[] = [];
|
||||
let total = 0;
|
||||
return await new Promise<{ ok: boolean; value?: unknown; error?: string }>((resolve) => {
|
||||
let done = false;
|
||||
const finish = (result: { ok: boolean; value?: unknown; error?: string }) => {
|
||||
if (done) {
|
||||
return;
|
||||
}
|
||||
done = true;
|
||||
clearTimeout(timer);
|
||||
resolve(result);
|
||||
};
|
||||
|
||||
const timer = setTimeout(() => {
|
||||
finish({ ok: false, error: "request body timeout" });
|
||||
req.destroy();
|
||||
}, timeoutMs);
|
||||
|
||||
req.on("data", (chunk: Buffer) => {
|
||||
total += chunk.length;
|
||||
if (total > maxBytes) {
|
||||
resolve({ ok: false, error: "payload too large" });
|
||||
finish({ ok: false, error: "payload too large" });
|
||||
req.destroy();
|
||||
return;
|
||||
}
|
||||
@@ -525,27 +540,30 @@ async function readJsonBody(req: IncomingMessage, maxBytes: number) {
|
||||
try {
|
||||
const raw = Buffer.concat(chunks).toString("utf8");
|
||||
if (!raw.trim()) {
|
||||
resolve({ ok: false, error: "empty payload" });
|
||||
finish({ ok: false, error: "empty payload" });
|
||||
return;
|
||||
}
|
||||
try {
|
||||
resolve({ ok: true, value: JSON.parse(raw) as unknown });
|
||||
finish({ ok: true, value: JSON.parse(raw) as unknown });
|
||||
return;
|
||||
} catch {
|
||||
const params = new URLSearchParams(raw);
|
||||
const payload = params.get("payload") ?? params.get("data") ?? params.get("message");
|
||||
if (payload) {
|
||||
resolve({ ok: true, value: JSON.parse(payload) as unknown });
|
||||
finish({ ok: true, value: JSON.parse(payload) as unknown });
|
||||
return;
|
||||
}
|
||||
throw new Error("invalid json");
|
||||
}
|
||||
} catch (err) {
|
||||
resolve({ ok: false, error: err instanceof Error ? err.message : String(err) });
|
||||
finish({ ok: false, error: err instanceof Error ? err.message : String(err) });
|
||||
}
|
||||
});
|
||||
req.on("error", (err) => {
|
||||
resolve({ ok: false, error: err instanceof Error ? err.message : String(err) });
|
||||
finish({ ok: false, error: err instanceof Error ? err.message : String(err) });
|
||||
});
|
||||
req.on("close", () => {
|
||||
finish({ ok: false, error: "connection closed" });
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -16,7 +16,9 @@ export type BlueBubblesServerInfo = {
|
||||
computer_id?: string;
|
||||
};
|
||||
|
||||
/** Cache server info by account ID to avoid repeated API calls */
|
||||
/** Cache server info by account ID to avoid repeated API calls.
|
||||
* Size-capped to prevent unbounded growth (#4948). */
|
||||
const MAX_SERVER_INFO_CACHE_SIZE = 64;
|
||||
const serverInfoCache = new Map<string, { info: BlueBubblesServerInfo; expires: number }>();
|
||||
const CACHE_TTL_MS = 10 * 60 * 1000; // 10 minutes
|
||||
|
||||
@@ -56,6 +58,13 @@ export async function fetchBlueBubblesServerInfo(params: {
|
||||
const data = payload?.data as BlueBubblesServerInfo | undefined;
|
||||
if (data) {
|
||||
serverInfoCache.set(cacheKey, { info: data, expires: Date.now() + CACHE_TTL_MS });
|
||||
// Evict oldest entries if cache exceeds max size
|
||||
if (serverInfoCache.size > MAX_SERVER_INFO_CACHE_SIZE) {
|
||||
const oldest = serverInfoCache.keys().next().value;
|
||||
if (oldest !== undefined) {
|
||||
serverInfoCache.delete(oldest);
|
||||
}
|
||||
}
|
||||
}
|
||||
return data ?? null;
|
||||
} catch {
|
||||
|
||||
@@ -370,6 +370,16 @@ describe("send", () => {
|
||||
).rejects.toThrow("requires text");
|
||||
});
|
||||
|
||||
it("throws when text becomes empty after markdown stripping", async () => {
|
||||
// Edge case: input like "***" or "---" passes initial check but becomes empty after stripMarkdown
|
||||
await expect(
|
||||
sendMessageBlueBubbles("+15551234567", "***", {
|
||||
serverUrl: "http://localhost:1234",
|
||||
password: "test",
|
||||
}),
|
||||
).rejects.toThrow("empty after markdown removal");
|
||||
});
|
||||
|
||||
it("throws when serverUrl is missing", async () => {
|
||||
await expect(sendMessageBlueBubbles("+15551234567", "Hello", {})).rejects.toThrow(
|
||||
"serverUrl is required",
|
||||
@@ -438,6 +448,77 @@ describe("send", () => {
|
||||
expect(body.method).toBeUndefined();
|
||||
});
|
||||
|
||||
it("strips markdown formatting from outbound messages", async () => {
|
||||
mockFetch
|
||||
.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
json: () =>
|
||||
Promise.resolve({
|
||||
data: [
|
||||
{
|
||||
guid: "iMessage;-;+15551234567",
|
||||
participants: [{ address: "+15551234567" }],
|
||||
},
|
||||
],
|
||||
}),
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
text: () =>
|
||||
Promise.resolve(
|
||||
JSON.stringify({
|
||||
data: { guid: "msg-uuid-stripped" },
|
||||
}),
|
||||
),
|
||||
});
|
||||
|
||||
const result = await sendMessageBlueBubbles(
|
||||
"+15551234567",
|
||||
"**Bold** and *italic* with `code`\n## Header",
|
||||
{
|
||||
serverUrl: "http://localhost:1234",
|
||||
password: "test",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.messageId).toBe("msg-uuid-stripped");
|
||||
|
||||
const sendCall = mockFetch.mock.calls[1];
|
||||
const body = JSON.parse(sendCall[1].body);
|
||||
// Markdown should be stripped: no asterisks, backticks, or hashes
|
||||
expect(body.message).toBe("Bold and italic with code\nHeader");
|
||||
});
|
||||
|
||||
it("strips markdown when creating a new chat", async () => {
|
||||
mockFetch
|
||||
.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
json: () => Promise.resolve({ data: [] }),
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
text: () =>
|
||||
Promise.resolve(
|
||||
JSON.stringify({
|
||||
data: { guid: "new-msg-stripped" },
|
||||
}),
|
||||
),
|
||||
});
|
||||
|
||||
const result = await sendMessageBlueBubbles("+15550009999", "**Welcome** to the _chat_!", {
|
||||
serverUrl: "http://localhost:1234",
|
||||
password: "test",
|
||||
});
|
||||
|
||||
expect(result.messageId).toBe("new-msg-stripped");
|
||||
|
||||
const createCall = mockFetch.mock.calls[1];
|
||||
expect(createCall[0]).toContain("/api/v1/chat/new");
|
||||
const body = JSON.parse(createCall[1].body);
|
||||
// Markdown should be stripped
|
||||
expect(body.message).toBe("Welcome to the chat!");
|
||||
});
|
||||
|
||||
it("creates a new chat when handle target is missing", async () => {
|
||||
mockFetch
|
||||
.mockResolvedValueOnce({
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk";
|
||||
import crypto from "node:crypto";
|
||||
import { stripMarkdown } from "openclaw/plugin-sdk";
|
||||
import { resolveBlueBubblesAccount } from "./accounts.js";
|
||||
import {
|
||||
extractHandleFromChatGuid,
|
||||
@@ -332,6 +333,7 @@ async function createNewChatWithMessage(params: {
|
||||
const payload = {
|
||||
addresses: [params.address],
|
||||
message: params.message,
|
||||
tempGuid: `temp-${crypto.randomUUID()}`,
|
||||
};
|
||||
const res = await blueBubblesFetchWithTimeout(
|
||||
url,
|
||||
@@ -377,6 +379,11 @@ export async function sendMessageBlueBubbles(
|
||||
if (!trimmedText.trim()) {
|
||||
throw new Error("BlueBubbles send requires text");
|
||||
}
|
||||
// Strip markdown early and validate - ensures messages like "***" or "---" don't become empty
|
||||
const strippedText = stripMarkdown(trimmedText);
|
||||
if (!strippedText.trim()) {
|
||||
throw new Error("BlueBubbles send requires text (message was empty after markdown removal)");
|
||||
}
|
||||
|
||||
const account = resolveBlueBubblesAccount({
|
||||
cfg: opts.cfg ?? {},
|
||||
@@ -406,7 +413,7 @@ export async function sendMessageBlueBubbles(
|
||||
baseUrl,
|
||||
password,
|
||||
address: target.address,
|
||||
message: trimmedText,
|
||||
message: strippedText,
|
||||
timeoutMs: opts.timeoutMs,
|
||||
});
|
||||
}
|
||||
@@ -419,7 +426,7 @@ export async function sendMessageBlueBubbles(
|
||||
const payload: Record<string, unknown> = {
|
||||
chatGuid,
|
||||
tempGuid: crypto.randomUUID(),
|
||||
message: trimmedText,
|
||||
message: strippedText,
|
||||
};
|
||||
if (needsPrivateApi) {
|
||||
payload.method = "private-api";
|
||||
|
||||
@@ -8,6 +8,8 @@ const ADDON_ISSUER_PATTERN = /^service-\d+@gcp-sa-gsuiteaddons\.iam\.gserviceacc
|
||||
const CHAT_CERTS_URL =
|
||||
"https://www.googleapis.com/service_accounts/v1/metadata/x509/chat@system.gserviceaccount.com";
|
||||
|
||||
// Size-capped to prevent unbounded growth in long-running deployments (#4948)
|
||||
const MAX_AUTH_CACHE_SIZE = 32;
|
||||
const authCache = new Map<string, { key: string; auth: GoogleAuth }>();
|
||||
const verifyClient = new OAuth2Client();
|
||||
|
||||
@@ -30,20 +32,32 @@ function getAuthInstance(account: ResolvedGoogleChatAccount): GoogleAuth {
|
||||
return cached.auth;
|
||||
}
|
||||
|
||||
const evictOldest = () => {
|
||||
if (authCache.size > MAX_AUTH_CACHE_SIZE) {
|
||||
const oldest = authCache.keys().next().value;
|
||||
if (oldest !== undefined) {
|
||||
authCache.delete(oldest);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if (account.credentialsFile) {
|
||||
const auth = new GoogleAuth({ keyFile: account.credentialsFile, scopes: [CHAT_SCOPE] });
|
||||
authCache.set(account.accountId, { key, auth });
|
||||
evictOldest();
|
||||
return auth;
|
||||
}
|
||||
|
||||
if (account.credentials) {
|
||||
const auth = new GoogleAuth({ credentials: account.credentials, scopes: [CHAT_SCOPE] });
|
||||
authCache.set(account.accountId, { key, auth });
|
||||
evictOldest();
|
||||
return auth;
|
||||
}
|
||||
|
||||
const auth = new GoogleAuth({ scopes: [CHAT_SCOPE] });
|
||||
authCache.set(account.accountId, { key, auth });
|
||||
evictOldest();
|
||||
return auth;
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,18 @@ export function normalizeThreadId(raw?: string | number | null): string | null {
|
||||
return trimmed ? trimmed : null;
|
||||
}
|
||||
|
||||
// Size-capped to prevent unbounded growth (#4948)
|
||||
const MAX_DIRECT_ROOM_CACHE_SIZE = 1024;
|
||||
const directRoomCache = new Map<string, string>();
|
||||
function setDirectRoomCached(key: string, value: string): void {
|
||||
directRoomCache.set(key, value);
|
||||
if (directRoomCache.size > MAX_DIRECT_ROOM_CACHE_SIZE) {
|
||||
const oldest = directRoomCache.keys().next().value;
|
||||
if (oldest !== undefined) {
|
||||
directRoomCache.delete(oldest);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function persistDirectRoom(
|
||||
client: MatrixClient,
|
||||
@@ -62,7 +73,7 @@ async function resolveDirectRoomId(client: MatrixClient, userId: string): Promis
|
||||
const directContent = await client.getAccountData(EventType.Direct);
|
||||
const list = Array.isArray(directContent?.[trimmed]) ? directContent[trimmed] : [];
|
||||
if (list.length > 0) {
|
||||
directRoomCache.set(trimmed, list[0]);
|
||||
setDirectRoomCached(trimmed, list[0]);
|
||||
return list[0];
|
||||
}
|
||||
} catch {
|
||||
@@ -86,7 +97,7 @@ async function resolveDirectRoomId(client: MatrixClient, userId: string): Promis
|
||||
}
|
||||
// Prefer classic 1:1 rooms, but allow larger rooms if requested.
|
||||
if (members.length === 2) {
|
||||
directRoomCache.set(trimmed, roomId);
|
||||
setDirectRoomCached(trimmed, roomId);
|
||||
await persistDirectRoom(client, trimmed, roomId);
|
||||
return roomId;
|
||||
}
|
||||
@@ -99,7 +110,7 @@ async function resolveDirectRoomId(client: MatrixClient, userId: string): Promis
|
||||
}
|
||||
|
||||
if (fallbackRoom) {
|
||||
directRoomCache.set(trimmed, fallbackRoom);
|
||||
setDirectRoomCached(trimmed, fallbackRoom);
|
||||
await persistDirectRoom(client, trimmed, fallbackRoom);
|
||||
return fallbackRoom;
|
||||
}
|
||||
|
||||
@@ -229,31 +229,58 @@ function sendJson(res: ServerResponse, status: number, body: unknown): void {
|
||||
res.end(JSON.stringify(body));
|
||||
}
|
||||
|
||||
async function readJsonBody(req: IncomingMessage, maxBytes = 64 * 1024): Promise<unknown> {
|
||||
async function readJsonBody(
|
||||
req: IncomingMessage,
|
||||
maxBytes = 64 * 1024,
|
||||
timeoutMs = 30_000,
|
||||
): Promise<unknown> {
|
||||
return new Promise((resolve, reject) => {
|
||||
let done = false;
|
||||
const finish = (fn: () => void) => {
|
||||
if (done) {
|
||||
return;
|
||||
}
|
||||
done = true;
|
||||
clearTimeout(timer);
|
||||
fn();
|
||||
};
|
||||
|
||||
const timer = setTimeout(() => {
|
||||
finish(() => {
|
||||
const err = new Error("Request body timeout");
|
||||
req.destroy(err);
|
||||
reject(err);
|
||||
});
|
||||
}, timeoutMs);
|
||||
|
||||
const chunks: Buffer[] = [];
|
||||
let totalBytes = 0;
|
||||
|
||||
req.on("data", (chunk: Buffer) => {
|
||||
totalBytes += chunk.length;
|
||||
if (totalBytes > maxBytes) {
|
||||
reject(new Error("Request body too large"));
|
||||
req.destroy();
|
||||
finish(() => {
|
||||
reject(new Error("Request body too large"));
|
||||
req.destroy();
|
||||
});
|
||||
return;
|
||||
}
|
||||
chunks.push(chunk);
|
||||
});
|
||||
|
||||
req.on("end", () => {
|
||||
try {
|
||||
const body = Buffer.concat(chunks).toString("utf-8");
|
||||
resolve(body ? JSON.parse(body) : {});
|
||||
} catch {
|
||||
reject(new Error("Invalid JSON"));
|
||||
}
|
||||
finish(() => {
|
||||
try {
|
||||
const body = Buffer.concat(chunks).toString("utf-8");
|
||||
resolve(body ? JSON.parse(body) : {});
|
||||
} catch {
|
||||
reject(new Error("Invalid JSON"));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
req.on("error", reject);
|
||||
req.on("error", (err) => finish(() => reject(err)));
|
||||
req.on("close", () => finish(() => reject(new Error("Connection closed"))));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -296,23 +296,48 @@ export class VoiceCallWebhookServer {
|
||||
}
|
||||
|
||||
/**
|
||||
* Read request body as string.
|
||||
* Read request body as string with timeout protection.
|
||||
*/
|
||||
private readBody(req: http.IncomingMessage, maxBytes: number): Promise<string> {
|
||||
private readBody(
|
||||
req: http.IncomingMessage,
|
||||
maxBytes: number,
|
||||
timeoutMs = 30_000,
|
||||
): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
let done = false;
|
||||
const finish = (fn: () => void) => {
|
||||
if (done) {
|
||||
return;
|
||||
}
|
||||
done = true;
|
||||
clearTimeout(timer);
|
||||
fn();
|
||||
};
|
||||
|
||||
const timer = setTimeout(() => {
|
||||
finish(() => {
|
||||
const err = new Error("Request body timeout");
|
||||
req.destroy(err);
|
||||
reject(err);
|
||||
});
|
||||
}, timeoutMs);
|
||||
|
||||
const chunks: Buffer[] = [];
|
||||
let totalBytes = 0;
|
||||
req.on("data", (chunk: Buffer) => {
|
||||
totalBytes += chunk.length;
|
||||
if (totalBytes > maxBytes) {
|
||||
req.destroy();
|
||||
reject(new Error("PayloadTooLarge"));
|
||||
finish(() => {
|
||||
req.destroy();
|
||||
reject(new Error("PayloadTooLarge"));
|
||||
});
|
||||
return;
|
||||
}
|
||||
chunks.push(chunk);
|
||||
});
|
||||
req.on("end", () => resolve(Buffer.concat(chunks).toString("utf-8")));
|
||||
req.on("error", reject);
|
||||
req.on("end", () => finish(() => resolve(Buffer.concat(chunks).toString("utf-8"))));
|
||||
req.on("error", (err) => finish(() => reject(err)));
|
||||
req.on("close", () => finish(() => reject(new Error("Connection closed"))));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -107,6 +107,7 @@ export async function getReplyFromConfig(
|
||||
typeof configuredTypingSeconds === "number" ? configuredTypingSeconds : 6;
|
||||
const typing = createTypingController({
|
||||
onReplyStart: opts?.onReplyStart,
|
||||
onCleanup: opts?.onTypingCleanup,
|
||||
typingIntervalSeconds,
|
||||
silentToken: SILENT_REPLY_TOKEN,
|
||||
log: defaultRuntime.log,
|
||||
|
||||
@@ -58,11 +58,13 @@ export type ReplyDispatcherOptions = {
|
||||
export type ReplyDispatcherWithTypingOptions = Omit<ReplyDispatcherOptions, "onIdle"> & {
|
||||
onReplyStart?: () => Promise<void> | void;
|
||||
onIdle?: () => void;
|
||||
/** Called when the typing controller is cleaned up (e.g., on NO_REPLY). */
|
||||
onCleanup?: () => void;
|
||||
};
|
||||
|
||||
type ReplyDispatcherWithTypingResult = {
|
||||
dispatcher: ReplyDispatcher;
|
||||
replyOptions: Pick<GetReplyOptions, "onReplyStart" | "onTypingController">;
|
||||
replyOptions: Pick<GetReplyOptions, "onReplyStart" | "onTypingController" | "onTypingCleanup">;
|
||||
markDispatchIdle: () => void;
|
||||
};
|
||||
|
||||
@@ -164,7 +166,7 @@ export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDis
|
||||
export function createReplyDispatcherWithTyping(
|
||||
options: ReplyDispatcherWithTypingOptions,
|
||||
): ReplyDispatcherWithTypingResult {
|
||||
const { onReplyStart, onIdle, ...dispatcherOptions } = options;
|
||||
const { onReplyStart, onIdle, onCleanup, ...dispatcherOptions } = options;
|
||||
let typingController: TypingController | undefined;
|
||||
const dispatcher = createReplyDispatcher({
|
||||
...dispatcherOptions,
|
||||
@@ -178,6 +180,7 @@ export function createReplyDispatcherWithTyping(
|
||||
dispatcher,
|
||||
replyOptions: {
|
||||
onReplyStart,
|
||||
onTypingCleanup: onCleanup,
|
||||
onTypingController: (typing) => {
|
||||
typingController = typing;
|
||||
},
|
||||
|
||||
@@ -13,6 +13,7 @@ export type TypingController = {
|
||||
|
||||
export function createTypingController(params: {
|
||||
onReplyStart?: () => Promise<void> | void;
|
||||
onCleanup?: () => void;
|
||||
typingIntervalSeconds?: number;
|
||||
typingTtlMs?: number;
|
||||
silentToken?: string;
|
||||
@@ -20,6 +21,7 @@ export function createTypingController(params: {
|
||||
}): TypingController {
|
||||
const {
|
||||
onReplyStart,
|
||||
onCleanup,
|
||||
typingIntervalSeconds = 6,
|
||||
typingTtlMs = 2 * 60_000,
|
||||
silentToken = SILENT_REPLY_TOKEN,
|
||||
@@ -63,6 +65,11 @@ export function createTypingController(params: {
|
||||
clearInterval(typingTimer);
|
||||
typingTimer = undefined;
|
||||
}
|
||||
// Notify the channel to stop its typing indicator (e.g., on NO_REPLY).
|
||||
// This fires only once (sealed prevents re-entry).
|
||||
if (active) {
|
||||
onCleanup?.();
|
||||
}
|
||||
resetCycle();
|
||||
sealed = true;
|
||||
};
|
||||
|
||||
@@ -42,11 +42,20 @@ export function listSkillCommandsForAgents(params: {
|
||||
const used = resolveReservedCommandNames();
|
||||
const entries: SkillCommandSpec[] = [];
|
||||
const agentIds = params.agentIds ?? listAgentIds(params.cfg);
|
||||
// Track visited workspace dirs to avoid registering duplicate commands
|
||||
// when multiple agents share the same workspace directory (#5717).
|
||||
const visitedDirs = new Set<string>();
|
||||
for (const agentId of agentIds) {
|
||||
const workspaceDir = resolveAgentWorkspaceDir(params.cfg, agentId);
|
||||
if (!fs.existsSync(workspaceDir)) {
|
||||
continue;
|
||||
}
|
||||
// Resolve to canonical path to handle symlinks and relative paths
|
||||
const canonicalDir = fs.realpathSync(workspaceDir);
|
||||
if (visitedDirs.has(canonicalDir)) {
|
||||
continue;
|
||||
}
|
||||
visitedDirs.add(canonicalDir);
|
||||
const commands = buildWorkspaceSkillCommandSpecs(workspaceDir, {
|
||||
config: params.cfg,
|
||||
eligibility: { remote: getRemoteSkillEligibility() },
|
||||
|
||||
@@ -23,6 +23,8 @@ export type GetReplyOptions = {
|
||||
/** Notifies when an agent run actually starts (useful for webchat command handling). */
|
||||
onAgentRunStart?: (runId: string) => void;
|
||||
onReplyStart?: () => Promise<void> | void;
|
||||
/** Called when the typing controller cleans up (e.g., run ended with NO_REPLY). */
|
||||
onTypingCleanup?: () => void;
|
||||
onTypingController?: (typing: TypingController) => void;
|
||||
isHeartbeat?: boolean;
|
||||
onPartialReply?: (payload: ReplyPayload) => Promise<void> | void;
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
export type TypingCallbacks = {
|
||||
onReplyStart: () => Promise<void>;
|
||||
onIdle?: () => void;
|
||||
/** Called when the typing controller is cleaned up (e.g., on NO_REPLY). */
|
||||
onCleanup?: () => void;
|
||||
};
|
||||
|
||||
export function createTypingCallbacks(params: {
|
||||
@@ -18,11 +20,11 @@ export function createTypingCallbacks(params: {
|
||||
}
|
||||
};
|
||||
|
||||
const onIdle = stop
|
||||
const fireStop = stop
|
||||
? () => {
|
||||
void stop().catch((err) => (params.onStopError ?? params.onStartError)(err));
|
||||
}
|
||||
: undefined;
|
||||
|
||||
return { onReplyStart, onIdle };
|
||||
return { onReplyStart, onIdle: fireStop, onCleanup: fireStop };
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ import { describe, expect, it } from "vitest";
|
||||
import { applyPluginAutoEnable } from "./plugin-auto-enable.js";
|
||||
|
||||
describe("applyPluginAutoEnable", () => {
|
||||
it("enables configured channel plugins and updates allowlist", () => {
|
||||
it("configures channel plugins with disabled state and updates allowlist", () => {
|
||||
const result = applyPluginAutoEnable({
|
||||
config: {
|
||||
channels: { slack: { botToken: "x" } },
|
||||
@@ -11,7 +11,7 @@ describe("applyPluginAutoEnable", () => {
|
||||
env: {},
|
||||
});
|
||||
|
||||
expect(result.config.plugins?.entries?.slack?.enabled).toBe(true);
|
||||
expect(result.config.plugins?.entries?.slack?.enabled).toBe(false);
|
||||
expect(result.config.plugins?.allow).toEqual(["telegram", "slack"]);
|
||||
expect(result.changes.join("\n")).toContain("Slack configured, not enabled yet.");
|
||||
});
|
||||
@@ -29,7 +29,7 @@ describe("applyPluginAutoEnable", () => {
|
||||
expect(result.changes).toEqual([]);
|
||||
});
|
||||
|
||||
it("enables provider auth plugins when profiles exist", () => {
|
||||
it("configures provider auth plugins as disabled when profiles exist", () => {
|
||||
const result = applyPluginAutoEnable({
|
||||
config: {
|
||||
auth: {
|
||||
@@ -44,7 +44,7 @@ describe("applyPluginAutoEnable", () => {
|
||||
env: {},
|
||||
});
|
||||
|
||||
expect(result.config.plugins?.entries?.["google-antigravity-auth"]?.enabled).toBe(true);
|
||||
expect(result.config.plugins?.entries?.["google-antigravity-auth"]?.enabled).toBe(false);
|
||||
});
|
||||
|
||||
it("skips when plugins are globally disabled", () => {
|
||||
@@ -61,7 +61,7 @@ describe("applyPluginAutoEnable", () => {
|
||||
});
|
||||
|
||||
describe("preferOver channel prioritization", () => {
|
||||
it("prefers bluebubbles: skips imessage auto-enable when both are configured", () => {
|
||||
it("prefers bluebubbles: skips imessage auto-configure when both are configured", () => {
|
||||
const result = applyPluginAutoEnable({
|
||||
config: {
|
||||
channels: {
|
||||
@@ -72,7 +72,7 @@ describe("applyPluginAutoEnable", () => {
|
||||
env: {},
|
||||
});
|
||||
|
||||
expect(result.config.plugins?.entries?.bluebubbles?.enabled).toBe(true);
|
||||
expect(result.config.plugins?.entries?.bluebubbles?.enabled).toBe(false);
|
||||
expect(result.config.plugins?.entries?.imessage?.enabled).toBeUndefined();
|
||||
expect(result.changes.join("\n")).toContain("bluebubbles configured, not enabled yet.");
|
||||
expect(result.changes.join("\n")).not.toContain("iMessage configured, not enabled yet.");
|
||||
@@ -90,11 +90,11 @@ describe("applyPluginAutoEnable", () => {
|
||||
env: {},
|
||||
});
|
||||
|
||||
expect(result.config.plugins?.entries?.bluebubbles?.enabled).toBe(true);
|
||||
expect(result.config.plugins?.entries?.bluebubbles?.enabled).toBe(false);
|
||||
expect(result.config.plugins?.entries?.imessage?.enabled).toBe(true);
|
||||
});
|
||||
|
||||
it("allows imessage auto-enable when bluebubbles is explicitly disabled", () => {
|
||||
it("allows imessage auto-configure when bluebubbles is explicitly disabled", () => {
|
||||
const result = applyPluginAutoEnable({
|
||||
config: {
|
||||
channels: {
|
||||
@@ -107,11 +107,11 @@ describe("applyPluginAutoEnable", () => {
|
||||
});
|
||||
|
||||
expect(result.config.plugins?.entries?.bluebubbles?.enabled).toBe(false);
|
||||
expect(result.config.plugins?.entries?.imessage?.enabled).toBe(true);
|
||||
expect(result.config.plugins?.entries?.imessage?.enabled).toBe(false);
|
||||
expect(result.changes.join("\n")).toContain("iMessage configured, not enabled yet.");
|
||||
});
|
||||
|
||||
it("allows imessage auto-enable when bluebubbles is in deny list", () => {
|
||||
it("allows imessage auto-configure when bluebubbles is in deny list", () => {
|
||||
const result = applyPluginAutoEnable({
|
||||
config: {
|
||||
channels: {
|
||||
@@ -124,10 +124,10 @@ describe("applyPluginAutoEnable", () => {
|
||||
});
|
||||
|
||||
expect(result.config.plugins?.entries?.bluebubbles?.enabled).toBeUndefined();
|
||||
expect(result.config.plugins?.entries?.imessage?.enabled).toBe(true);
|
||||
expect(result.config.plugins?.entries?.imessage?.enabled).toBe(false);
|
||||
});
|
||||
|
||||
it("enables imessage normally when only imessage is configured", () => {
|
||||
it("configures imessage as disabled when only imessage is configured", () => {
|
||||
const result = applyPluginAutoEnable({
|
||||
config: {
|
||||
channels: { imessage: { cliPath: "/usr/local/bin/imsg" } },
|
||||
@@ -135,7 +135,7 @@ describe("applyPluginAutoEnable", () => {
|
||||
env: {},
|
||||
});
|
||||
|
||||
expect(result.config.plugins?.entries?.imessage?.enabled).toBe(true);
|
||||
expect(result.config.plugins?.entries?.imessage?.enabled).toBe(false);
|
||||
expect(result.changes.join("\n")).toContain("iMessage configured, not enabled yet.");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -386,12 +386,12 @@ function ensureAllowlisted(cfg: OpenClawConfig, pluginId: string): OpenClawConfi
|
||||
};
|
||||
}
|
||||
|
||||
function enablePluginEntry(cfg: OpenClawConfig, pluginId: string): OpenClawConfig {
|
||||
function registerPluginEntry(cfg: OpenClawConfig, pluginId: string): OpenClawConfig {
|
||||
const entries = {
|
||||
...cfg.plugins?.entries,
|
||||
[pluginId]: {
|
||||
...(cfg.plugins?.entries?.[pluginId] as Record<string, unknown> | undefined),
|
||||
enabled: true,
|
||||
enabled: false,
|
||||
},
|
||||
};
|
||||
return {
|
||||
@@ -399,7 +399,6 @@ function enablePluginEntry(cfg: OpenClawConfig, pluginId: string): OpenClawConfi
|
||||
plugins: {
|
||||
...cfg.plugins,
|
||||
entries,
|
||||
...(cfg.plugins?.enabled === false ? { enabled: true } : {}),
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -447,7 +446,7 @@ export function applyPluginAutoEnable(params: {
|
||||
if (alreadyEnabled && !allowMissing) {
|
||||
continue;
|
||||
}
|
||||
next = enablePluginEntry(next, entry.pluginId);
|
||||
next = registerPluginEntry(next, entry.pluginId);
|
||||
next = ensureAllowlisted(next, entry.pluginId);
|
||||
changes.push(formatAutoEnableChange(entry));
|
||||
}
|
||||
|
||||
@@ -3,7 +3,9 @@ import type { GatewayPresenceUpdate } from "discord-api-types/v10";
|
||||
/**
|
||||
* In-memory cache of Discord user presence data.
|
||||
* Populated by PRESENCE_UPDATE gateway events when the GuildPresences intent is enabled.
|
||||
* Per-account maps are capped to prevent unbounded growth (#4948).
|
||||
*/
|
||||
const MAX_PRESENCE_PER_ACCOUNT = 5000;
|
||||
const presenceCache = new Map<string, Map<string, GatewayPresenceUpdate>>();
|
||||
|
||||
function resolveAccountKey(accountId?: string): string {
|
||||
@@ -23,6 +25,13 @@ export function setPresence(
|
||||
presenceCache.set(accountKey, accountCache);
|
||||
}
|
||||
accountCache.set(userId, data);
|
||||
// Evict oldest entries if cache exceeds limit
|
||||
if (accountCache.size > MAX_PRESENCE_PER_ACCOUNT) {
|
||||
const oldest = accountCache.keys().next().value;
|
||||
if (oldest !== undefined) {
|
||||
accountCache.delete(oldest);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Get cached presence for a user. Returns undefined if not cached. */
|
||||
|
||||
@@ -279,12 +279,8 @@ function resolveAttachmentMaxBytes(params: {
|
||||
channel: ChannelId;
|
||||
accountId?: string | null;
|
||||
}): number | undefined {
|
||||
const fallback = params.cfg.agents?.defaults?.mediaMaxMb;
|
||||
if (params.channel !== "bluebubbles") {
|
||||
return typeof fallback === "number" ? fallback * 1024 * 1024 : undefined;
|
||||
}
|
||||
const accountId = typeof params.accountId === "string" ? params.accountId.trim() : "";
|
||||
const channelCfg = params.cfg.channels?.bluebubbles;
|
||||
const channelCfg = params.cfg.channels?.[params.channel];
|
||||
const channelObj =
|
||||
channelCfg && typeof channelCfg === "object"
|
||||
? (channelCfg as Record<string, unknown>)
|
||||
@@ -300,6 +296,7 @@ function resolveAttachmentMaxBytes(params: {
|
||||
accountCfg && typeof accountCfg === "object"
|
||||
? (accountCfg as Record<string, unknown>).mediaMaxMb
|
||||
: undefined;
|
||||
// Priority: account-specific > channel-level > global default
|
||||
const limitMb =
|
||||
(typeof accountMediaMax === "number" ? accountMediaMax : undefined) ??
|
||||
channelMediaMax ??
|
||||
|
||||
@@ -367,13 +367,32 @@ export const registerTelegramNativeCommands = ({
|
||||
...customCommands,
|
||||
];
|
||||
|
||||
if (allCommands.length > 0) {
|
||||
// Clear stale commands before registering new ones to prevent
|
||||
// leftover commands from deleted skills persisting across restarts (#5717).
|
||||
// Chain delete → set so a late-resolving delete cannot wipe newly registered commands.
|
||||
const registerCommands = () => {
|
||||
if (allCommands.length > 0) {
|
||||
withTelegramApiErrorLogging({
|
||||
operation: "setMyCommands",
|
||||
runtime,
|
||||
fn: () => bot.api.setMyCommands(allCommands),
|
||||
}).catch(() => {});
|
||||
}
|
||||
};
|
||||
if (typeof bot.api.deleteMyCommands === "function") {
|
||||
withTelegramApiErrorLogging({
|
||||
operation: "setMyCommands",
|
||||
operation: "deleteMyCommands",
|
||||
runtime,
|
||||
fn: () => bot.api.setMyCommands(allCommands),
|
||||
}).catch(() => {});
|
||||
fn: () => bot.api.deleteMyCommands(),
|
||||
})
|
||||
.catch(() => {})
|
||||
.then(registerCommands)
|
||||
.catch(() => {});
|
||||
} else {
|
||||
registerCommands();
|
||||
}
|
||||
|
||||
if (allCommands.length > 0) {
|
||||
if (typeof (bot as unknown as { command?: unknown }).command !== "function") {
|
||||
logVerbose("telegram: bot.command unavailable; skipping native handlers");
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user