Revert "Core: update shared gateway models"

This reverts commit 37eaca719a.
This commit is contained in:
Mariano Belinky
2026-02-02 17:35:58 +00:00
parent 4ab814fd50
commit 7113dc21a9
4 changed files with 379 additions and 17 deletions

View File

@@ -416,7 +416,9 @@ public actor GatewayChannelActor {
guard let self else { return }
await self.watchTicks()
}
await self.pushHandler?(.snapshot(ok))
if let pushHandler = self.pushHandler {
Task { await pushHandler(.snapshot(ok)) }
}
}
private func listen() {

View File

@@ -11,10 +11,12 @@ private struct NodeInvokeRequestPayload: Codable, Sendable {
var idempotencyKey: String?
}
public actor GatewayNodeSession {
private let logger = Logger(subsystem: "ai.openclaw", category: "node.gateway")
private let decoder = JSONDecoder()
private let encoder = JSONEncoder()
private static let defaultInvokeTimeoutMs = 30_000
private var channel: GatewayChannelActor?
private var activeURL: URL?
private var activeToken: String?
@@ -23,34 +25,78 @@ public actor GatewayNodeSession {
private var onConnected: (@Sendable () async -> Void)?
private var onDisconnected: (@Sendable (String) async -> Void)?
private var onInvoke: (@Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse)?
private var hasNotifiedConnected = false
private var snapshotReceived = false
private var snapshotWaiters: [CheckedContinuation<Bool, Never>] = []
static func invokeWithTimeout(
request: BridgeInvokeRequest,
timeoutMs: Int?,
onInvoke: @escaping @Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse
) async -> BridgeInvokeResponse {
let timeout = max(0, timeoutMs ?? 0)
let timeoutLogger = Logger(subsystem: "ai.openclaw", category: "node.gateway")
let timeout: Int = {
if let timeoutMs { return max(0, timeoutMs) }
return Self.defaultInvokeTimeoutMs
}()
guard timeout > 0 else {
return await onInvoke(request)
}
return await withTaskGroup(of: BridgeInvokeResponse.self) { group in
group.addTask { await onInvoke(request) }
group.addTask {
// Use an explicit latch so timeouts win even if onInvoke blocks (e.g., permission prompts).
final class InvokeLatch: @unchecked Sendable {
private let lock = NSLock()
private var continuation: CheckedContinuation<BridgeInvokeResponse, Never>?
private var resumed = false
func setContinuation(_ continuation: CheckedContinuation<BridgeInvokeResponse, Never>) {
self.lock.lock()
defer { self.lock.unlock() }
self.continuation = continuation
}
func resume(_ response: BridgeInvokeResponse) {
let cont: CheckedContinuation<BridgeInvokeResponse, Never>?
self.lock.lock()
if self.resumed {
self.lock.unlock()
return
}
self.resumed = true
cont = self.continuation
self.continuation = nil
self.lock.unlock()
cont?.resume(returning: response)
}
}
let latch = InvokeLatch()
var onInvokeTask: Task<Void, Never>?
var timeoutTask: Task<Void, Never>?
defer {
onInvokeTask?.cancel()
timeoutTask?.cancel()
}
let response = await withCheckedContinuation { (cont: CheckedContinuation<BridgeInvokeResponse, Never>) in
latch.setContinuation(cont)
onInvokeTask = Task.detached {
let result = await onInvoke(request)
latch.resume(result)
}
timeoutTask = Task.detached {
try? await Task.sleep(nanoseconds: UInt64(timeout) * 1_000_000)
return BridgeInvokeResponse(
timeoutLogger.info("node invoke timeout fired id=\(request.id, privacy: .public)")
latch.resume(BridgeInvokeResponse(
id: request.id,
ok: false,
error: OpenClawNodeError(
code: .unavailable,
message: "node invoke timed out")
)
))
}
let first = await group.next()!
group.cancelAll()
return first
}
timeoutLogger.info("node invoke race resolved id=\(request.id, privacy: .public) ok=\(response.ok, privacy: .public)")
return response
}
private var serverEventSubscribers: [UUID: AsyncStream<EventFrame>.Continuation] = [:]
private var canvasHostUrl: String?
@@ -78,6 +124,7 @@ public actor GatewayNodeSession {
self.onInvoke = onInvoke
if shouldReconnect {
self.resetConnectionState()
if let existing = self.channel {
await existing.shutdown()
}
@@ -107,7 +154,8 @@ public actor GatewayNodeSession {
do {
try await channel.connect()
await onConnected()
_ = await self.waitForSnapshot(timeoutMs: 500)
await self.notifyConnectedIfNeeded()
} catch {
await onDisconnected(error.localizedDescription)
throw error
@@ -120,6 +168,7 @@ public actor GatewayNodeSession {
self.activeURL = nil
self.activeToken = nil
self.activePassword = nil
self.resetConnectionState()
}
public func currentCanvasHostUrl() -> String? {
@@ -179,7 +228,8 @@ public actor GatewayNodeSession {
case let .snapshot(ok):
let raw = ok.canvashosturl?.trimmingCharacters(in: .whitespacesAndNewlines)
self.canvasHostUrl = (raw?.isEmpty == false) ? raw : nil
await self.onConnected?()
self.markSnapshotReceived()
await self.notifyConnectedIfNeeded()
case let .event(evt):
await self.handleEvent(evt)
default:
@@ -187,28 +237,98 @@ public actor GatewayNodeSession {
}
}
private func resetConnectionState() {
self.hasNotifiedConnected = false
self.snapshotReceived = false
if !self.snapshotWaiters.isEmpty {
let waiters = self.snapshotWaiters
self.snapshotWaiters.removeAll()
for waiter in waiters {
waiter.resume(returning: false)
}
}
}
private func markSnapshotReceived() {
self.snapshotReceived = true
if !self.snapshotWaiters.isEmpty {
let waiters = self.snapshotWaiters
self.snapshotWaiters.removeAll()
for waiter in waiters {
waiter.resume(returning: true)
}
}
}
private func waitForSnapshot(timeoutMs: Int) async -> Bool {
if self.snapshotReceived { return true }
let clamped = max(0, timeoutMs)
return await withCheckedContinuation { cont in
self.snapshotWaiters.append(cont)
Task { [weak self] in
guard let self else { return }
try? await Task.sleep(nanoseconds: UInt64(clamped) * 1_000_000)
await self.timeoutSnapshotWaiters()
}
}
}
private func timeoutSnapshotWaiters() {
guard !self.snapshotReceived else { return }
if !self.snapshotWaiters.isEmpty {
let waiters = self.snapshotWaiters
self.snapshotWaiters.removeAll()
for waiter in waiters {
waiter.resume(returning: false)
}
}
}
private func notifyConnectedIfNeeded() async {
guard !self.hasNotifiedConnected else { return }
self.hasNotifiedConnected = true
await self.onConnected?()
}
private func handleEvent(_ evt: EventFrame) async {
self.broadcastServerEvent(evt)
guard evt.event == "node.invoke.request" else { return }
self.logger.info("node invoke request received")
guard let payload = evt.payload else { return }
do {
let data = try self.encoder.encode(payload)
let request = try self.decoder.decode(NodeInvokeRequestPayload.self, from: data)
let request = try self.decodeInvokeRequest(from: payload)
let timeoutLabel = request.timeoutMs.map(String.init) ?? "none"
self.logger.info("node invoke request decoded id=\(request.id, privacy: .public) command=\(request.command, privacy: .public) timeoutMs=\(timeoutLabel, privacy: .public)")
guard let onInvoke else { return }
let req = BridgeInvokeRequest(id: request.id, command: request.command, paramsJSON: request.paramsJSON)
self.logger.info("node invoke executing id=\(request.id, privacy: .public)")
let response = await Self.invokeWithTimeout(
request: req,
timeoutMs: request.timeoutMs,
onInvoke: onInvoke
)
self.logger.info("node invoke completed id=\(request.id, privacy: .public) ok=\(response.ok, privacy: .public)")
await self.sendInvokeResult(request: request, response: response)
} catch {
self.logger.error("node invoke decode failed: \(error.localizedDescription, privacy: .public)")
}
}
private func decodeInvokeRequest(from payload: OpenClawProtocol.AnyCodable) throws -> NodeInvokeRequestPayload {
do {
let data = try self.encoder.encode(payload)
return try self.decoder.decode(NodeInvokeRequestPayload.self, from: data)
} catch {
if let raw = payload.value as? String, let data = raw.data(using: .utf8) {
return try self.decoder.decode(NodeInvokeRequestPayload.self, from: data)
}
throw error
}
}
private func sendInvokeResult(request: NodeInvokeRequestPayload, response: BridgeInvokeResponse) async {
guard let channel = self.channel else { return }
self.logger.info("node invoke result sending id=\(request.id, privacy: .public) ok=\(response.ok, privacy: .public)")
var params: [String: AnyCodable] = [
"id": AnyCodable(request.id),
"nodeId": AnyCodable(request.nodeId),
@@ -226,7 +346,7 @@ public actor GatewayNodeSession {
do {
try await channel.send(method: "node.invoke.result", params: params)
} catch {
self.logger.error("node invoke result failed: \(error.localizedDescription, privacy: .public)")
self.logger.error("node invoke result failed id=\(request.id, privacy: .public) error=\(error.localizedDescription, privacy: .public)")
}
}

View File

@@ -37,6 +37,7 @@ const NODES_TOOL_ACTIONS = [
"screen_record",
"location_get",
"run",
"invoke",
] as const;
const NOTIFY_PRIORITIES = ["passive", "active", "timeSensitive"] as const;
@@ -84,6 +85,9 @@ const NodesToolSchema = Type.Object({
commandTimeoutMs: Type.Optional(Type.Number()),
invokeTimeoutMs: Type.Optional(Type.Number()),
needsScreenRecording: Type.Optional(Type.Boolean()),
// invoke
invokeCommand: Type.Optional(Type.String()),
invokeParamsJson: Type.Optional(Type.String()),
});
export function createNodesTool(options?: {
@@ -99,7 +103,7 @@ export function createNodesTool(options?: {
label: "Nodes",
name: "nodes",
description:
"Discover and control paired nodes (status/describe/pairing/notify/camera/screen/location/run).",
"Discover and control paired nodes (status/describe/pairing/notify/camera/screen/location/run/invoke).",
parameters: NodesToolSchema,
execute: async (_toolCallId, args) => {
const params = args as Record<string, unknown>;
@@ -438,6 +442,31 @@ export function createNodesTool(options?: {
});
return jsonResult(raw?.payload ?? {});
}
case "invoke": {
const node = readStringParam(params, "node", { required: true });
const nodeId = await resolveNodeId(gatewayOpts, node);
const invokeCommand = readStringParam(params, "invokeCommand", { required: true });
const invokeParamsJson =
typeof params.invokeParamsJson === "string" ? params.invokeParamsJson.trim() : "";
let invokeParams: unknown = {};
if (invokeParamsJson) {
try {
invokeParams = JSON.parse(invokeParamsJson);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
throw new Error(`invokeParamsJson must be valid JSON: ${message}`);
}
}
const invokeTimeoutMs = parseTimeoutMs(params.invokeTimeoutMs);
const raw = await callGatewayTool("node.invoke", gatewayOpts, {
nodeId,
command: invokeCommand,
params: invokeParams,
timeoutMs: invokeTimeoutMs,
idempotencyKey: crypto.randomUUID(),
});
return jsonResult(raw ?? {});
}
default:
throw new Error(`Unknown action: ${action}`);
}

View File

@@ -0,0 +1,211 @@
import type { OpenClawConfig } from "../../config/config.js";
import type { CommandHandler } from "./commands-types.js";
import { callGateway, randomIdempotencyKey } from "../../gateway/call.js";
import { logVerbose } from "../../globals.js";
type NodeSummary = {
nodeId: string;
displayName?: string;
platform?: string;
deviceFamily?: string;
remoteIp?: string;
connected?: boolean;
};
const PTT_COMMANDS: Record<string, string> = {
start: "talk.ptt.start",
stop: "talk.ptt.stop",
once: "talk.ptt.once",
cancel: "talk.ptt.cancel",
};
function normalizeNodeKey(value: string) {
return value
.toLowerCase()
.replace(/[^a-z0-9]+/g, "-")
.replace(/^-+/, "")
.replace(/-+$/, "");
}
function isIOSNode(node: NodeSummary): boolean {
const platform = node.platform?.toLowerCase() ?? "";
const family = node.deviceFamily?.toLowerCase() ?? "";
return (
platform.startsWith("ios") ||
family.includes("iphone") ||
family.includes("ipad") ||
family.includes("ios")
);
}
async function loadNodes(cfg: OpenClawConfig): Promise<NodeSummary[]> {
try {
const res = await callGateway<{ nodes?: NodeSummary[] }>({
method: "node.list",
params: {},
config: cfg,
});
return Array.isArray(res.nodes) ? res.nodes : [];
} catch {
const res = await callGateway<{ pending?: unknown[]; paired?: NodeSummary[] }>({
method: "node.pair.list",
params: {},
config: cfg,
});
return Array.isArray(res.paired) ? res.paired : [];
}
}
function describeNodes(nodes: NodeSummary[]) {
return nodes
.map((node) => node.displayName || node.remoteIp || node.nodeId)
.filter(Boolean)
.join(", ");
}
function resolveNodeId(nodes: NodeSummary[], query?: string): string {
const trimmed = String(query ?? "").trim();
if (trimmed) {
const qNorm = normalizeNodeKey(trimmed);
const matches = nodes.filter((node) => {
if (node.nodeId === trimmed) {
return true;
}
if (typeof node.remoteIp === "string" && node.remoteIp === trimmed) {
return true;
}
const name = typeof node.displayName === "string" ? node.displayName : "";
if (name && normalizeNodeKey(name) === qNorm) {
return true;
}
if (trimmed.length >= 6 && node.nodeId.startsWith(trimmed)) {
return true;
}
return false;
});
if (matches.length === 1) {
return matches[0].nodeId;
}
const known = describeNodes(nodes);
if (matches.length === 0) {
throw new Error(`unknown node: ${trimmed}${known ? ` (known: ${known})` : ""}`);
}
throw new Error(
`ambiguous node: ${trimmed} (matches: ${matches
.map((node) => node.displayName || node.remoteIp || node.nodeId)
.join(", ")})`,
);
}
const iosNodes = nodes.filter(isIOSNode);
const iosConnected = iosNodes.filter((node) => node.connected);
const iosCandidates = iosConnected.length > 0 ? iosConnected : iosNodes;
if (iosCandidates.length === 1) {
return iosCandidates[0].nodeId;
}
if (iosCandidates.length > 1) {
throw new Error(
`multiple iOS nodes found (${describeNodes(iosCandidates)}); specify node=<id>`,
);
}
const connected = nodes.filter((node) => node.connected);
const fallback = connected.length > 0 ? connected : nodes;
if (fallback.length === 1) {
return fallback[0].nodeId;
}
const known = describeNodes(nodes);
throw new Error(`node required${known ? ` (known: ${known})` : ""}`);
}
function parsePTTArgs(commandBody: string) {
const tokens = commandBody.trim().split(/\s+/).slice(1);
let action: string | undefined;
let node: string | undefined;
for (const token of tokens) {
if (!token) {
continue;
}
if (token.toLowerCase().startsWith("node=")) {
node = token.slice("node=".length);
continue;
}
if (!action) {
action = token;
}
}
return { action, node };
}
function buildPTTHelpText() {
return [
"Usage: /ptt <start|stop|once|cancel> [node=<id>]",
"Example: /ptt once node=iphone",
].join("\n");
}
export const handlePTTCommand: CommandHandler = async (params, allowTextCommands) => {
if (!allowTextCommands) {
return null;
}
const { command, cfg } = params;
const normalized = command.commandBodyNormalized.trim();
if (!normalized.startsWith("/ptt")) {
return null;
}
if (!command.isAuthorizedSender) {
logVerbose(`Ignoring /ptt from unauthorized sender: ${command.senderId || "<unknown>"}`);
return { shouldContinue: false, reply: { text: "PTT requires an authorized sender." } };
}
const parsed = parsePTTArgs(normalized);
const actionKey = parsed.action?.trim().toLowerCase() ?? "";
const commandId = PTT_COMMANDS[actionKey];
if (!commandId) {
return { shouldContinue: false, reply: { text: buildPTTHelpText() } };
}
try {
const nodes = await loadNodes(cfg);
const nodeId = resolveNodeId(nodes, parsed.node);
const invokeParams: Record<string, unknown> = {
nodeId,
command: commandId,
params: {},
idempotencyKey: randomIdempotencyKey(),
timeoutMs: 15_000,
};
const res = await callGateway<{
ok?: boolean;
payload?: Record<string, unknown>;
command?: string;
nodeId?: string;
}>({
method: "node.invoke",
params: invokeParams,
config: cfg,
});
const payload =
res.payload && typeof res.payload === "object"
? (res.payload as Record<string, unknown>)
: {};
const lines = [`PTT ${actionKey}${nodeId}`];
if (typeof payload.status === "string") {
lines.push(`status: ${payload.status}`);
}
if (typeof payload.captureId === "string") {
lines.push(`captureId: ${payload.captureId}`);
}
if (typeof payload.transcript === "string" && payload.transcript.trim()) {
lines.push(`transcript: ${payload.transcript}`);
}
return { shouldContinue: false, reply: { text: lines.join("\n") } };
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
return { shouldContinue: false, reply: { text: `PTT failed: ${message}` } };
}
};