mirror of
https://github.com/openclaw/openclaw.git
synced 2026-02-08 21:09:23 +08:00
Matrix: fix event bridge dedupe and invite detection
This commit is contained in:
@@ -9,7 +9,7 @@ import {
|
||||
|
||||
export async function createMatrixClient(params: {
|
||||
homeserver: string;
|
||||
userId: string;
|
||||
userId?: string;
|
||||
accessToken: string;
|
||||
deviceId?: string;
|
||||
encryption?: boolean;
|
||||
@@ -19,10 +19,12 @@ export async function createMatrixClient(params: {
|
||||
}): Promise<MatrixClient> {
|
||||
ensureMatrixSdkLoggingConfigured();
|
||||
const env = process.env;
|
||||
const userId = params.userId?.trim() || "unknown";
|
||||
const matrixClientUserId = params.userId?.trim() || undefined;
|
||||
|
||||
const storagePaths = resolveMatrixStoragePaths({
|
||||
homeserver: params.homeserver,
|
||||
userId: params.userId,
|
||||
userId,
|
||||
accessToken: params.accessToken,
|
||||
accountId: params.accountId,
|
||||
env,
|
||||
@@ -33,12 +35,12 @@ export async function createMatrixClient(params: {
|
||||
writeStorageMeta({
|
||||
storagePaths,
|
||||
homeserver: params.homeserver,
|
||||
userId: params.userId,
|
||||
userId,
|
||||
accountId: params.accountId,
|
||||
});
|
||||
|
||||
return new MatrixClient(params.homeserver, params.accessToken, undefined, undefined, {
|
||||
userId: params.userId,
|
||||
userId: matrixClientUserId,
|
||||
deviceId: params.deviceId,
|
||||
encryption: params.encryption,
|
||||
localTimeoutMs: params.localTimeoutMs,
|
||||
|
||||
@@ -5,18 +5,28 @@ import path from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import { getMatrixRuntime } from "../runtime.js";
|
||||
|
||||
const MATRIX_SDK_PACKAGE = "matrix-js-sdk";
|
||||
const REQUIRED_MATRIX_PACKAGES = ["matrix-js-sdk", "@matrix-org/matrix-sdk-crypto-nodejs"];
|
||||
|
||||
export function isMatrixSdkAvailable(): boolean {
|
||||
function resolveMissingMatrixPackages(): string[] {
|
||||
try {
|
||||
const req = createRequire(import.meta.url);
|
||||
req.resolve(MATRIX_SDK_PACKAGE);
|
||||
return true;
|
||||
return REQUIRED_MATRIX_PACKAGES.filter((pkg) => {
|
||||
try {
|
||||
req.resolve(pkg);
|
||||
return false;
|
||||
} catch {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
} catch {
|
||||
return false;
|
||||
return [...REQUIRED_MATRIX_PACKAGES];
|
||||
}
|
||||
}
|
||||
|
||||
export function isMatrixSdkAvailable(): boolean {
|
||||
return resolveMissingMatrixPackages().length === 0;
|
||||
}
|
||||
|
||||
function resolvePluginRoot(): string {
|
||||
const currentDir = path.dirname(fileURLToPath(import.meta.url));
|
||||
return path.resolve(currentDir, "..", "..");
|
||||
@@ -31,9 +41,13 @@ export async function ensureMatrixSdkInstalled(params: {
|
||||
}
|
||||
const confirm = params.confirm;
|
||||
if (confirm) {
|
||||
const ok = await confirm("Matrix requires matrix-js-sdk. Install now?");
|
||||
const ok = await confirm(
|
||||
"Matrix requires matrix-js-sdk and @matrix-org/matrix-sdk-crypto-nodejs. Install now?",
|
||||
);
|
||||
if (!ok) {
|
||||
throw new Error("Matrix requires matrix-js-sdk (install dependencies first).");
|
||||
throw new Error(
|
||||
"Matrix requires matrix-js-sdk and @matrix-org/matrix-sdk-crypto-nodejs (install dependencies first).",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,6 +67,11 @@ export async function ensureMatrixSdkInstalled(params: {
|
||||
);
|
||||
}
|
||||
if (!isMatrixSdkAvailable()) {
|
||||
throw new Error("Matrix dependency install completed but matrix-js-sdk is still missing.");
|
||||
const missing = resolveMissingMatrixPackages();
|
||||
throw new Error(
|
||||
missing.length > 0
|
||||
? `Matrix dependency install completed but required packages are still missing: ${missing.join(", ")}`
|
||||
: "Matrix dependency install completed but Matrix dependencies are still missing.",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
53
extensions/matrix/src/matrix/probe.test.ts
Normal file
53
extensions/matrix/src/matrix/probe.test.ts
Normal file
@@ -0,0 +1,53 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const createMatrixClientMock = vi.fn();
|
||||
const isBunRuntimeMock = vi.fn(() => false);
|
||||
|
||||
vi.mock("./client.js", () => ({
|
||||
createMatrixClient: (...args: unknown[]) => createMatrixClientMock(...args),
|
||||
isBunRuntime: () => isBunRuntimeMock(),
|
||||
}));
|
||||
|
||||
import { probeMatrix } from "./probe.js";
|
||||
|
||||
describe("probeMatrix", () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
isBunRuntimeMock.mockReturnValue(false);
|
||||
createMatrixClientMock.mockResolvedValue({
|
||||
getUserId: vi.fn(async () => "@bot:example.org"),
|
||||
});
|
||||
});
|
||||
|
||||
it("passes undefined userId when not provided", async () => {
|
||||
const result = await probeMatrix({
|
||||
homeserver: "https://matrix.example.org",
|
||||
accessToken: "tok",
|
||||
timeoutMs: 1234,
|
||||
});
|
||||
|
||||
expect(result.ok).toBe(true);
|
||||
expect(createMatrixClientMock).toHaveBeenCalledWith({
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: undefined,
|
||||
accessToken: "tok",
|
||||
localTimeoutMs: 1234,
|
||||
});
|
||||
});
|
||||
|
||||
it("trims provided userId before client creation", async () => {
|
||||
await probeMatrix({
|
||||
homeserver: "https://matrix.example.org",
|
||||
accessToken: "tok",
|
||||
userId: " @bot:example.org ",
|
||||
timeoutMs: 500,
|
||||
});
|
||||
|
||||
expect(createMatrixClientMock).toHaveBeenCalledWith({
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "tok",
|
||||
localTimeoutMs: 500,
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -43,9 +43,10 @@ export async function probeMatrix(params: {
|
||||
};
|
||||
}
|
||||
try {
|
||||
const inputUserId = params.userId?.trim() || undefined;
|
||||
const client = await createMatrixClient({
|
||||
homeserver: params.homeserver,
|
||||
userId: params.userId ?? "",
|
||||
userId: inputUserId,
|
||||
accessToken: params.accessToken,
|
||||
localTimeoutMs: params.timeoutMs,
|
||||
});
|
||||
|
||||
@@ -262,6 +262,8 @@ describe("MatrixClient event bridge", () => {
|
||||
expect(messageEvents).toHaveLength(0);
|
||||
|
||||
encrypted.emit("decrypted", decrypted);
|
||||
// Simulate a second normal event emission from the SDK after decryption.
|
||||
matrixJsClient.emit("event", decrypted);
|
||||
expect(messageEvents).toEqual([
|
||||
{
|
||||
roomId: "!room:example.org",
|
||||
@@ -310,4 +312,31 @@ describe("MatrixClient event bridge", () => {
|
||||
expect(failed).toEqual(["decrypt failed"]);
|
||||
expect(delivered).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("emits room.invite when a membership invite targets the current user", async () => {
|
||||
const client = new MatrixClient("https://matrix.example.org", "token");
|
||||
const invites: string[] = [];
|
||||
|
||||
client.on("room.invite", (roomId) => {
|
||||
invites.push(roomId);
|
||||
});
|
||||
|
||||
await client.start();
|
||||
|
||||
const inviteMembership = new FakeMatrixEvent({
|
||||
roomId: "!room:example.org",
|
||||
eventId: "$invite",
|
||||
sender: "@alice:example.org",
|
||||
type: "m.room.member",
|
||||
ts: Date.now(),
|
||||
stateKey: "@bot:example.org",
|
||||
content: {
|
||||
membership: "invite",
|
||||
},
|
||||
});
|
||||
|
||||
matrixJsClient.emit("event", inviteMembership);
|
||||
|
||||
expect(invites).toEqual(["!room:example.org"]);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -197,6 +197,7 @@ export class MatrixClient {
|
||||
private selfUserId: string | null;
|
||||
private readonly dmRoomIds = new Set<string>();
|
||||
private cryptoInitialized = false;
|
||||
private readonly decryptedMessageDedupe = new Map<string, number>();
|
||||
|
||||
readonly dms = {
|
||||
update: async (): Promise<void> => {
|
||||
@@ -482,7 +483,9 @@ export class MatrixClient {
|
||||
if (isEncryptedEvent) {
|
||||
this.emitter.emit("room.encrypted_event", roomId, raw);
|
||||
} else {
|
||||
this.emitter.emit("room.message", roomId, raw);
|
||||
if (!this.isDuplicateDecryptedMessage(roomId, raw.event_id)) {
|
||||
this.emitter.emit("room.message", roomId, raw);
|
||||
}
|
||||
}
|
||||
|
||||
const stateKey = raw.state_key ?? "";
|
||||
@@ -521,12 +524,52 @@ export class MatrixClient {
|
||||
return;
|
||||
}
|
||||
this.emitter.emit("room.decrypted_event", decryptedRoomId, decryptedRaw);
|
||||
this.rememberDecryptedMessage(decryptedRoomId, decryptedRaw.event_id);
|
||||
this.emitter.emit("room.message", decryptedRoomId, decryptedRaw);
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private rememberDecryptedMessage(roomId: string, eventId: string): void {
|
||||
if (!eventId) {
|
||||
return;
|
||||
}
|
||||
const now = Date.now();
|
||||
this.pruneDecryptedMessageDedupe(now);
|
||||
this.decryptedMessageDedupe.set(`${roomId}|${eventId}`, now);
|
||||
}
|
||||
|
||||
private isDuplicateDecryptedMessage(roomId: string, eventId: string): boolean {
|
||||
if (!eventId) {
|
||||
return false;
|
||||
}
|
||||
const key = `${roomId}|${eventId}`;
|
||||
const createdAt = this.decryptedMessageDedupe.get(key);
|
||||
if (createdAt === undefined) {
|
||||
return false;
|
||||
}
|
||||
this.decryptedMessageDedupe.delete(key);
|
||||
return true;
|
||||
}
|
||||
|
||||
private pruneDecryptedMessageDedupe(now: number): void {
|
||||
const ttlMs = 30_000;
|
||||
for (const [key, createdAt] of this.decryptedMessageDedupe) {
|
||||
if (now - createdAt > ttlMs) {
|
||||
this.decryptedMessageDedupe.delete(key);
|
||||
}
|
||||
}
|
||||
const maxEntries = 2048;
|
||||
while (this.decryptedMessageDedupe.size > maxEntries) {
|
||||
const oldest = this.decryptedMessageDedupe.keys().next().value;
|
||||
if (oldest === undefined) {
|
||||
break;
|
||||
}
|
||||
this.decryptedMessageDedupe.delete(oldest);
|
||||
}
|
||||
}
|
||||
|
||||
private createCryptoFacade(): MatrixCryptoFacade {
|
||||
return {
|
||||
prepare: async (_joinedRooms: string[]) => {
|
||||
@@ -738,13 +781,31 @@ function matrixEventToRaw(event: MatrixEvent): MatrixRawEvent {
|
||||
content: ((event.getContent?.() ?? {}) as Record<string, unknown>) || {},
|
||||
unsigned,
|
||||
};
|
||||
const stateKey = event.getStateKey?.();
|
||||
if (typeof stateKey === "string" && stateKey.length > 0) {
|
||||
const stateKey = resolveMatrixStateKey(event);
|
||||
if (typeof stateKey === "string") {
|
||||
raw.state_key = stateKey;
|
||||
}
|
||||
return raw;
|
||||
}
|
||||
|
||||
function resolveMatrixStateKey(event: MatrixEvent): string | undefined {
|
||||
const direct = event.getStateKey?.();
|
||||
if (typeof direct === "string") {
|
||||
return direct;
|
||||
}
|
||||
const wireContent = (
|
||||
event as { getWireContent?: () => { state_key?: unknown } }
|
||||
).getWireContent?.();
|
||||
if (wireContent && typeof wireContent.state_key === "string") {
|
||||
return wireContent.state_key;
|
||||
}
|
||||
const rawEvent = (event as { event?: { state_key?: unknown } }).event;
|
||||
if (rawEvent && typeof rawEvent.state_key === "string") {
|
||||
return rawEvent.state_key;
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function normalizeEndpoint(endpoint: string): string {
|
||||
if (!endpoint) {
|
||||
return "/";
|
||||
|
||||
Reference in New Issue
Block a user