From d90cac990c8f9864d2bcb0f3605a057fad2003cf Mon Sep 17 00:00:00 2001 From: Tyler Yust <64381258+tyler6204@users.noreply.github.com> Date: Fri, 6 Feb 2026 18:03:03 -0800 Subject: [PATCH] fix: cron scheduler reliability, store hardening, and UX improvements (#10776) * refactor: update cron job wake mode and run mode handling - Changed default wake mode from 'next-heartbeat' to 'now' in CronJobEditor and related CLI commands. - Updated cron-tool tests to reflect changes in run mode, introducing 'due' and 'force' options. - Enhanced cron-tool logic to handle new run modes and ensure compatibility with existing job structures. - Added new tests for delivery plan consistency and job execution behavior under various conditions. - Improved normalization functions to handle wake mode and session target casing. This refactor aims to streamline cron job configurations and enhance the overall user experience with clearer defaults and improved functionality. * test: enhance cron job functionality and UI - Added tests to ensure the isolated agent correctly announces the final payload text when delivering messages via Telegram. - Implemented a new function to pick the last deliverable payload from a list of delivery payloads. - Enhanced the cron service to maintain legacy "every" jobs while minute cron jobs recompute schedules. - Updated the cron store migration tests to verify the addition of anchorMs to legacy every schedules. - Improved the UI for displaying cron job details, including job state and delivery information, with new styles and layout adjustments. These changes aim to improve the reliability and user experience of the cron job system. * test: enhance sessions thinking level handling - Added tests to verify that the correct thinking levels are applied during session spawning. - Updated the sessions-spawn-tool to include a new parameter for overriding thinking levels. - Enhanced the UI to support additional thinking levels, including "xhigh" and "full", and improved the handling of current options in dropdowns. These changes aim to improve the flexibility and accuracy of thinking level configurations in session management. * feat: enhance session management and cron job functionality - Introduced passthrough arguments in the test-parallel script to allow for flexible command-line options. - Updated session handling to hide cron run alias session keys from the sessions list, improving clarity. - Enhanced the cron service to accurately record job start times and durations, ensuring better tracking of job execution. - Added tests to verify the correct behavior of the cron service under various conditions, including zero-delay timers. These changes aim to improve the usability and reliability of session and cron job management. * feat: implement job running state checks in cron service - Added functionality to prevent manual job runs if a job is already in progress, enhancing job management. - Updated the `isJobDue` function to include checks for running jobs, ensuring accurate scheduling. - Enhanced the `run` function to return a specific reason when a job is already running. - Introduced a new test case to verify the behavior of forced manual runs during active job execution. These changes aim to improve the reliability and clarity of cron job execution and management. * feat: add session ID and key to CronRunLogEntry model - Introduced `sessionid` and `sessionkey` properties to the `CronRunLogEntry` struct for enhanced tracking of session-related information. - Updated the initializer and Codable conformance to accommodate the new properties, ensuring proper serialization and deserialization. These changes aim to improve the granularity of logging and session management within the cron job system. * fix: improve session display name resolution - Updated the `resolveSessionDisplayName` function to ensure that both label and displayName are trimmed and default to an empty string if not present. - Enhanced the logic to prevent returning the key if it matches the label or displayName, improving clarity in session naming. These changes aim to enhance the accuracy and usability of session display names in the UI. * perf: skip cron store persist when idle timer tick produces no changes recomputeNextRuns now returns a boolean indicating whether any job state was mutated. The idle path in onTimer only persists when the return value is true, eliminating unnecessary file writes every 60s for far-future or idle schedules. * fix: prep for merge - explicit delivery mode migration, docs + changelog (#10776) (thanks @tyler6204) --- CHANGELOG.md | 10 + .../Sources/OpenClaw/CronJobEditor.swift | 4 +- .../OpenClawProtocol/GatewayModels.swift | 8 + .../OpenClawProtocol/GatewayModels.swift | 8 + docs/automation/cron-jobs.md | 13 +- scripts/test-parallel.mjs | 25 ++ ...ons-spawn-applies-thinking-default.test.ts | 8 + src/agents/tools/cron-tool.test.ts | 22 +- src/agents/tools/cron-tool.ts | 7 +- src/agents/tools/sessions-spawn-tool.ts | 20 + src/cli/cron-cli/register.cron-add.ts | 6 +- src/cli/cron-cli/register.cron-simple.ts | 4 +- src/cron/delivery.test.ts | 45 +++ src/cron/delivery.ts | 9 +- ...p-recipient-besteffortdeliver-true.test.ts | 42 ++ ....uses-last-non-empty-agent-text-as.test.ts | 77 +++- src/cron/isolated-agent/delivery-target.ts | 5 +- src/cron/isolated-agent/helpers.ts | 14 + src/cron/isolated-agent/run.ts | 84 ++-- src/cron/isolated-agent/session.ts | 2 + src/cron/normalize.test.ts | 58 +++ src/cron/normalize.ts | 229 ++++++++++- src/cron/run-log.test.ts | 4 + src/cron/run-log.ts | 21 +- src/cron/schedule.ts | 25 +- src/cron/service.delivery-plan.test.ts | 92 +++++ src/cron/service.every-jobs-fire.test.ts | 110 +++++- src/cron/service.issue-regressions.test.ts | 346 ++++++++++++++++ src/cron/service.read-ops-nonblocking.test.ts | 104 +++++ src/cron/service.restart-catchup.test.ts | 165 ++++++++ ...runs-one-shot-main-job-disables-it.test.ts | 68 +++- ...s-main-jobs-empty-systemevent-text.test.ts | 23 +- src/cron/service.store-migration.test.ts | 124 ++++++ src/cron/service.store.migration.test.ts | 45 +++ src/cron/service/jobs.ts | 63 ++- src/cron/service/ops.ts | 41 +- src/cron/service/state.ts | 5 + src/cron/service/store.ts | 224 ++++++++++- src/cron/service/timer.ts | 368 +++++++++++++----- src/cron/store.test.ts | 32 ++ src/cron/store.ts | 22 +- src/gateway/protocol/schema/cron.ts | 12 + src/gateway/server-cron.ts | 2 + src/gateway/server-methods/cron.ts | 2 +- src/gateway/server.cron.e2e.test.ts | 28 +- src/gateway/session-utils.test.ts | 25 ++ src/gateway/session-utils.ts | 9 + ui/src/styles/components.css | 171 ++++++++ ui/src/ui/app-defaults.ts | 2 +- ui/src/ui/app-render.helpers.ts | 10 +- ui/src/ui/app-render.ts | 1 + ui/src/ui/format.test.ts | 4 +- ui/src/ui/format.ts | 2 +- ui/src/ui/types.ts | 2 + ui/src/ui/views/cron.test.ts | 71 +++- ui/src/ui/views/cron.ts | 130 +++++-- ui/src/ui/views/sessions.test.ts | 81 ++++ ui/src/ui/views/sessions.ts | 68 +++- 58 files changed, 2952 insertions(+), 250 deletions(-) create mode 100644 src/cron/delivery.test.ts create mode 100644 src/cron/service.delivery-plan.test.ts create mode 100644 src/cron/service.issue-regressions.test.ts create mode 100644 src/cron/service.read-ops-nonblocking.test.ts create mode 100644 src/cron/service.restart-catchup.test.ts create mode 100644 src/cron/service.store-migration.test.ts create mode 100644 src/cron/store.test.ts create mode 100644 ui/src/ui/views/sessions.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 89de0c5531..312799a31f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ Docs: https://docs.openclaw.ai ### Changes +- Cron: default `wakeMode` is now `"now"` for new jobs (was `"next-heartbeat"`). (#10776) Thanks @tyler6204. +- Cron: `cron run` defaults to force execution; use `--due` to restrict to due-only. (#10776) Thanks @tyler6204. - Models: support Anthropic Opus 4.6 and OpenAI Codex gpt-5.3-codex (forward-compat fallbacks). (#9853, #10720, #9995) Thanks @TinyTb, @calvin-hpnet, @tyler6204. - Providers: add xAI (Grok) support. (#9885) Thanks @grp06. - Web UI: add token usage dashboard. (#10072) Thanks @Takhoffman. @@ -14,8 +16,16 @@ Docs: https://docs.openclaw.ai - CLI: sort commands alphabetically in help output. (#8068) Thanks @deepsoumya617. - Agents: bump pi-mono to 0.52.7; add embedded forward-compat fallback for Opus 4.6 model ids. +### Added + +- Cron: run history deep-links to session chat from the dashboard. (#10776) Thanks @tyler6204. +- Cron: per-run session keys in run log entries and default labels for cron sessions. (#10776) Thanks @tyler6204. +- Cron: legacy payload field compatibility (`deliver`, `channel`, `to`, `bestEffortDeliver`) in schema. (#10776) Thanks @tyler6204. + ### Fixes +- Cron: scheduler reliability (timer drift, restart catch-up, lock contention, stale running markers). (#10776) Thanks @tyler6204. +- Cron: store migration hardening (legacy field migration, parse error handling, explicit delivery mode persistence). (#10776) Thanks @tyler6204. - Telegram: auto-inject DM topic threadId in message tool + subagent announce. (#7235) Thanks @Lukavyi. - Security: require auth for Gateway canvas host and A2UI assets. (#9518) Thanks @coygeek. - Cron: fix scheduling and reminder delivery regressions; harden next-run recompute + timer re-arming + legacy schedule fields. (#9733, #9823, #9948, #9932) Thanks @tyler6204, @pycckuu, @j2h4u, @fujiwara-tofu-shop. diff --git a/apps/macos/Sources/OpenClaw/CronJobEditor.swift b/apps/macos/Sources/OpenClaw/CronJobEditor.swift index a5207ca101..517d32df44 100644 --- a/apps/macos/Sources/OpenClaw/CronJobEditor.swift +++ b/apps/macos/Sources/OpenClaw/CronJobEditor.swift @@ -29,7 +29,7 @@ struct CronJobEditor: View { @State var agentId: String = "" @State var enabled: Bool = true @State var sessionTarget: CronSessionTarget = .main - @State var wakeMode: CronWakeMode = .nextHeartbeat + @State var wakeMode: CronWakeMode = .now @State var deleteAfterRun: Bool = false enum ScheduleKind: String, CaseIterable, Identifiable { case at, every, cron; var id: String { rawValue } } @@ -119,8 +119,8 @@ struct CronJobEditor: View { GridRow { self.gridLabel("Wake mode") Picker("", selection: self.$wakeMode) { - Text("next-heartbeat").tag(CronWakeMode.nextHeartbeat) Text("now").tag(CronWakeMode.now) + Text("next-heartbeat").tag(CronWakeMode.nextHeartbeat) } .labelsHidden() .pickerStyle(.segmented) diff --git a/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift b/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift index dd3cfb50a1..07c9db84e2 100644 --- a/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift +++ b/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift @@ -2025,6 +2025,8 @@ public struct CronRunLogEntry: Codable, Sendable { public let status: AnyCodable? public let error: String? public let summary: String? + public let sessionid: String? + public let sessionkey: String? public let runatms: Int? public let durationms: Int? public let nextrunatms: Int? @@ -2036,6 +2038,8 @@ public struct CronRunLogEntry: Codable, Sendable { status: AnyCodable?, error: String?, summary: String?, + sessionid: String?, + sessionkey: String?, runatms: Int?, durationms: Int?, nextrunatms: Int? @@ -2046,6 +2050,8 @@ public struct CronRunLogEntry: Codable, Sendable { self.status = status self.error = error self.summary = summary + self.sessionid = sessionid + self.sessionkey = sessionkey self.runatms = runatms self.durationms = durationms self.nextrunatms = nextrunatms @@ -2057,6 +2063,8 @@ public struct CronRunLogEntry: Codable, Sendable { case status case error case summary + case sessionid = "sessionId" + case sessionkey = "sessionKey" case runatms = "runAtMs" case durationms = "durationMs" case nextrunatms = "nextRunAtMs" diff --git a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift index dd3cfb50a1..07c9db84e2 100644 --- a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift +++ b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift @@ -2025,6 +2025,8 @@ public struct CronRunLogEntry: Codable, Sendable { public let status: AnyCodable? public let error: String? public let summary: String? + public let sessionid: String? + public let sessionkey: String? public let runatms: Int? public let durationms: Int? public let nextrunatms: Int? @@ -2036,6 +2038,8 @@ public struct CronRunLogEntry: Codable, Sendable { status: AnyCodable?, error: String?, summary: String?, + sessionid: String?, + sessionkey: String?, runatms: Int?, durationms: Int?, nextrunatms: Int? @@ -2046,6 +2050,8 @@ public struct CronRunLogEntry: Codable, Sendable { self.status = status self.error = error self.summary = summary + self.sessionid = sessionid + self.sessionkey = sessionkey self.runatms = runatms self.durationms = durationms self.nextrunatms = nextrunatms @@ -2057,6 +2063,8 @@ public struct CronRunLogEntry: Codable, Sendable { case status case error case summary + case sessionid = "sessionId" + case sessionkey = "sessionKey" case runatms = "runAtMs" case durationms = "durationMs" case nextrunatms = "nextRunAtMs" diff --git a/docs/automation/cron-jobs.md b/docs/automation/cron-jobs.md index 8eb79881ec..54d6a96470 100644 --- a/docs/automation/cron-jobs.md +++ b/docs/automation/cron-jobs.md @@ -40,7 +40,7 @@ openclaw cron add \ --delete-after-run openclaw cron list -openclaw cron run --force +openclaw cron run openclaw cron runs --id ``` @@ -123,8 +123,8 @@ local timezone is used. Main jobs enqueue a system event and optionally wake the heartbeat runner. They must use `payload.kind = "systemEvent"`. -- `wakeMode: "next-heartbeat"` (default): event waits for the next scheduled heartbeat. -- `wakeMode: "now"`: event triggers an immediate heartbeat run. +- `wakeMode: "now"` (default): event triggers an immediate heartbeat run. +- `wakeMode: "next-heartbeat"`: event waits for the next scheduled heartbeat. This is the best fit when you want the normal heartbeat prompt + main-session context. See [Heartbeat](/gateway/heartbeat). @@ -288,7 +288,7 @@ Notes: - `sessionTarget` must be `"main"` or `"isolated"` and must match `payload.kind`. - Optional fields: `agentId`, `description`, `enabled`, `deleteAfterRun` (defaults to true for `at`), `delivery`. -- `wakeMode` defaults to `"next-heartbeat"` when omitted. +- `wakeMode` defaults to `"now"` when omitted. ### cron.update params @@ -420,10 +420,11 @@ openclaw cron edit --agent ops openclaw cron edit --clear-agent ``` -Manual run (debug): +Manual run (force is the default, use `--due` to only run when due): ```bash -openclaw cron run --force +openclaw cron run +openclaw cron run --due ``` Edit an existing job (patch fields): diff --git a/scripts/test-parallel.mjs b/scripts/test-parallel.mjs index 9ee0a9d876..786f8bbd14 100644 --- a/scripts/test-parallel.mjs +++ b/scripts/test-parallel.mjs @@ -32,6 +32,7 @@ const shardCount = isWindowsCi const windowsCiArgs = isWindowsCi ? ["--no-file-parallelism", "--dangerouslyIgnoreUnhandledErrors"] : []; +const passthroughArgs = process.argv.slice(2); const overrideWorkers = Number.parseInt(process.env.OPENCLAW_TEST_WORKERS ?? "", 10); const resolvedOverride = Number.isFinite(overrideWorkers) && overrideWorkers > 0 ? overrideWorkers : null; @@ -96,6 +97,30 @@ const shutdown = (signal) => { process.on("SIGINT", () => shutdown("SIGINT")); process.on("SIGTERM", () => shutdown("SIGTERM")); +if (passthroughArgs.length > 0) { + const args = maxWorkers + ? ["vitest", "run", "--maxWorkers", String(maxWorkers), ...windowsCiArgs, ...passthroughArgs] + : ["vitest", "run", ...windowsCiArgs, ...passthroughArgs]; + const nodeOptions = process.env.NODE_OPTIONS ?? ""; + const nextNodeOptions = WARNING_SUPPRESSION_FLAGS.reduce( + (acc, flag) => (acc.includes(flag) ? acc : `${acc} ${flag}`.trim()), + nodeOptions, + ); + const code = await new Promise((resolve) => { + const child = spawn(pnpm, args, { + stdio: "inherit", + env: { ...process.env, NODE_OPTIONS: nextNodeOptions }, + shell: process.platform === "win32", + }); + children.add(child); + child.on("exit", (exitCode, signal) => { + children.delete(child); + resolve(exitCode ?? (signal ? 1 : 0)); + }); + }); + process.exit(Number(code) || 0); +} + const parallelCodes = await Promise.all(parallelRuns.map(run)); const failedParallel = parallelCodes.find((code) => code !== 0); if (failedParallel !== undefined) { diff --git a/src/agents/openclaw-tools.subagents.sessions-spawn-applies-thinking-default.test.ts b/src/agents/openclaw-tools.subagents.sessions-spawn-applies-thinking-default.test.ts index 36eb50b553..c9b7175717 100644 --- a/src/agents/openclaw-tools.subagents.sessions-spawn-applies-thinking-default.test.ts +++ b/src/agents/openclaw-tools.subagents.sessions-spawn-applies-thinking-default.test.ts @@ -45,8 +45,12 @@ describe("sessions_spawn thinking defaults", () => { const agentCall = calls .map((call) => call[0] as { method: string; params?: Record }) .findLast((call) => call.method === "agent"); + const thinkingPatch = calls + .map((call) => call[0] as { method: string; params?: Record }) + .findLast((call) => call.method === "sessions.patch" && call.params?.thinkingLevel); expect(agentCall?.params?.thinking).toBe("high"); + expect(thinkingPatch?.params?.thinkingLevel).toBe("high"); }); it("prefers explicit sessions_spawn.thinking over config default", async () => { @@ -60,7 +64,11 @@ describe("sessions_spawn thinking defaults", () => { const agentCall = calls .map((call) => call[0] as { method: string; params?: Record }) .findLast((call) => call.method === "agent"); + const thinkingPatch = calls + .map((call) => call[0] as { method: string; params?: Record }) + .findLast((call) => call.method === "sessions.patch" && call.params?.thinkingLevel); expect(agentCall?.params?.thinking).toBe("low"); + expect(thinkingPatch?.params?.thinkingLevel).toBe("low"); }); }); diff --git a/src/agents/tools/cron-tool.test.ts b/src/agents/tools/cron-tool.test.ts index 77ffb36e6f..cee2e57e0f 100644 --- a/src/agents/tools/cron-tool.test.ts +++ b/src/agents/tools/cron-tool.test.ts @@ -30,8 +30,8 @@ describe("cron tool", () => { ], ["remove", { action: "remove", jobId: "job-1" }, { id: "job-1" }], ["remove", { action: "remove", id: "job-2" }, { id: "job-2" }], - ["run", { action: "run", jobId: "job-1" }, { id: "job-1" }], - ["run", { action: "run", id: "job-2" }, { id: "job-2" }], + ["run", { action: "run", jobId: "job-1" }, { id: "job-1", mode: "force" }], + ["run", { action: "run", id: "job-2" }, { id: "job-2", mode: "force" }], ["runs", { action: "runs", jobId: "job-1" }, { id: "job-1" }], ["runs", { action: "runs", id: "job-2" }, { id: "job-2" }], ])("%s sends id to gateway", async (action, args, expectedParams) => { @@ -58,7 +58,21 @@ describe("cron tool", () => { const call = callGatewayMock.mock.calls[0]?.[0] as { params?: unknown; }; - expect(call?.params).toEqual({ id: "job-primary" }); + expect(call?.params).toEqual({ id: "job-primary", mode: "force" }); + }); + + it("supports due-only run mode", async () => { + const tool = createCronTool(); + await tool.execute("call-due", { + action: "run", + jobId: "job-due", + runMode: "due", + }); + + const call = callGatewayMock.mock.calls[0]?.[0] as { + params?: unknown; + }; + expect(call?.params).toEqual({ id: "job-due", mode: "due" }); }); it("normalizes cron.add job payloads", async () => { @@ -86,7 +100,7 @@ describe("cron tool", () => { deleteAfterRun: true, schedule: { kind: "at", at: new Date(123).toISOString() }, sessionTarget: "main", - wakeMode: "next-heartbeat", + wakeMode: "now", payload: { kind: "systemEvent", text: "hello" }, }); }); diff --git a/src/agents/tools/cron-tool.ts b/src/agents/tools/cron-tool.ts index 4c9633144f..cc5cab54f6 100644 --- a/src/agents/tools/cron-tool.ts +++ b/src/agents/tools/cron-tool.ts @@ -18,6 +18,7 @@ import { resolveInternalSessionKey, resolveMainSessionAlias } from "./sessions-h const CRON_ACTIONS = ["status", "list", "add", "update", "remove", "run", "runs", "wake"] as const; const CRON_WAKE_MODES = ["now", "next-heartbeat"] as const; +const CRON_RUN_MODES = ["due", "force"] as const; const REMINDER_CONTEXT_MESSAGES_MAX = 10; const REMINDER_CONTEXT_PER_MESSAGE_MAX = 220; @@ -37,6 +38,7 @@ const CronToolSchema = Type.Object({ patch: Type.Optional(Type.Object({}, { additionalProperties: true })), text: Type.Optional(Type.String()), mode: optionalStringEnum(CRON_WAKE_MODES), + runMode: optionalStringEnum(CRON_RUN_MODES), contextMessages: Type.Optional( Type.Number({ minimum: 0, maximum: REMINDER_CONTEXT_MESSAGES_MAX }), ), @@ -312,7 +314,6 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con } } - // [Fix Issue 3] Infer delivery target from session key for isolated jobs if not provided if ( opts?.agentSessionKey && job && @@ -393,7 +394,9 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con if (!id) { throw new Error("jobId required (id accepted for backward compatibility)"); } - return jsonResult(await callGatewayTool("cron.run", gatewayOpts, { id })); + const runMode = + params.runMode === "due" || params.runMode === "force" ? params.runMode : "force"; + return jsonResult(await callGatewayTool("cron.run", gatewayOpts, { id, mode: runMode })); } case "runs": { const id = readStringParam(params, "jobId") ?? readStringParam(params, "id"); diff --git a/src/agents/tools/sessions-spawn-tool.ts b/src/agents/tools/sessions-spawn-tool.ts index d73b8c4a0d..1ed7bcd1c1 100644 --- a/src/agents/tools/sessions-spawn-tool.ts +++ b/src/agents/tools/sessions-spawn-tool.ts @@ -214,6 +214,26 @@ export function createSessionsSpawnTool(opts?: { modelWarning = messageText; } } + if (thinkingOverride !== undefined) { + try { + await callGateway({ + method: "sessions.patch", + params: { + key: childSessionKey, + thinkingLevel: thinkingOverride === "off" ? null : thinkingOverride, + }, + timeoutMs: 10_000, + }); + } catch (err) { + const messageText = + err instanceof Error ? err.message : typeof err === "string" ? err : "error"; + return jsonResult({ + status: "error", + error: messageText, + childSessionKey, + }); + } + } const childSystemPrompt = buildSubagentSystemPrompt({ requesterSessionKey, requesterOrigin, diff --git a/src/cli/cron-cli/register.cron-add.ts b/src/cli/cron-cli/register.cron-add.ts index 81720418d2..001fd5f1bf 100644 --- a/src/cli/cron-cli/register.cron-add.ts +++ b/src/cli/cron-cli/register.cron-add.ts @@ -71,7 +71,7 @@ export function registerCronAddCommand(cron: Command) { .option("--keep-after-run", "Keep one-shot job after it succeeds", false) .option("--agent ", "Agent id for this job") .option("--session ", "Session target (main|isolated)") - .option("--wake ", "Wake mode (now|next-heartbeat)", "next-heartbeat") + .option("--wake ", "Wake mode (now|next-heartbeat)", "now") .option("--at ", "Run once at time (ISO) or +duration (e.g. 20m)") .option("--every ", "Run every duration (e.g. 10m, 1h)") .option("--cron ", "Cron expression (5-field)") @@ -122,8 +122,8 @@ export function registerCronAddCommand(cron: Command) { }; })(); - const wakeModeRaw = typeof opts.wake === "string" ? opts.wake : "next-heartbeat"; - const wakeMode = wakeModeRaw.trim() || "next-heartbeat"; + const wakeModeRaw = typeof opts.wake === "string" ? opts.wake : "now"; + const wakeMode = wakeModeRaw.trim() || "now"; if (wakeMode !== "now" && wakeMode !== "next-heartbeat") { throw new Error("--wake must be now or next-heartbeat"); } diff --git a/src/cli/cron-cli/register.cron-simple.ts b/src/cli/cron-cli/register.cron-simple.ts index 1493c371ac..e5baa11714 100644 --- a/src/cli/cron-cli/register.cron-simple.ts +++ b/src/cli/cron-cli/register.cron-simple.ts @@ -92,12 +92,12 @@ export function registerCronSimpleCommands(cron: Command) { .command("run") .description("Run a cron job now (debug)") .argument("", "Job id") - .option("--force", "Run even if not due", false) + .option("--due", "Run only when due (default behavior in older versions)", false) .action(async (id, opts) => { try { const res = await callGatewayFromCli("cron.run", opts, { id, - mode: opts.force ? "force" : "due", + mode: opts.due ? "due" : "force", }); defaultRuntime.log(JSON.stringify(res, null, 2)); } catch (err) { diff --git a/src/cron/delivery.test.ts b/src/cron/delivery.test.ts new file mode 100644 index 0000000000..fcbe9e99a3 --- /dev/null +++ b/src/cron/delivery.test.ts @@ -0,0 +1,45 @@ +import { describe, expect, it } from "vitest"; +import type { CronJob } from "./types.js"; +import { resolveCronDeliveryPlan } from "./delivery.js"; + +function makeJob(overrides: Partial): CronJob { + const now = Date.now(); + return { + id: "job-1", + name: "test", + enabled: true, + createdAtMs: now, + updatedAtMs: now, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "hello" }, + state: {}, + ...overrides, + }; +} + +describe("resolveCronDeliveryPlan", () => { + it("defaults to announce when delivery object has no mode", () => { + const plan = resolveCronDeliveryPlan( + makeJob({ + delivery: { channel: "telegram", to: "123", mode: undefined as never }, + }), + ); + expect(plan.mode).toBe("announce"); + expect(plan.requested).toBe(true); + expect(plan.channel).toBe("telegram"); + expect(plan.to).toBe("123"); + }); + + it("respects legacy payload deliver=false", () => { + const plan = resolveCronDeliveryPlan( + makeJob({ + delivery: undefined, + payload: { kind: "agentTurn", message: "hello", deliver: false }, + }), + ); + expect(plan.mode).toBe("none"); + expect(plan.requested).toBe(false); + }); +}); diff --git a/src/cron/delivery.ts b/src/cron/delivery.ts index c7cbe87f9b..f0ba2c2b07 100644 --- a/src/cron/delivery.ts +++ b/src/cron/delivery.ts @@ -32,12 +32,13 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan { const delivery = job.delivery; const hasDelivery = delivery && typeof delivery === "object"; const rawMode = hasDelivery ? (delivery as { mode?: unknown }).mode : undefined; + const normalizedMode = typeof rawMode === "string" ? rawMode.trim().toLowerCase() : rawMode; const mode = - rawMode === "announce" + normalizedMode === "announce" ? "announce" - : rawMode === "none" + : normalizedMode === "none" ? "none" - : rawMode === "deliver" + : normalizedMode === "deliver" ? "announce" : undefined; @@ -51,7 +52,7 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan { const channel = deliveryChannel ?? payloadChannel ?? "last"; const to = deliveryTo ?? payloadTo; if (hasDelivery) { - const resolvedMode = mode ?? "none"; + const resolvedMode = mode ?? "announce"; return { mode: resolvedMode, channel, diff --git a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts index 6aac38f88d..4b5317ef4e 100644 --- a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts +++ b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts @@ -134,6 +134,48 @@ describe("runCronIsolatedAgentTurn", () => { }); }); + it("announces only the final payload text", async () => { + await withTempHome(async (home) => { + const storePath = await writeSessionStore(home); + const deps: CliDeps = { + sendMessageWhatsApp: vi.fn(), + sendMessageTelegram: vi.fn(), + sendMessageDiscord: vi.fn(), + sendMessageSignal: vi.fn(), + sendMessageIMessage: vi.fn(), + }; + vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ + payloads: [{ text: "Working on it..." }, { text: "Final weather summary" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }); + + const res = await runCronIsolatedAgentTurn({ + cfg: makeCfg(home, storePath, { + channels: { telegram: { botToken: "t-1" } }, + }), + deps, + job: { + ...makeJob({ kind: "agentTurn", message: "do it" }), + delivery: { mode: "announce", channel: "telegram", to: "123" }, + }, + message: "do it", + sessionKey: "cron:job-1", + lane: "cron", + }); + + expect(res.status).toBe("ok"); + expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1); + expect(deps.sendMessageTelegram).toHaveBeenCalledWith( + "123", + "Final weather summary", + expect.any(Object), + ); + }); + }); + it("skips announce when messaging tool already sent to target", async () => { await withTempHome(async (home) => { const storePath = await writeSessionStore(home); diff --git a/src/cron/isolated-agent.uses-last-non-empty-agent-text-as.test.ts b/src/cron/isolated-agent.uses-last-non-empty-agent-text-as.test.ts index ab547bdf72..3ec1c935b0 100644 --- a/src/cron/isolated-agent.uses-last-non-empty-agent-text-as.test.ts +++ b/src/cron/isolated-agent.uses-last-non-empty-agent-text-as.test.ts @@ -48,7 +48,7 @@ async function writeSessionStore(home: string) { async function readSessionEntry(storePath: string, key: string) { const raw = await fs.readFile(storePath, "utf-8"); - const store = JSON.parse(raw) as Record; + const store = JSON.parse(raw) as Record; return store[key]; } @@ -90,6 +90,38 @@ describe("runCronIsolatedAgentTurn", () => { vi.mocked(loadModelCatalog).mockResolvedValue([]); }); + it("treats blank model overrides as unset", async () => { + await withTempHome(async (home) => { + const storePath = await writeSessionStore(home); + const deps: CliDeps = { + sendMessageWhatsApp: vi.fn(), + sendMessageTelegram: vi.fn(), + sendMessageDiscord: vi.fn(), + sendMessageSignal: vi.fn(), + sendMessageIMessage: vi.fn(), + }; + vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ + payloads: [{ text: "ok" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }); + + const res = await runCronIsolatedAgentTurn({ + cfg: makeCfg(home, storePath), + deps, + job: makeJob({ kind: "agentTurn", message: "do it", model: " " }), + message: "do it", + sessionKey: "cron:job-1", + lane: "cron", + }); + + expect(res.status).toBe("ok"); + expect(vi.mocked(runEmbeddedPiAgent)).toHaveBeenCalledTimes(1); + }); + }); + it("uses last non-empty agent text as summary", async () => { await withTempHome(async (home) => { const storePath = await writeSessionStore(home); @@ -585,6 +617,49 @@ describe("runCronIsolatedAgentTurn", () => { expect(first?.sessionId).toBeDefined(); expect(second?.sessionId).toBeDefined(); expect(second?.sessionId).not.toBe(first?.sessionId); + expect(first?.label).toBe("Cron: job-1"); + expect(second?.label).toBe("Cron: job-1"); + }); + }); + + it("preserves an existing cron session label", async () => { + await withTempHome(async (home) => { + const storePath = await writeSessionStore(home); + const raw = await fs.readFile(storePath, "utf-8"); + const store = JSON.parse(raw) as Record>; + store["agent:main:cron:job-1"] = { + sessionId: "old", + updatedAt: Date.now(), + label: "Nightly digest", + }; + await fs.writeFile(storePath, JSON.stringify(store, null, 2), "utf-8"); + + const deps: CliDeps = { + sendMessageWhatsApp: vi.fn(), + sendMessageTelegram: vi.fn(), + sendMessageDiscord: vi.fn(), + sendMessageSignal: vi.fn(), + sendMessageIMessage: vi.fn(), + }; + vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ + payloads: [{ text: "ok" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }); + + await runCronIsolatedAgentTurn({ + cfg: makeCfg(home, storePath), + deps, + job: makeJob({ kind: "agentTurn", message: "ping", deliver: false }), + message: "ping", + sessionKey: "cron:job-1", + lane: "cron", + }); + const entry = await readSessionEntry(storePath, "agent:main:cron:job-1"); + + expect(entry?.label).toBe("Nightly digest"); }); }); }); diff --git a/src/cron/isolated-agent/delivery-target.ts b/src/cron/isolated-agent/delivery-target.ts index 5be448b2c1..35ccc90473 100644 --- a/src/cron/isolated-agent/delivery-target.ts +++ b/src/cron/isolated-agent/delivery-target.ts @@ -30,6 +30,7 @@ export async function resolveDeliveryTarget( }> { const requestedChannel = typeof jobPayload.channel === "string" ? jobPayload.channel : "last"; const explicitTo = typeof jobPayload.to === "string" ? jobPayload.to : undefined; + const allowMismatchedLastTo = requestedChannel === "last"; const sessionCfg = cfg.session; const mainSessionKey = resolveAgentMainSessionKey({ cfg, agentId }); @@ -41,7 +42,7 @@ export async function resolveDeliveryTarget( entry: main, requestedChannel, explicitTo, - allowMismatchedLastTo: true, + allowMismatchedLastTo, }); let fallbackChannel: Exclude | undefined; @@ -60,7 +61,7 @@ export async function resolveDeliveryTarget( requestedChannel, explicitTo, fallbackChannel, - allowMismatchedLastTo: true, + allowMismatchedLastTo, mode: preliminary.mode, }) : preliminary; diff --git a/src/cron/isolated-agent/helpers.ts b/src/cron/isolated-agent/helpers.ts index ddc72d6456..d4d42b20fe 100644 --- a/src/cron/isolated-agent/helpers.ts +++ b/src/cron/isolated-agent/helpers.ts @@ -8,6 +8,7 @@ type DeliveryPayload = { text?: string; mediaUrl?: string; mediaUrls?: string[]; + channelData?: Record; }; export function pickSummaryFromOutput(text: string | undefined) { @@ -39,6 +40,19 @@ export function pickLastNonEmptyTextFromPayloads(payloads: Array<{ text?: string return undefined; } +export function pickLastDeliverablePayload(payloads: DeliveryPayload[]) { + for (let i = payloads.length - 1; i >= 0; i--) { + const payload = payloads[i]; + const text = (payload?.text ?? "").trim(); + const hasMedia = Boolean(payload?.mediaUrl) || (payload?.mediaUrls?.length ?? 0) > 0; + const hasChannelData = Object.keys(payload?.channelData ?? {}).length > 0; + if (text || hasMedia || hasChannelData) { + return payload; + } + } + return undefined; +} + /** * Check if all payloads are just heartbeat ack responses (HEARTBEAT_OK). * Returns true if delivery should be skipped because there's no real content. diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index 3273cb8f9b..3dd0cc4165 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -56,6 +56,7 @@ import { resolveCronDeliveryPlan } from "../delivery.js"; import { resolveDeliveryTarget } from "./delivery-target.js"; import { isHeartbeatOnlyResponse, + pickLastDeliverablePayload, pickLastNonEmptyTextFromPayloads, pickSummaryFromOutput, pickSummaryFromPayloads, @@ -97,6 +98,8 @@ export type RunCronAgentTurnResult = { /** Last non-empty agent text output (not truncated). */ outputText?: string; error?: string; + sessionId?: string; + sessionKey?: string; }; export async function runCronIsolatedAgentTurn(params: { @@ -187,14 +190,12 @@ export async function runCronIsolatedAgentTurn(params: { } const modelOverrideRaw = params.job.payload.kind === "agentTurn" ? params.job.payload.model : undefined; - if (modelOverrideRaw !== undefined) { - if (typeof modelOverrideRaw !== "string") { - return { status: "error", error: "invalid model: expected string" }; - } + const modelOverride = typeof modelOverrideRaw === "string" ? modelOverrideRaw.trim() : undefined; + if (modelOverride !== undefined && modelOverride.length > 0) { const resolvedOverride = resolveAllowedModelRef({ cfg: cfgWithAgentDefaults, catalog: await loadCatalog(), - raw: modelOverrideRaw, + raw: modelOverride, defaultProvider: resolvedDefault.provider, defaultModel: resolvedDefault.model, }); @@ -211,6 +212,36 @@ export async function runCronIsolatedAgentTurn(params: { agentId, nowMs: now, }); + const runSessionId = cronSession.sessionEntry.sessionId; + const runSessionKey = baseSessionKey.startsWith("cron:") + ? `${agentSessionKey}:run:${runSessionId}` + : agentSessionKey; + const persistSessionEntry = async () => { + cronSession.store[agentSessionKey] = cronSession.sessionEntry; + if (runSessionKey !== agentSessionKey) { + cronSession.store[runSessionKey] = cronSession.sessionEntry; + } + await updateSessionStore(cronSession.storePath, (store) => { + store[agentSessionKey] = cronSession.sessionEntry; + if (runSessionKey !== agentSessionKey) { + store[runSessionKey] = cronSession.sessionEntry; + } + }); + }; + const withRunSession = ( + result: Omit, + ): RunCronAgentTurnResult => ({ + ...result, + sessionId: runSessionId, + sessionKey: runSessionKey, + }); + if (!cronSession.sessionEntry.label?.trim() && baseSessionKey.startsWith("cron:")) { + const labelSuffix = + typeof params.job.name === "string" && params.job.name.trim() + ? params.job.name.trim() + : params.job.id; + cronSession.sessionEntry.label = `Cron: ${labelSuffix}`; + } // Resolve thinking level - job thinking > hooks.gmail.thinking > agent default const hooksGmailThinking = isGmailHook @@ -317,18 +348,12 @@ export async function runCronIsolatedAgentTurn(params: { updatedAt: Date.now(), skillsSnapshot, }; - cronSession.store[agentSessionKey] = cronSession.sessionEntry; - await updateSessionStore(cronSession.storePath, (store) => { - store[agentSessionKey] = cronSession.sessionEntry; - }); + await persistSessionEntry(); } // Persist systemSent before the run, mirroring the inbound auto-reply behavior. cronSession.sessionEntry.systemSent = true; - cronSession.store[agentSessionKey] = cronSession.sessionEntry; - await updateSessionStore(cronSession.storePath, (store) => { - store[agentSessionKey] = cronSession.sessionEntry; - }); + await persistSessionEntry(); let runResult: Awaited>; let fallbackProvider = provider; @@ -396,7 +421,7 @@ export async function runCronIsolatedAgentTurn(params: { fallbackProvider = fallbackResult.provider; fallbackModel = fallbackResult.model; } catch (err) { - return { status: "error", error: String(err) }; + return withRunSession({ status: "error", error: String(err) }); } const payloads = runResult.payloads ?? []; @@ -427,14 +452,19 @@ export async function runCronIsolatedAgentTurn(params: { cronSession.sessionEntry.totalTokens = promptTokens > 0 ? promptTokens : (usage.total ?? input); } - cronSession.store[agentSessionKey] = cronSession.sessionEntry; - await updateSessionStore(cronSession.storePath, (store) => { - store[agentSessionKey] = cronSession.sessionEntry; - }); + await persistSessionEntry(); } const firstText = payloads[0]?.text ?? ""; const summary = pickSummaryFromPayloads(payloads) ?? pickSummaryFromOutput(firstText); const outputText = pickLastNonEmptyTextFromPayloads(payloads); + const synthesizedText = outputText?.trim() || summary?.trim() || undefined; + const deliveryPayload = pickLastDeliverablePayload(payloads); + const deliveryPayloads = + deliveryPayload !== undefined + ? [deliveryPayload] + : synthesizedText + ? [{ text: synthesizedText }] + : []; const deliveryBestEffort = resolveCronDeliveryBestEffort(params.job); // Skip delivery for heartbeat-only responses (HEARTBEAT_OK with no real content). @@ -454,28 +484,28 @@ export async function runCronIsolatedAgentTurn(params: { if (deliveryRequested && !skipHeartbeatDelivery && !skipMessagingToolDelivery) { if (resolvedDelivery.error) { if (!deliveryBestEffort) { - return { + return withRunSession({ status: "error", error: resolvedDelivery.error.message, summary, outputText, - }; + }); } logWarn(`[cron:${params.job.id}] ${resolvedDelivery.error.message}`); - return { status: "ok", summary, outputText }; + return withRunSession({ status: "ok", summary, outputText }); } if (!resolvedDelivery.to) { const message = "cron delivery target is missing"; if (!deliveryBestEffort) { - return { + return withRunSession({ status: "error", error: message, summary, outputText, - }; + }); } logWarn(`[cron:${params.job.id}] ${message}`); - return { status: "ok", summary, outputText }; + return withRunSession({ status: "ok", summary, outputText }); } try { await deliverOutboundPayloads({ @@ -484,16 +514,16 @@ export async function runCronIsolatedAgentTurn(params: { to: resolvedDelivery.to, accountId: resolvedDelivery.accountId, threadId: resolvedDelivery.threadId, - payloads, + payloads: deliveryPayloads, bestEffort: deliveryBestEffort, deps: createOutboundSendDeps(params.deps), }); } catch (err) { if (!deliveryBestEffort) { - return { status: "error", summary, outputText, error: String(err) }; + return withRunSession({ status: "error", summary, outputText, error: String(err) }); } } } - return { status: "ok", summary, outputText }; + return withRunSession({ status: "ok", summary, outputText }); } diff --git a/src/cron/isolated-agent/session.ts b/src/cron/isolated-agent/session.ts index 8428efeb4f..c31a35465c 100644 --- a/src/cron/isolated-agent/session.ts +++ b/src/cron/isolated-agent/session.ts @@ -28,6 +28,8 @@ export function resolveCronSession(params: { lastChannel: entry?.lastChannel, lastTo: entry?.lastTo, lastAccountId: entry?.lastAccountId, + label: entry?.label, + displayName: entry?.displayName, skillsSnapshot: entry?.skillsSnapshot, }; return { storePath, store, sessionEntry, systemSent, isNewSession: true }; diff --git a/src/cron/normalize.test.ts b/src/cron/normalize.test.ts index a876e03175..99c6748364 100644 --- a/src/cron/normalize.test.ts +++ b/src/cron/normalize.test.ts @@ -234,4 +234,62 @@ describe("normalizeCronJobCreate", () => { expect(delivery.mode).toBe("announce"); expect((normalized as { isolation?: unknown }).isolation).toBeUndefined(); }); + + it("infers payload kind/session target and name for message-only jobs", () => { + const normalized = normalizeCronJobCreate({ + schedule: { kind: "every", everyMs: 60_000 }, + payload: { message: "Nightly backup" }, + }) as unknown as Record; + + const payload = normalized.payload as Record; + expect(payload.kind).toBe("agentTurn"); + expect(payload.message).toBe("Nightly backup"); + expect(normalized.sessionTarget).toBe("isolated"); + expect(normalized.wakeMode).toBe("now"); + expect(typeof normalized.name).toBe("string"); + }); + + it("maps top-level model/thinking/timeout into payload for legacy add params", () => { + const normalized = normalizeCronJobCreate({ + name: "legacy root fields", + schedule: { kind: "every", everyMs: 60_000 }, + payload: { kind: "agentTurn", message: "hello" }, + model: " openrouter/deepseek/deepseek-r1 ", + thinking: " high ", + timeoutSeconds: 45, + allowUnsafeExternalContent: true, + }) as unknown as Record; + + const payload = normalized.payload as Record; + expect(payload.model).toBe("openrouter/deepseek/deepseek-r1"); + expect(payload.thinking).toBe("high"); + expect(payload.timeoutSeconds).toBe(45); + expect(payload.allowUnsafeExternalContent).toBe(true); + }); + + it("coerces sessionTarget and wakeMode casing", () => { + const normalized = normalizeCronJobCreate({ + name: "casing", + schedule: { kind: "cron", expr: "* * * * *" }, + sessionTarget: " IsOlAtEd ", + wakeMode: " NOW ", + payload: { kind: "agentTurn", message: "hello" }, + }) as unknown as Record; + + expect(normalized.sessionTarget).toBe("isolated"); + expect(normalized.wakeMode).toBe("now"); + }); + + it("strips invalid delivery mode from partial delivery objects", () => { + const normalized = normalizeCronJobCreate({ + name: "delivery mode", + schedule: { kind: "cron", expr: "* * * * *" }, + payload: { kind: "agentTurn", message: "hello" }, + delivery: { mode: "bogus", to: "123" }, + }) as unknown as Record; + + const delivery = normalized.delivery as Record; + expect(delivery.mode).toBeUndefined(); + expect(delivery.to).toBe("123"); + }); }); diff --git a/src/cron/normalize.ts b/src/cron/normalize.ts index 733be718c1..a41044b363 100644 --- a/src/cron/normalize.ts +++ b/src/cron/normalize.ts @@ -2,6 +2,7 @@ import type { CronJobCreate, CronJobPatch } from "./types.js"; import { sanitizeAgentId } from "../routing/session-key.js"; import { parseAbsoluteTimeMs } from "./parse.js"; import { migrateLegacyCronPayload } from "./payload-migration.js"; +import { inferLegacyName } from "./service/normalize.js"; type UnknownRecord = Record; @@ -19,7 +20,8 @@ function isRecord(value: unknown): value is UnknownRecord { function coerceSchedule(schedule: UnknownRecord) { const next: UnknownRecord = { ...schedule }; - const kind = typeof schedule.kind === "string" ? schedule.kind : undefined; + const rawKind = typeof schedule.kind === "string" ? schedule.kind.trim().toLowerCase() : ""; + const kind = rawKind === "at" || rawKind === "every" || rawKind === "cron" ? rawKind : undefined; const atMsRaw = schedule.atMs; const atRaw = schedule.at; const atString = typeof atRaw === "string" ? atRaw.trim() : ""; @@ -32,7 +34,9 @@ function coerceSchedule(schedule: UnknownRecord) { ? parseAbsoluteTimeMs(atString) : null; - if (!kind) { + if (kind) { + next.kind = kind; + } else { if ( typeof schedule.atMs === "number" || typeof schedule.at === "string" || @@ -47,7 +51,7 @@ function coerceSchedule(schedule: UnknownRecord) { } if (atString) { - next.at = parsedAtMs ? new Date(parsedAtMs).toISOString() : atString; + next.at = parsedAtMs !== null ? new Date(parsedAtMs).toISOString() : atString; } else if (parsedAtMs !== null) { next.at = new Date(parsedAtMs).toISOString(); } @@ -62,6 +66,72 @@ function coercePayload(payload: UnknownRecord) { const next: UnknownRecord = { ...payload }; // Back-compat: older configs used `provider` for delivery channel. migrateLegacyCronPayload(next); + const kindRaw = typeof next.kind === "string" ? next.kind.trim().toLowerCase() : ""; + if (kindRaw === "agentturn") { + next.kind = "agentTurn"; + } else if (kindRaw === "systemevent") { + next.kind = "systemEvent"; + } else if (kindRaw) { + next.kind = kindRaw; + } + if (!next.kind) { + const hasMessage = typeof next.message === "string" && next.message.trim().length > 0; + const hasText = typeof next.text === "string" && next.text.trim().length > 0; + if (hasMessage) { + next.kind = "agentTurn"; + } else if (hasText) { + next.kind = "systemEvent"; + } + } + if (typeof next.message === "string") { + const trimmed = next.message.trim(); + if (trimmed) { + next.message = trimmed; + } + } + if (typeof next.text === "string") { + const trimmed = next.text.trim(); + if (trimmed) { + next.text = trimmed; + } + } + if ("model" in next) { + if (typeof next.model === "string") { + const trimmed = next.model.trim(); + if (trimmed) { + next.model = trimmed; + } else { + delete next.model; + } + } else { + delete next.model; + } + } + if ("thinking" in next) { + if (typeof next.thinking === "string") { + const trimmed = next.thinking.trim(); + if (trimmed) { + next.thinking = trimmed; + } else { + delete next.thinking; + } + } else { + delete next.thinking; + } + } + if ("timeoutSeconds" in next) { + if (typeof next.timeoutSeconds === "number" && Number.isFinite(next.timeoutSeconds)) { + next.timeoutSeconds = Math.max(1, Math.floor(next.timeoutSeconds)); + } else { + delete next.timeoutSeconds; + } + } + if ( + "allowUnsafeExternalContent" in next && + typeof next.allowUnsafeExternalContent !== "boolean" + ) { + delete next.allowUnsafeExternalContent; + } return next; } @@ -69,7 +139,15 @@ function coerceDelivery(delivery: UnknownRecord) { const next: UnknownRecord = { ...delivery }; if (typeof delivery.mode === "string") { const mode = delivery.mode.trim().toLowerCase(); - next.mode = mode === "deliver" ? "announce" : mode; + if (mode === "deliver") { + next.mode = "announce"; + } else if (mode === "announce" || mode === "none") { + next.mode = mode; + } else { + delete next.mode; + } + } else if ("mode" in next) { + delete next.mode; } if (typeof delivery.channel === "string") { const trimmed = delivery.channel.trim().toLowerCase(); @@ -147,6 +225,95 @@ function unwrapJob(raw: UnknownRecord) { return raw; } +function normalizeSessionTarget(raw: unknown) { + if (typeof raw !== "string") { + return undefined; + } + const trimmed = raw.trim().toLowerCase(); + if (trimmed === "main" || trimmed === "isolated") { + return trimmed; + } + return undefined; +} + +function normalizeWakeMode(raw: unknown) { + if (typeof raw !== "string") { + return undefined; + } + const trimmed = raw.trim().toLowerCase(); + if (trimmed === "now" || trimmed === "next-heartbeat") { + return trimmed; + } + return undefined; +} + +function copyTopLevelAgentTurnFields(next: UnknownRecord, payload: UnknownRecord) { + const copyString = (field: "model" | "thinking") => { + if (typeof payload[field] === "string" && payload[field].trim()) { + return; + } + const value = next[field]; + if (typeof value === "string" && value.trim()) { + payload[field] = value.trim(); + } + }; + copyString("model"); + copyString("thinking"); + + if (typeof payload.timeoutSeconds !== "number" && typeof next.timeoutSeconds === "number") { + payload.timeoutSeconds = next.timeoutSeconds; + } + if ( + typeof payload.allowUnsafeExternalContent !== "boolean" && + typeof next.allowUnsafeExternalContent === "boolean" + ) { + payload.allowUnsafeExternalContent = next.allowUnsafeExternalContent; + } +} + +function copyTopLevelLegacyDeliveryFields(next: UnknownRecord, payload: UnknownRecord) { + if (typeof payload.deliver !== "boolean" && typeof next.deliver === "boolean") { + payload.deliver = next.deliver; + } + if ( + typeof payload.channel !== "string" && + typeof next.channel === "string" && + next.channel.trim() + ) { + payload.channel = next.channel.trim(); + } + if (typeof payload.to !== "string" && typeof next.to === "string" && next.to.trim()) { + payload.to = next.to.trim(); + } + if ( + typeof payload.bestEffortDeliver !== "boolean" && + typeof next.bestEffortDeliver === "boolean" + ) { + payload.bestEffortDeliver = next.bestEffortDeliver; + } + if ( + typeof payload.provider !== "string" && + typeof next.provider === "string" && + next.provider.trim() + ) { + payload.provider = next.provider.trim(); + } +} + +function stripLegacyTopLevelFields(next: UnknownRecord) { + delete next.model; + delete next.thinking; + delete next.timeoutSeconds; + delete next.allowUnsafeExternalContent; + delete next.message; + delete next.text; + delete next.deliver; + delete next.channel; + delete next.to; + delete next.bestEffortDeliver; + delete next.provider; +} + export function normalizeCronJobInput( raw: unknown, options: NormalizeOptions = DEFAULT_OPTIONS, @@ -186,10 +353,38 @@ export function normalizeCronJobInput( } } + if ("sessionTarget" in base) { + const normalized = normalizeSessionTarget(base.sessionTarget); + if (normalized) { + next.sessionTarget = normalized; + } else { + delete next.sessionTarget; + } + } + + if ("wakeMode" in base) { + const normalized = normalizeWakeMode(base.wakeMode); + if (normalized) { + next.wakeMode = normalized; + } else { + delete next.wakeMode; + } + } + if (isRecord(base.schedule)) { next.schedule = coerceSchedule(base.schedule); } + if (!("payload" in next) || !isRecord(next.payload)) { + const message = typeof next.message === "string" ? next.message.trim() : ""; + const text = typeof next.text === "string" ? next.text.trim() : ""; + if (message) { + next.payload = { kind: "agentTurn", message }; + } else if (text) { + next.payload = { kind: "systemEvent", text }; + } + } + if (isRecord(base.payload)) { next.payload = coercePayload(base.payload); } @@ -198,17 +393,39 @@ export function normalizeCronJobInput( next.delivery = coerceDelivery(base.delivery); } - if (isRecord(base.isolation)) { + if ("isolation" in next) { delete next.isolation; } + const payload = isRecord(next.payload) ? next.payload : null; + if (payload && payload.kind === "agentTurn") { + copyTopLevelAgentTurnFields(next, payload); + copyTopLevelLegacyDeliveryFields(next, payload); + } + stripLegacyTopLevelFields(next); + if (options.applyDefaults) { if (!next.wakeMode) { - next.wakeMode = "next-heartbeat"; + next.wakeMode = "now"; } if (typeof next.enabled !== "boolean") { next.enabled = true; } + if ( + (typeof next.name !== "string" || !next.name.trim()) && + isRecord(next.schedule) && + isRecord(next.payload) + ) { + next.name = inferLegacyName({ + schedule: next.schedule as { kind?: unknown; everyMs?: unknown; expr?: unknown }, + payload: next.payload as { kind?: unknown; text?: unknown; message?: unknown }, + }); + } else if (typeof next.name === "string") { + const trimmed = next.name.trim(); + if (trimmed) { + next.name = trimmed; + } + } if (!next.sessionTarget && isRecord(next.payload)) { const kind = typeof next.payload.kind === "string" ? next.payload.kind : ""; if (kind === "systemEvent") { diff --git a/src/cron/run-log.test.ts b/src/cron/run-log.test.ts index cef09acfe2..6ac9cca2bb 100644 --- a/src/cron/run-log.test.ts +++ b/src/cron/run-log.test.ts @@ -65,6 +65,8 @@ describe("cron run log", () => { jobId: "a", action: "finished", status: "skipped", + sessionId: "run-123", + sessionKey: "agent:main:cron:a:run:run-123", }); const allA = await readCronRunLogEntries(logPathA, { limit: 10 }); @@ -78,6 +80,8 @@ describe("cron run log", () => { const lastOne = await readCronRunLogEntries(logPathA, { limit: 1 }); expect(lastOne.map((e) => e.ts)).toEqual([3]); + expect(lastOne[0]?.sessionId).toBe("run-123"); + expect(lastOne[0]?.sessionKey).toBe("agent:main:cron:a:run:run-123"); const onlyB = await readCronRunLogEntries(logPathB, { limit: 10, diff --git a/src/cron/run-log.ts b/src/cron/run-log.ts index 744b023e53..25846ce81a 100644 --- a/src/cron/run-log.ts +++ b/src/cron/run-log.ts @@ -8,6 +8,8 @@ export type CronRunLogEntry = { status?: "ok" | "error" | "skipped"; error?: string; summary?: string; + sessionId?: string; + sessionKey?: string; runAtMs?: number; durationMs?: number; nextRunAtMs?: number; @@ -93,7 +95,24 @@ export async function readCronRunLogEntries( if (jobId && obj.jobId !== jobId) { continue; } - parsed.push(obj as CronRunLogEntry); + const entry: CronRunLogEntry = { + ts: obj.ts, + jobId: obj.jobId, + action: "finished", + status: obj.status, + error: obj.error, + summary: obj.summary, + runAtMs: obj.runAtMs, + durationMs: obj.durationMs, + nextRunAtMs: obj.nextRunAtMs, + }; + if (typeof obj.sessionId === "string" && obj.sessionId.trim().length > 0) { + entry.sessionId = obj.sessionId; + } + if (typeof obj.sessionKey === "string" && obj.sessionKey.trim().length > 0) { + entry.sessionKey = obj.sessionKey; + } + parsed.push(entry); } catch { // ignore invalid lines } diff --git a/src/cron/schedule.ts b/src/cron/schedule.ts index 252d29babe..fc13ebfe2b 100644 --- a/src/cron/schedule.ts +++ b/src/cron/schedule.ts @@ -2,6 +2,14 @@ import { Cron } from "croner"; import type { CronSchedule } from "./types.js"; import { parseAbsoluteTimeMs } from "./parse.js"; +function resolveCronTimezone(tz?: string) { + const trimmed = typeof tz === "string" ? tz.trim() : ""; + if (trimmed) { + return trimmed; + } + return Intl.DateTimeFormat().resolvedOptions().timeZone; +} + export function computeNextRunAtMs(schedule: CronSchedule, nowMs: number): number | undefined { if (schedule.kind === "at") { // Handle both canonical `at` (string) and legacy `atMs` (number) fields. @@ -38,9 +46,20 @@ export function computeNextRunAtMs(schedule: CronSchedule, nowMs: number): numbe return undefined; } const cron = new Cron(expr, { - timezone: schedule.tz?.trim() || undefined, + timezone: resolveCronTimezone(schedule.tz), catch: false, }); - const next = cron.nextRun(new Date(nowMs)); - return next ? next.getTime() : undefined; + let cursor = nowMs; + for (let attempt = 0; attempt < 3; attempt++) { + const next = cron.nextRun(new Date(cursor)); + if (!next) { + return undefined; + } + const nextMs = next.getTime(); + if (Number.isFinite(nextMs) && nextMs > nowMs) { + return nextMs; + } + cursor += 1_000; + } + return undefined; } diff --git a/src/cron/service.delivery-plan.test.ts b/src/cron/service.delivery-plan.test.ts new file mode 100644 index 0000000000..707868cba6 --- /dev/null +++ b/src/cron/service.delivery-plan.test.ts @@ -0,0 +1,92 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { describe, expect, it, vi } from "vitest"; +import { CronService } from "./service.js"; + +const noopLogger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), +}; + +async function makeStorePath() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-delivery-")); + return { + storePath: path.join(dir, "cron", "jobs.json"), + cleanup: async () => { + await fs.rm(dir, { recursive: true, force: true }); + }, + }; +} + +describe("CronService delivery plan consistency", () => { + it("does not post isolated summary when legacy deliver=false", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const cron = new CronService({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok", summary: "done" })), + }); + await cron.start(); + const job = await cron.add({ + name: "legacy-off", + schedule: { kind: "every", everyMs: 60_000, anchorMs: Date.now() }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { + kind: "agentTurn", + message: "hello", + deliver: false, + }, + }); + + const result = await cron.run(job.id, "force"); + expect(result).toEqual({ ok: true, ran: true }); + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + + cron.stop(); + await store.cleanup(); + }); + + it("treats delivery object without mode as announce", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const cron = new CronService({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok", summary: "done" })), + }); + await cron.start(); + const job = await cron.add({ + name: "partial-delivery", + schedule: { kind: "every", everyMs: 60_000, anchorMs: Date.now() }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { + kind: "agentTurn", + message: "hello", + }, + delivery: { channel: "telegram", to: "123" } as unknown as { + mode: "none" | "announce"; + channel?: string; + to?: string; + }, + }); + + const result = await cron.run(job.id, "force"); + expect(result).toEqual({ ok: true, ran: true }); + expect(enqueueSystemEvent).toHaveBeenCalledWith("Cron: done", { agentId: undefined }); + + cron.stop(); + await store.cleanup(); + }); +}); diff --git a/src/cron/service.every-jobs-fire.test.ts b/src/cron/service.every-jobs-fire.test.ts index a6a2bab80f..7ae49ac2d0 100644 --- a/src/cron/service.every-jobs-fire.test.ts +++ b/src/cron/service.every-jobs-fire.test.ts @@ -2,6 +2,7 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { CronJob } from "./types.js"; import { CronService } from "./service.js"; const noopLogger = { @@ -21,6 +22,23 @@ async function makeStorePath() { }; } +async function waitForJob( + cron: CronService, + id: string, + predicate: (job: CronJob | undefined) => boolean, +) { + let latest: CronJob | undefined; + for (let i = 0; i < 30; i++) { + const jobs = await cron.list({ includeDisabled: true }); + latest = jobs.find((job) => job.id === id); + if (predicate(latest)) { + return latest; + } + await vi.runOnlyPendingTimersAsync(); + } + return latest; +} + describe("CronService interval/cron jobs fire on time", () => { beforeEach(() => { vi.useFakeTimers(); @@ -66,9 +84,7 @@ describe("CronService interval/cron jobs fire on time", () => { vi.setSystemTime(new Date(firstDueAt + 5)); await vi.runOnlyPendingTimersAsync(); - // Wait for the async onTimer to complete via the lock queue. - const jobs = await cron.list(); - const updated = jobs.find((j) => j.id === job.id); + const updated = await waitForJob(cron, job.id, (current) => current?.state.lastStatus === "ok"); expect(enqueueSystemEvent).toHaveBeenCalledWith("tick", { agentId: undefined }); expect(updated?.state.lastStatus).toBe("ok"); @@ -112,9 +128,7 @@ describe("CronService interval/cron jobs fire on time", () => { vi.setSystemTime(new Date(firstDueAt + 5)); await vi.runOnlyPendingTimersAsync(); - // Wait for the async onTimer to complete via the lock queue. - const jobs = await cron.list(); - const updated = jobs.find((j) => j.id === job.id); + const updated = await waitForJob(cron, job.id, (current) => current?.state.lastStatus === "ok"); expect(enqueueSystemEvent).toHaveBeenCalledWith("cron-tick", { agentId: undefined }); expect(updated?.state.lastStatus).toBe("ok"); @@ -124,4 +138,88 @@ describe("CronService interval/cron jobs fire on time", () => { cron.stop(); await store.cleanup(); }); + + it("keeps legacy every jobs due while minute cron jobs recompute schedules", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + const nowMs = Date.parse("2025-12-13T00:00:00.000Z"); + + await fs.mkdir(path.dirname(store.storePath), { recursive: true }); + await fs.writeFile( + store.storePath, + JSON.stringify( + { + version: 1, + jobs: [ + { + id: "legacy-every", + name: "legacy every", + enabled: true, + createdAtMs: nowMs, + updatedAtMs: nowMs, + schedule: { kind: "every", everyMs: 120_000 }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "sf-tick" }, + state: { nextRunAtMs: nowMs + 120_000 }, + }, + { + id: "minute-cron", + name: "minute cron", + enabled: true, + createdAtMs: nowMs, + updatedAtMs: nowMs, + schedule: { kind: "cron", expr: "* * * * *", tz: "UTC" }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "minute-tick" }, + state: { nextRunAtMs: nowMs + 60_000 }, + }, + ], + }, + null, + 2, + ), + "utf-8", + ); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow, + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + }); + + await cron.start(); + for (let minute = 1; minute <= 6; minute++) { + vi.setSystemTime(new Date(nowMs + minute * 60_000)); + const minuteRun = await cron.run("minute-cron", "force"); + expect(minuteRun).toEqual({ ok: true, ran: true }); + } + + vi.setSystemTime(new Date(nowMs + 6 * 60_000)); + const sfRun = await cron.run("legacy-every", "due"); + expect(sfRun).toEqual({ ok: true, ran: true }); + + const sfRuns = enqueueSystemEvent.mock.calls.filter((args) => args[0] === "sf-tick").length; + const minuteRuns = enqueueSystemEvent.mock.calls.filter( + (args) => args[0] === "minute-tick", + ).length; + expect(minuteRuns).toBeGreaterThan(0); + expect(sfRuns).toBeGreaterThan(0); + + const jobs = await cron.list({ includeDisabled: true }); + const sfJob = jobs.find((job) => job.id === "legacy-every"); + expect(sfJob?.state.lastStatus).toBe("ok"); + expect(sfJob?.schedule.kind).toBe("every"); + if (sfJob?.schedule.kind === "every") { + expect(sfJob.schedule.anchorMs).toBe(nowMs); + } + + cron.stop(); + await store.cleanup(); + }); }); diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts new file mode 100644 index 0000000000..c793979c16 --- /dev/null +++ b/src/cron/service.issue-regressions.test.ts @@ -0,0 +1,346 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { setTimeout as delay } from "node:timers/promises"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { CronJob } from "./types.js"; +import { CronService } from "./service.js"; +import { createCronServiceState, type CronEvent } from "./service/state.js"; +import { onTimer } from "./service/timer.js"; + +const noopLogger = { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + trace: vi.fn(), +}; + +async function makeStorePath() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "cron-issues-")); + const storePath = path.join(dir, "jobs.json"); + return { + storePath, + cleanup: async () => { + await fs.rm(dir, { recursive: true, force: true }); + }, + }; +} + +function createDueIsolatedJob(params: { + id: string; + nowMs: number; + nextRunAtMs: number; + deleteAfterRun?: boolean; +}): CronJob { + return { + id: params.id, + name: params.id, + enabled: true, + deleteAfterRun: params.deleteAfterRun ?? false, + createdAtMs: params.nowMs, + updatedAtMs: params.nowMs, + schedule: { kind: "at", at: new Date(params.nextRunAtMs).toISOString() }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: params.id }, + delivery: { mode: "none" }, + state: { nextRunAtMs: params.nextRunAtMs }, + }; +} + +describe("Cron issue regressions", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-02-06T10:05:00.000Z")); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.clearAllMocks(); + }); + + it("recalculates nextRunAtMs when schedule changes", async () => { + const store = await makeStorePath(); + const cron = new CronService({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok", summary: "ok" }), + }); + await cron.start(); + + const created = await cron.add({ + name: "hourly", + schedule: { kind: "cron", expr: "0 * * * *", tz: "UTC" }, + sessionTarget: "main", + payload: { kind: "systemEvent", text: "tick" }, + }); + expect(created.state.nextRunAtMs).toBe(Date.parse("2026-02-06T11:00:00.000Z")); + + const updated = await cron.update(created.id, { + schedule: { kind: "cron", expr: "0 */2 * * *", tz: "UTC" }, + }); + + expect(updated.state.nextRunAtMs).toBe(Date.parse("2026-02-06T12:00:00.000Z")); + + cron.stop(); + await store.cleanup(); + }); + + it("runs immediately with force mode even when not due", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const cron = new CronService({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok", summary: "ok" }), + }); + await cron.start(); + + const created = await cron.add({ + name: "force-now", + schedule: { kind: "every", everyMs: 60_000, anchorMs: Date.now() }, + sessionTarget: "main", + payload: { kind: "systemEvent", text: "force" }, + }); + + const result = await cron.run(created.id, "force"); + + expect(result).toEqual({ ok: true, ran: true }); + expect(enqueueSystemEvent).toHaveBeenCalledWith("force", { agentId: undefined }); + + cron.stop(); + await store.cleanup(); + }); + + it("schedules isolated jobs with next wake time", async () => { + const store = await makeStorePath(); + const cron = new CronService({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok", summary: "ok" }), + }); + await cron.start(); + + const job = await cron.add({ + name: "isolated", + schedule: { kind: "every", everyMs: 60_000, anchorMs: Date.now() }, + sessionTarget: "isolated", + payload: { kind: "agentTurn", message: "hi" }, + }); + const status = await cron.status(); + + expect(typeof job.state.nextRunAtMs).toBe("number"); + expect(typeof status.nextWakeAtMs).toBe("number"); + + cron.stop(); + await store.cleanup(); + }); + + it("persists allowUnsafeExternalContent on agentTurn payload patches", async () => { + const store = await makeStorePath(); + const cron = new CronService({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok", summary: "ok" }), + }); + await cron.start(); + + const created = await cron.add({ + name: "unsafe toggle", + schedule: { kind: "every", everyMs: 60_000, anchorMs: Date.now() }, + sessionTarget: "isolated", + payload: { kind: "agentTurn", message: "hi" }, + }); + + const updated = await cron.update(created.id, { + payload: { kind: "agentTurn", allowUnsafeExternalContent: true }, + }); + + expect(updated.payload.kind).toBe("agentTurn"); + if (updated.payload.kind === "agentTurn") { + expect(updated.payload.allowUnsafeExternalContent).toBe(true); + expect(updated.payload.message).toBe("hi"); + } + + cron.stop(); + await store.cleanup(); + }); + + it("caps timer delay to 60s for far-future schedules", async () => { + const timeoutSpy = vi.spyOn(globalThis, "setTimeout"); + const store = await makeStorePath(); + const cron = new CronService({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok", summary: "ok" }), + }); + await cron.start(); + + const callsBeforeAdd = timeoutSpy.mock.calls.length; + await cron.add({ + name: "far-future", + schedule: { kind: "at", at: "2035-01-01T00:00:00.000Z" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "future" }, + }); + + const delaysAfterAdd = timeoutSpy.mock.calls + .slice(callsBeforeAdd) + .map(([, delay]) => delay) + .filter((delay): delay is number => typeof delay === "number"); + expect(delaysAfterAdd.some((delay) => delay === 60_000)).toBe(true); + + cron.stop(); + timeoutSpy.mockRestore(); + await store.cleanup(); + }); + + it("does not hot-loop zero-delay timers while a run is already in progress", async () => { + const timeoutSpy = vi.spyOn(globalThis, "setTimeout"); + const store = await makeStorePath(); + const now = Date.parse("2026-02-06T10:05:00.000Z"); + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => now, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok", summary: "ok" }), + }); + state.running = true; + state.store = { + version: 1, + jobs: [createDueIsolatedJob({ id: "due", nowMs: now, nextRunAtMs: now - 1 })], + }; + + await onTimer(state); + + expect(timeoutSpy).not.toHaveBeenCalled(); + expect(state.timer).toBeNull(); + timeoutSpy.mockRestore(); + await store.cleanup(); + }); + + it("skips forced manual runs while a timer-triggered run is in progress", async () => { + vi.useRealTimers(); + const store = await makeStorePath(); + let resolveRun: + | ((value: { status: "ok" | "error" | "skipped"; summary?: string; error?: string }) => void) + | undefined; + const runIsolatedAgentJob = vi.fn( + async () => + await new Promise<{ status: "ok" | "error" | "skipped"; summary?: string; error?: string }>( + (resolve) => { + resolveRun = resolve; + }, + ), + ); + + const cron = new CronService({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob, + }); + await cron.start(); + + const runAt = Date.now() + 30; + const job = await cron.add({ + name: "timer-overlap", + enabled: true, + schedule: { kind: "at", at: new Date(runAt).toISOString() }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "long task" }, + delivery: { mode: "none" }, + }); + + for (let i = 0; i < 25 && runIsolatedAgentJob.mock.calls.length === 0; i++) { + await delay(20); + } + expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); + + const manualResult = await cron.run(job.id, "force"); + expect(manualResult).toEqual({ ok: true, ran: false, reason: "already-running" }); + expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); + + resolveRun?.({ status: "ok", summary: "done" }); + for (let i = 0; i < 25; i++) { + const jobs = await cron.list({ includeDisabled: true }); + if (jobs.some((j) => j.id === job.id && j.state.lastStatus === "ok")) { + break; + } + await delay(20); + } + + cron.stop(); + await store.cleanup(); + }); + + it("records per-job start time and duration for batched due jobs", async () => { + const store = await makeStorePath(); + const dueAt = Date.parse("2026-02-06T10:05:01.000Z"); + const first = createDueIsolatedJob({ id: "batch-first", nowMs: dueAt, nextRunAtMs: dueAt }); + const second = createDueIsolatedJob({ id: "batch-second", nowMs: dueAt, nextRunAtMs: dueAt }); + await fs.writeFile( + store.storePath, + JSON.stringify({ version: 1, jobs: [first, second] }, null, 2), + "utf-8", + ); + + let now = dueAt; + const events: CronEvent[] = []; + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => now, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + onEvent: (evt) => { + events.push(evt); + }, + runIsolatedAgentJob: vi.fn(async (params: { job: { id: string } }) => { + now += params.job.id === first.id ? 50 : 20; + return { status: "ok" as const, summary: "ok" }; + }), + }); + + await onTimer(state); + + const jobs = state.store?.jobs ?? []; + const firstDone = jobs.find((job) => job.id === first.id); + const secondDone = jobs.find((job) => job.id === second.id); + const startedAtEvents = events + .filter((evt) => evt.action === "started") + .map((evt) => evt.runAtMs); + + expect(firstDone?.state.lastRunAtMs).toBe(dueAt); + expect(firstDone?.state.lastDurationMs).toBe(50); + expect(secondDone?.state.lastRunAtMs).toBe(dueAt + 50); + expect(secondDone?.state.lastDurationMs).toBe(20); + expect(startedAtEvents).toEqual([dueAt, dueAt + 50]); + + await store.cleanup(); + }); +}); diff --git a/src/cron/service.read-ops-nonblocking.test.ts b/src/cron/service.read-ops-nonblocking.test.ts new file mode 100644 index 0000000000..d0e73c87dd --- /dev/null +++ b/src/cron/service.read-ops-nonblocking.test.ts @@ -0,0 +1,104 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { setTimeout as delay } from "node:timers/promises"; +import { describe, expect, it, vi } from "vitest"; +import { CronService } from "./service.js"; + +const noopLogger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), +}; + +async function makeStorePath() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-")); + return { + storePath: path.join(dir, "cron", "jobs.json"), + cleanup: async () => { + await fs.rm(dir, { recursive: true, force: true }); + }, + }; +} + +describe("CronService read ops while job is running", () => { + it("keeps list and status responsive during a long isolated run", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + + let resolveRun: + | ((value: { status: "ok" | "error" | "skipped"; summary?: string; error?: string }) => void) + | undefined; + + const runIsolatedAgentJob = vi.fn( + async () => + await new Promise<{ status: "ok" | "error" | "skipped"; summary?: string; error?: string }>( + (resolve) => { + resolveRun = resolve; + }, + ), + ); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow, + runIsolatedAgentJob, + }); + + await cron.start(); + + const runAt = Date.now() + 30; + await cron.add({ + name: "slow isolated", + enabled: true, + deleteAfterRun: false, + schedule: { kind: "at", at: new Date(runAt).toISOString() }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "long task" }, + delivery: { mode: "none" }, + }); + + for (let i = 0; i < 25 && runIsolatedAgentJob.mock.calls.length === 0; i++) { + await delay(20); + } + + expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); + + const listRace = await Promise.race([ + cron.list({ includeDisabled: true }).then(() => "ok"), + delay(200).then(() => "timeout"), + ]); + expect(listRace).toBe("ok"); + + const statusRace = await Promise.race([ + cron.status().then(() => "ok"), + delay(200).then(() => "timeout"), + ]); + expect(statusRace).toBe("ok"); + + const running = await cron.list({ includeDisabled: true }); + expect(running[0]?.state.runningAtMs).toBeTypeOf("number"); + + resolveRun?.({ status: "ok", summary: "done" }); + + for (let i = 0; i < 25; i++) { + const jobs = await cron.list({ includeDisabled: true }); + if (jobs[0]?.state.lastStatus === "ok") { + break; + } + await delay(20); + } + + const finished = await cron.list({ includeDisabled: true }); + expect(finished[0]?.state.lastStatus).toBe("ok"); + + cron.stop(); + await store.cleanup(); + }); +}); diff --git a/src/cron/service.restart-catchup.test.ts b/src/cron/service.restart-catchup.test.ts new file mode 100644 index 0000000000..c8994eed19 --- /dev/null +++ b/src/cron/service.restart-catchup.test.ts @@ -0,0 +1,165 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { CronService } from "./service.js"; + +const noopLogger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), +}; + +async function makeStorePath() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-")); + return { + storePath: path.join(dir, "cron", "jobs.json"), + cleanup: async () => { + await fs.rm(dir, { recursive: true, force: true }); + }, + }; +} + +describe("CronService restart catch-up", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2025-12-13T17:00:00.000Z")); + noopLogger.debug.mockClear(); + noopLogger.info.mockClear(); + noopLogger.warn.mockClear(); + noopLogger.error.mockClear(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("executes an overdue recurring job immediately on start", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + + const dueAt = Date.parse("2025-12-13T15:00:00.000Z"); + const lastRunAt = Date.parse("2025-12-12T15:00:00.000Z"); + + await fs.mkdir(path.dirname(store.storePath), { recursive: true }); + await fs.writeFile( + store.storePath, + JSON.stringify( + { + version: 1, + jobs: [ + { + id: "restart-overdue-job", + name: "daily digest", + enabled: true, + createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"), + updatedAtMs: Date.parse("2025-12-12T15:00:00.000Z"), + schedule: { kind: "cron", expr: "0 15 * * *", tz: "UTC" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "digest now" }, + state: { + nextRunAtMs: dueAt, + lastRunAtMs: lastRunAt, + lastStatus: "ok", + }, + }, + ], + }, + null, + 2, + ), + "utf-8", + ); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow, + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + }); + + await cron.start(); + + expect(enqueueSystemEvent).toHaveBeenCalledWith("digest now", { agentId: undefined }); + expect(requestHeartbeatNow).toHaveBeenCalled(); + + const jobs = await cron.list({ includeDisabled: true }); + const updated = jobs.find((job) => job.id === "restart-overdue-job"); + expect(updated?.state.lastStatus).toBe("ok"); + expect(updated?.state.lastRunAtMs).toBe(Date.parse("2025-12-13T17:00:00.000Z")); + expect(updated?.state.nextRunAtMs).toBeGreaterThan(Date.parse("2025-12-13T17:00:00.000Z")); + + cron.stop(); + await store.cleanup(); + }); + + it("clears stale running markers and catches up overdue jobs on startup", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + + const dueAt = Date.parse("2025-12-13T16:00:00.000Z"); + const staleRunningAt = Date.parse("2025-12-13T16:30:00.000Z"); + + await fs.mkdir(path.dirname(store.storePath), { recursive: true }); + await fs.writeFile( + store.storePath, + JSON.stringify( + { + version: 1, + jobs: [ + { + id: "restart-stale-running", + name: "daily stale marker", + enabled: true, + createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"), + updatedAtMs: Date.parse("2025-12-13T16:30:00.000Z"), + schedule: { kind: "cron", expr: "0 16 * * *", tz: "UTC" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "resume stale marker" }, + state: { + nextRunAtMs: dueAt, + runningAtMs: staleRunningAt, + }, + }, + ], + }, + null, + 2, + ), + "utf-8", + ); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow, + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + }); + + await cron.start(); + + expect(enqueueSystemEvent).toHaveBeenCalledWith("resume stale marker", { agentId: undefined }); + expect(noopLogger.warn).toHaveBeenCalledWith( + expect.objectContaining({ jobId: "restart-stale-running" }), + "cron: clearing stale running marker on startup", + ); + + const jobs = await cron.list({ includeDisabled: true }); + const updated = jobs.find((job) => job.id === "restart-stale-running"); + expect(updated?.state.runningAtMs).toBeUndefined(); + expect(updated?.state.lastStatus).toBe("ok"); + expect(updated?.state.lastRunAtMs).toBe(Date.parse("2025-12-13T17:00:00.000Z")); + + cron.stop(); + await store.cleanup(); + }); +}); diff --git a/src/cron/service.runs-one-shot-main-job-disables-it.test.ts b/src/cron/service.runs-one-shot-main-job-disables-it.test.ts index e26e71cab7..1cc3eca03c 100644 --- a/src/cron/service.runs-one-shot-main-job-disables-it.test.ts +++ b/src/cron/service.runs-one-shot-main-job-disables-it.test.ts @@ -3,6 +3,7 @@ import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js"; +import type { CronJob } from "./types.js"; import { CronService } from "./service.js"; const noopLogger = { @@ -22,6 +23,18 @@ async function makeStorePath() { }; } +async function waitForJobs(cron: CronService, predicate: (jobs: CronJob[]) => boolean) { + let latest: CronJob[] = []; + for (let i = 0; i < 30; i++) { + latest = await cron.list({ includeDisabled: true }); + if (predicate(latest)) { + return latest; + } + await vi.runOnlyPendingTimersAsync(); + } + return latest; +} + describe("CronService", () => { beforeEach(() => { vi.useFakeTimers(); @@ -67,7 +80,9 @@ describe("CronService", () => { vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z")); await vi.runOnlyPendingTimersAsync(); - const jobs = await cron.list({ includeDisabled: true }); + const jobs = await waitForJobs(cron, (items) => + items.some((item) => item.id === job.id && !item.enabled), + ); const updated = jobs.find((j) => j.id === job.id); expect(updated?.enabled).toBe(false); expect(enqueueSystemEvent).toHaveBeenCalledWith("hello", { @@ -108,7 +123,7 @@ describe("CronService", () => { vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z")); await vi.runOnlyPendingTimersAsync(); - const jobs = await cron.list({ includeDisabled: true }); + const jobs = await waitForJobs(cron, (items) => !items.some((item) => item.id === job.id)); expect(jobs.find((j) => j.id === job.id)).toBeUndefined(); expect(enqueueSystemEvent).toHaveBeenCalledWith("hello", { agentId: undefined, @@ -185,6 +200,49 @@ describe("CronService", () => { await store.cleanup(); }); + it("wakeMode now falls back to queued heartbeat when main lane stays busy", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + const runHeartbeatOnce = vi.fn(async () => ({ + status: "skipped" as const, + reason: "requests-in-flight", + })); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow, + runHeartbeatOnce, + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + }); + + await cron.start(); + const job = await cron.add({ + name: "wakeMode now fallback", + enabled: true, + schedule: { kind: "at", at: new Date(1).toISOString() }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "hello" }, + }); + + const runPromise = cron.run(job.id, "force"); + await vi.advanceTimersByTimeAsync(125_000); + await runPromise; + + expect(runHeartbeatOnce).toHaveBeenCalled(); + expect(requestHeartbeatNow).toHaveBeenCalled(); + expect(job.state.lastStatus).toBe("ok"); + expect(job.state.lastError).toBeUndefined(); + + await cron.list({ includeDisabled: true }); + cron.stop(); + await store.cleanup(); + }); + it("runs an isolated job and posts summary to main", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); @@ -218,7 +276,7 @@ describe("CronService", () => { vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z")); await vi.runOnlyPendingTimersAsync(); - await cron.list({ includeDisabled: true }); + await waitForJobs(cron, (items) => items.some((item) => item.state.lastStatus === "ok")); expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); expect(enqueueSystemEvent).toHaveBeenCalledWith("Cron: done", { agentId: undefined, @@ -366,7 +424,7 @@ describe("CronService", () => { vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z")); await vi.runOnlyPendingTimersAsync(); - await cron.list({ includeDisabled: true }); + await waitForJobs(cron, (items) => items.some((item) => item.state.lastStatus === "error")); expect(enqueueSystemEvent).toHaveBeenCalledWith("Cron (error): last output", { agentId: undefined, @@ -460,7 +518,7 @@ describe("CronService", () => { expect(enqueueSystemEvent).not.toHaveBeenCalled(); expect(requestHeartbeatNow).not.toHaveBeenCalled(); - const jobs = await cron.list({ includeDisabled: true }); + const jobs = await waitForJobs(cron, (items) => items[0]?.state.lastStatus === "skipped"); expect(jobs[0]?.state.lastStatus).toBe("skipped"); expect(jobs[0]?.state.lastError).toMatch(/main job requires/i); diff --git a/src/cron/service.skips-main-jobs-empty-systemevent-text.test.ts b/src/cron/service.skips-main-jobs-empty-systemevent-text.test.ts index d25edfb8a7..4bbc07afc8 100644 --- a/src/cron/service.skips-main-jobs-empty-systemevent-text.test.ts +++ b/src/cron/service.skips-main-jobs-empty-systemevent-text.test.ts @@ -2,6 +2,7 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { CronJob } from "./types.js"; import { CronService } from "./service.js"; const noopLogger = { @@ -21,6 +22,22 @@ async function makeStorePath() { }; } +async function waitForFirstJob( + cron: CronService, + predicate: (job: CronJob | undefined) => boolean, +) { + let latest: CronJob | undefined; + for (let i = 0; i < 30; i++) { + const jobs = await cron.list({ includeDisabled: true }); + latest = jobs[0]; + if (predicate(latest)) { + return latest; + } + await vi.runOnlyPendingTimersAsync(); + } + return latest; +} + describe("CronService", () => { beforeEach(() => { vi.useFakeTimers(); @@ -66,9 +83,9 @@ describe("CronService", () => { expect(enqueueSystemEvent).not.toHaveBeenCalled(); expect(requestHeartbeatNow).not.toHaveBeenCalled(); - const jobs = await cron.list({ includeDisabled: true }); - expect(jobs[0]?.state.lastStatus).toBe("skipped"); - expect(jobs[0]?.state.lastError).toMatch(/non-empty/i); + const job = await waitForFirstJob(cron, (current) => current?.state.lastStatus === "skipped"); + expect(job?.state.lastStatus).toBe("skipped"); + expect(job?.state.lastError).toMatch(/non-empty/i); cron.stop(); await store.cleanup(); diff --git a/src/cron/service.store-migration.test.ts b/src/cron/service.store-migration.test.ts new file mode 100644 index 0000000000..ed3b25e690 --- /dev/null +++ b/src/cron/service.store-migration.test.ts @@ -0,0 +1,124 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { CronService } from "./service.js"; + +const noopLogger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), +}; + +async function makeStorePath() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-")); + return { + storePath: path.join(dir, "cron", "jobs.json"), + cleanup: async () => { + await fs.rm(dir, { recursive: true, force: true }); + }, + }; +} + +describe("CronService store migrations", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-02-06T17:00:00.000Z")); + noopLogger.debug.mockClear(); + noopLogger.info.mockClear(); + noopLogger.warn.mockClear(); + noopLogger.error.mockClear(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("migrates legacy top-level agentTurn fields and initializes missing state", async () => { + const store = await makeStorePath(); + await fs.mkdir(path.dirname(store.storePath), { recursive: true }); + await fs.writeFile( + store.storePath, + JSON.stringify( + { + version: 1, + jobs: [ + { + id: "legacy-agentturn-job", + name: "legacy agentturn", + enabled: true, + createdAtMs: Date.parse("2026-02-01T12:00:00.000Z"), + updatedAtMs: Date.parse("2026-02-05T12:00:00.000Z"), + schedule: { kind: "cron", expr: "0 23 * * *", tz: "UTC" }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + model: "openrouter/deepseek/deepseek-r1", + thinking: "high", + timeoutSeconds: 120, + allowUnsafeExternalContent: true, + deliver: true, + channel: "telegram", + to: "12345", + bestEffortDeliver: true, + payload: { kind: "agentTurn", message: "legacy payload fields" }, + }, + ], + }, + null, + 2, + ), + "utf-8", + ); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok", summary: "ok" })), + }); + + await cron.start(); + + const status = await cron.status(); + expect(status.enabled).toBe(true); + + const jobs = await cron.list({ includeDisabled: true }); + const job = jobs.find((entry) => entry.id === "legacy-agentturn-job"); + expect(job).toBeDefined(); + expect(job?.state).toBeDefined(); + expect(job?.sessionTarget).toBe("isolated"); + expect(job?.payload.kind).toBe("agentTurn"); + if (job?.payload.kind === "agentTurn") { + expect(job.payload.model).toBe("openrouter/deepseek/deepseek-r1"); + expect(job.payload.thinking).toBe("high"); + expect(job.payload.timeoutSeconds).toBe(120); + expect(job.payload.allowUnsafeExternalContent).toBe(true); + } + expect(job?.delivery).toEqual({ + mode: "announce", + channel: "telegram", + to: "12345", + bestEffort: true, + }); + + const persisted = JSON.parse(await fs.readFile(store.storePath, "utf-8")) as { + jobs: Array>; + }; + const persistedJob = persisted.jobs.find((entry) => entry.id === "legacy-agentturn-job"); + expect(persistedJob).toBeDefined(); + expect(persistedJob?.state).toEqual(expect.any(Object)); + expect(persistedJob?.model).toBeUndefined(); + expect(persistedJob?.thinking).toBeUndefined(); + expect(persistedJob?.timeoutSeconds).toBeUndefined(); + expect(persistedJob?.deliver).toBeUndefined(); + expect(persistedJob?.channel).toBeUndefined(); + expect(persistedJob?.to).toBeUndefined(); + expect(persistedJob?.bestEffortDeliver).toBeUndefined(); + + cron.stop(); + await store.cleanup(); + }); +}); diff --git a/src/cron/service.store.migration.test.ts b/src/cron/service.store.migration.test.ts index 6e0734b15b..3054a634e5 100644 --- a/src/cron/service.store.migration.test.ts +++ b/src/cron/service.store.migration.test.ts @@ -98,4 +98,49 @@ describe("cron store migration", () => { await store.cleanup(); }); + + it("adds anchorMs to legacy every schedules", async () => { + const store = await makeStorePath(); + const createdAtMs = 1_700_000_000_000; + const legacyJob = { + id: "job-every-legacy", + agentId: undefined, + name: "Legacy every", + description: null, + enabled: true, + deleteAfterRun: false, + createdAtMs, + updatedAtMs: createdAtMs, + schedule: { kind: "every", everyMs: 120_000 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { + kind: "systemEvent", + text: "tick", + }, + state: {}, + }; + await fs.mkdir(path.dirname(store.storePath), { recursive: true }); + await fs.writeFile(store.storePath, JSON.stringify({ version: 1, jobs: [legacyJob] }, null, 2)); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + }); + + await cron.start(); + cron.stop(); + + const loaded = await loadCronStore(store.storePath); + const migrated = loaded.jobs[0] as Record; + const schedule = migrated.schedule as Record; + expect(schedule.kind).toBe("every"); + expect(schedule.anchorMs).toBe(createdAtMs); + + await store.cleanup(); + }); }); diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index a01475224a..fbd96d34d9 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -20,6 +20,17 @@ import { const STUCK_RUN_MS = 2 * 60 * 60 * 1000; +function resolveEveryAnchorMs(params: { + schedule: { everyMs: number; anchorMs?: number }; + fallbackAnchorMs: number; +}) { + const raw = params.schedule.anchorMs; + if (typeof raw === "number" && Number.isFinite(raw)) { + return Math.max(0, Math.floor(raw)); + } + return Math.max(0, Math.floor(params.fallbackAnchorMs)); +} + export function assertSupportedJobSpec(job: Pick) { if (job.sessionTarget === "main" && job.payload.kind !== "systemEvent") { throw new Error('main cron jobs require payload.kind="systemEvent"'); @@ -47,6 +58,13 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und if (!job.enabled) { return undefined; } + if (job.schedule.kind === "every") { + const anchorMs = resolveEveryAnchorMs({ + schedule: job.schedule, + fallbackAnchorMs: job.createdAtMs, + }); + return computeNextRunAtMs({ ...job.schedule, anchorMs }, nowMs); + } if (job.schedule.kind === "at") { // One-shot jobs stay due until they successfully finish. if (job.state.lastStatus === "ok" && job.state.lastRunAtMs) { @@ -69,18 +87,26 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und return computeNextRunAtMs(job.schedule, nowMs); } -export function recomputeNextRuns(state: CronServiceState) { +export function recomputeNextRuns(state: CronServiceState): boolean { if (!state.store) { - return; + return false; } + let changed = false; const now = state.deps.nowMs(); for (const job of state.store.jobs) { if (!job.state) { job.state = {}; + changed = true; } if (!job.enabled) { - job.state.nextRunAtMs = undefined; - job.state.runningAtMs = undefined; + if (job.state.nextRunAtMs !== undefined) { + job.state.nextRunAtMs = undefined; + changed = true; + } + if (job.state.runningAtMs !== undefined) { + job.state.runningAtMs = undefined; + changed = true; + } continue; } const runningAt = job.state.runningAtMs; @@ -90,9 +116,15 @@ export function recomputeNextRuns(state: CronServiceState) { "cron: clearing stuck running marker", ); job.state.runningAtMs = undefined; + changed = true; + } + const newNext = computeJobNextRunAtMs(job, now); + if (job.state.nextRunAtMs !== newNext) { + job.state.nextRunAtMs = newNext; + changed = true; } - job.state.nextRunAtMs = computeJobNextRunAtMs(job, now); } + return changed; } export function nextWakeAtMs(state: CronServiceState) { @@ -110,10 +142,20 @@ export function nextWakeAtMs(state: CronServiceState) { export function createJob(state: CronServiceState, input: CronJobCreate): CronJob { const now = state.deps.nowMs(); const id = crypto.randomUUID(); + const schedule = + input.schedule.kind === "every" + ? { + ...input.schedule, + anchorMs: resolveEveryAnchorMs({ + schedule: input.schedule, + fallbackAnchorMs: now, + }), + } + : input.schedule; const deleteAfterRun = typeof input.deleteAfterRun === "boolean" ? input.deleteAfterRun - : input.schedule.kind === "at" + : schedule.kind === "at" ? true : undefined; const enabled = typeof input.enabled === "boolean" ? input.enabled : true; @@ -126,7 +168,7 @@ export function createJob(state: CronServiceState, input: CronJobCreate): CronJo deleteAfterRun, createdAtMs: now, updatedAtMs: now, - schedule: input.schedule, + schedule, sessionTarget: input.sessionTarget, wakeMode: input.wakeMode, payload: input.payload, @@ -223,6 +265,9 @@ function mergeCronPayload(existing: CronPayload, patch: CronPayloadPatch): CronP if (typeof patch.timeoutSeconds === "number") { next.timeoutSeconds = patch.timeoutSeconds; } + if (typeof patch.allowUnsafeExternalContent === "boolean") { + next.allowUnsafeExternalContent = patch.allowUnsafeExternalContent; + } if (typeof patch.deliver === "boolean") { next.deliver = patch.deliver; } @@ -297,6 +342,7 @@ function buildPayloadFromPatch(patch: CronPayloadPatch): CronPayload { model: patch.model, thinking: patch.thinking, timeoutSeconds: patch.timeoutSeconds, + allowUnsafeExternalContent: patch.allowUnsafeExternalContent, deliver: patch.deliver, channel: patch.channel, to: patch.to, @@ -334,6 +380,9 @@ function mergeCronDelivery( } export function isJobDue(job: CronJob, nowMs: number, opts: { forced: boolean }) { + if (typeof job.state.runningAtMs === "number") { + return false; + } if (opts.forced) { return true; } diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index d145976569..545261e973 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -11,7 +11,7 @@ import { } from "./jobs.js"; import { locked } from "./locked.js"; import { ensureLoaded, persist, warnIfDisabled } from "./store.js"; -import { armTimer, emit, executeJob, stopTimer, wake } from "./timer.js"; +import { armTimer, emit, executeJob, runMissedJobs, stopTimer, wake } from "./timer.js"; export async function start(state: CronServiceState) { await locked(state, async () => { @@ -19,7 +19,18 @@ export async function start(state: CronServiceState) { state.deps.log.info({ enabled: false }, "cron: disabled"); return; } - await ensureLoaded(state); + await ensureLoaded(state, { skipRecompute: true }); + const jobs = state.store?.jobs ?? []; + for (const job of jobs) { + if (typeof job.state.runningAtMs === "number") { + state.deps.log.warn( + { jobId: job.id, runningAtMs: job.state.runningAtMs }, + "cron: clearing stale running marker on startup", + ); + job.state.runningAtMs = undefined; + } + } + await runMissedJobs(state); recomputeNextRuns(state); await persist(state); armTimer(state); @@ -40,7 +51,7 @@ export function stop(state: CronServiceState) { export async function status(state: CronServiceState) { return await locked(state, async () => { - await ensureLoaded(state); + await ensureLoaded(state, { skipRecompute: true }); return { enabled: state.deps.cronEnabled, storePath: state.deps.storePath, @@ -52,7 +63,7 @@ export async function status(state: CronServiceState) { export async function list(state: CronServiceState, opts?: { includeDisabled?: boolean }) { return await locked(state, async () => { - await ensureLoaded(state); + await ensureLoaded(state, { skipRecompute: true }); const includeDisabled = opts?.includeDisabled === true; const jobs = (state.store?.jobs ?? []).filter((j) => includeDisabled || j.enabled); return jobs.toSorted((a, b) => (a.state.nextRunAtMs ?? 0) - (b.state.nextRunAtMs ?? 0)); @@ -83,6 +94,22 @@ export async function update(state: CronServiceState, id: string, patch: CronJob const job = findJobOrThrow(state, id); const now = state.deps.nowMs(); applyJobPatch(job, patch); + if (job.schedule.kind === "every") { + const anchor = job.schedule.anchorMs; + if (typeof anchor !== "number" || !Number.isFinite(anchor)) { + const patchSchedule = patch.schedule; + const fallbackAnchorMs = + patchSchedule?.kind === "every" + ? now + : typeof job.createdAtMs === "number" && Number.isFinite(job.createdAtMs) + ? job.createdAtMs + : now; + job.schedule = { + ...job.schedule, + anchorMs: Math.max(0, Math.floor(fallbackAnchorMs)), + }; + } + } job.updatedAtMs = now; if (job.enabled) { job.state.nextRunAtMs = computeJobNextRunAtMs(job, now); @@ -124,14 +151,18 @@ export async function remove(state: CronServiceState, id: string) { export async function run(state: CronServiceState, id: string, mode?: "due" | "force") { return await locked(state, async () => { warnIfDisabled(state, "run"); - await ensureLoaded(state); + await ensureLoaded(state, { skipRecompute: true }); const job = findJobOrThrow(state, id); + if (typeof job.state.runningAtMs === "number") { + return { ok: true, ran: false, reason: "already-running" as const }; + } const now = state.deps.nowMs(); const due = isJobDue(job, now, { forced: mode === "force" }); if (!due) { return { ok: true, ran: false, reason: "not-due" as const }; } await executeJob(state, job, now, { forced: mode === "force" }); + recomputeNextRuns(state); await persist(state); armTimer(state); return { ok: true, ran: true } as const; diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index 64fd9cc9e0..0847989b3d 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -9,6 +9,8 @@ export type CronEvent = { status?: "ok" | "error" | "skipped"; error?: string; summary?: string; + sessionId?: string; + sessionKey?: string; nextRunAtMs?: number; }; @@ -33,6 +35,8 @@ export type CronServiceDeps = { /** Last non-empty agent text output (not truncated). */ outputText?: string; error?: string; + sessionId?: string; + sessionKey?: string; }>; onEvent?: (evt: CronEvent) => void; }; @@ -78,6 +82,7 @@ export type CronStatusSummary = { export type CronRunResult = | { ok: true; ran: true } | { ok: true; ran: false; reason: "not-due" } + | { ok: true; ran: false; reason: "already-running" } | { ok: false }; export type CronRemoveResult = { ok: true; removed: boolean } | { ok: false; removed: false }; diff --git a/src/cron/service/store.ts b/src/cron/service/store.ts index 51aca41657..3da848f3e3 100644 --- a/src/cron/service/store.ts +++ b/src/cron/service/store.ts @@ -117,6 +117,141 @@ function stripLegacyDeliveryFields(payload: Record) { } } +function normalizePayloadKind(payload: Record) { + const raw = typeof payload.kind === "string" ? payload.kind.trim().toLowerCase() : ""; + if (raw === "agentturn") { + payload.kind = "agentTurn"; + return true; + } + if (raw === "systemevent") { + payload.kind = "systemEvent"; + return true; + } + return false; +} + +function inferPayloadIfMissing(raw: Record) { + const message = typeof raw.message === "string" ? raw.message.trim() : ""; + const text = typeof raw.text === "string" ? raw.text.trim() : ""; + if (message) { + raw.payload = { kind: "agentTurn", message }; + return true; + } + if (text) { + raw.payload = { kind: "systemEvent", text }; + return true; + } + return false; +} + +function copyTopLevelAgentTurnFields( + raw: Record, + payload: Record, +) { + let mutated = false; + + const copyTrimmedString = (field: "model" | "thinking") => { + const existing = payload[field]; + if (typeof existing === "string" && existing.trim()) { + return; + } + const value = raw[field]; + if (typeof value === "string" && value.trim()) { + payload[field] = value.trim(); + mutated = true; + } + }; + copyTrimmedString("model"); + copyTrimmedString("thinking"); + + if ( + typeof payload.timeoutSeconds !== "number" && + typeof raw.timeoutSeconds === "number" && + Number.isFinite(raw.timeoutSeconds) + ) { + payload.timeoutSeconds = Math.max(1, Math.floor(raw.timeoutSeconds)); + mutated = true; + } + + if ( + typeof payload.allowUnsafeExternalContent !== "boolean" && + typeof raw.allowUnsafeExternalContent === "boolean" + ) { + payload.allowUnsafeExternalContent = raw.allowUnsafeExternalContent; + mutated = true; + } + + if (typeof payload.deliver !== "boolean" && typeof raw.deliver === "boolean") { + payload.deliver = raw.deliver; + mutated = true; + } + if ( + typeof payload.channel !== "string" && + typeof raw.channel === "string" && + raw.channel.trim() + ) { + payload.channel = raw.channel.trim(); + mutated = true; + } + if (typeof payload.to !== "string" && typeof raw.to === "string" && raw.to.trim()) { + payload.to = raw.to.trim(); + mutated = true; + } + if ( + typeof payload.bestEffortDeliver !== "boolean" && + typeof raw.bestEffortDeliver === "boolean" + ) { + payload.bestEffortDeliver = raw.bestEffortDeliver; + mutated = true; + } + if ( + typeof payload.provider !== "string" && + typeof raw.provider === "string" && + raw.provider.trim() + ) { + payload.provider = raw.provider.trim(); + mutated = true; + } + + return mutated; +} + +function stripLegacyTopLevelFields(raw: Record) { + if ("model" in raw) { + delete raw.model; + } + if ("thinking" in raw) { + delete raw.thinking; + } + if ("timeoutSeconds" in raw) { + delete raw.timeoutSeconds; + } + if ("allowUnsafeExternalContent" in raw) { + delete raw.allowUnsafeExternalContent; + } + if ("message" in raw) { + delete raw.message; + } + if ("text" in raw) { + delete raw.text; + } + if ("deliver" in raw) { + delete raw.deliver; + } + if ("channel" in raw) { + delete raw.channel; + } + if ("to" in raw) { + delete raw.to; + } + if ("bestEffortDeliver" in raw) { + delete raw.bestEffortDeliver; + } + if ("provider" in raw) { + delete raw.provider; + } +} + async function getFileMtimeMs(path: string): Promise { try { const stats = await fs.promises.stat(path); @@ -148,6 +283,12 @@ export async function ensureLoaded( const jobs = (loaded.jobs ?? []) as unknown as Array>; let mutated = false; for (const raw of jobs) { + const state = raw.state; + if (!state || typeof state !== "object" || Array.isArray(state)) { + raw.state = {}; + mutated = true; + } + const nameRaw = raw.name; if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) { raw.name = inferLegacyName({ @@ -171,8 +312,57 @@ export async function ensureLoaded( } const payload = raw.payload; - if (payload && typeof payload === "object" && !Array.isArray(payload)) { - if (migrateLegacyCronPayload(payload as Record)) { + if ( + (!payload || typeof payload !== "object" || Array.isArray(payload)) && + inferPayloadIfMissing(raw) + ) { + mutated = true; + } + + const payloadRecord = + raw.payload && typeof raw.payload === "object" && !Array.isArray(raw.payload) + ? (raw.payload as Record) + : null; + + if (payloadRecord) { + if (normalizePayloadKind(payloadRecord)) { + mutated = true; + } + if (!payloadRecord.kind) { + if (typeof payloadRecord.message === "string" && payloadRecord.message.trim()) { + payloadRecord.kind = "agentTurn"; + mutated = true; + } else if (typeof payloadRecord.text === "string" && payloadRecord.text.trim()) { + payloadRecord.kind = "systemEvent"; + mutated = true; + } + } + if (payloadRecord.kind === "agentTurn") { + if (copyTopLevelAgentTurnFields(raw, payloadRecord)) { + mutated = true; + } + } + } + + const hadLegacyTopLevelFields = + "model" in raw || + "thinking" in raw || + "timeoutSeconds" in raw || + "allowUnsafeExternalContent" in raw || + "message" in raw || + "text" in raw || + "deliver" in raw || + "channel" in raw || + "to" in raw || + "bestEffortDeliver" in raw || + "provider" in raw; + if (hadLegacyTopLevelFields) { + stripLegacyTopLevelFields(raw); + mutated = true; + } + + if (payloadRecord) { + if (migrateLegacyCronPayload(payloadRecord)) { mutated = true; } } @@ -202,6 +392,27 @@ export async function ensureLoaded( } mutated = true; } + + const everyMsRaw = sched.everyMs; + const everyMs = + typeof everyMsRaw === "number" && Number.isFinite(everyMsRaw) + ? Math.floor(everyMsRaw) + : null; + if ((kind === "every" || sched.kind === "every") && everyMs !== null) { + const anchorRaw = sched.anchorMs; + const normalizedAnchor = + typeof anchorRaw === "number" && Number.isFinite(anchorRaw) + ? Math.max(0, Math.floor(anchorRaw)) + : typeof raw.createdAtMs === "number" && Number.isFinite(raw.createdAtMs) + ? Math.max(0, Math.floor(raw.createdAtMs)) + : typeof raw.updatedAtMs === "number" && Number.isFinite(raw.updatedAtMs) + ? Math.max(0, Math.floor(raw.updatedAtMs)) + : null; + if (normalizedAnchor !== null && anchorRaw !== normalizedAnchor) { + sched.anchorMs = normalizedAnchor; + mutated = true; + } + } } const delivery = raw.delivery; @@ -213,6 +424,11 @@ export async function ensureLoaded( (delivery as { mode?: unknown }).mode = "announce"; mutated = true; } + } else if (modeRaw === undefined || modeRaw === null) { + // Explicitly persist the default so existing jobs don't silently + // change behaviour when the runtime default shifts. + (delivery as { mode?: unknown }).mode = "announce"; + mutated = true; } } @@ -222,10 +438,6 @@ export async function ensureLoaded( mutated = true; } - const payloadRecord = - payload && typeof payload === "object" && !Array.isArray(payload) - ? (payload as Record) - : null; const payloadKind = payloadRecord && typeof payloadRecord.kind === "string" ? payloadRecord.kind : ""; const sessionTarget = diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 8af4f9bc36..8e9bfb2d58 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -1,6 +1,7 @@ import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; import type { CronJob } from "../types.js"; import type { CronEvent, CronServiceState } from "./state.js"; +import { resolveCronDeliveryPlan } from "../delivery.js"; import { computeJobNextRunAtMs, nextWakeAtMs, @@ -10,7 +11,7 @@ import { import { locked } from "./locked.js"; import { ensureLoaded, persist } from "./store.js"; -const MAX_TIMEOUT_MS = 2 ** 31 - 1; +const MAX_TIMER_DELAY_MS = 60_000; export function armTimer(state: CronServiceState) { if (state.timer) { @@ -25,12 +26,15 @@ export function armTimer(state: CronServiceState) { return; } const delay = Math.max(nextAt - state.deps.nowMs(), 0); - // Avoid TimeoutOverflowWarning when a job is far in the future. - const clampedDelay = Math.min(delay, MAX_TIMEOUT_MS); - state.timer = setTimeout(() => { - void onTimer(state).catch((err) => { + // Wake at least once a minute to avoid schedule drift and recover quickly + // when the process was paused or wall-clock time jumps. + const clampedDelay = Math.min(delay, MAX_TIMER_DELAY_MS); + state.timer = setTimeout(async () => { + try { + await onTimer(state); + } catch (err) { state.deps.log.error({ err: String(err) }, "cron: timer tick failed"); - }); + } }, clampedDelay); } @@ -40,22 +44,169 @@ export async function onTimer(state: CronServiceState) { } state.running = true; try { - await locked(state, async () => { - // Reload persisted due-times without recomputing so runDueJobs sees - // the original nextRunAtMs values. Recomputing first would advance - // every/cron slots past the current tick when the timer fires late (#9788). + const dueJobs = await locked(state, async () => { await ensureLoaded(state, { forceReload: true, skipRecompute: true }); - await runDueJobs(state); - recomputeNextRuns(state); + const due = findDueJobs(state); + + if (due.length === 0) { + const changed = recomputeNextRuns(state); + if (changed) { + await persist(state); + } + return []; + } + + const now = state.deps.nowMs(); + for (const job of due) { + job.state.runningAtMs = now; + job.state.lastError = undefined; + } await persist(state); + + return due.map((j) => ({ + id: j.id, + job: j, + })); }); + + const results: Array<{ + jobId: string; + status: "ok" | "error" | "skipped"; + error?: string; + summary?: string; + sessionId?: string; + sessionKey?: string; + startedAt: number; + endedAt: number; + }> = []; + + for (const { id, job } of dueJobs) { + const startedAt = state.deps.nowMs(); + job.state.runningAtMs = startedAt; + emit(state, { jobId: job.id, action: "started", runAtMs: startedAt }); + try { + const result = await executeJobCore(state, job); + results.push({ jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() }); + } catch (err) { + results.push({ + jobId: id, + status: "error", + error: String(err), + startedAt, + endedAt: state.deps.nowMs(), + }); + } + } + + if (results.length > 0) { + await locked(state, async () => { + await ensureLoaded(state, { forceReload: true, skipRecompute: true }); + + for (const result of results) { + const job = state.store?.jobs.find((j) => j.id === result.jobId); + if (!job) { + continue; + } + + const startedAt = result.startedAt; + job.state.runningAtMs = undefined; + job.state.lastRunAtMs = startedAt; + job.state.lastStatus = result.status; + job.state.lastDurationMs = Math.max(0, result.endedAt - startedAt); + job.state.lastError = result.error; + + const shouldDelete = + job.schedule.kind === "at" && result.status === "ok" && job.deleteAfterRun === true; + + if (!shouldDelete) { + if (job.schedule.kind === "at" && result.status === "ok") { + job.enabled = false; + job.state.nextRunAtMs = undefined; + } else if (job.enabled) { + job.state.nextRunAtMs = computeJobNextRunAtMs(job, result.endedAt); + } else { + job.state.nextRunAtMs = undefined; + } + } + + emit(state, { + jobId: job.id, + action: "finished", + status: result.status, + error: result.error, + summary: result.summary, + sessionId: result.sessionId, + sessionKey: result.sessionKey, + runAtMs: startedAt, + durationMs: job.state.lastDurationMs, + nextRunAtMs: job.state.nextRunAtMs, + }); + + if (shouldDelete && state.store) { + state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id); + emit(state, { jobId: job.id, action: "removed" }); + } + + job.updatedAtMs = result.endedAt; + } + + recomputeNextRuns(state); + await persist(state); + }); + } } finally { state.running = false; - // Always re-arm so transient errors (e.g. ENOSPC) don't kill the scheduler. armTimer(state); } } +function findDueJobs(state: CronServiceState): CronJob[] { + if (!state.store) { + return []; + } + const now = state.deps.nowMs(); + return state.store.jobs.filter((j) => { + if (!j.enabled) { + return false; + } + if (typeof j.state.runningAtMs === "number") { + return false; + } + const next = j.state.nextRunAtMs; + return typeof next === "number" && now >= next; + }); +} + +export async function runMissedJobs(state: CronServiceState) { + if (!state.store) { + return; + } + const now = state.deps.nowMs(); + const missed = state.store.jobs.filter((j) => { + if (!j.enabled) { + return false; + } + if (typeof j.state.runningAtMs === "number") { + return false; + } + const next = j.state.nextRunAtMs; + if (j.schedule.kind === "at" && j.state.lastStatus === "ok") { + return false; + } + return typeof next === "number" && now >= next; + }); + + if (missed.length > 0) { + state.deps.log.info( + { count: missed.length, jobIds: missed.map((j) => j.id) }, + "cron: running missed jobs after restart", + ); + for (const job of missed) { + await executeJob(state, job, now, { forced: false }); + } + } +} + export async function runDueJobs(state: CronServiceState) { if (!state.store) { return; @@ -76,6 +227,99 @@ export async function runDueJobs(state: CronServiceState) { } } +async function executeJobCore( + state: CronServiceState, + job: CronJob, +): Promise<{ + status: "ok" | "error" | "skipped"; + error?: string; + summary?: string; + sessionId?: string; + sessionKey?: string; +}> { + if (job.sessionTarget === "main") { + const text = resolveJobPayloadTextForMain(job); + if (!text) { + const kind = job.payload.kind; + return { + status: "skipped", + error: + kind === "systemEvent" + ? "main job requires non-empty systemEvent text" + : 'main job requires payload.kind="systemEvent"', + }; + } + state.deps.enqueueSystemEvent(text, { agentId: job.agentId }); + if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) { + const reason = `cron:${job.id}`; + const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + const maxWaitMs = 2 * 60_000; + const waitStartedAt = state.deps.nowMs(); + + let heartbeatResult: HeartbeatRunResult; + for (;;) { + heartbeatResult = await state.deps.runHeartbeatOnce({ reason }); + if ( + heartbeatResult.status !== "skipped" || + heartbeatResult.reason !== "requests-in-flight" + ) { + break; + } + if (state.deps.nowMs() - waitStartedAt > maxWaitMs) { + state.deps.requestHeartbeatNow({ reason }); + return { status: "ok", summary: text }; + } + await delay(250); + } + + if (heartbeatResult.status === "ran") { + return { status: "ok", summary: text }; + } else if (heartbeatResult.status === "skipped") { + return { status: "skipped", error: heartbeatResult.reason, summary: text }; + } else { + return { status: "error", error: heartbeatResult.reason, summary: text }; + } + } else { + state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` }); + return { status: "ok", summary: text }; + } + } + + if (job.payload.kind !== "agentTurn") { + return { status: "skipped", error: "isolated job requires payload.kind=agentTurn" }; + } + + const res = await state.deps.runIsolatedAgentJob({ + job, + message: job.payload.message, + }); + + // Post a short summary back to the main session. + const summaryText = res.summary?.trim(); + const deliveryPlan = resolveCronDeliveryPlan(job); + if (summaryText && deliveryPlan.requested) { + const prefix = "Cron"; + const label = + res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`; + state.deps.enqueueSystemEvent(label, { agentId: job.agentId }); + if (job.wakeMode === "now") { + state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` }); + } + } + + return { + status: res.status, + error: res.error, + summary: res.summary, + sessionId: res.sessionId, + sessionKey: res.sessionKey, + }; +} + +/** + * Execute a job. This version is used by the `run` command and other + * places that need the full execution with state updates. + */ export async function executeJob( state: CronServiceState, job: CronJob, @@ -89,7 +333,12 @@ export async function executeJob( let deleted = false; - const finish = async (status: "ok" | "error" | "skipped", err?: string, summary?: string) => { + const finish = async ( + status: "ok" | "error" | "skipped", + err?: string, + summary?: string, + session?: { sessionId?: string; sessionKey?: string }, + ) => { const endedAt = state.deps.nowMs(); job.state.runningAtMs = undefined; job.state.lastRunAtMs = startedAt; @@ -102,7 +351,6 @@ export async function executeJob( if (!shouldDelete) { if (job.schedule.kind === "at" && status === "ok") { - // One-shot job completed successfully; disable it. job.enabled = false; job.state.nextRunAtMs = undefined; } else if (job.enabled) { @@ -118,6 +366,8 @@ export async function executeJob( status, error: err, summary, + sessionId: session?.sessionId, + sessionKey: session?.sessionKey, runAtMs: startedAt, durationMs: job.state.lastDurationMs, nextRunAtMs: job.state.nextRunAtMs, @@ -131,96 +381,16 @@ export async function executeJob( }; try { - if (job.sessionTarget === "main") { - const text = resolveJobPayloadTextForMain(job); - if (!text) { - const kind = job.payload.kind; - await finish( - "skipped", - kind === "systemEvent" - ? "main job requires non-empty systemEvent text" - : 'main job requires payload.kind="systemEvent"', - ); - return; - } - state.deps.enqueueSystemEvent(text, { agentId: job.agentId }); - if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) { - const reason = `cron:${job.id}`; - const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); - const maxWaitMs = 2 * 60_000; - const waitStartedAt = state.deps.nowMs(); - - let heartbeatResult: HeartbeatRunResult; - for (;;) { - heartbeatResult = await state.deps.runHeartbeatOnce({ reason }); - if ( - heartbeatResult.status !== "skipped" || - heartbeatResult.reason !== "requests-in-flight" - ) { - break; - } - if (state.deps.nowMs() - waitStartedAt > maxWaitMs) { - heartbeatResult = { - status: "skipped", - reason: "timeout waiting for main lane to become idle", - }; - break; - } - await delay(250); - } - - if (heartbeatResult.status === "ran") { - await finish("ok", undefined, text); - } else if (heartbeatResult.status === "skipped") { - await finish("skipped", heartbeatResult.reason, text); - } else { - await finish("error", heartbeatResult.reason, text); - } - } else { - // wakeMode is "next-heartbeat" or runHeartbeatOnce not available - state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` }); - await finish("ok", undefined, text); - } - return; - } - - if (job.payload.kind !== "agentTurn") { - await finish("skipped", "isolated job requires payload.kind=agentTurn"); - return; - } - - const res = await state.deps.runIsolatedAgentJob({ - job, - message: job.payload.message, + const result = await executeJobCore(state, job); + await finish(result.status, result.error, result.summary, { + sessionId: result.sessionId, + sessionKey: result.sessionKey, }); - - // Post a short summary back to the main session so the user sees - // the cron result without opening the isolated session. - const summaryText = res.summary?.trim(); - const deliveryMode = job.delivery?.mode ?? "announce"; - if (summaryText && deliveryMode !== "none") { - const prefix = "Cron"; - const label = - res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`; - state.deps.enqueueSystemEvent(label, { agentId: job.agentId }); - if (job.wakeMode === "now") { - state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` }); - } - } - - if (res.status === "ok") { - await finish("ok", undefined, res.summary); - } else if (res.status === "skipped") { - await finish("skipped", undefined, res.summary); - } else { - await finish("error", res.error ?? "cron job failed", res.summary); - } } catch (err) { await finish("error", String(err)); } finally { job.updatedAtMs = nowMs; if (!opts.forced && job.enabled && !deleted) { - // Keep nextRunAtMs in sync in case the schedule advanced during a long run. job.state.nextRunAtMs = computeJobNextRunAtMs(job, state.deps.nowMs()); } } diff --git a/src/cron/store.test.ts b/src/cron/store.test.ts new file mode 100644 index 0000000000..ec80160df2 --- /dev/null +++ b/src/cron/store.test.ts @@ -0,0 +1,32 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { describe, expect, it } from "vitest"; +import { loadCronStore } from "./store.js"; + +async function makeStorePath() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-store-")); + return { + dir, + storePath: path.join(dir, "jobs.json"), + cleanup: async () => { + await fs.rm(dir, { recursive: true, force: true }); + }, + }; +} + +describe("cron store", () => { + it("returns empty store when file does not exist", async () => { + const store = await makeStorePath(); + const loaded = await loadCronStore(store.storePath); + expect(loaded).toEqual({ version: 1, jobs: [] }); + await store.cleanup(); + }); + + it("throws when store contains invalid JSON", async () => { + const store = await makeStorePath(); + await fs.writeFile(store.storePath, "{ not json", "utf-8"); + await expect(loadCronStore(store.storePath)).rejects.toThrow(/Failed to parse cron store/i); + await store.cleanup(); + }); +}); diff --git a/src/cron/store.ts b/src/cron/store.ts index 5fb296153a..21bc182452 100644 --- a/src/cron/store.ts +++ b/src/cron/store.ts @@ -22,14 +22,28 @@ export function resolveCronStorePath(storePath?: string) { export async function loadCronStore(storePath: string): Promise { try { const raw = await fs.promises.readFile(storePath, "utf-8"); - const parsed = JSON5.parse(raw); - const jobs = Array.isArray(parsed?.jobs) ? (parsed?.jobs as never[]) : []; + let parsed: unknown; + try { + parsed = JSON5.parse(raw); + } catch (err) { + throw new Error(`Failed to parse cron store at ${storePath}: ${String(err)}`, { + cause: err, + }); + } + const parsedRecord = + parsed && typeof parsed === "object" && !Array.isArray(parsed) + ? (parsed as Record) + : {}; + const jobs = Array.isArray(parsedRecord.jobs) ? (parsedRecord.jobs as never[]) : []; return { version: 1, jobs: jobs.filter(Boolean) as never as CronStoreFile["jobs"], }; - } catch { - return { version: 1, jobs: [] }; + } catch (err) { + if ((err as { code?: unknown })?.code === "ENOENT") { + return { version: 1, jobs: [] }; + } + throw err; } } diff --git a/src/gateway/protocol/schema/cron.ts b/src/gateway/protocol/schema/cron.ts index ce9479d1ad..c8238c50f1 100644 --- a/src/gateway/protocol/schema/cron.ts +++ b/src/gateway/protocol/schema/cron.ts @@ -42,6 +42,11 @@ export const CronPayloadSchema = Type.Union([ model: Type.Optional(Type.String()), thinking: Type.Optional(Type.String()), timeoutSeconds: Type.Optional(Type.Integer({ minimum: 1 })), + allowUnsafeExternalContent: Type.Optional(Type.Boolean()), + deliver: Type.Optional(Type.Boolean()), + channel: Type.Optional(Type.String()), + to: Type.Optional(Type.String()), + bestEffortDeliver: Type.Optional(Type.Boolean()), }, { additionalProperties: false }, ), @@ -62,6 +67,11 @@ export const CronPayloadPatchSchema = Type.Union([ model: Type.Optional(Type.String()), thinking: Type.Optional(Type.String()), timeoutSeconds: Type.Optional(Type.Integer({ minimum: 1 })), + allowUnsafeExternalContent: Type.Optional(Type.Boolean()), + deliver: Type.Optional(Type.Boolean()), + channel: Type.Optional(Type.String()), + to: Type.Optional(Type.String()), + bestEffortDeliver: Type.Optional(Type.Boolean()), }, { additionalProperties: false }, ), @@ -239,6 +249,8 @@ export const CronRunLogEntrySchema = Type.Object( ), error: Type.Optional(Type.String()), summary: Type.Optional(Type.String()), + sessionId: Type.Optional(NonEmptyString), + sessionKey: Type.Optional(NonEmptyString), runAtMs: Type.Optional(Type.Integer({ minimum: 0 })), durationMs: Type.Optional(Type.Integer({ minimum: 0 })), nextRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })), diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index 68b0bc095e..12b0fe6b6c 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -90,6 +90,8 @@ export function buildGatewayCronService(params: { status: evt.status, error: evt.error, summary: evt.summary, + sessionId: evt.sessionId, + sessionKey: evt.sessionKey, runAtMs: evt.runAtMs, durationMs: evt.durationMs, nextRunAtMs: evt.nextRunAtMs, diff --git a/src/gateway/server-methods/cron.ts b/src/gateway/server-methods/cron.ts index 703103860f..023d9d3633 100644 --- a/src/gateway/server-methods/cron.ts +++ b/src/gateway/server-methods/cron.ts @@ -189,7 +189,7 @@ export const cronHandlers: GatewayRequestHandlers = { ); return; } - const result = await context.cron.run(jobId, p.mode); + const result = await context.cron.run(jobId, p.mode ?? "force"); respond(true, result, undefined); }, "cron.runs": async ({ params, respond, context }) => { diff --git a/src/gateway/server.cron.e2e.test.ts b/src/gateway/server.cron.e2e.test.ts index fc37f1702b..8e9d242e4f 100644 --- a/src/gateway/server.cron.e2e.test.ts +++ b/src/gateway/server.cron.e2e.test.ts @@ -117,7 +117,7 @@ describe("gateway server cron", () => { | { schedule?: unknown; sessionTarget?: unknown; wakeMode?: unknown } | undefined; expect(wrappedPayload?.sessionTarget).toBe("main"); - expect(wrappedPayload?.wakeMode).toBe("next-heartbeat"); + expect(wrappedPayload?.wakeMode).toBe("now"); expect((wrappedPayload?.schedule as { kind?: unknown } | undefined)?.kind).toBe("at"); const patchRes = await rpcReq(ws, "cron.add", { @@ -181,6 +181,32 @@ describe("gateway server cron", () => { expect(merged?.delivery?.channel).toBe("telegram"); expect(merged?.delivery?.to).toBe("19098680"); + const legacyDeliveryPatchRes = await rpcReq(ws, "cron.update", { + id: mergeJobId, + patch: { + payload: { + kind: "agentTurn", + deliver: true, + channel: "signal", + to: "+15550001111", + bestEffortDeliver: true, + }, + }, + }); + expect(legacyDeliveryPatchRes.ok).toBe(true); + const legacyDeliveryPatched = legacyDeliveryPatchRes.payload as + | { + payload?: { kind?: unknown; message?: unknown }; + delivery?: { mode?: unknown; channel?: unknown; to?: unknown; bestEffort?: unknown }; + } + | undefined; + expect(legacyDeliveryPatched?.payload?.kind).toBe("agentTurn"); + expect(legacyDeliveryPatched?.payload?.message).toBe("hello"); + expect(legacyDeliveryPatched?.delivery?.mode).toBe("announce"); + expect(legacyDeliveryPatched?.delivery?.channel).toBe("signal"); + expect(legacyDeliveryPatched?.delivery?.to).toBe("+15550001111"); + expect(legacyDeliveryPatched?.delivery?.bestEffort).toBe(true); + const rejectRes = await rpcReq(ws, "cron.add", { name: "patch reject", enabled: true, diff --git a/src/gateway/session-utils.test.ts b/src/gateway/session-utils.test.ts index 76798db437..2fb51153d4 100644 --- a/src/gateway/session-utils.test.ts +++ b/src/gateway/session-utils.test.ts @@ -331,4 +331,29 @@ describe("listSessionsFromStore search", () => { }); expect(result.sessions.length).toBe(1); }); + + test("hides cron run alias session keys from sessions list", () => { + const now = Date.now(); + const store: Record = { + "agent:main:cron:job-1": { + sessionId: "run-abc", + updatedAt: now, + label: "Cron: job-1", + } as SessionEntry, + "agent:main:cron:job-1:run:run-abc": { + sessionId: "run-abc", + updatedAt: now, + label: "Cron: job-1", + } as SessionEntry, + }; + + const result = listSessionsFromStore({ + cfg: baseCfg, + storePath: "/tmp/sessions.json", + store, + opts: {}, + }); + + expect(result.sessions.map((session) => session.key)).toEqual(["agent:main:cron:job-1"]); + }); }); diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index 80ea40e408..bbbbc575ec 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -207,6 +207,12 @@ export function classifySessionKey(key: string, entry?: SessionEntry): GatewaySe return "direct"; } +function isCronRunSessionKey(key: string): boolean { + const parsed = parseAgentSessionKey(key); + const raw = parsed?.rest ?? key; + return /^cron:[^:]+:run:[^:]+$/.test(raw); +} + export function parseGroupKey( key: string, ): { channel?: string; kind?: "group" | "channel"; id?: string } | null { @@ -568,6 +574,9 @@ export function listSessionsFromStore(params: { let sessions = Object.entries(store) .filter(([key]) => { + if (isCronRunSessionKey(key)) { + return false; + } if (!includeGlobal && key === "global") { return false; } diff --git a/ui/src/styles/components.css b/ui/src/styles/components.css index 5ab1858b81..f0438eeec2 100644 --- a/ui/src/styles/components.css +++ b/ui/src/styles/components.css @@ -681,6 +681,138 @@ width: 100%; } +/* Cron jobs: allow long payload/state text and keep action buttons inside the card. */ +.cron-job-payload, +.cron-job-agent, +.cron-job-state { + overflow-wrap: anywhere; + word-break: break-word; +} + +.cron-job .list-title { + font-weight: 600; + font-size: 15px; + letter-spacing: -0.015em; +} + +.cron-job { + grid-template-columns: minmax(0, 1fr) minmax(240px, 300px); + grid-template-areas: + "main meta" + "footer footer"; + row-gap: 10px; +} + +.cron-job .list-main { + grid-area: main; +} + +.cron-job .list-meta { + grid-area: meta; + min-width: 240px; + gap: 8px; +} + +.cron-job-footer { + grid-area: footer; + display: flex; + justify-content: space-between; + align-items: center; + gap: 12px; + border-top: 1px solid var(--border); + padding-top: 10px; +} + +.cron-job-chips { + flex: 1 1 auto; +} + +.cron-job-detail { + display: grid; + gap: 3px; + margin-top: 2px; +} + +.cron-job-detail-label { + color: var(--muted); + font-size: 11px; + font-weight: 600; + letter-spacing: 0.03em; + text-transform: uppercase; +} + +.cron-job-detail-value { + font-size: 13px; + line-height: 1.35; +} + +.cron-job-state { + display: grid; + gap: 4px; +} + +.cron-job-state-row { + display: flex; + justify-content: space-between; + align-items: baseline; + gap: 10px; +} + +.cron-job-state-key { + color: var(--muted); + font-size: 10px; + font-weight: 600; + letter-spacing: 0.05em; + text-transform: uppercase; +} + +.cron-job-state-value { + color: var(--text); + font-size: 12px; + white-space: nowrap; +} + +.cron-job-status-pill { + font-size: 11px; + font-weight: 600; + border: 1px solid var(--border); + border-radius: var(--radius-full); + padding: 2px 8px; + text-transform: lowercase; +} + +.cron-job-status-ok { + color: var(--ok); + border-color: rgba(34, 197, 94, 0.35); + background: var(--ok-subtle); +} + +.cron-job-status-error { + color: var(--danger); + border-color: rgba(239, 68, 68, 0.35); + background: var(--danger-subtle); +} + +.cron-job-status-skipped { + color: var(--warn); + border-color: rgba(245, 158, 11, 0.35); + background: var(--warn-subtle); +} + +.cron-job-status-na { + color: var(--muted); +} + +.cron-job-actions { + flex-wrap: wrap; + justify-content: flex-end; + margin-top: 0; +} + +.cron-job-actions .btn { + flex: 0 0 auto; +} + @container (max-width: 560px) { .list-item { grid-template-columns: 1fr; @@ -690,6 +822,23 @@ min-width: 0; text-align: left; } + + .cron-job-actions { + justify-content: flex-start; + } + + .cron-job { + grid-template-columns: 1fr; + grid-template-areas: + "main" + "meta" + "footer"; + } + + .cron-job-footer { + flex-direction: column; + align-items: stretch; + } } /* =========================================== @@ -737,6 +886,12 @@ background: var(--warn-subtle); } +.chip-danger { + color: var(--danger); + border-color: rgba(239, 68, 68, 0.3); + background: var(--danger-subtle); +} + /* =========================================== Tables =========================================== */ @@ -783,6 +938,22 @@ text-decoration: underline; } +.session-key-cell { + display: grid; + gap: 4px; + min-width: 0; +} + +.session-key-cell .session-link, +.session-key-display-name { + overflow-wrap: anywhere; + word-break: break-word; +} + +.session-key-display-name { + font-size: 11px; +} + /* =========================================== Log Stream =========================================== */ diff --git a/ui/src/ui/app-defaults.ts b/ui/src/ui/app-defaults.ts index 6521d07487..89bdaf11d1 100644 --- a/ui/src/ui/app-defaults.ts +++ b/ui/src/ui/app-defaults.ts @@ -22,7 +22,7 @@ export const DEFAULT_CRON_FORM: CronFormState = { cronExpr: "0 7 * * *", cronTz: "", sessionTarget: "isolated", - wakeMode: "next-heartbeat", + wakeMode: "now", payloadKind: "agentTurn", payloadText: "", deliveryMode: "announce", diff --git a/ui/src/ui/app-render.helpers.ts b/ui/src/ui/app-render.helpers.ts index d2bc9aa906..c122585998 100644 --- a/ui/src/ui/app-render.helpers.ts +++ b/ui/src/ui/app-render.helpers.ts @@ -206,13 +206,13 @@ function resolveMainSessionKey( } function resolveSessionDisplayName(key: string, row?: SessionsListResult["sessions"][number]) { - const label = row?.label?.trim(); - if (label) { + const label = row?.label?.trim() || ""; + const displayName = row?.displayName?.trim() || ""; + if (label && label !== key) { return `${label} (${key})`; } - const displayName = row?.displayName?.trim(); - if (displayName) { - return displayName; + if (displayName && displayName !== key) { + return `${key} (${displayName})`; } return key; } diff --git a/ui/src/ui/app-render.ts b/ui/src/ui/app-render.ts index f5c71c5792..d416bde447 100644 --- a/ui/src/ui/app-render.ts +++ b/ui/src/ui/app-render.ts @@ -581,6 +581,7 @@ export function renderApp(state: AppViewState) { ${ state.tab === "cron" ? renderCron({ + basePath: state.basePath, loading: state.cronLoading, status: state.cronStatus, jobs: state.cronJobs, diff --git a/ui/src/ui/format.test.ts b/ui/src/ui/format.test.ts index 8e1f121ea6..4260f07dac 100644 --- a/ui/src/ui/format.test.ts +++ b/ui/src/ui/format.test.ts @@ -2,8 +2,8 @@ import { describe, expect, it } from "vitest"; import { formatAgo, stripThinkingTags } from "./format.ts"; describe("formatAgo", () => { - it("returns 'just now' for timestamps less than 60s in the future", () => { - expect(formatAgo(Date.now() + 30_000)).toBe("just now"); + it("returns 'in <1m' for timestamps less than 60s in the future", () => { + expect(formatAgo(Date.now() + 30_000)).toBe("in <1m"); }); it("returns 'Xm from now' for future timestamps", () => { diff --git a/ui/src/ui/format.ts b/ui/src/ui/format.ts index 812aaa3fb1..91debb2e41 100644 --- a/ui/src/ui/format.ts +++ b/ui/src/ui/format.ts @@ -16,7 +16,7 @@ export function formatAgo(ms?: number | null): string { const suffix = diff < 0 ? "from now" : "ago"; const sec = Math.round(absDiff / 1000); if (sec < 60) { - return diff < 0 ? "just now" : `${sec}s ago`; + return diff < 0 ? "in <1m" : `${sec}s ago`; } const min = Math.round(sec / 60); if (min < 60) { diff --git a/ui/src/ui/types.ts b/ui/src/ui/types.ts index d1d3f432b5..1c85b87319 100644 --- a/ui/src/ui/types.ts +++ b/ui/src/ui/types.ts @@ -704,6 +704,8 @@ export type CronRunLogEntry = { durationMs?: number; error?: string; summary?: string; + sessionId?: string; + sessionKey?: string; }; export type SkillsStatusConfigCheck = { diff --git a/ui/src/ui/views/cron.test.ts b/ui/src/ui/views/cron.test.ts index 6d1e2c7a4d..ea74093afe 100644 --- a/ui/src/ui/views/cron.test.ts +++ b/ui/src/ui/views/cron.test.ts @@ -20,6 +20,7 @@ function createJob(id: string): CronJob { function createProps(overrides: Partial = {}): CronProps { return { + basePath: "", loading: false, status: null, jobs: [], @@ -70,7 +71,7 @@ describe("cron view", () => { expect(onLoadRuns).toHaveBeenCalledWith("job-1"); }); - it("marks the selected job and keeps Runs button to a single call", () => { + it("marks the selected job and keeps History button to a single call", () => { const container = document.createElement("div"); const onLoadRuns = vi.fn(); const job = createJob("job-1"); @@ -88,13 +89,73 @@ describe("cron view", () => { const selected = container.querySelector(".list-item-selected"); expect(selected).not.toBeNull(); - const runsButton = Array.from(container.querySelectorAll("button")).find( - (btn) => btn.textContent?.trim() === "Runs", + const historyButton = Array.from(container.querySelectorAll("button")).find( + (btn) => btn.textContent?.trim() === "History", ); - expect(runsButton).not.toBeUndefined(); - runsButton?.dispatchEvent(new MouseEvent("click", { bubbles: true })); + expect(historyButton).not.toBeUndefined(); + historyButton?.dispatchEvent(new MouseEvent("click", { bubbles: true })); expect(onLoadRuns).toHaveBeenCalledTimes(1); expect(onLoadRuns).toHaveBeenCalledWith("job-1"); }); + + it("renders run chat links when session keys are present", () => { + const container = document.createElement("div"); + render( + renderCron( + createProps({ + basePath: "/ui", + runsJobId: "job-1", + runs: [ + { + ts: Date.now(), + jobId: "job-1", + status: "ok", + summary: "done", + sessionKey: "agent:main:cron:job-1:run:abc", + }, + ], + }), + ), + container, + ); + + const link = container.querySelector("a.session-link"); + expect(link).not.toBeNull(); + expect(link?.getAttribute("href")).toContain( + "/ui/chat?session=agent%3Amain%3Acron%3Ajob-1%3Arun%3Aabc", + ); + }); + + it("shows selected job name and sorts run history newest first", () => { + const container = document.createElement("div"); + const job = createJob("job-1"); + render( + renderCron( + createProps({ + jobs: [job], + runsJobId: "job-1", + runs: [ + { ts: 1, jobId: "job-1", status: "ok", summary: "older run" }, + { ts: 2, jobId: "job-1", status: "ok", summary: "newer run" }, + ], + }), + ), + container, + ); + + expect(container.textContent).toContain("Latest runs for Daily ping."); + + const cards = Array.from(container.querySelectorAll(".card")); + const runHistoryCard = cards.find( + (card) => card.querySelector(".card-title")?.textContent?.trim() === "Run history", + ); + expect(runHistoryCard).not.toBeUndefined(); + + const summaries = Array.from( + runHistoryCard?.querySelectorAll(".list-item .list-sub") ?? [], + ).map((el) => (el.textContent ?? "").trim()); + expect(summaries[0]).toBe("newer run"); + expect(summaries[1]).toBe("older run"); + }); }); diff --git a/ui/src/ui/views/cron.ts b/ui/src/ui/views/cron.ts index a957cf1a20..7b87826eaa 100644 --- a/ui/src/ui/views/cron.ts +++ b/ui/src/ui/views/cron.ts @@ -1,15 +1,12 @@ import { html, nothing } from "lit"; import type { ChannelUiMetaEntry, CronJob, CronRunLogEntry, CronStatus } from "../types.ts"; import type { CronFormState } from "../ui-types.ts"; -import { formatMs } from "../format.ts"; -import { - formatCronPayload, - formatCronSchedule, - formatCronState, - formatNextRun, -} from "../presenter.ts"; +import { formatAgo, formatMs } from "../format.ts"; +import { pathForTab } from "../navigation.ts"; +import { formatCronSchedule, formatNextRun } from "../presenter.ts"; export type CronProps = { + basePath: string; loading: boolean; status: CronStatus | null; jobs: CronJob[]; @@ -59,6 +56,10 @@ function resolveChannelLabel(props: CronProps, channel: string): string { export function renderCron(props: CronProps) { const channelOptions = buildChannelOptions(props); + const selectedJob = + props.runsJobId == null ? undefined : props.jobs.find((job) => job.id === props.runsJobId); + const selectedRunTitle = selectedJob?.name ?? props.runsJobId ?? "(select a job)"; + const orderedRuns = props.runs.toSorted((a, b) => b.ts - a.ts); return html`
@@ -167,8 +168,8 @@ export function renderCron(props: CronProps) { wakeMode: (e.target as HTMLSelectElement).value as CronFormState["wakeMode"], })} > - +