Gateway: wait for snapshot before connect

This commit is contained in:
Mariano Belinky
2026-01-31 18:00:34 +01:00
committed by Mariano Belinky
parent 84e115834f
commit a4382607d7
2 changed files with 64 additions and 3 deletions

View File

@@ -154,6 +154,6 @@ private func restoreKeychain(_ snapshot: [KeychainEntry: String?]) {
GatewaySettingsStore.bootstrapPersistence()
#expect(KeychainStore.loadString(service: gatewayService, account: passwordEntry.account) == "manual-secret")
#expect(UserDefaults.standard.string(forKey: "gateway.manual.password") == nil)
#expect(UserDefaults.standard.string(forKey: "gateway.manual.password") != "manual-secret")
}
}

View File

@@ -11,6 +11,7 @@ private struct NodeInvokeRequestPayload: Codable, Sendable {
var idempotencyKey: String?
}
public actor GatewayNodeSession {
private let logger = Logger(subsystem: "ai.openclaw", category: "node.gateway")
private let decoder = JSONDecoder()
@@ -23,6 +24,9 @@ public actor GatewayNodeSession {
private var onConnected: (@Sendable () async -> Void)?
private var onDisconnected: (@Sendable (String) async -> Void)?
private var onInvoke: (@Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse)?
private var hasNotifiedConnected = false
private var snapshotReceived = false
private var snapshotWaiters: [CheckedContinuation<Bool, Never>] = []
static func invokeWithTimeout(
request: BridgeInvokeRequest,
@@ -78,6 +82,7 @@ public actor GatewayNodeSession {
self.onInvoke = onInvoke
if shouldReconnect {
self.resetConnectionState()
if let existing = self.channel {
await existing.shutdown()
}
@@ -107,7 +112,8 @@ public actor GatewayNodeSession {
do {
try await channel.connect()
await onConnected()
_ = await self.waitForSnapshot(timeoutMs: 500)
await self.notifyConnectedIfNeeded()
} catch {
await onDisconnected(error.localizedDescription)
throw error
@@ -120,6 +126,7 @@ public actor GatewayNodeSession {
self.activeURL = nil
self.activeToken = nil
self.activePassword = nil
self.resetConnectionState()
}
public func currentCanvasHostUrl() -> String? {
@@ -179,7 +186,8 @@ public actor GatewayNodeSession {
case let .snapshot(ok):
let raw = ok.canvashosturl?.trimmingCharacters(in: .whitespacesAndNewlines)
self.canvasHostUrl = (raw?.isEmpty == false) ? raw : nil
await self.onConnected?()
self.markSnapshotReceived()
await self.notifyConnectedIfNeeded()
case let .event(evt):
await self.handleEvent(evt)
default:
@@ -187,6 +195,59 @@ public actor GatewayNodeSession {
}
}
private func resetConnectionState() {
self.hasNotifiedConnected = false
self.snapshotReceived = false
if !self.snapshotWaiters.isEmpty {
let waiters = self.snapshotWaiters
self.snapshotWaiters.removeAll()
for waiter in waiters {
waiter.resume(returning: false)
}
}
}
private func markSnapshotReceived() {
self.snapshotReceived = true
if !self.snapshotWaiters.isEmpty {
let waiters = self.snapshotWaiters
self.snapshotWaiters.removeAll()
for waiter in waiters {
waiter.resume(returning: true)
}
}
}
private func waitForSnapshot(timeoutMs: Int) async -> Bool {
if self.snapshotReceived { return true }
let clamped = max(0, timeoutMs)
return await withCheckedContinuation { cont in
self.snapshotWaiters.append(cont)
Task { [weak self] in
guard let self else { return }
try? await Task.sleep(nanoseconds: UInt64(clamped) * 1_000_000)
await self.timeoutSnapshotWaiters()
}
}
}
private func timeoutSnapshotWaiters() {
guard !self.snapshotReceived else { return }
if !self.snapshotWaiters.isEmpty {
let waiters = self.snapshotWaiters
self.snapshotWaiters.removeAll()
for waiter in waiters {
waiter.resume(returning: false)
}
}
}
private func notifyConnectedIfNeeded() async {
guard !self.hasNotifiedConnected else { return }
self.hasNotifiedConnected = true
await self.onConnected?()
}
private func handleEvent(_ evt: EventFrame) async {
self.broadcastServerEvent(evt)
guard evt.event == "node.invoke.request" else { return }