From 191da1feb52e2cb164c6d1212c0dd133ae3f9198 Mon Sep 17 00:00:00 2001 From: Tyler Yust <64381258+tyler6204@users.noreply.github.com> Date: Sat, 7 Feb 2026 20:02:32 -0800 Subject: [PATCH] fix: context overflow compaction and subagent announce improvements (#11664) (thanks @tyler6204) * initial commit * feat: implement deriveSessionTotalTokens function and update usage tests * Added deriveSessionTotalTokens function to calculate total tokens based on usage and context tokens. * Updated usage tests to include cases for derived session total tokens. * Refactored session usage calculations in multiple files to utilize the new function for improved accuracy. * fix: restore overflow truncation fallback + changelog/test hardening (#11551) (thanks @tyler6204) --- CHANGELOG.md | 1 + scripts/test-parallel.mjs | 4 +- ...ounces-agent-wait-lifecycle-events.test.ts | 10 + ...n-normalizes-allowlisted-agent-ids.test.ts | 55 ++-- ...resolves-main-announce-target-from.test.ts | 10 + .../run.overflow-compaction.test.ts | 110 +++++++ src/agents/pi-embedded-runner/run.ts | 295 ++++++++++++------ src/agents/pi-embedded-runner/run/attempt.ts | 4 + src/agents/pi-embedded-runner/run/types.ts | 3 + src/agents/pi-embedded-runner/types.ts | 1 + ...i-embedded-subscribe.handlers.lifecycle.ts | 1 + ...pi-embedded-subscribe.handlers.messages.ts | 1 + .../pi-embedded-subscribe.handlers.types.ts | 5 + src/agents/pi-embedded-subscribe.ts | 52 +++ src/agents/subagent-announce.format.test.ts | 96 +++++- src/agents/subagent-announce.ts | 78 ++++- .../subagent-registry.persistence.test.ts | 49 +++ src/agents/subagent-registry.ts | 17 +- src/agents/timeout.test.ts | 14 + src/agents/timeout.ts | 16 +- src/agents/usage.test.ts | 30 +- src/agents/usage.ts | 31 ++ src/auto-reply/reply/session-usage.ts | 17 +- src/commands/agent/session-store.ts | 13 +- src/cron/isolated-agent/run.ts | 8 +- src/gateway/call.test.ts | 23 ++ src/gateway/call.ts | 6 +- src/gateway/server-methods/agent-job.ts | 3 +- ui/src/styles/components.css | 44 ++- ui/src/ui/views/chat.test.ts | 62 ++++ ui/src/ui/views/chat.ts | 8 +- 31 files changed, 889 insertions(+), 178 deletions(-) create mode 100644 src/agents/timeout.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 430abee754..4e655f64bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Cron: route text-only isolated agent announces through the shared subagent announce flow; add exponential backoff for repeated errors; preserve future `nextRunAtMs` on restart; include current-boundary schedule matches; prevent stale threadId reuse across targets; and add per-job execution timeout. (#11641) Thanks @tyler6204. +- Subagents: stabilize announce timing, preserve compaction metrics across retries, clamp overflow-prone long timeouts, and cap impossible context usage token totals. (#11551) Thanks @tyler6204. - Agents: recover from context overflow caused by oversized tool results (pre-emptive capping + fallback truncation). (#11579) Thanks @tyler6204. - Gateway/CLI: when `gateway.bind=lan`, use a LAN IP for probe URLs and Control UI links. (#11448) Thanks @AnonO6. - Memory: set Voyage embeddings `input_type` for improved retrieval. (#10818) Thanks @mcinteerj. diff --git a/scripts/test-parallel.mjs b/scripts/test-parallel.mjs index a2063d430b..4a3554e0b0 100644 --- a/scripts/test-parallel.mjs +++ b/scripts/test-parallel.mjs @@ -30,7 +30,9 @@ const shardCount = isWindowsCi : 2 : 1; const windowsCiArgs = isWindowsCi ? ["--dangerouslyIgnoreUnhandledErrors"] : []; -const passthroughArgs = process.argv.slice(2); +const rawPassthroughArgs = process.argv.slice(2); +const passthroughArgs = + rawPassthroughArgs[0] === "--" ? rawPassthroughArgs.slice(1) : rawPassthroughArgs; const overrideWorkers = Number.parseInt(process.env.OPENCLAW_TEST_WORKERS ?? "", 10); const resolvedOverride = Number.isFinite(overrideWorkers) && overrideWorkers > 0 ? overrideWorkers : null; diff --git a/src/agents/openclaw-tools.subagents.sessions-spawn-announces-agent-wait-lifecycle-events.test.ts b/src/agents/openclaw-tools.subagents.sessions-spawn-announces-agent-wait-lifecycle-events.test.ts index c4ee75fe63..0634d488b5 100644 --- a/src/agents/openclaw-tools.subagents.sessions-spawn-announces-agent-wait-lifecycle-events.test.ts +++ b/src/agents/openclaw-tools.subagents.sessions-spawn-announces-agent-wait-lifecycle-events.test.ts @@ -82,6 +82,16 @@ describe("openclaw-tools: subagents", () => { endedAt: 4000, }; } + if (request.method === "chat.history") { + return { + messages: [ + { + role: "assistant", + content: [{ type: "text", text: "done" }], + }, + ], + }; + } if (request.method === "sessions.delete") { const params = request.params as { key?: string } | undefined; deletedKey = params?.key; diff --git a/src/agents/openclaw-tools.subagents.sessions-spawn-normalizes-allowlisted-agent-ids.test.ts b/src/agents/openclaw-tools.subagents.sessions-spawn-normalizes-allowlisted-agent-ids.test.ts index d2f7a05be8..411653e606 100644 --- a/src/agents/openclaw-tools.subagents.sessions-spawn-normalizes-allowlisted-agent-ids.test.ts +++ b/src/agents/openclaw-tools.subagents.sessions-spawn-normalizes-allowlisted-agent-ids.test.ts @@ -23,7 +23,6 @@ vi.mock("../config/config.js", async (importOriginal) => { import { emitAgentEvent } from "../infra/agent-events.js"; import "./test-helpers/fast-core-tools.js"; -import { sleep } from "../utils.js"; import { createOpenClawTools } from "./openclaw-tools.js"; import { resetSubagentRegistryForTests } from "./subagent-registry.js"; @@ -202,19 +201,22 @@ describe("openclaw-tools: subagents", () => { if (!childRunId) { throw new Error("missing child runId"); } - emitAgentEvent({ - runId: childRunId, - stream: "lifecycle", - data: { - phase: "end", - startedAt: 1234, - endedAt: 2345, - }, - }); + vi.useFakeTimers(); + try { + emitAgentEvent({ + runId: childRunId, + stream: "lifecycle", + data: { + phase: "end", + startedAt: 1234, + endedAt: 2345, + }, + }); - await sleep(0); - await sleep(0); - await sleep(0); + await vi.runAllTimersAsync(); + } finally { + vi.useRealTimers(); + } const childWait = waitCalls.find((call) => call.runId === childRunId); expect(childWait?.timeoutMs).toBe(1000); @@ -313,19 +315,22 @@ describe("openclaw-tools: subagents", () => { if (!childRunId) { throw new Error("missing child runId"); } - emitAgentEvent({ - runId: childRunId, - stream: "lifecycle", - data: { - phase: "end", - startedAt: 1000, - endedAt: 2000, - }, - }); + vi.useFakeTimers(); + try { + emitAgentEvent({ + runId: childRunId, + stream: "lifecycle", + data: { + phase: "end", + startedAt: 1000, + endedAt: 2000, + }, + }); - await sleep(0); - await sleep(0); - await sleep(0); + await vi.runAllTimersAsync(); + } finally { + vi.useRealTimers(); + } const agentCalls = calls.filter((call) => call.method === "agent"); expect(agentCalls).toHaveLength(2); diff --git a/src/agents/openclaw-tools.subagents.sessions-spawn-resolves-main-announce-target-from.test.ts b/src/agents/openclaw-tools.subagents.sessions-spawn-resolves-main-announce-target-from.test.ts index 9f77998482..0548d70357 100644 --- a/src/agents/openclaw-tools.subagents.sessions-spawn-resolves-main-announce-target-from.test.ts +++ b/src/agents/openclaw-tools.subagents.sessions-spawn-resolves-main-announce-target-from.test.ts @@ -95,6 +95,16 @@ describe("openclaw-tools: subagents", () => { patchParams = { key: params?.key, label: params?.label }; return { ok: true }; } + if (request.method === "chat.history") { + return { + messages: [ + { + role: "assistant", + content: [{ type: "text", text: "done" }], + }, + ], + }; + } if (request.method === "sessions.delete") { return { ok: true }; } diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts index c913192a6a..df85d888cf 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts @@ -88,6 +88,7 @@ vi.mock("../failover-error.js", () => ({ vi.mock("../usage.js", () => ({ normalizeUsage: vi.fn(() => undefined), + hasNonzeroUsage: vi.fn(() => false), })); vi.mock("./lanes.js", () => ({ @@ -108,6 +109,15 @@ vi.mock("./run/payloads.js", () => ({ buildEmbeddedRunPayloads: vi.fn(() => []), })); +vi.mock("./tool-result-truncation.js", () => ({ + truncateOversizedToolResultsInSession: vi.fn(async () => ({ + truncated: false, + truncatedCount: 0, + reason: "no oversized tool results", + })), + sessionLikelyHasOversizedToolResults: vi.fn(() => false), +})); + vi.mock("./utils.js", () => ({ describeUnknownError: vi.fn((err: unknown) => { if (err instanceof Error) { @@ -140,6 +150,7 @@ vi.mock("../pi-embedded-helpers.js", async () => { isBillingAssistantError: vi.fn(() => false), classifyFailoverReason: vi.fn(() => null), formatAssistantErrorText: vi.fn(() => ""), + parseImageSizeError: vi.fn(() => null), pickFallbackThinkingLevel: vi.fn(() => null), isTimeoutErrorMessage: vi.fn(() => false), parseImageDimensionError: vi.fn(() => null), @@ -151,9 +162,17 @@ import { compactEmbeddedPiSessionDirect } from "./compact.js"; import { log } from "./logger.js"; import { runEmbeddedPiAgent } from "./run.js"; import { runEmbeddedAttempt } from "./run/attempt.js"; +import { + sessionLikelyHasOversizedToolResults, + truncateOversizedToolResultsInSession, +} from "./tool-result-truncation.js"; const mockedRunEmbeddedAttempt = vi.mocked(runEmbeddedAttempt); const mockedCompactDirect = vi.mocked(compactEmbeddedPiSessionDirect); +const mockedSessionLikelyHasOversizedToolResults = vi.mocked(sessionLikelyHasOversizedToolResults); +const mockedTruncateOversizedToolResultsInSession = vi.mocked( + truncateOversizedToolResultsInSession, +); function makeAttemptResult( overrides: Partial = {}, @@ -188,6 +207,12 @@ const baseParams = { describe("overflow compaction in run loop", () => { beforeEach(() => { vi.clearAllMocks(); + mockedSessionLikelyHasOversizedToolResults.mockReturnValue(false); + mockedTruncateOversizedToolResultsInSession.mockResolvedValue({ + truncated: false, + truncatedCount: 0, + reason: "no oversized tool results", + }); }); it("retries after successful compaction on context overflow promptError", async () => { @@ -244,6 +269,43 @@ describe("overflow compaction in run loop", () => { expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("auto-compaction failed")); }); + it("falls back to tool-result truncation and retries when oversized results are detected", async () => { + const overflowError = new Error("request_too_large: Request size exceeds model context window"); + + mockedRunEmbeddedAttempt + .mockResolvedValueOnce( + makeAttemptResult({ + promptError: overflowError, + messagesSnapshot: [{ role: "assistant", content: "big tool output" }], + }), + ) + .mockResolvedValueOnce(makeAttemptResult({ promptError: null })); + + mockedCompactDirect.mockResolvedValueOnce({ + ok: false, + compacted: false, + reason: "nothing to compact", + }); + mockedSessionLikelyHasOversizedToolResults.mockReturnValue(true); + mockedTruncateOversizedToolResultsInSession.mockResolvedValueOnce({ + truncated: true, + truncatedCount: 1, + }); + + const result = await runEmbeddedPiAgent(baseParams); + + expect(mockedCompactDirect).toHaveBeenCalledTimes(1); + expect(mockedSessionLikelyHasOversizedToolResults).toHaveBeenCalledWith( + expect.objectContaining({ contextWindowTokens: 200000 }), + ); + expect(mockedTruncateOversizedToolResultsInSession).toHaveBeenCalledWith( + expect.objectContaining({ sessionFile: "/tmp/session.json" }), + ); + expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(2); + expect(log.info).toHaveBeenCalledWith(expect.stringContaining("Truncated 1 tool result(s)")); + expect(result.meta.error).toBeUndefined(); + }); + it("retries compaction up to 3 times before giving up", async () => { const overflowError = new Error("request_too_large: Request size exceeds model context window"); @@ -323,4 +385,52 @@ describe("overflow compaction in run loop", () => { expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(1); expect(result.meta.error?.kind).toBe("compaction_failure"); }); + + it("retries after successful compaction on assistant context overflow errors", async () => { + mockedRunEmbeddedAttempt + .mockResolvedValueOnce( + makeAttemptResult({ + promptError: null, + lastAssistant: { + stopReason: "error", + errorMessage: "request_too_large: Request size exceeds model context window", + } as EmbeddedRunAttemptResult["lastAssistant"], + }), + ) + .mockResolvedValueOnce(makeAttemptResult({ promptError: null })); + + mockedCompactDirect.mockResolvedValueOnce({ + ok: true, + compacted: true, + result: { + summary: "Compacted session", + firstKeptEntryId: "entry-5", + tokensBefore: 150000, + }, + }); + + const result = await runEmbeddedPiAgent(baseParams); + + expect(mockedCompactDirect).toHaveBeenCalledTimes(1); + expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(2); + expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("source=assistantError")); + expect(result.meta.error).toBeUndefined(); + }); + + it("does not treat stale assistant overflow as current-attempt overflow when promptError is non-overflow", async () => { + mockedRunEmbeddedAttempt.mockResolvedValue( + makeAttemptResult({ + promptError: new Error("transport disconnected"), + lastAssistant: { + stopReason: "error", + errorMessage: "request_too_large: Request size exceeds model context window", + } as EmbeddedRunAttemptResult["lastAssistant"], + }), + ); + + await expect(runEmbeddedPiAgent(baseParams)).rejects.toThrow("transport disconnected"); + + expect(mockedCompactDirect).not.toHaveBeenCalled(); + expect(log.warn).not.toHaveBeenCalledWith(expect.stringContaining("source=assistantError")); + }); }); diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 97ff88cb60..268c263038 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -74,6 +74,66 @@ function scrubAnthropicRefusalMagic(prompt: string): string { ); } +type UsageAccumulator = { + input: number; + output: number; + cacheRead: number; + cacheWrite: number; + total: number; +}; + +const createUsageAccumulator = (): UsageAccumulator => ({ + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + total: 0, +}); + +const hasUsageValues = ( + usage: ReturnType, +): usage is NonNullable> => + !!usage && + [usage.input, usage.output, usage.cacheRead, usage.cacheWrite, usage.total].some( + (value) => typeof value === "number" && Number.isFinite(value) && value > 0, + ); + +const mergeUsageIntoAccumulator = ( + target: UsageAccumulator, + usage: ReturnType, +) => { + if (!hasUsageValues(usage)) { + return; + } + target.input += usage.input ?? 0; + target.output += usage.output ?? 0; + target.cacheRead += usage.cacheRead ?? 0; + target.cacheWrite += usage.cacheWrite ?? 0; + target.total += + usage.total ?? + (usage.input ?? 0) + (usage.output ?? 0) + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0); +}; + +const toNormalizedUsage = (usage: UsageAccumulator) => { + const hasUsage = + usage.input > 0 || + usage.output > 0 || + usage.cacheRead > 0 || + usage.cacheWrite > 0 || + usage.total > 0; + if (!hasUsage) { + return undefined; + } + const derivedTotal = usage.input + usage.output + usage.cacheRead + usage.cacheWrite; + return { + input: usage.input || undefined, + output: usage.output || undefined, + cacheRead: usage.cacheRead || undefined, + cacheWrite: usage.cacheWrite || undefined, + total: usage.total || derivedTotal || undefined, + }; +}; + export async function runEmbeddedPiAgent( params: RunEmbeddedPiAgentParams, ): Promise { @@ -326,6 +386,8 @@ export async function runEmbeddedPiAgent( const MAX_OVERFLOW_COMPACTION_ATTEMPTS = 3; let overflowCompactionAttempts = 0; let toolResultTruncationAttempted = false; + const usageAccumulator = createUsageAccumulator(); + let autoCompactionCount = 0; try { while (true) { attemptedThinking.add(thinkLevel); @@ -392,119 +454,151 @@ export async function runEmbeddedPiAgent( }); const { aborted, promptError, timedOut, sessionIdUsed, lastAssistant } = attempt; + mergeUsageIntoAccumulator( + usageAccumulator, + attempt.attemptUsage ?? normalizeUsage(lastAssistant?.usage as UsageLike), + ); + autoCompactionCount += Math.max(0, attempt.compactionCount ?? 0); + const formattedAssistantErrorText = lastAssistant + ? formatAssistantErrorText(lastAssistant, { + cfg: params.config, + sessionKey: params.sessionKey ?? params.sessionId, + }) + : undefined; + const assistantErrorText = + lastAssistant?.stopReason === "error" + ? lastAssistant.errorMessage?.trim() || formattedAssistantErrorText + : undefined; - if (promptError && !aborted) { - const errorText = describeUnknownError(promptError); - if (isContextOverflowError(errorText)) { - const msgCount = attempt.messagesSnapshot?.length ?? 0; + const contextOverflowError = !aborted + ? (() => { + if (promptError) { + const errorText = describeUnknownError(promptError); + if (isContextOverflowError(errorText)) { + return { text: errorText, source: "promptError" as const }; + } + // Prompt submission failed with a non-overflow error. Do not + // inspect prior assistant errors from history for this attempt. + return null; + } + if (assistantErrorText && isContextOverflowError(assistantErrorText)) { + return { text: assistantErrorText, source: "assistantError" as const }; + } + return null; + })() + : null; + + if (contextOverflowError) { + const errorText = contextOverflowError.text; + const msgCount = attempt.messagesSnapshot?.length ?? 0; + log.warn( + `[context-overflow-diag] sessionKey=${params.sessionKey ?? params.sessionId} ` + + `provider=${provider}/${modelId} source=${contextOverflowError.source} ` + + `messages=${msgCount} sessionFile=${params.sessionFile} ` + + `compactionAttempts=${overflowCompactionAttempts} error=${errorText.slice(0, 200)}`, + ); + const isCompactionFailure = isCompactionFailureError(errorText); + // Attempt auto-compaction on context overflow (not compaction_failure) + if ( + !isCompactionFailure && + overflowCompactionAttempts < MAX_OVERFLOW_COMPACTION_ATTEMPTS + ) { + overflowCompactionAttempts++; log.warn( - `[context-overflow-diag] sessionKey=${params.sessionKey ?? params.sessionId} ` + - `provider=${provider}/${modelId} messages=${msgCount} ` + - `sessionFile=${params.sessionFile} compactionAttempts=${overflowCompactionAttempts} ` + - `error=${errorText.slice(0, 200)}`, + `context overflow detected (attempt ${overflowCompactionAttempts}/${MAX_OVERFLOW_COMPACTION_ATTEMPTS}); attempting auto-compaction for ${provider}/${modelId}`, ); - const isCompactionFailure = isCompactionFailureError(errorText); - // Attempt auto-compaction on context overflow (not compaction_failure) - if ( - !isCompactionFailure && - overflowCompactionAttempts < MAX_OVERFLOW_COMPACTION_ATTEMPTS - ) { - overflowCompactionAttempts++; + const compactResult = await compactEmbeddedPiSessionDirect({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + messageChannel: params.messageChannel, + messageProvider: params.messageProvider, + agentAccountId: params.agentAccountId, + authProfileId: lastProfileId, + sessionFile: params.sessionFile, + workspaceDir: resolvedWorkspace, + agentDir, + config: params.config, + skillsSnapshot: params.skillsSnapshot, + senderIsOwner: params.senderIsOwner, + provider, + model: modelId, + thinkLevel, + reasoningLevel: params.reasoningLevel, + bashElevated: params.bashElevated, + extraSystemPrompt: params.extraSystemPrompt, + ownerNumbers: params.ownerNumbers, + }); + if (compactResult.compacted) { + autoCompactionCount += 1; + log.info(`auto-compaction succeeded for ${provider}/${modelId}; retrying prompt`); + continue; + } + log.warn( + `auto-compaction failed for ${provider}/${modelId}: ${compactResult.reason ?? "nothing to compact"}`, + ); + } + // Fallback: try truncating oversized tool results in the session. + // This handles the case where a single tool result exceeds the + // context window and compaction cannot reduce it further. + if (!toolResultTruncationAttempted) { + const contextWindowTokens = ctxInfo.tokens; + const hasOversized = attempt.messagesSnapshot + ? sessionLikelyHasOversizedToolResults({ + messages: attempt.messagesSnapshot, + contextWindowTokens, + }) + : false; + + if (hasOversized) { + toolResultTruncationAttempted = true; log.warn( - `context overflow detected (attempt ${overflowCompactionAttempts}/${MAX_OVERFLOW_COMPACTION_ATTEMPTS}); attempting auto-compaction for ${provider}/${modelId}`, + `[context-overflow-recovery] Attempting tool result truncation for ${provider}/${modelId} ` + + `(contextWindow=${contextWindowTokens} tokens)`, ); - const compactResult = await compactEmbeddedPiSessionDirect({ + const truncResult = await truncateOversizedToolResultsInSession({ + sessionFile: params.sessionFile, + contextWindowTokens, sessionId: params.sessionId, sessionKey: params.sessionKey, - messageChannel: params.messageChannel, - messageProvider: params.messageProvider, - agentAccountId: params.agentAccountId, - authProfileId: lastProfileId, - sessionFile: params.sessionFile, - workspaceDir: resolvedWorkspace, - agentDir, - config: params.config, - skillsSnapshot: params.skillsSnapshot, - senderIsOwner: params.senderIsOwner, - provider, - model: modelId, - thinkLevel, - reasoningLevel: params.reasoningLevel, - bashElevated: params.bashElevated, - extraSystemPrompt: params.extraSystemPrompt, - ownerNumbers: params.ownerNumbers, }); - if (compactResult.compacted) { - log.info(`auto-compaction succeeded for ${provider}/${modelId}; retrying prompt`); + if (truncResult.truncated) { + log.info( + `[context-overflow-recovery] Truncated ${truncResult.truncatedCount} tool result(s); retrying prompt`, + ); + // Session is now smaller; allow compaction retries again. + overflowCompactionAttempts = 0; continue; } log.warn( - `auto-compaction failed for ${provider}/${modelId}: ${compactResult.reason ?? "nothing to compact"}`, + `[context-overflow-recovery] Tool result truncation did not help: ${truncResult.reason ?? "unknown"}`, ); } - - // Fallback: try truncating oversized tool results in the session. - // This handles the case where a single tool result (e.g., reading a - // huge file or getting a massive PR diff) exceeds the context window, - // and compaction can't help because there's no older history to compact. - if (!toolResultTruncationAttempted) { - const contextWindowTokens = ctxInfo.tokens; - const hasOversized = attempt.messagesSnapshot - ? sessionLikelyHasOversizedToolResults({ - messages: attempt.messagesSnapshot, - contextWindowTokens, - }) - : false; - - if (hasOversized) { - toolResultTruncationAttempted = true; - log.warn( - `[context-overflow-recovery] Attempting tool result truncation for ${provider}/${modelId} ` + - `(contextWindow=${contextWindowTokens} tokens)`, - ); - const truncResult = await truncateOversizedToolResultsInSession({ - sessionFile: params.sessionFile, - contextWindowTokens, - sessionId: params.sessionId, - sessionKey: params.sessionKey, - }); - if (truncResult.truncated) { - log.info( - `[context-overflow-recovery] Truncated ${truncResult.truncatedCount} tool result(s); retrying prompt`, - ); - // Reset compaction attempts so compaction can be tried again - // after truncation (the session is now smaller) - overflowCompactionAttempts = 0; - continue; - } - log.warn( - `[context-overflow-recovery] Tool result truncation did not help: ${truncResult.reason ?? "unknown"}`, - ); - } - } - - const kind = isCompactionFailure ? "compaction_failure" : "context_overflow"; - return { - payloads: [ - { - text: - "Context overflow: prompt too large for the model. " + - "Try again with less input or a larger-context model.", - isError: true, - }, - ], - meta: { - durationMs: Date.now() - started, - agentMeta: { - sessionId: sessionIdUsed, - provider, - model: model.id, - }, - systemPromptReport: attempt.systemPromptReport, - error: { kind, message: errorText }, - }, - }; } + const kind = isCompactionFailure ? "compaction_failure" : "context_overflow"; + return { + payloads: [ + { + text: + "Context overflow: prompt too large for the model. " + + "Try again with less input or a larger-context model.", + isError: true, + }, + ], + meta: { + durationMs: Date.now() - started, + agentMeta: { + sessionId: sessionIdUsed, + provider, + model: model.id, + }, + systemPromptReport: attempt.systemPromptReport, + error: { kind, message: errorText }, + }, + }; + } + + if (promptError && !aborted) { + const errorText = describeUnknownError(promptError); // Handle role ordering errors with a user-friendly message if (/incorrect role information|roles must alternate/i.test(errorText)) { return { @@ -702,12 +796,13 @@ export async function runEmbeddedPiAgent( } } - const usage = normalizeUsage(lastAssistant?.usage as UsageLike); + const usage = toNormalizedUsage(usageAccumulator); const agentMeta: EmbeddedPiAgentMeta = { sessionId: sessionIdUsed, provider: lastAssistant?.provider ?? provider, model: lastAssistant?.model ?? model.id, usage, + compactionCount: autoCompactionCount > 0 ? autoCompactionCount : undefined, }; const payloads = buildEmbeddedRunPayloads({ diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index cc95a77cf8..f195150a04 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -650,6 +650,8 @@ export async function runEmbeddedAttempt( getMessagingToolSentTargets, didSendViaMessagingTool, getLastToolError, + getUsageTotals, + getCompactionCount, } = subscription; const queueHandle: EmbeddedPiQueueHandle = { @@ -908,6 +910,8 @@ export async function runEmbeddedAttempt( cloudCodeAssistFormatError: Boolean( lastAssistant?.errorMessage && isCloudCodeAssistFormatError(lastAssistant.errorMessage), ), + attemptUsage: getUsageTotals(), + compactionCount: getCompactionCount(), // Client tool call detected (OpenResponses hosted tools) clientToolCall: clientToolCallDetected ?? undefined, }; diff --git a/src/agents/pi-embedded-runner/run/types.ts b/src/agents/pi-embedded-runner/run/types.ts index 181a42c9f9..5cfc8bbca1 100644 --- a/src/agents/pi-embedded-runner/run/types.ts +++ b/src/agents/pi-embedded-runner/run/types.ts @@ -9,6 +9,7 @@ import type { MessagingToolSend } from "../../pi-embedded-messaging.js"; import type { BlockReplyChunking, ToolResultFormat } from "../../pi-embedded-subscribe.js"; import type { AuthStorage, ModelRegistry } from "../../pi-model-discovery.js"; import type { SkillSnapshot } from "../../skills.js"; +import type { NormalizedUsage } from "../../usage.js"; import type { ClientToolDefinition } from "./params.js"; export type EmbeddedRunAttemptParams = { @@ -106,6 +107,8 @@ export type EmbeddedRunAttemptResult = { messagingToolSentTexts: string[]; messagingToolSentTargets: MessagingToolSend[]; cloudCodeAssistFormatError: boolean; + attemptUsage?: NormalizedUsage; + compactionCount?: number; /** Client tool call detected (OpenResponses hosted tools). */ clientToolCall?: { name: string; params: Record }; }; diff --git a/src/agents/pi-embedded-runner/types.ts b/src/agents/pi-embedded-runner/types.ts index 9b6c349162..9217b48319 100644 --- a/src/agents/pi-embedded-runner/types.ts +++ b/src/agents/pi-embedded-runner/types.ts @@ -5,6 +5,7 @@ export type EmbeddedPiAgentMeta = { sessionId: string; provider: string; model: string; + compactionCount?: number; usage?: { input?: number; output?: number; diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts index de8c8bd6ae..943f2dec7c 100644 --- a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts @@ -21,6 +21,7 @@ export function handleAgentStart(ctx: EmbeddedPiSubscribeContext) { export function handleAutoCompactionStart(ctx: EmbeddedPiSubscribeContext) { ctx.state.compactionInFlight = true; + ctx.incrementCompactionCount(); ctx.ensureCompactionPromise(); ctx.log.debug(`embedded run compaction start: runId=${ctx.params.runId}`); emitAgentEvent({ diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index a5bd6bd536..3f1b0e70e4 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -198,6 +198,7 @@ export function handleMessageEnd( } const assistantMessage = msg; + ctx.recordAssistantUsage((assistantMessage as { usage?: unknown }).usage); promoteThinkingTagsToBlocks(assistantMessage); const rawText = extractAssistantText(assistantMessage); diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index db2f07b768..89a661e742 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -8,6 +8,7 @@ import type { BlockReplyChunking, SubscribeEmbeddedPiSessionParams, } from "./pi-embedded-subscribe.types.js"; +import type { NormalizedUsage } from "./usage.js"; export type EmbeddedSubscribeLogger = { debug: (message: string) => void; @@ -100,6 +101,10 @@ export type EmbeddedPiSubscribeContext = { noteCompactionRetry: () => void; resolveCompactionRetry: () => void; maybeResolveCompactionWait: () => void; + recordAssistantUsage: (usage: unknown) => void; + incrementCompactionCount: () => void; + getUsageTotals: () => NormalizedUsage | undefined; + getCompactionCount: () => number; }; export type EmbeddedPiSubscribeEvent = diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 0a4b9c0fa5..3c269a3783 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -16,6 +16,7 @@ import { } from "./pi-embedded-helpers.js"; import { createEmbeddedPiSessionEventHandler } from "./pi-embedded-subscribe.handlers.js"; import { formatReasoningMessage } from "./pi-embedded-utils.js"; +import { hasNonzeroUsage, normalizeUsage, type UsageLike } from "./usage.js"; const THINKING_TAG_SCAN_RE = /<\s*(\/?)\s*(?:think(?:ing)?|thought|antthinking)\s*>/gi; const FINAL_TAG_SCAN_RE = /<\s*(\/?)\s*final\s*>/gi; @@ -69,6 +70,14 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar pendingMessagingTexts: new Map(), pendingMessagingTargets: new Map(), }; + const usageTotals = { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + total: 0, + }; + let compactionCount = 0; const assistantTexts = state.assistantTexts; const toolMetas = state.toolMetas; @@ -222,6 +231,43 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar state.compactionRetryPromise = null; } }; + const recordAssistantUsage = (usageLike: unknown) => { + const usage = normalizeUsage((usageLike ?? undefined) as UsageLike | undefined); + if (!hasNonzeroUsage(usage)) { + return; + } + usageTotals.input += usage.input ?? 0; + usageTotals.output += usage.output ?? 0; + usageTotals.cacheRead += usage.cacheRead ?? 0; + usageTotals.cacheWrite += usage.cacheWrite ?? 0; + const usageTotal = + usage.total ?? + (usage.input ?? 0) + (usage.output ?? 0) + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0); + usageTotals.total += usageTotal; + }; + const getUsageTotals = () => { + const hasUsage = + usageTotals.input > 0 || + usageTotals.output > 0 || + usageTotals.cacheRead > 0 || + usageTotals.cacheWrite > 0 || + usageTotals.total > 0; + if (!hasUsage) { + return undefined; + } + const derivedTotal = + usageTotals.input + usageTotals.output + usageTotals.cacheRead + usageTotals.cacheWrite; + return { + input: usageTotals.input || undefined, + output: usageTotals.output || undefined, + cacheRead: usageTotals.cacheRead || undefined, + cacheWrite: usageTotals.cacheWrite || undefined, + total: usageTotals.total || derivedTotal || undefined, + }; + }; + const incrementCompactionCount = () => { + compactionCount += 1; + }; const blockChunking = params.blockReplyChunking; const blockChunker = blockChunking ? new EmbeddedBlockChunker(blockChunking) : null; @@ -530,6 +576,10 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar noteCompactionRetry, resolveCompactionRetry, maybeResolveCompactionWait, + recordAssistantUsage, + incrementCompactionCount, + getUsageTotals, + getCompactionCount: () => compactionCount, }; const unsubscribe = params.session.subscribe(createEmbeddedPiSessionEventHandler(ctx)); @@ -546,6 +596,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar // which is generated AFTER the tool sends the actual answer. didSendViaMessagingTool: () => messagingToolSentTexts.length > 0, getLastToolError: () => (state.lastToolError ? { ...state.lastToolError } : undefined), + getUsageTotals, + getCompactionCount: () => compactionCount, waitForCompactionRetry: () => { if (state.compactionInFlight || state.pendingCompactionRetry > 0) { ensureCompactionPromise(); diff --git a/src/agents/subagent-announce.format.test.ts b/src/agents/subagent-announce.format.test.ts index 2f137dc4d5..b1a0f6dd14 100644 --- a/src/agents/subagent-announce.format.test.ts +++ b/src/agents/subagent-announce.format.test.ts @@ -1,6 +1,8 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; const agentSpy = vi.fn(async () => ({ runId: "run-main", status: "ok" })); +const sessionsDeleteSpy = vi.fn(); +const readLatestAssistantReplyMock = vi.fn(async () => "raw subagent reply"); const embeddedRunMock = { isEmbeddedPiRunActive: vi.fn(() => false), isEmbeddedPiRunStreaming: vi.fn(() => false), @@ -28,6 +30,7 @@ vi.mock("../gateway/call.js", () => ({ return {}; } if (typed.method === "sessions.delete") { + sessionsDeleteSpy(typed); return {}; } return {}; @@ -35,7 +38,7 @@ vi.mock("../gateway/call.js", () => ({ })); vi.mock("./tools/agent-step.js", () => ({ - readLatestAssistantReply: vi.fn(async () => "raw subagent reply"), + readLatestAssistantReply: readLatestAssistantReplyMock, })); vi.mock("../config/sessions.js", () => ({ @@ -60,10 +63,12 @@ vi.mock("../config/config.js", async (importOriginal) => { describe("subagent announce formatting", () => { beforeEach(() => { agentSpy.mockClear(); + sessionsDeleteSpy.mockClear(); embeddedRunMock.isEmbeddedPiRunActive.mockReset().mockReturnValue(false); embeddedRunMock.isEmbeddedPiRunStreaming.mockReset().mockReturnValue(false); embeddedRunMock.queueEmbeddedPiMessage.mockReset().mockReturnValue(false); embeddedRunMock.waitForEmbeddedPiRunEnd.mockReset().mockResolvedValue(true); + readLatestAssistantReplyMock.mockReset().mockResolvedValue("raw subagent reply"); sessionStore = {}; configOverride = { session: { @@ -356,6 +361,95 @@ describe("subagent announce formatting", () => { expect(call?.params?.accountId).toBe("acct-123"); }); + it("retries reading subagent output when early lifecycle completion had no text", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + embeddedRunMock.isEmbeddedPiRunActive.mockReturnValueOnce(true).mockReturnValue(false); + embeddedRunMock.waitForEmbeddedPiRunEnd.mockResolvedValue(true); + readLatestAssistantReplyMock + .mockResolvedValueOnce(undefined) + .mockResolvedValueOnce("Read #12 complete."); + sessionStore = { + "agent:main:subagent:test": { + sessionId: "child-session-1", + }, + }; + + await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:test", + childRunId: "run-child", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "context-stress-test", + timeoutMs: 1000, + cleanup: "keep", + waitForCompletion: false, + startedAt: 10, + endedAt: 20, + outcome: { status: "ok" }, + }); + + expect(embeddedRunMock.waitForEmbeddedPiRunEnd).toHaveBeenCalledWith("child-session-1", 1000); + const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } }; + expect(call?.params?.message).toContain("Read #12 complete."); + expect(call?.params?.message).not.toContain("(no output)"); + }); + + it("defers announce when child run is still active after wait timeout", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); + embeddedRunMock.waitForEmbeddedPiRunEnd.mockResolvedValue(false); + sessionStore = { + "agent:main:subagent:test": { + sessionId: "child-session-active", + }, + }; + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:test", + childRunId: "run-child-active", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "context-stress-test", + timeoutMs: 1000, + cleanup: "keep", + waitForCompletion: false, + startedAt: 10, + endedAt: 20, + outcome: { status: "ok" }, + }); + + expect(didAnnounce).toBe(false); + expect(agentSpy).not.toHaveBeenCalled(); + }); + + it("does not delete child session when announce is deferred for an active run", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); + embeddedRunMock.waitForEmbeddedPiRunEnd.mockResolvedValue(false); + sessionStore = { + "agent:main:subagent:test": { + sessionId: "child-session-active", + }, + }; + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:test", + childRunId: "run-child-active-delete", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "context-stress-test", + timeoutMs: 1000, + cleanup: "delete", + waitForCompletion: false, + startedAt: 10, + endedAt: 20, + outcome: { status: "ok" }, + }); + + expect(didAnnounce).toBe(false); + expect(sessionsDeleteSpy).not.toHaveBeenCalled(); + }); + it("normalizes requesterOrigin for direct announce delivery", async () => { const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false); diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 24de2c2bc5..ae771dade0 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -17,7 +17,11 @@ import { mergeDeliveryContext, normalizeDeliveryContext, } from "../utils/delivery-context.js"; -import { isEmbeddedPiRunActive, queueEmbeddedPiMessage } from "./pi-embedded.js"; +import { + isEmbeddedPiRunActive, + queueEmbeddedPiMessage, + waitForEmbeddedPiRunEnd, +} from "./pi-embedded.js"; import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-queue.js"; import { readLatestAssistantReply } from "./tools/agent-step.js"; @@ -288,6 +292,35 @@ async function buildSubagentStatsLine(params: { return `Stats: ${parts.join(" \u2022 ")}`; } +function loadSessionEntryByKey(sessionKey: string) { + const cfg = loadConfig(); + const agentId = resolveAgentIdFromSessionKey(sessionKey); + const storePath = resolveStorePath(cfg.session?.store, { agentId }); + const store = loadSessionStore(storePath); + return store[sessionKey]; +} + +async function readLatestAssistantReplyWithRetry(params: { + sessionKey: string; + initialReply?: string; + maxWaitMs: number; +}): Promise { + let reply = params.initialReply?.trim() ? params.initialReply : undefined; + if (reply) { + return reply; + } + + const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 15_000)); + while (Date.now() < deadline) { + await new Promise((resolve) => setTimeout(resolve, 300)); + const latest = await readLatestAssistantReply({ sessionKey: params.sessionKey }); + if (latest?.trim()) { + return latest; + } + } + return reply; +} + export function buildSubagentSystemPrompt(params: { requesterSessionKey?: string; requesterOrigin?: DeliveryContext; @@ -365,12 +398,33 @@ export async function runSubagentAnnounceFlow(params: { announceType?: SubagentAnnounceType; }): Promise { let didAnnounce = false; + let shouldDeleteChildSession = params.cleanup === "delete"; try { const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin); + const childSessionId = (() => { + const entry = loadSessionEntryByKey(params.childSessionKey); + return typeof entry?.sessionId === "string" && entry.sessionId.trim() + ? entry.sessionId.trim() + : undefined; + })(); + const settleTimeoutMs = Math.min(Math.max(params.timeoutMs, 1), 120_000); let reply = params.roundOneReply; let outcome: SubagentRunOutcome | undefined = params.outcome; + // Lifecycle "end" can arrive before auto-compaction retries finish. If the + // subagent is still active, wait for the embedded run to fully settle. + if (childSessionId && isEmbeddedPiRunActive(childSessionId)) { + const settled = await waitForEmbeddedPiRunEnd(childSessionId, settleTimeoutMs); + if (!settled && isEmbeddedPiRunActive(childSessionId)) { + // The child run is still active (e.g., compaction retry still in progress). + // Defer announcement so we don't report stale/partial output. + // Keep the child session so output is not lost while the run is still active. + shouldDeleteChildSession = false; + return false; + } + } + if (!reply && params.waitForCompletion !== false) { - const waitMs = Math.min(params.timeoutMs, 60_000); + const waitMs = settleTimeoutMs; const wait = await callGateway<{ status?: string; startedAt?: number; @@ -403,17 +457,27 @@ export async function runSubagentAnnounceFlow(params: { outcome = { status: "timeout" }; } } - reply = await readLatestAssistantReply({ - sessionKey: params.childSessionKey, - }); + reply = await readLatestAssistantReply({ sessionKey: params.childSessionKey }); } if (!reply) { - reply = await readLatestAssistantReply({ + reply = await readLatestAssistantReply({ sessionKey: params.childSessionKey }); + } + + if (!reply?.trim()) { + reply = await readLatestAssistantReplyWithRetry({ sessionKey: params.childSessionKey, + initialReply: reply, + maxWaitMs: params.timeoutMs, }); } + if (!reply?.trim() && childSessionId && isEmbeddedPiRunActive(childSessionId)) { + // Avoid announcing "(no output)" while the child run is still producing output. + shouldDeleteChildSession = false; + return false; + } + if (!outcome) { outcome = { status: "unknown" }; } @@ -508,7 +572,7 @@ export async function runSubagentAnnounceFlow(params: { // Best-effort } } - if (params.cleanup === "delete") { + if (shouldDeleteChildSession) { try { await callGateway({ method: "sessions.delete", diff --git a/src/agents/subagent-registry.persistence.test.ts b/src/agents/subagent-registry.persistence.test.ts index f97312a885..6b7aa4f473 100644 --- a/src/agents/subagent-registry.persistence.test.ts +++ b/src/agents/subagent-registry.persistence.test.ts @@ -230,4 +230,53 @@ describe("subagent registry persistence", () => { }; expect(afterSecond.runs["run-3"].cleanupCompletedAt).toBeDefined(); }); + + it("keeps delete-mode runs retryable when announce is deferred", async () => { + tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-subagent-")); + process.env.OPENCLAW_STATE_DIR = tempStateDir; + + const registryPath = path.join(tempStateDir, "subagents", "runs.json"); + const persisted = { + version: 2, + runs: { + "run-4": { + runId: "run-4", + childSessionKey: "agent:main:subagent:four", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "deferred announce", + cleanup: "delete", + createdAt: 1, + startedAt: 1, + endedAt: 2, + }, + }, + }; + await fs.mkdir(path.dirname(registryPath), { recursive: true }); + await fs.writeFile(registryPath, `${JSON.stringify(persisted)}\n`, "utf8"); + + announceSpy.mockResolvedValueOnce(false); + vi.resetModules(); + const mod1 = await import("./subagent-registry.js"); + mod1.initSubagentRegistry(); + await new Promise((r) => setTimeout(r, 0)); + + expect(announceSpy).toHaveBeenCalledTimes(1); + const afterFirst = JSON.parse(await fs.readFile(registryPath, "utf8")) as { + runs: Record; + }; + expect(afterFirst.runs["run-4"]?.cleanupHandled).toBe(false); + + announceSpy.mockResolvedValueOnce(true); + vi.resetModules(); + const mod2 = await import("./subagent-registry.js"); + mod2.initSubagentRegistry(); + await new Promise((r) => setTimeout(r, 0)); + + expect(announceSpy).toHaveBeenCalledTimes(2); + const afterSecond = JSON.parse(await fs.readFile(registryPath, "utf8")) as { + runs?: Record; + }; + expect(afterSecond.runs?.["run-4"]).toBeUndefined(); + }); }); diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index d2588982a0..c790490bc8 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -33,6 +33,7 @@ let listenerStarted = false; let listenerStop: (() => void) | null = null; // Use var to avoid TDZ when init runs across circular imports during bootstrap. var restoreAttempted = false; +const SUBAGENT_ANNOUNCE_TIMEOUT_MS = 120_000; function persistSubagentRuns() { try { @@ -68,7 +69,7 @@ function resumeSubagentRun(runId: string) { requesterOrigin, requesterDisplayKey: entry.requesterDisplayKey, task: entry.task, - timeoutMs: 30_000, + timeoutMs: SUBAGENT_ANNOUNCE_TIMEOUT_MS, cleanup: entry.cleanup, waitForCompletion: false, startedAt: entry.startedAt, @@ -229,7 +230,7 @@ function ensureListener() { requesterOrigin, requesterDisplayKey: entry.requesterDisplayKey, task: entry.task, - timeoutMs: 30_000, + timeoutMs: SUBAGENT_ANNOUNCE_TIMEOUT_MS, cleanup: entry.cleanup, waitForCompletion: false, startedAt: entry.startedAt, @@ -247,14 +248,14 @@ function finalizeSubagentCleanup(runId: string, cleanup: "delete" | "keep", didA if (!entry) { return; } - if (cleanup === "delete") { - subagentRuns.delete(runId); + if (!didAnnounce) { + // Allow retry on the next wake if announce was deferred or failed. + entry.cleanupHandled = false; persistSubagentRuns(); return; } - if (!didAnnounce) { - // Allow retry on the next wake if the announce failed. - entry.cleanupHandled = false; + if (cleanup === "delete") { + subagentRuns.delete(runId); persistSubagentRuns(); return; } @@ -373,7 +374,7 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) { requesterOrigin, requesterDisplayKey: entry.requesterDisplayKey, task: entry.task, - timeoutMs: 30_000, + timeoutMs: SUBAGENT_ANNOUNCE_TIMEOUT_MS, cleanup: entry.cleanup, waitForCompletion: false, startedAt: entry.startedAt, diff --git a/src/agents/timeout.test.ts b/src/agents/timeout.test.ts new file mode 100644 index 0000000000..37a96a9ff0 --- /dev/null +++ b/src/agents/timeout.test.ts @@ -0,0 +1,14 @@ +import { describe, expect, it } from "vitest"; +import { resolveAgentTimeoutMs } from "./timeout.js"; + +describe("resolveAgentTimeoutMs", () => { + it("uses a timer-safe sentinel for no-timeout overrides", () => { + expect(resolveAgentTimeoutMs({ overrideSeconds: 0 })).toBe(2_147_000_000); + expect(resolveAgentTimeoutMs({ overrideMs: 0 })).toBe(2_147_000_000); + }); + + it("clamps very large timeout overrides to timer-safe values", () => { + expect(resolveAgentTimeoutMs({ overrideSeconds: 9_999_999 })).toBe(2_147_000_000); + expect(resolveAgentTimeoutMs({ overrideMs: 9_999_999_999 })).toBe(2_147_000_000); + }); +}); diff --git a/src/agents/timeout.ts b/src/agents/timeout.ts index 6b38b4b04c..56970a1185 100644 --- a/src/agents/timeout.ts +++ b/src/agents/timeout.ts @@ -1,6 +1,7 @@ import type { OpenClawConfig } from "../config/config.js"; const DEFAULT_AGENT_TIMEOUT_SECONDS = 600; +const MAX_SAFE_TIMEOUT_MS = 2_147_000_000; const normalizeNumber = (value: unknown): number | undefined => typeof value === "number" && Number.isFinite(value) ? Math.floor(value) : undefined; @@ -18,10 +19,11 @@ export function resolveAgentTimeoutMs(opts: { minMs?: number; }): number { const minMs = Math.max(normalizeNumber(opts.minMs) ?? 1, 1); - const defaultMs = resolveAgentTimeoutSeconds(opts.cfg) * 1000; - // Use a very large timeout value (30 days) to represent "no timeout" - // when explicitly set to 0. This avoids setTimeout issues with Infinity. - const NO_TIMEOUT_MS = 30 * 24 * 60 * 60 * 1000; + const clampTimeoutMs = (valueMs: number) => + Math.min(Math.max(valueMs, minMs), MAX_SAFE_TIMEOUT_MS); + const defaultMs = clampTimeoutMs(resolveAgentTimeoutSeconds(opts.cfg) * 1000); + // Use the maximum timer-safe timeout to represent "no timeout" when explicitly set to 0. + const NO_TIMEOUT_MS = MAX_SAFE_TIMEOUT_MS; const overrideMs = normalizeNumber(opts.overrideMs); if (overrideMs !== undefined) { if (overrideMs === 0) { @@ -30,7 +32,7 @@ export function resolveAgentTimeoutMs(opts: { if (overrideMs < 0) { return defaultMs; } - return Math.max(overrideMs, minMs); + return clampTimeoutMs(overrideMs); } const overrideSeconds = normalizeNumber(opts.overrideSeconds); if (overrideSeconds !== undefined) { @@ -40,7 +42,7 @@ export function resolveAgentTimeoutMs(opts: { if (overrideSeconds < 0) { return defaultMs; } - return Math.max(overrideSeconds * 1000, minMs); + return clampTimeoutMs(overrideSeconds * 1000); } - return Math.max(defaultMs, minMs); + return defaultMs; } diff --git a/src/agents/usage.test.ts b/src/agents/usage.test.ts index 8250f2488e..8743de718d 100644 --- a/src/agents/usage.test.ts +++ b/src/agents/usage.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it } from "vitest"; -import { hasNonzeroUsage, normalizeUsage } from "./usage.js"; +import { deriveSessionTotalTokens, hasNonzeroUsage, normalizeUsage } from "./usage.js"; describe("normalizeUsage", () => { it("normalizes Anthropic-style snake_case usage", () => { @@ -46,4 +46,32 @@ describe("normalizeUsage", () => { expect(hasNonzeroUsage({ input: 1 })).toBe(true); expect(hasNonzeroUsage({ total: 1 })).toBe(true); }); + + it("caps derived session total tokens to the context window", () => { + expect( + deriveSessionTotalTokens({ + usage: { + input: 27, + cacheRead: 2_400_000, + cacheWrite: 0, + total: 2_402_300, + }, + contextTokens: 200_000, + }), + ).toBe(200_000); + }); + + it("uses prompt tokens when within context window", () => { + expect( + deriveSessionTotalTokens({ + usage: { + input: 1_200, + cacheRead: 300, + cacheWrite: 50, + total: 2_000, + }, + contextTokens: 200_000, + }), + ).toBe(1_550); + }); }); diff --git a/src/agents/usage.ts b/src/agents/usage.ts index ec0610b8d3..7367b99ff3 100644 --- a/src/agents/usage.ts +++ b/src/agents/usage.ts @@ -103,3 +103,34 @@ export function derivePromptTokens(usage?: { const sum = input + cacheRead + cacheWrite; return sum > 0 ? sum : undefined; } + +export function deriveSessionTotalTokens(params: { + usage?: { + input?: number; + total?: number; + cacheRead?: number; + cacheWrite?: number; + }; + contextTokens?: number; +}): number | undefined { + const usage = params.usage; + if (!usage) { + return undefined; + } + const input = usage.input ?? 0; + const promptTokens = derivePromptTokens({ + input: usage.input, + cacheRead: usage.cacheRead, + cacheWrite: usage.cacheWrite, + }); + let total = promptTokens ?? usage.total ?? input; + if (!(total > 0)) { + return undefined; + } + + const contextTokens = params.contextTokens; + if (typeof contextTokens === "number" && Number.isFinite(contextTokens) && contextTokens > 0) { + total = Math.min(total, contextTokens); + } + return total; +} diff --git a/src/auto-reply/reply/session-usage.ts b/src/auto-reply/reply/session-usage.ts index 8ef885d1a1..a562c20054 100644 --- a/src/auto-reply/reply/session-usage.ts +++ b/src/auto-reply/reply/session-usage.ts @@ -1,5 +1,9 @@ import { setCliSessionId } from "../../agents/cli-session.js"; -import { hasNonzeroUsage, type NormalizedUsage } from "../../agents/usage.js"; +import { + deriveSessionTotalTokens, + hasNonzeroUsage, + type NormalizedUsage, +} from "../../agents/usage.js"; import { type SessionSystemPromptReport, type SessionEntry, @@ -32,15 +36,18 @@ export async function persistSessionUsageUpdate(params: { update: async (entry) => { const input = params.usage?.input ?? 0; const output = params.usage?.output ?? 0; - const promptTokens = - input + (params.usage?.cacheRead ?? 0) + (params.usage?.cacheWrite ?? 0); + const resolvedContextTokens = params.contextTokensUsed ?? entry.contextTokens; const patch: Partial = { inputTokens: input, outputTokens: output, - totalTokens: promptTokens > 0 ? promptTokens : (params.usage?.total ?? input), + totalTokens: + deriveSessionTotalTokens({ + usage: params.usage, + contextTokens: resolvedContextTokens, + }) ?? input, modelProvider: params.providerUsed ?? entry.modelProvider, model: params.modelUsed ?? entry.model, - contextTokens: params.contextTokensUsed ?? entry.contextTokens, + contextTokens: resolvedContextTokens, systemPromptReport: params.systemPromptReport ?? entry.systemPromptReport, updatedAt: Date.now(), }; diff --git a/src/commands/agent/session-store.ts b/src/commands/agent/session-store.ts index 2c97d3ddf7..af0c24ae59 100644 --- a/src/commands/agent/session-store.ts +++ b/src/commands/agent/session-store.ts @@ -3,7 +3,7 @@ import { setCliSessionId } from "../../agents/cli-session.js"; import { lookupContextTokens } from "../../agents/context.js"; import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js"; import { isCliProvider } from "../../agents/model-selection.js"; -import { hasNonzeroUsage } from "../../agents/usage.js"; +import { deriveSessionTotalTokens, hasNonzeroUsage } from "../../agents/usage.js"; import { type SessionEntry, updateSessionStore } from "../../config/sessions.js"; type RunResult = Awaited< @@ -37,6 +37,7 @@ export async function updateSessionStoreAfterAgentRun(params: { } = params; const usage = result.meta.agentMeta?.usage; + const compactionsThisRun = Math.max(0, result.meta.agentMeta?.compactionCount ?? 0); const modelUsed = result.meta.agentMeta?.model ?? fallbackModel ?? defaultModel; const providerUsed = result.meta.agentMeta?.provider ?? fallbackProvider ?? defaultProvider; const contextTokens = @@ -64,10 +65,16 @@ export async function updateSessionStoreAfterAgentRun(params: { if (hasNonzeroUsage(usage)) { const input = usage.input ?? 0; const output = usage.output ?? 0; - const promptTokens = input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0); next.inputTokens = input; next.outputTokens = output; - next.totalTokens = promptTokens > 0 ? promptTokens : (usage.total ?? input); + next.totalTokens = + deriveSessionTotalTokens({ + usage, + contextTokens, + }) ?? input; + } + if (compactionsThisRun > 0) { + next.compactionCount = (entry.compactionCount ?? 0) + compactionsThisRun; } sessionStore[sessionKey] = next; await updateSessionStore(storePath, (store) => { diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index 2ae856b37c..0d1993b4fb 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -33,7 +33,7 @@ import { buildWorkspaceSkillSnapshot } from "../../agents/skills.js"; import { getSkillsSnapshotVersion } from "../../agents/skills/refresh.js"; import { runSubagentAnnounceFlow } from "../../agents/subagent-announce.js"; import { resolveAgentTimeoutMs } from "../../agents/timeout.js"; -import { hasNonzeroUsage } from "../../agents/usage.js"; +import { deriveSessionTotalTokens, hasNonzeroUsage } from "../../agents/usage.js"; import { ensureAgentWorkspace } from "../../agents/workspace.js"; import { normalizeThinkLevel, @@ -454,11 +454,13 @@ export async function runCronIsolatedAgentTurn(params: { if (hasNonzeroUsage(usage)) { const input = usage.input ?? 0; const output = usage.output ?? 0; - const promptTokens = input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0); cronSession.sessionEntry.inputTokens = input; cronSession.sessionEntry.outputTokens = output; cronSession.sessionEntry.totalTokens = - promptTokens > 0 ? promptTokens : (usage.total ?? input); + deriveSessionTotalTokens({ + usage, + contextTokens, + }) ?? input; } await persistSessionEntry(); } diff --git a/src/gateway/call.test.ts b/src/gateway/call.test.ts index 0607ce34e1..a4aff647ac 100644 --- a/src/gateway/call.test.ts +++ b/src/gateway/call.test.ts @@ -294,6 +294,29 @@ describe("callGateway error details", () => { expect(err?.message).toContain("Bind: loopback"); }); + it("does not overflow very large timeout values", async () => { + startMode = "silent"; + loadConfig.mockReturnValue({ + gateway: { mode: "local", bind: "loopback" }, + }); + resolveGatewayPort.mockReturnValue(18789); + pickPrimaryTailnetIPv4.mockReturnValue(undefined); + + vi.useFakeTimers(); + let err: Error | null = null; + const promise = callGateway({ method: "health", timeoutMs: 2_592_010_000 }).catch((caught) => { + err = caught as Error; + }); + + await vi.advanceTimersByTimeAsync(1); + expect(err).toBeNull(); + + lastClientOptions?.onClose?.(1006, ""); + await promise; + + expect(err?.message).toContain("gateway closed (1006"); + }); + it("fails fast when remote mode is missing remote url", async () => { loadConfig.mockReturnValue({ gateway: { mode: "remote", bind: "loopback", remote: {} }, diff --git a/src/gateway/call.ts b/src/gateway/call.ts index d3cf747e63..41dc07b53b 100644 --- a/src/gateway/call.ts +++ b/src/gateway/call.ts @@ -156,7 +156,9 @@ export function buildGatewayConnectionDetails( export async function callGateway>( opts: CallGatewayOptions, ): Promise { - const timeoutMs = opts.timeoutMs ?? 10_000; + const timeoutMs = + typeof opts.timeoutMs === "number" && Number.isFinite(opts.timeoutMs) ? opts.timeoutMs : 10_000; + const safeTimerTimeoutMs = Math.max(1, Math.min(Math.floor(timeoutMs), 2_147_483_647)); const config = opts.config ?? loadConfig(); const isRemoteMode = config.gateway?.mode === "remote"; const remote = isRemoteMode ? config.gateway?.remote : undefined; @@ -299,7 +301,7 @@ export async function callGateway>( ignoreClose = true; client.stop(); stop(new Error(formatTimeoutError())); - }, timeoutMs); + }, safeTimerTimeoutMs); client.start(); }); diff --git a/src/gateway/server-methods/agent-job.ts b/src/gateway/server-methods/agent-job.ts index 872d887226..6ac7b5f521 100644 --- a/src/gateway/server-methods/agent-job.ts +++ b/src/gateway/server-methods/agent-job.ts @@ -127,7 +127,8 @@ export async function waitForAgentJob(params: { recordAgentRunSnapshot(snapshot); finish(snapshot); }); - const timer = setTimeout(() => finish(null), Math.max(1, timeoutMs)); + const timerDelayMs = Math.max(1, Math.min(Math.floor(timeoutMs), 2_147_483_647)); + const timer = setTimeout(() => finish(null), timerDelayMs); }); } diff --git a/ui/src/styles/components.css b/ui/src/styles/components.css index f0438eeec2..0b1d56ef77 100644 --- a/ui/src/styles/components.css +++ b/ui/src/styles/components.css @@ -564,27 +564,51 @@ /* Compaction indicator */ .compaction-indicator { + align-self: center; + display: inline-flex; + align-items: center; + gap: 6px; font-size: 13px; - padding: 10px 12px; + line-height: 1.2; + padding: 6px 14px; margin-bottom: 8px; + border-radius: 999px; + border: 1px solid var(--border); + background: var(--panel-strong); + color: var(--text); + white-space: nowrap; + user-select: none; animation: fade-in 0.2s var(--ease-out); } +.compaction-indicator svg { + width: 16px; + height: 16px; + stroke: currentColor; + fill: none; + stroke-width: 1.5px; + stroke-linecap: round; + stroke-linejoin: round; + flex-shrink: 0; +} + .compaction-indicator--active { - animation: compaction-pulse 1.5s ease-in-out infinite; + color: var(--info); + border-color: rgba(59, 130, 246, 0.35); +} + +.compaction-indicator--active svg { + animation: compaction-spin 1s linear infinite; } .compaction-indicator--complete { - animation: fade-in 0.2s var(--ease-out); + color: var(--ok); + border-color: rgba(34, 197, 94, 0.35); } -@keyframes compaction-pulse { - 0%, - 100% { - opacity: 0.7; - } - 50% { - opacity: 1; +@keyframes compaction-spin { + to { + transform: rotate(360deg); } } diff --git a/ui/src/ui/views/chat.test.ts b/ui/src/ui/views/chat.test.ts index 903d588cd8..b690bb1bb1 100644 --- a/ui/src/ui/views/chat.test.ts +++ b/ui/src/ui/views/chat.test.ts @@ -49,6 +49,68 @@ function createProps(overrides: Partial = {}): ChatProps { } describe("chat view", () => { + it("renders compacting indicator as a badge", () => { + const container = document.createElement("div"); + render( + renderChat( + createProps({ + compactionStatus: { + active: true, + startedAt: Date.now(), + completedAt: null, + }, + }), + ), + container, + ); + + const indicator = container.querySelector(".compaction-indicator--active"); + expect(indicator).not.toBeNull(); + expect(indicator?.textContent).toContain("Compacting context..."); + }); + + it("renders completion indicator shortly after compaction", () => { + const container = document.createElement("div"); + const nowSpy = vi.spyOn(Date, "now").mockReturnValue(1_000); + render( + renderChat( + createProps({ + compactionStatus: { + active: false, + startedAt: 900, + completedAt: 900, + }, + }), + ), + container, + ); + + const indicator = container.querySelector(".compaction-indicator--complete"); + expect(indicator).not.toBeNull(); + expect(indicator?.textContent).toContain("Context compacted"); + nowSpy.mockRestore(); + }); + + it("hides stale compaction completion indicator", () => { + const container = document.createElement("div"); + const nowSpy = vi.spyOn(Date, "now").mockReturnValue(10_000); + render( + renderChat( + createProps({ + compactionStatus: { + active: false, + startedAt: 0, + completedAt: 0, + }, + }), + ), + container, + ); + + expect(container.querySelector(".compaction-indicator")).toBeNull(); + nowSpy.mockRestore(); + }); + it("shows a stop button when aborting is available", () => { const container = document.createElement("div"); const onAbort = vi.fn(); diff --git a/ui/src/ui/views/chat.ts b/ui/src/ui/views/chat.ts index 70ceeb9131..ce51fde5ff 100644 --- a/ui/src/ui/views/chat.ts +++ b/ui/src/ui/views/chat.ts @@ -85,7 +85,7 @@ function renderCompactionIndicator(status: CompactionIndicatorStatus | null | un // Show "compacting..." while active if (status.active) { return html` -
+
${icons.loader} Compacting context...
`; @@ -96,7 +96,7 @@ function renderCompactionIndicator(status: CompactionIndicatorStatus | null | un const elapsed = Date.now() - status.completedAt; if (elapsed < COMPACTION_TOAST_DURATION_MS) { return html` -
+
${icons.check} Context compacted
`; @@ -268,8 +268,6 @@ export function renderChat(props: ChatProps) { ${props.error ? html`
${props.error}
` : nothing} - ${renderCompactionIndicator(props.compactionStatus)} - ${ props.focusMode ? html` @@ -353,6 +351,8 @@ export function renderChat(props: ChatProps) { : nothing } + ${renderCompactionIndicator(props.compactionStatus)} + ${ props.showNewMessages ? html`