mirror of
https://github.com/openclaw/openclaw.git
synced 2026-02-09 05:19:32 +08:00
Memory: queue forced QMD sync and handle sqlite busy reads
This commit is contained in:
committed by
Vignesh
parent
ce715c4c56
commit
0d60ef6fef
@@ -253,6 +253,61 @@ describe("QmdMemoryManager", () => {
|
||||
await manager.close();
|
||||
});
|
||||
|
||||
it("queues a forced sync behind an in-flight update", async () => {
|
||||
cfg = {
|
||||
...cfg,
|
||||
memory: {
|
||||
backend: "qmd",
|
||||
qmd: {
|
||||
includeDefaultMemory: false,
|
||||
update: {
|
||||
interval: "0s",
|
||||
debounceMs: 0,
|
||||
onBoot: false,
|
||||
updateTimeoutMs: 1_000,
|
||||
},
|
||||
paths: [{ path: workspaceDir, pattern: "**/*.md", name: "workspace" }],
|
||||
},
|
||||
},
|
||||
} as OpenClawConfig;
|
||||
|
||||
let updateCalls = 0;
|
||||
let releaseFirstUpdate: (() => void) | null = null;
|
||||
spawnMock.mockImplementation((_cmd: string, args: string[]) => {
|
||||
if (args[0] === "update") {
|
||||
updateCalls += 1;
|
||||
if (updateCalls === 1) {
|
||||
const first = createMockChild({ autoClose: false });
|
||||
releaseFirstUpdate = () => first.closeWith(0);
|
||||
return first;
|
||||
}
|
||||
return createMockChild();
|
||||
}
|
||||
return createMockChild();
|
||||
});
|
||||
|
||||
const resolved = resolveMemoryBackendConfig({ cfg, agentId });
|
||||
const manager = await QmdMemoryManager.create({ cfg, agentId, resolved });
|
||||
expect(manager).toBeTruthy();
|
||||
if (!manager) {
|
||||
throw new Error("manager missing");
|
||||
}
|
||||
|
||||
const inFlight = manager.sync({ reason: "interval" });
|
||||
const forced = manager.sync({ reason: "manual", force: true });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
expect(updateCalls).toBe(1);
|
||||
if (!releaseFirstUpdate) {
|
||||
throw new Error("first update release missing");
|
||||
}
|
||||
releaseFirstUpdate();
|
||||
|
||||
await Promise.all([inFlight, forced]);
|
||||
expect(updateCalls).toBe(2);
|
||||
await manager.close();
|
||||
});
|
||||
|
||||
it("logs and continues when qmd embed times out", async () => {
|
||||
cfg = {
|
||||
...cfg,
|
||||
@@ -342,4 +397,27 @@ describe("QmdMemoryManager", () => {
|
||||
|
||||
await manager.close();
|
||||
});
|
||||
|
||||
it("skips doc lookup when sqlite index is busy", async () => {
|
||||
const resolved = resolveMemoryBackendConfig({ cfg, agentId });
|
||||
const manager = await QmdMemoryManager.create({ cfg, agentId, resolved });
|
||||
expect(manager).toBeTruthy();
|
||||
if (!manager) {
|
||||
throw new Error("manager missing");
|
||||
}
|
||||
const inner = manager as unknown as {
|
||||
db: { prepare: () => { get: () => never }; close: () => void } | null;
|
||||
resolveDocLocation: (docid?: string) => Promise<unknown>;
|
||||
};
|
||||
inner.db = {
|
||||
prepare: () => ({
|
||||
get: () => {
|
||||
throw new Error("SQLITE_BUSY: database is locked");
|
||||
},
|
||||
}),
|
||||
close: () => {},
|
||||
};
|
||||
await expect(inner.resolveDocLocation("abc123")).resolves.toBeNull();
|
||||
await manager.close();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -84,6 +84,7 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
private readonly sessionExporter: SessionExporterConfig | null;
|
||||
private updateTimer: NodeJS.Timeout | null = null;
|
||||
private pendingUpdate: Promise<void> | null = null;
|
||||
private queuedForcedUpdate: Promise<void> | null = null;
|
||||
private closed = false;
|
||||
private db: SqliteDatabase | null = null;
|
||||
private lastUpdateAt: number | null = null;
|
||||
@@ -393,7 +394,26 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
}
|
||||
|
||||
private async runUpdate(reason: string, force?: boolean): Promise<void> {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
if (this.pendingUpdate) {
|
||||
if (force) {
|
||||
if (!this.queuedForcedUpdate) {
|
||||
this.queuedForcedUpdate = this.pendingUpdate
|
||||
.catch(() => undefined)
|
||||
.then(async () => {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
await this.runUpdate(`${reason}:queued`, true);
|
||||
})
|
||||
.finally(() => {
|
||||
this.queuedForcedUpdate = null;
|
||||
});
|
||||
}
|
||||
return this.queuedForcedUpdate;
|
||||
}
|
||||
return this.pendingUpdate;
|
||||
}
|
||||
if (this.shouldSkipUpdate(force)) {
|
||||
@@ -474,6 +494,8 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
}
|
||||
const { DatabaseSync } = requireNodeSqlite();
|
||||
this.db = new DatabaseSync(this.indexPath, { readOnly: true });
|
||||
// Keep QMD recall responsive when the updater holds a write lock.
|
||||
this.db.exec("PRAGMA busy_timeout = 1");
|
||||
return this.db;
|
||||
}
|
||||
|
||||
@@ -547,9 +569,18 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
return cached;
|
||||
}
|
||||
const db = this.ensureDb();
|
||||
const row = db
|
||||
.prepare("SELECT collection, path FROM documents WHERE hash LIKE ? AND active = 1 LIMIT 1")
|
||||
.get(`${normalized}%`) as { collection: string; path: string } | undefined;
|
||||
let row: { collection: string; path: string } | undefined;
|
||||
try {
|
||||
row = db
|
||||
.prepare("SELECT collection, path FROM documents WHERE hash LIKE ? AND active = 1 LIMIT 1")
|
||||
.get(`${normalized}%`) as { collection: string; path: string } | undefined;
|
||||
} catch (err) {
|
||||
if (this.isSqliteBusyError(err)) {
|
||||
log.debug(`qmd index is busy while resolving doc path: ${String(err)}`);
|
||||
return null;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
if (!row) {
|
||||
return null;
|
||||
}
|
||||
@@ -825,6 +856,12 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
return Date.now() - this.lastUpdateAt < debounceMs;
|
||||
}
|
||||
|
||||
private isSqliteBusyError(err: unknown): boolean {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
const normalized = message.toLowerCase();
|
||||
return normalized.includes("sqlite_busy") || normalized.includes("database is locked");
|
||||
}
|
||||
|
||||
private async waitForPendingUpdateBeforeSearch(): Promise<void> {
|
||||
const pending = this.pendingUpdate;
|
||||
if (!pending) {
|
||||
|
||||
Reference in New Issue
Block a user