Move lobster integration to optional plugin tool

This commit is contained in:
Vignesh Natarajan
2026-01-17 20:18:54 -08:00
parent 147fccd967
commit b2650ba672
9 changed files with 326 additions and 419 deletions

View File

@@ -1,64 +0,0 @@
---
title: Lobster
description: Run Lobster pipelines (typed workflows) as a first-class Clawdbot tool.
---
# Lobster
The `lobster` tool lets Clawdbot run Lobster pipelines as a **local-first, typed workflow runtime**.
This is designed for:
- Deterministic orchestration (move multi-step tool workflows out of the LLM)
- Human-in-the-loop approvals that **halt and resume**
- Lower token usage (one `lobster.run` call instead of many tool calls)
## Security model
- Lobster runs as a **local subprocess**.
- Lobster does **not** manage OAuth or secrets.
- Side effects still go through Clawdbot tools (messaging, files, etc.).
Recommendations:
- Prefer configuring `lobsterPath` as an **absolute path** to avoid PATH hijack.
- Use Lobster approvals (`approve`) for any side-effectful step.
## Actions
### `run`
Run a pipeline in tool mode.
Example:
```json
{
"action": "run",
"pipeline": "exec --json \"echo [1]\" | approve --prompt 'ok?'",
"lobsterPath": "/absolute/path/to/lobster",
"timeoutMs": 20000
}
```
### `resume`
Resume a halted pipeline.
Example:
```json
{
"action": "resume",
"token": "<resumeToken>",
"approve": true,
"lobsterPath": "/absolute/path/to/lobster"
}
```
## Output
Lobster returns a JSON envelope:
- `ok`: boolean
- `status`: `ok` | `needs_approval` | `cancelled`
- `output`: array of items
- `requiresApproval`: approval request object (when `status=needs_approval`)

View File

@@ -0,0 +1,38 @@
# Lobster (plugin)
Adds the `lobster` agent tool as an **optional** plugin tool.
## What this is
- Lobster is a standalone workflow shell (typed JSON-first pipelines + approvals/resume).
- This plugin integrates Lobster with Clawdbot *without core changes*.
## Enable
Because this tool can trigger side effects (via workflows), it is registered with `optional: true`.
Enable it in an agent allowlist:
```json
{
"agents": {
"list": [
{
"id": "main",
"tools": {
"allow": [
"lobster" // plugin id (enables all tools from this plugin)
]
}
}
]
}
}
```
## Security
- Runs the `lobster` executable as a local subprocess.
- Does not manage OAuth/tokens.
- Uses timeouts, stdout caps, and strict JSON envelope parsing.
- Prefer an absolute `lobsterPath` in production to avoid PATH hijack.

View File

@@ -0,0 +1,7 @@
import type { ClawdbotPluginApi } from "../../src/plugins/types.js";
import { createLobsterTool } from "./src/lobster-tool.js";
export default function register(api: ClawdbotPluginApi) {
api.registerTool(createLobsterTool(api), { optional: true });
}

View File

@@ -0,0 +1,9 @@
{
"name": "@clawdbot/lobster",
"version": "2026.1.17-1",
"type": "module",
"description": "Lobster workflow tool plugin (typed pipelines + resumable approvals)",
"clawdbot": {
"extensions": ["./index.ts"]
}
}

View File

@@ -0,0 +1,87 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import type { ClawdbotPluginApi } from "../../../src/plugins/types.js";
import { createLobsterTool } from "./lobster-tool.js";
async function writeFakeLobster(params: {
payload: unknown;
}) {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-lobster-plugin-"));
const binPath = path.join(dir, "lobster");
const file = `#!/usr/bin/env node\n` +
`process.stdout.write(JSON.stringify(${JSON.stringify(params.payload)}));\n`;
await fs.writeFile(binPath, file, { encoding: "utf8", mode: 0o755 });
return { dir, binPath };
}
function fakeApi(): ClawdbotPluginApi {
return {
id: "lobster",
name: "lobster",
source: "test",
config: {} as any,
runtime: { version: "test" } as any,
logger: { info() {}, warn() {}, error() {}, debug() {} },
registerTool() {},
registerHttpHandler() {},
registerChannel() {},
registerGatewayMethod() {},
registerCli() {},
registerService() {},
registerProvider() {},
resolvePath: (p) => p,
};
}
describe("lobster plugin tool", () => {
it("runs lobster and returns parsed envelope in details", async () => {
const fake = await writeFakeLobster({
payload: { ok: true, status: "ok", output: [{ hello: "world" }], requiresApproval: null },
});
const tool = createLobsterTool(fakeApi());
const res = await tool.execute("call1", {
action: "run",
pipeline: "noop",
lobsterPath: fake.binPath,
timeoutMs: 1000,
});
expect(res.details).toMatchObject({ ok: true, status: "ok" });
});
it("requires absolute lobsterPath when provided", async () => {
const tool = createLobsterTool(fakeApi());
await expect(
tool.execute("call2", {
action: "run",
pipeline: "noop",
lobsterPath: "./lobster",
}),
).rejects.toThrow(/absolute path/);
});
it("rejects invalid JSON from lobster", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-lobster-plugin-bad-"));
const binPath = path.join(dir, "lobster");
await fs.writeFile(binPath, `#!/usr/bin/env node\nprocess.stdout.write('nope');\n`, {
encoding: "utf8",
mode: 0o755,
});
const tool = createLobsterTool(fakeApi());
await expect(
tool.execute("call3", {
action: "run",
pipeline: "noop",
lobsterPath: binPath,
}),
).rejects.toThrow(/invalid JSON/);
});
});

View File

@@ -0,0 +1,185 @@
import { Type } from "@sinclair/typebox";
import { spawn } from "node:child_process";
import path from "node:path";
import type { ClawdbotPluginApi } from "../../../src/plugins/types.js";
type LobsterEnvelope =
| {
ok: true;
status: "ok" | "needs_approval" | "cancelled";
output: unknown[];
requiresApproval: null | {
type: "approval_request";
prompt: string;
items: unknown[];
resumeToken?: string;
};
}
| {
ok: false;
error: { type?: string; message: string };
};
function resolveExecutablePath(lobsterPathRaw: string | undefined) {
const lobsterPath = lobsterPathRaw?.trim() || "lobster";
if (lobsterPath !== "lobster" && !path.isAbsolute(lobsterPath)) {
throw new Error("lobsterPath must be an absolute path (or omit to use PATH)");
}
return lobsterPath;
}
async function runLobsterSubprocess(params: {
execPath: string;
argv: string[];
cwd: string;
timeoutMs: number;
maxStdoutBytes: number;
}) {
const { execPath, argv, cwd } = params;
const timeoutMs = Math.max(200, params.timeoutMs);
const maxStdoutBytes = Math.max(1024, params.maxStdoutBytes);
return await new Promise<{ stdout: string }>((resolve, reject) => {
const child = spawn(execPath, argv, {
cwd,
stdio: ["ignore", "pipe", "pipe"],
env: {
...process.env,
LOBSTER_MODE: "tool",
},
});
let stdout = "";
let stdoutBytes = 0;
let stderr = "";
child.stdout?.setEncoding("utf8");
child.stderr?.setEncoding("utf8");
child.stdout?.on("data", (chunk) => {
const str = String(chunk);
stdoutBytes += Buffer.byteLength(str, "utf8");
if (stdoutBytes > maxStdoutBytes) {
try {
child.kill("SIGKILL");
} finally {
reject(new Error("lobster output exceeded maxStdoutBytes"));
}
return;
}
stdout += str;
});
child.stderr?.on("data", (chunk) => {
stderr += String(chunk);
});
const timer = setTimeout(() => {
try {
child.kill("SIGKILL");
} finally {
reject(new Error("lobster subprocess timed out"));
}
}, timeoutMs);
child.once("error", (err) => {
clearTimeout(timer);
reject(err);
});
child.once("exit", (code) => {
clearTimeout(timer);
if (code !== 0) {
reject(new Error(`lobster failed (${code ?? "?"}): ${stderr.trim() || stdout.trim()}`));
return;
}
resolve({ stdout });
});
});
}
function parseEnvelope(stdout: string): LobsterEnvelope {
let parsed: unknown;
try {
parsed = JSON.parse(stdout);
} catch {
throw new Error("lobster returned invalid JSON");
}
if (!parsed || typeof parsed !== "object") {
throw new Error("lobster returned invalid JSON envelope");
}
const ok = (parsed as { ok?: unknown }).ok;
if (ok === true || ok === false) {
return parsed as LobsterEnvelope;
}
throw new Error("lobster returned invalid JSON envelope");
}
export function createLobsterTool(api: ClawdbotPluginApi) {
return {
name: "lobster",
description:
"Run Lobster pipelines as a local-first workflow runtime (typed JSON envelope + resumable approvals).",
parameters: Type.Object({
// NOTE: Prefer string enums in tool schemas; some providers reject unions/anyOf.
action: Type.Unsafe<"run" | "resume">({ type: "string", enum: ["run", "resume"] }),
pipeline: Type.Optional(Type.String()),
token: Type.Optional(Type.String()),
approve: Type.Optional(Type.Boolean()),
lobsterPath: Type.Optional(Type.String()),
cwd: Type.Optional(Type.String()),
timeoutMs: Type.Optional(Type.Number()),
maxStdoutBytes: Type.Optional(Type.Number()),
}),
async execute(_id: string, params: Record<string, unknown>) {
const action = String(params.action || "").trim();
if (!action) throw new Error("action required");
const execPath = resolveExecutablePath(
typeof params.lobsterPath === "string" ? params.lobsterPath : undefined,
);
const cwd = typeof params.cwd === "string" && params.cwd.trim() ? params.cwd.trim() : process.cwd();
const timeoutMs = typeof params.timeoutMs === "number" ? params.timeoutMs : 20_000;
const maxStdoutBytes = typeof params.maxStdoutBytes === "number" ? params.maxStdoutBytes : 512_000;
const argv = (() => {
if (action === "run") {
const pipeline = typeof params.pipeline === "string" ? params.pipeline : "";
if (!pipeline.trim()) throw new Error("pipeline required");
return ["run", "--mode", "tool", pipeline];
}
if (action === "resume") {
const token = typeof params.token === "string" ? params.token : "";
if (!token.trim()) throw new Error("token required");
const approve = params.approve;
if (typeof approve !== "boolean") throw new Error("approve required");
return ["resume", "--token", token, "--approve", approve ? "yes" : "no"];
}
throw new Error(`Unknown action: ${action}`);
})();
if (api.runtime?.version && api.logger?.debug) {
api.logger.debug(`lobster plugin runtime=${api.runtime.version}`);
}
const { stdout } = await runLobsterSubprocess({
execPath,
argv,
cwd,
timeoutMs,
maxStdoutBytes,
});
const envelope = parseEnvelope(stdout);
return {
content: [{ type: "text", text: JSON.stringify(envelope, null, 2) }],
details: envelope,
};
},
};
}

View File

@@ -9,7 +9,6 @@ import type { AnyAgentTool } from "./tools/common.js";
import { createCronTool } from "./tools/cron-tool.js";
import { createGatewayTool } from "./tools/gateway-tool.js";
import { createImageTool } from "./tools/image-tool.js";
import { createLobsterTool } from "./tools/lobster-tool.js";
import { createMessageTool } from "./tools/message-tool.js";
import { createNodesTool } from "./tools/nodes-tool.js";
import { createSessionStatusTool } from "./tools/session-status-tool.js";
@@ -112,7 +111,6 @@ export function createClawdbotTools(options?: {
agentSessionKey: options?.agentSessionKey,
config: options?.config,
}),
createLobsterTool({ sandboxed: options?.sandboxed }),
...(webSearchTool ? [webSearchTool] : []),
...(webFetchTool ? [webFetchTool] : []),
...(imageTool ? [imageTool] : []),

View File

@@ -1,122 +0,0 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { createLobsterTool } from "./lobster-tool.js";
async function writeFakeLobster(params: {
script: (args: string[]) => unknown;
}) {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-lobster-"));
const binPath = path.join(dir, "lobster");
const file = `#!/usr/bin/env node\n` +
`const args = process.argv.slice(2);\n` +
`const payload = (${params.script.toString()})(args);\n` +
`process.stdout.write(JSON.stringify(payload));\n`;
await fs.writeFile(binPath, file, { encoding: "utf8", mode: 0o755 });
return { dir, binPath };
}
describe("lobster tool", () => {
it("runs lobster in tool mode and returns envelope", async () => {
const fake = await writeFakeLobster({
script: (args) => {
if (args[0] !== "run") throw new Error("expected run");
return {
ok: true,
status: "ok",
output: [{ hello: "world" }],
requiresApproval: null,
};
},
});
const tool = createLobsterTool();
const res = await tool.execute("call1", {
action: "run",
pipeline: "exec --json \"echo [1]\"",
lobsterPath: fake.binPath,
timeoutMs: 1000,
});
expect(res.details).toMatchObject({
ok: true,
status: "ok",
output: [{ hello: "world" }],
requiresApproval: null,
});
});
it("supports resume action", async () => {
const fake = await writeFakeLobster({
script: (args) => {
if (args[0] !== "resume") throw new Error("expected resume");
return {
ok: true,
status: "ok",
output: ["resumed"],
requiresApproval: null,
};
},
});
const tool = createLobsterTool();
const res = await tool.execute("call2", {
action: "resume",
token: "tok",
approve: true,
lobsterPath: fake.binPath,
timeoutMs: 1000,
});
expect(res.details).toMatchObject({ ok: true, status: "ok" });
});
it("rejects non-absolute lobsterPath", async () => {
const tool = createLobsterTool();
await expect(
tool.execute("call3", {
action: "run",
pipeline: "json",
lobsterPath: "./lobster",
}),
).rejects.toThrow(/absolute path/);
});
it("blocks tool in sandboxed mode", async () => {
const tool = createLobsterTool({ sandboxed: true });
await expect(
tool.execute("call4", {
action: "run",
pipeline: "json",
lobsterPath: "/usr/bin/true",
}),
).rejects.toThrow(/not available in sandboxed/);
});
it("rejects invalid JSON", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-lobster-bad-"));
const binPath = path.join(dir, "lobster");
await fs.writeFile(
binPath,
`#!/usr/bin/env node\nprocess.stdout.write('not-json');\n`,
{
encoding: "utf8",
mode: 0o755,
},
);
const tool = createLobsterTool();
await expect(
tool.execute("call5", {
action: "run",
pipeline: "json",
lobsterPath: binPath,
}),
).rejects.toThrow(/invalid JSON/);
});
});

View File

@@ -1,231 +0,0 @@
import { Type } from "@sinclair/typebox";
import { spawn } from "node:child_process";
import path from "node:path";
import { stringEnum } from "../schema/typebox.js";
import type { AnyAgentTool } from "./common.js";
import { jsonResult, readNumberParam, readStringParam } from "./common.js";
const LobsterActions = ["run", "resume"] as const;
type LobsterToolParams = {
action: (typeof LobsterActions)[number];
pipeline?: string;
token?: string;
approve?: boolean;
lobsterPath?: string;
cwd?: string;
timeoutMs?: number;
maxStdoutBytes?: number;
};
type LobsterEnvelope =
| {
ok: true;
status: "ok" | "needs_approval" | "cancelled";
output: unknown[];
requiresApproval: null | {
type: "approval_request";
prompt: string;
items: unknown[];
resumeToken?: string;
};
}
| {
ok: false;
error: { type?: string; message: string };
};
function buildSchema() {
return Type.Object({
action: stringEnum(LobsterActions),
pipeline: Type.Optional(Type.String({ description: "Lobster pipeline string." })),
token: Type.Optional(Type.String({ description: "Resume token from lobster tool mode." })),
approve: Type.Optional(Type.Boolean({ description: "Approval decision for resume." })),
lobsterPath: Type.Optional(
Type.String({
description:
"Path to lobster executable. Prefer an absolute path to avoid PATH hijack. Defaults to 'lobster'.",
}),
),
cwd: Type.Optional(
Type.String({
description: "Working directory for lobster subprocess.",
}),
),
timeoutMs: Type.Optional(
Type.Number({
description: "Subprocess timeout (ms).",
}),
),
maxStdoutBytes: Type.Optional(
Type.Number({
description: "Max stdout bytes to read before aborting.",
}),
),
});
}
function resolveExecutablePath(lobsterPathRaw: string | undefined) {
const lobsterPath = lobsterPathRaw?.trim() || "lobster";
if (lobsterPath !== "lobster" && !path.isAbsolute(lobsterPath)) {
throw new Error("lobsterPath must be an absolute path (or omit to use PATH)");
}
return lobsterPath;
}
async function runLobsterSubprocess(params: {
execPath: string;
argv: string[];
cwd: string;
timeoutMs: number;
maxStdoutBytes: number;
}) {
const { execPath, argv, cwd } = params;
const timeoutMs = Math.max(200, params.timeoutMs);
const maxStdoutBytes = Math.max(1024, params.maxStdoutBytes);
return await new Promise<{ stdout: string; exitCode: number | null }>((resolve, reject) => {
const child = spawn(execPath, argv, {
cwd,
stdio: ["ignore", "pipe", "pipe"],
env: {
...process.env,
// Ensure lobster never tries to be interactive.
LOBSTER_MODE: "tool",
},
});
let stdout = "";
let stdoutBytes = 0;
let stderr = "";
child.stdout?.setEncoding("utf8");
child.stderr?.setEncoding("utf8");
child.stdout?.on("data", (chunk) => {
const str = String(chunk);
stdoutBytes += Buffer.byteLength(str, "utf8");
if (stdoutBytes > maxStdoutBytes) {
try {
child.kill("SIGKILL");
} finally {
reject(new Error("lobster output exceeded maxStdoutBytes"));
}
return;
}
stdout += str;
});
child.stderr?.on("data", (chunk) => {
stderr += String(chunk);
});
const timer = setTimeout(() => {
try {
child.kill("SIGKILL");
} finally {
reject(new Error("lobster subprocess timed out"));
}
}, timeoutMs);
child.once("error", (err) => {
clearTimeout(timer);
reject(err);
});
child.once("exit", (code) => {
clearTimeout(timer);
if (code !== 0) {
reject(new Error(`lobster failed (${code ?? "?"}): ${stderr.trim() || stdout.trim()}`));
return;
}
resolve({ stdout, exitCode: code });
});
});
}
function parseEnvelope(stdout: string): LobsterEnvelope {
let parsed: unknown;
try {
parsed = JSON.parse(stdout);
} catch {
throw new Error("lobster returned invalid JSON");
}
if (!parsed || typeof parsed !== "object") {
throw new Error("lobster returned invalid JSON envelope");
}
const ok = (parsed as { ok?: unknown }).ok;
if (ok === true) {
const env = parsed as LobsterEnvelope;
if (!Array.isArray((env as any).output)) {
throw new Error("lobster tool output must include output[]");
}
return env;
}
if (ok === false) {
const env = parsed as LobsterEnvelope;
const msg = (env as any)?.error?.message;
if (typeof msg !== "string" || !msg.trim()) {
throw new Error("lobster error envelope missing error.message");
}
return env;
}
throw new Error("lobster returned invalid JSON envelope");
}
export function createLobsterTool(options: { sandboxed?: boolean } = {}): AnyAgentTool {
const parameters = buildSchema();
return {
label: "Lobster",
name: "lobster",
description:
"Run Lobster pipelines as a local-first, typed workflow runtime (tool mode JSON envelope, resumable approvals).",
parameters,
async execute(_callId, paramsRaw) {
if (options.sandboxed) {
throw new Error("lobster tool is not available in sandboxed mode");
}
const params = paramsRaw as Record<string, unknown>;
const action = readStringParam(params, "action", { required: true }) as LobsterToolParams["action"];
const execPath = resolveExecutablePath(readStringParam(params, "lobsterPath"));
const cwd = readStringParam(params, "cwd", { allowEmpty: false }) || process.cwd();
const timeoutMs = readNumberParam(params, "timeoutMs", { integer: true }) ?? 20_000;
const maxStdoutBytes = readNumberParam(params, "maxStdoutBytes", { integer: true }) ?? 512_000;
let argv: string[];
if (action === "run") {
const pipeline = readStringParam(params, "pipeline", { required: true, label: "pipeline" })!;
argv = ["run", "--mode", "tool", pipeline];
} else if (action === "resume") {
const token = readStringParam(params, "token", { required: true, label: "token" })!;
const approve = params["approve"];
if (typeof approve !== "boolean") {
throw new Error("approve required");
}
argv = ["resume", "--token", token, "--approve", approve ? "yes" : "no"];
} else {
throw new Error(`Unknown action: ${action}`);
}
const { stdout } = await runLobsterSubprocess({
execPath,
argv,
cwd,
timeoutMs,
maxStdoutBytes,
});
const envelope = parseEnvelope(stdout);
return jsonResult(envelope);
},
};
}