From 7113dc21a909970098674e3b04d2c74617e9a307 Mon Sep 17 00:00:00 2001 From: Mariano Belinky Date: Mon, 2 Feb 2026 17:35:58 +0000 Subject: [PATCH] Revert "Core: update shared gateway models" This reverts commit 37eaca719a68aa1ad9dcbfe83c8a20e88bb6951a. --- .../Sources/OpenClawKit/GatewayChannel.swift | 4 +- .../OpenClawKit/GatewayNodeSession.swift | 150 +++++++++++-- src/agents/tools/nodes-tool.ts | 31 ++- src/auto-reply/reply/commands-ptt.ts | 211 ++++++++++++++++++ 4 files changed, 379 insertions(+), 17 deletions(-) create mode 100644 src/auto-reply/reply/commands-ptt.ts diff --git a/apps/shared/OpenClawKit/Sources/OpenClawKit/GatewayChannel.swift b/apps/shared/OpenClawKit/Sources/OpenClawKit/GatewayChannel.swift index aebfcd72c1..0b2e70471c 100644 --- a/apps/shared/OpenClawKit/Sources/OpenClawKit/GatewayChannel.swift +++ b/apps/shared/OpenClawKit/Sources/OpenClawKit/GatewayChannel.swift @@ -416,7 +416,9 @@ public actor GatewayChannelActor { guard let self else { return } await self.watchTicks() } - await self.pushHandler?(.snapshot(ok)) + if let pushHandler = self.pushHandler { + Task { await pushHandler(.snapshot(ok)) } + } } private func listen() { diff --git a/apps/shared/OpenClawKit/Sources/OpenClawKit/GatewayNodeSession.swift b/apps/shared/OpenClawKit/Sources/OpenClawKit/GatewayNodeSession.swift index 39190f7b88..dbc7dba3d6 100644 --- a/apps/shared/OpenClawKit/Sources/OpenClawKit/GatewayNodeSession.swift +++ b/apps/shared/OpenClawKit/Sources/OpenClawKit/GatewayNodeSession.swift @@ -11,10 +11,12 @@ private struct NodeInvokeRequestPayload: Codable, Sendable { var idempotencyKey: String? } + public actor GatewayNodeSession { private let logger = Logger(subsystem: "ai.openclaw", category: "node.gateway") private let decoder = JSONDecoder() private let encoder = JSONEncoder() + private static let defaultInvokeTimeoutMs = 30_000 private var channel: GatewayChannelActor? private var activeURL: URL? private var activeToken: String? @@ -23,34 +25,78 @@ public actor GatewayNodeSession { private var onConnected: (@Sendable () async -> Void)? private var onDisconnected: (@Sendable (String) async -> Void)? private var onInvoke: (@Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse)? + private var hasNotifiedConnected = false + private var snapshotReceived = false + private var snapshotWaiters: [CheckedContinuation] = [] static func invokeWithTimeout( request: BridgeInvokeRequest, timeoutMs: Int?, onInvoke: @escaping @Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse ) async -> BridgeInvokeResponse { - let timeout = max(0, timeoutMs ?? 0) + let timeoutLogger = Logger(subsystem: "ai.openclaw", category: "node.gateway") + let timeout: Int = { + if let timeoutMs { return max(0, timeoutMs) } + return Self.defaultInvokeTimeoutMs + }() guard timeout > 0 else { return await onInvoke(request) } - return await withTaskGroup(of: BridgeInvokeResponse.self) { group in - group.addTask { await onInvoke(request) } - group.addTask { + // Use an explicit latch so timeouts win even if onInvoke blocks (e.g., permission prompts). + final class InvokeLatch: @unchecked Sendable { + private let lock = NSLock() + private var continuation: CheckedContinuation? + private var resumed = false + + func setContinuation(_ continuation: CheckedContinuation) { + self.lock.lock() + defer { self.lock.unlock() } + self.continuation = continuation + } + + func resume(_ response: BridgeInvokeResponse) { + let cont: CheckedContinuation? + self.lock.lock() + if self.resumed { + self.lock.unlock() + return + } + self.resumed = true + cont = self.continuation + self.continuation = nil + self.lock.unlock() + cont?.resume(returning: response) + } + } + + let latch = InvokeLatch() + var onInvokeTask: Task? + var timeoutTask: Task? + defer { + onInvokeTask?.cancel() + timeoutTask?.cancel() + } + let response = await withCheckedContinuation { (cont: CheckedContinuation) in + latch.setContinuation(cont) + onInvokeTask = Task.detached { + let result = await onInvoke(request) + latch.resume(result) + } + timeoutTask = Task.detached { try? await Task.sleep(nanoseconds: UInt64(timeout) * 1_000_000) - return BridgeInvokeResponse( + timeoutLogger.info("node invoke timeout fired id=\(request.id, privacy: .public)") + latch.resume(BridgeInvokeResponse( id: request.id, ok: false, error: OpenClawNodeError( code: .unavailable, message: "node invoke timed out") - ) + )) } - - let first = await group.next()! - group.cancelAll() - return first } + timeoutLogger.info("node invoke race resolved id=\(request.id, privacy: .public) ok=\(response.ok, privacy: .public)") + return response } private var serverEventSubscribers: [UUID: AsyncStream.Continuation] = [:] private var canvasHostUrl: String? @@ -78,6 +124,7 @@ public actor GatewayNodeSession { self.onInvoke = onInvoke if shouldReconnect { + self.resetConnectionState() if let existing = self.channel { await existing.shutdown() } @@ -107,7 +154,8 @@ public actor GatewayNodeSession { do { try await channel.connect() - await onConnected() + _ = await self.waitForSnapshot(timeoutMs: 500) + await self.notifyConnectedIfNeeded() } catch { await onDisconnected(error.localizedDescription) throw error @@ -120,6 +168,7 @@ public actor GatewayNodeSession { self.activeURL = nil self.activeToken = nil self.activePassword = nil + self.resetConnectionState() } public func currentCanvasHostUrl() -> String? { @@ -179,7 +228,8 @@ public actor GatewayNodeSession { case let .snapshot(ok): let raw = ok.canvashosturl?.trimmingCharacters(in: .whitespacesAndNewlines) self.canvasHostUrl = (raw?.isEmpty == false) ? raw : nil - await self.onConnected?() + self.markSnapshotReceived() + await self.notifyConnectedIfNeeded() case let .event(evt): await self.handleEvent(evt) default: @@ -187,28 +237,98 @@ public actor GatewayNodeSession { } } + private func resetConnectionState() { + self.hasNotifiedConnected = false + self.snapshotReceived = false + if !self.snapshotWaiters.isEmpty { + let waiters = self.snapshotWaiters + self.snapshotWaiters.removeAll() + for waiter in waiters { + waiter.resume(returning: false) + } + } + } + + private func markSnapshotReceived() { + self.snapshotReceived = true + if !self.snapshotWaiters.isEmpty { + let waiters = self.snapshotWaiters + self.snapshotWaiters.removeAll() + for waiter in waiters { + waiter.resume(returning: true) + } + } + } + + private func waitForSnapshot(timeoutMs: Int) async -> Bool { + if self.snapshotReceived { return true } + let clamped = max(0, timeoutMs) + return await withCheckedContinuation { cont in + self.snapshotWaiters.append(cont) + Task { [weak self] in + guard let self else { return } + try? await Task.sleep(nanoseconds: UInt64(clamped) * 1_000_000) + await self.timeoutSnapshotWaiters() + } + } + } + + private func timeoutSnapshotWaiters() { + guard !self.snapshotReceived else { return } + if !self.snapshotWaiters.isEmpty { + let waiters = self.snapshotWaiters + self.snapshotWaiters.removeAll() + for waiter in waiters { + waiter.resume(returning: false) + } + } + } + + private func notifyConnectedIfNeeded() async { + guard !self.hasNotifiedConnected else { return } + self.hasNotifiedConnected = true + await self.onConnected?() + } + private func handleEvent(_ evt: EventFrame) async { self.broadcastServerEvent(evt) guard evt.event == "node.invoke.request" else { return } + self.logger.info("node invoke request received") guard let payload = evt.payload else { return } do { - let data = try self.encoder.encode(payload) - let request = try self.decoder.decode(NodeInvokeRequestPayload.self, from: data) + let request = try self.decodeInvokeRequest(from: payload) + let timeoutLabel = request.timeoutMs.map(String.init) ?? "none" + self.logger.info("node invoke request decoded id=\(request.id, privacy: .public) command=\(request.command, privacy: .public) timeoutMs=\(timeoutLabel, privacy: .public)") guard let onInvoke else { return } let req = BridgeInvokeRequest(id: request.id, command: request.command, paramsJSON: request.paramsJSON) + self.logger.info("node invoke executing id=\(request.id, privacy: .public)") let response = await Self.invokeWithTimeout( request: req, timeoutMs: request.timeoutMs, onInvoke: onInvoke ) + self.logger.info("node invoke completed id=\(request.id, privacy: .public) ok=\(response.ok, privacy: .public)") await self.sendInvokeResult(request: request, response: response) } catch { self.logger.error("node invoke decode failed: \(error.localizedDescription, privacy: .public)") } } + private func decodeInvokeRequest(from payload: OpenClawProtocol.AnyCodable) throws -> NodeInvokeRequestPayload { + do { + let data = try self.encoder.encode(payload) + return try self.decoder.decode(NodeInvokeRequestPayload.self, from: data) + } catch { + if let raw = payload.value as? String, let data = raw.data(using: .utf8) { + return try self.decoder.decode(NodeInvokeRequestPayload.self, from: data) + } + throw error + } + } + private func sendInvokeResult(request: NodeInvokeRequestPayload, response: BridgeInvokeResponse) async { guard let channel = self.channel else { return } + self.logger.info("node invoke result sending id=\(request.id, privacy: .public) ok=\(response.ok, privacy: .public)") var params: [String: AnyCodable] = [ "id": AnyCodable(request.id), "nodeId": AnyCodable(request.nodeId), @@ -226,7 +346,7 @@ public actor GatewayNodeSession { do { try await channel.send(method: "node.invoke.result", params: params) } catch { - self.logger.error("node invoke result failed: \(error.localizedDescription, privacy: .public)") + self.logger.error("node invoke result failed id=\(request.id, privacy: .public) error=\(error.localizedDescription, privacy: .public)") } } diff --git a/src/agents/tools/nodes-tool.ts b/src/agents/tools/nodes-tool.ts index 4fce2a7f38..1528726b8d 100644 --- a/src/agents/tools/nodes-tool.ts +++ b/src/agents/tools/nodes-tool.ts @@ -37,6 +37,7 @@ const NODES_TOOL_ACTIONS = [ "screen_record", "location_get", "run", + "invoke", ] as const; const NOTIFY_PRIORITIES = ["passive", "active", "timeSensitive"] as const; @@ -84,6 +85,9 @@ const NodesToolSchema = Type.Object({ commandTimeoutMs: Type.Optional(Type.Number()), invokeTimeoutMs: Type.Optional(Type.Number()), needsScreenRecording: Type.Optional(Type.Boolean()), + // invoke + invokeCommand: Type.Optional(Type.String()), + invokeParamsJson: Type.Optional(Type.String()), }); export function createNodesTool(options?: { @@ -99,7 +103,7 @@ export function createNodesTool(options?: { label: "Nodes", name: "nodes", description: - "Discover and control paired nodes (status/describe/pairing/notify/camera/screen/location/run).", + "Discover and control paired nodes (status/describe/pairing/notify/camera/screen/location/run/invoke).", parameters: NodesToolSchema, execute: async (_toolCallId, args) => { const params = args as Record; @@ -438,6 +442,31 @@ export function createNodesTool(options?: { }); return jsonResult(raw?.payload ?? {}); } + case "invoke": { + const node = readStringParam(params, "node", { required: true }); + const nodeId = await resolveNodeId(gatewayOpts, node); + const invokeCommand = readStringParam(params, "invokeCommand", { required: true }); + const invokeParamsJson = + typeof params.invokeParamsJson === "string" ? params.invokeParamsJson.trim() : ""; + let invokeParams: unknown = {}; + if (invokeParamsJson) { + try { + invokeParams = JSON.parse(invokeParamsJson); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + throw new Error(`invokeParamsJson must be valid JSON: ${message}`); + } + } + const invokeTimeoutMs = parseTimeoutMs(params.invokeTimeoutMs); + const raw = await callGatewayTool("node.invoke", gatewayOpts, { + nodeId, + command: invokeCommand, + params: invokeParams, + timeoutMs: invokeTimeoutMs, + idempotencyKey: crypto.randomUUID(), + }); + return jsonResult(raw ?? {}); + } default: throw new Error(`Unknown action: ${action}`); } diff --git a/src/auto-reply/reply/commands-ptt.ts b/src/auto-reply/reply/commands-ptt.ts new file mode 100644 index 0000000000..037c3043fb --- /dev/null +++ b/src/auto-reply/reply/commands-ptt.ts @@ -0,0 +1,211 @@ +import type { OpenClawConfig } from "../../config/config.js"; +import type { CommandHandler } from "./commands-types.js"; +import { callGateway, randomIdempotencyKey } from "../../gateway/call.js"; +import { logVerbose } from "../../globals.js"; + +type NodeSummary = { + nodeId: string; + displayName?: string; + platform?: string; + deviceFamily?: string; + remoteIp?: string; + connected?: boolean; +}; + +const PTT_COMMANDS: Record = { + start: "talk.ptt.start", + stop: "talk.ptt.stop", + once: "talk.ptt.once", + cancel: "talk.ptt.cancel", +}; + +function normalizeNodeKey(value: string) { + return value + .toLowerCase() + .replace(/[^a-z0-9]+/g, "-") + .replace(/^-+/, "") + .replace(/-+$/, ""); +} + +function isIOSNode(node: NodeSummary): boolean { + const platform = node.platform?.toLowerCase() ?? ""; + const family = node.deviceFamily?.toLowerCase() ?? ""; + return ( + platform.startsWith("ios") || + family.includes("iphone") || + family.includes("ipad") || + family.includes("ios") + ); +} + +async function loadNodes(cfg: OpenClawConfig): Promise { + try { + const res = await callGateway<{ nodes?: NodeSummary[] }>({ + method: "node.list", + params: {}, + config: cfg, + }); + return Array.isArray(res.nodes) ? res.nodes : []; + } catch { + const res = await callGateway<{ pending?: unknown[]; paired?: NodeSummary[] }>({ + method: "node.pair.list", + params: {}, + config: cfg, + }); + return Array.isArray(res.paired) ? res.paired : []; + } +} + +function describeNodes(nodes: NodeSummary[]) { + return nodes + .map((node) => node.displayName || node.remoteIp || node.nodeId) + .filter(Boolean) + .join(", "); +} + +function resolveNodeId(nodes: NodeSummary[], query?: string): string { + const trimmed = String(query ?? "").trim(); + if (trimmed) { + const qNorm = normalizeNodeKey(trimmed); + const matches = nodes.filter((node) => { + if (node.nodeId === trimmed) { + return true; + } + if (typeof node.remoteIp === "string" && node.remoteIp === trimmed) { + return true; + } + const name = typeof node.displayName === "string" ? node.displayName : ""; + if (name && normalizeNodeKey(name) === qNorm) { + return true; + } + if (trimmed.length >= 6 && node.nodeId.startsWith(trimmed)) { + return true; + } + return false; + }); + + if (matches.length === 1) { + return matches[0].nodeId; + } + const known = describeNodes(nodes); + if (matches.length === 0) { + throw new Error(`unknown node: ${trimmed}${known ? ` (known: ${known})` : ""}`); + } + throw new Error( + `ambiguous node: ${trimmed} (matches: ${matches + .map((node) => node.displayName || node.remoteIp || node.nodeId) + .join(", ")})`, + ); + } + + const iosNodes = nodes.filter(isIOSNode); + const iosConnected = iosNodes.filter((node) => node.connected); + const iosCandidates = iosConnected.length > 0 ? iosConnected : iosNodes; + if (iosCandidates.length === 1) { + return iosCandidates[0].nodeId; + } + if (iosCandidates.length > 1) { + throw new Error( + `multiple iOS nodes found (${describeNodes(iosCandidates)}); specify node=`, + ); + } + + const connected = nodes.filter((node) => node.connected); + const fallback = connected.length > 0 ? connected : nodes; + if (fallback.length === 1) { + return fallback[0].nodeId; + } + + const known = describeNodes(nodes); + throw new Error(`node required${known ? ` (known: ${known})` : ""}`); +} + +function parsePTTArgs(commandBody: string) { + const tokens = commandBody.trim().split(/\s+/).slice(1); + let action: string | undefined; + let node: string | undefined; + for (const token of tokens) { + if (!token) { + continue; + } + if (token.toLowerCase().startsWith("node=")) { + node = token.slice("node=".length); + continue; + } + if (!action) { + action = token; + } + } + return { action, node }; +} + +function buildPTTHelpText() { + return [ + "Usage: /ptt [node=]", + "Example: /ptt once node=iphone", + ].join("\n"); +} + +export const handlePTTCommand: CommandHandler = async (params, allowTextCommands) => { + if (!allowTextCommands) { + return null; + } + const { command, cfg } = params; + const normalized = command.commandBodyNormalized.trim(); + if (!normalized.startsWith("/ptt")) { + return null; + } + if (!command.isAuthorizedSender) { + logVerbose(`Ignoring /ptt from unauthorized sender: ${command.senderId || ""}`); + return { shouldContinue: false, reply: { text: "PTT requires an authorized sender." } }; + } + + const parsed = parsePTTArgs(normalized); + const actionKey = parsed.action?.trim().toLowerCase() ?? ""; + const commandId = PTT_COMMANDS[actionKey]; + if (!commandId) { + return { shouldContinue: false, reply: { text: buildPTTHelpText() } }; + } + + try { + const nodes = await loadNodes(cfg); + const nodeId = resolveNodeId(nodes, parsed.node); + const invokeParams: Record = { + nodeId, + command: commandId, + params: {}, + idempotencyKey: randomIdempotencyKey(), + timeoutMs: 15_000, + }; + const res = await callGateway<{ + ok?: boolean; + payload?: Record; + command?: string; + nodeId?: string; + }>({ + method: "node.invoke", + params: invokeParams, + config: cfg, + }); + const payload = + res.payload && typeof res.payload === "object" + ? (res.payload as Record) + : {}; + + const lines = [`PTT ${actionKey} → ${nodeId}`]; + if (typeof payload.status === "string") { + lines.push(`status: ${payload.status}`); + } + if (typeof payload.captureId === "string") { + lines.push(`captureId: ${payload.captureId}`); + } + if (typeof payload.transcript === "string" && payload.transcript.trim()) { + lines.push(`transcript: ${payload.transcript}`); + } + + return { shouldContinue: false, reply: { text: lines.join("\n") } }; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + return { shouldContinue: false, reply: { text: `PTT failed: ${message}` } }; + } +};