fix: prevent duplicate cron runs across hot reloads

This commit is contained in:
Peter Steinberger
2026-01-20 10:35:54 +00:00
parent da7da30b22
commit 47cf28f6b6
4 changed files with 113 additions and 4 deletions

View File

@@ -10,6 +10,7 @@ Docs: https://docs.clawd.bot
- Config: centralize default agent concurrency limits.
### Fixes
- Cron: serialize scheduler operations per store path to prevent duplicate runs across hot reloads. (#1216) — thanks @carlulsoe.
- Web search: infer Perplexity base URL from API key source (direct vs OpenRouter).
- Agents: treat OAuth refresh failures as auth errors to trigger model fallback. (#1261) — thanks @zknicker.
- TUI: keep thinking blocks ordered before content during streaming and isolate per-run assembly. (#1202) — thanks @aaronveklabs.

View File

@@ -0,0 +1,89 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { CronService } from "./service.js";
const noopLogger = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
async function makeStorePath() {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-cron-"));
return {
storePath: path.join(dir, "cron", "jobs.json"),
cleanup: async () => {
await fs.rm(dir, { recursive: true, force: true });
},
};
}
describe("CronService", () => {
beforeEach(() => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2025-12-13T00:00:00.000Z"));
noopLogger.debug.mockClear();
noopLogger.info.mockClear();
noopLogger.warn.mockClear();
noopLogger.error.mockClear();
});
afterEach(() => {
vi.useRealTimers();
});
it("avoids duplicate runs when two services share a store", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const runIsolatedAgentJob = vi.fn(async () => ({ status: "ok" }));
const cronA = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob,
});
await cronA.start();
const atMs = Date.parse("2025-12-13T00:00:01.000Z");
await cronA.add({
name: "shared store job",
enabled: true,
schedule: { kind: "at", atMs },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "hello" },
});
const cronB = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob,
});
await cronB.start();
vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));
await vi.runOnlyPendingTimersAsync();
await cronA.status();
await cronB.status();
expect(enqueueSystemEvent).toHaveBeenCalledTimes(1);
expect(requestHeartbeatNow).toHaveBeenCalledTimes(1);
cronA.stop();
cronB.stop();
await store.cleanup();
});
});

View File

@@ -1,11 +1,22 @@
import type { CronServiceState } from "./state.js";
export async function locked<T>(state: CronServiceState, fn: () => Promise<T>): Promise<T> {
const next = state.op.then(fn, fn);
// Keep the chain alive even when the operation fails.
state.op = next.then(
const storeLocks = new Map<string, Promise<void>>();
const resolveChain = (promise: Promise<unknown>) =>
promise.then(
() => undefined,
() => undefined,
);
export async function locked<T>(state: CronServiceState, fn: () => Promise<T>): Promise<T> {
const storePath = state.deps.storePath;
const storeOp = storeLocks.get(storePath) ?? Promise.resolve();
const next = Promise.all([resolveChain(state.op), resolveChain(storeOp)]).then(fn);
// Keep the chain alive even when the operation fails.
const keepAlive = resolveChain(next);
state.op = keepAlive;
storeLocks.set(storePath, keepAlive);
return (await next) as T;
}

View File

@@ -4,8 +4,15 @@ import type { CronJob } from "../types.js";
import { inferLegacyName, normalizeOptionalText } from "./normalize.js";
import type { CronServiceState } from "./state.js";
const storeCache = new Map<string, { version: 1; jobs: CronJob[] }>();
export async function ensureLoaded(state: CronServiceState) {
if (state.store) return;
const cached = storeCache.get(state.deps.storePath);
if (cached) {
state.store = cached;
return;
}
const loaded = await loadCronStore(state.deps.storePath);
const jobs = (loaded.jobs ?? []) as unknown as Array<Record<string, unknown>>;
let mutated = false;
@@ -35,6 +42,7 @@ export async function ensureLoaded(state: CronServiceState) {
}
}
state.store = { version: 1, jobs: jobs as unknown as CronJob[] };
storeCache.set(state.deps.storePath, state.store);
if (mutated) await persist(state);
}