refactor: share matrix and telegram dedupe helpers

This commit is contained in:
Peter Steinberger
2026-03-26 15:07:51 +00:00
parent 4b1c37a152
commit 5f9f08394a
16 changed files with 325 additions and 697 deletions

View File

@@ -1,41 +1,4 @@
async function readChunkWithIdleTimeout(
reader: ReadableStreamDefaultReader<Uint8Array>,
chunkTimeoutMs: number,
): Promise<Awaited<ReturnType<typeof reader.read>>> {
let timeoutId: ReturnType<typeof setTimeout> | undefined;
let timedOut = false;
return await new Promise((resolve, reject) => {
const clear = () => {
if (timeoutId !== undefined) {
clearTimeout(timeoutId);
timeoutId = undefined;
}
};
timeoutId = setTimeout(() => {
timedOut = true;
clear();
void reader.cancel().catch(() => undefined);
reject(new Error(`Matrix media download stalled: no data received for ${chunkTimeoutMs}ms`));
}, chunkTimeoutMs);
void reader.read().then(
(result) => {
clear();
if (!timedOut) {
resolve(result);
}
},
(err) => {
clear();
if (!timedOut) {
reject(err);
}
},
);
});
}
import { readResponseWithLimit as readSharedResponseWithLimit } from "openclaw/plugin-sdk/media-runtime";
export async function readResponseWithLimit(
res: Response,
@@ -43,53 +6,14 @@ export async function readResponseWithLimit(
opts?: {
onOverflow?: (params: { size: number; maxBytes: number; res: Response }) => Error;
chunkTimeoutMs?: number;
onIdleTimeout?: (params: { chunkTimeoutMs: number }) => Error;
},
): Promise<Buffer> {
const onOverflow =
opts?.onOverflow ??
((params: { size: number; maxBytes: number }) =>
new Error(`Content too large: ${params.size} bytes (limit: ${params.maxBytes} bytes)`));
const chunkTimeoutMs = opts?.chunkTimeoutMs;
const body = res.body;
if (!body || typeof body.getReader !== "function") {
const fallback = Buffer.from(await res.arrayBuffer());
if (fallback.length > maxBytes) {
throw onOverflow({ size: fallback.length, maxBytes, res });
}
return fallback;
}
const reader = body.getReader();
const chunks: Uint8Array[] = [];
let total = 0;
try {
while (true) {
const { done, value } = chunkTimeoutMs
? await readChunkWithIdleTimeout(reader, chunkTimeoutMs)
: await reader.read();
if (done) {
break;
}
if (value?.length) {
total += value.length;
if (total > maxBytes) {
try {
await reader.cancel();
} catch {}
throw onOverflow({ size: total, maxBytes, res });
}
chunks.push(value);
}
}
} finally {
try {
reader.releaseLock();
} catch {}
}
return Buffer.concat(
chunks.map((chunk) => Buffer.from(chunk)),
total,
);
return await readSharedResponseWithLimit(res, maxBytes, {
...opts,
onIdleTimeout:
opts?.onIdleTimeout ??
(({ chunkTimeoutMs }) =>
new Error(`Matrix media download stalled: no data received for ${chunkTimeoutMs}ms`)),
});
}

View File

@@ -66,4 +66,44 @@ describe("performMatrixRequest", () => {
}),
).rejects.toThrow("Matrix media exceeds configured size limit");
});
it("uses the matrix-specific idle-timeout error for stalled raw downloads", async () => {
vi.useFakeTimers();
try {
const stream = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
},
});
vi.stubGlobal(
"fetch",
vi.fn(
async () =>
new Response(stream, {
status: 200,
}),
),
);
const requestPromise = performMatrixRequest({
homeserver: "http://127.0.0.1:8008",
accessToken: "token",
method: "GET",
endpoint: "/_matrix/media/v3/download/example/id",
timeoutMs: 5000,
raw: true,
maxBytes: 1024,
readIdleTimeoutMs: 50,
ssrfPolicy: { allowPrivateNetwork: true },
});
const rejection = expect(requestPromise).rejects.toThrow(
"Matrix media download stalled: no data received for 50ms",
);
await vi.advanceTimersByTimeAsync(60);
await rejection;
} finally {
vi.useRealTimers();
}
}, 5_000);
});

View File

@@ -1,48 +1,5 @@
type PassiveChannelStatusSnapshot = {
configured?: boolean;
running?: boolean;
lastStartAt?: number | null;
lastStopAt?: number | null;
lastError?: string | null;
probe?: unknown;
lastProbeAt?: number | null;
};
type TrafficStatusSnapshot = {
lastInboundAt?: number | null;
lastOutboundAt?: number | null;
};
export function buildPassiveChannelStatusSummary<TExtra extends object>(
snapshot: PassiveChannelStatusSnapshot,
extra?: TExtra,
) {
return {
configured: snapshot.configured ?? false,
...(extra ?? ({} as TExtra)),
running: snapshot.running ?? false,
lastStartAt: snapshot.lastStartAt ?? null,
lastStopAt: snapshot.lastStopAt ?? null,
lastError: snapshot.lastError ?? null,
};
}
export function buildPassiveProbedChannelStatusSummary<TExtra extends object>(
snapshot: PassiveChannelStatusSnapshot,
extra?: TExtra,
) {
return {
...buildPassiveChannelStatusSummary(snapshot, extra),
probe: snapshot.probe,
lastProbeAt: snapshot.lastProbeAt ?? null,
};
}
export function buildTrafficStatusSummary<TSnapshot extends TrafficStatusSnapshot>(
snapshot?: TSnapshot | null,
) {
return {
lastInboundAt: snapshot?.lastInboundAt ?? null,
lastOutboundAt: snapshot?.lastOutboundAt ?? null,
};
}
export {
buildPassiveChannelStatusSummary,
buildPassiveProbedChannelStatusSummary,
buildTrafficStatusSummary,
} from "openclaw/plugin-sdk/extension-shared";

View File

@@ -1,25 +1 @@
import type { z } from "zod";
type RequireOpenAllowFromFn = (params: {
policy?: string;
allowFrom?: Array<string | number>;
ctx: z.RefinementCtx;
path: Array<string | number>;
message: string;
}) => void;
export function requireChannelOpenAllowFrom(params: {
channel: string;
policy?: string;
allowFrom?: Array<string | number>;
ctx: z.RefinementCtx;
requireOpenAllowFrom: RequireOpenAllowFromFn;
}) {
params.requireOpenAllowFrom({
policy: params.policy,
allowFrom: params.allowFrom,
ctx: params.ctx,
path: ["allowFrom"],
message: `channels.${params.channel}.dmPolicy="open" requires channels.${params.channel}.allowFrom to include "*"`,
});
}
export { requireChannelOpenAllowFrom } from "openclaw/plugin-sdk/extension-shared";

View File

@@ -1,9 +1 @@
export function createDeferred<T>() {
let resolve!: (value: T | PromiseLike<T>) => void;
let reject!: (reason?: unknown) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}
export { createDeferred } from "openclaw/plugin-sdk/extension-shared";

View File

@@ -1,18 +1 @@
import { runPassiveAccountLifecycle } from "openclaw/plugin-sdk/channel-lifecycle";
type StoppableMonitor = {
stop: () => void;
};
export async function runStoppablePassiveMonitor<TMonitor extends StoppableMonitor>(params: {
abortSignal: AbortSignal;
start: () => Promise<TMonitor>;
}): Promise<void> {
await runPassiveAccountLifecycle({
abortSignal: params.abortSignal,
start: params.start,
stop: async (monitor) => {
monitor.stop();
},
});
}
export { runStoppablePassiveMonitor } from "openclaw/plugin-sdk/extension-shared";

View File

@@ -1,66 +1 @@
import { expect, it } from "vitest";
type ResolveTargetMode = "explicit" | "implicit" | "heartbeat";
type ResolveTargetResult = {
ok: boolean;
to?: string;
error?: unknown;
};
type ResolveTargetFn = (params: {
to?: string;
mode: ResolveTargetMode;
allowFrom: string[];
}) => ResolveTargetResult;
export function installCommonResolveTargetErrorCases(params: {
resolveTarget: ResolveTargetFn;
implicitAllowFrom: string[];
}) {
const { resolveTarget, implicitAllowFrom } = params;
it("should error on normalization failure with allowlist (implicit mode)", () => {
const result = resolveTarget({
to: "invalid-target",
mode: "implicit",
allowFrom: implicitAllowFrom,
});
expect(result.ok).toBe(false);
expect(result.error).toBeDefined();
});
it("should error when no target provided with allowlist", () => {
const result = resolveTarget({
to: undefined,
mode: "implicit",
allowFrom: implicitAllowFrom,
});
expect(result.ok).toBe(false);
expect(result.error).toBeDefined();
});
it("should error when no target and no allowlist", () => {
const result = resolveTarget({
to: undefined,
mode: "explicit",
allowFrom: [],
});
expect(result.ok).toBe(false);
expect(result.error).toBeDefined();
});
it("should handle whitespace-only target", () => {
const result = resolveTarget({
to: " ",
mode: "explicit",
allowFrom: [],
});
expect(result.ok).toBe(false);
expect(result.error).toBeDefined();
});
}
export { installCommonResolveTargetErrorCases } from "openclaw/plugin-sdk/testing";

View File

@@ -1,14 +1 @@
import { createLoggerBackedRuntime } from "openclaw/plugin-sdk/runtime";
export function resolveLoggerBackedRuntime<TRuntime>(
runtime: TRuntime | undefined,
logger: Parameters<typeof createLoggerBackedRuntime>[0]["logger"],
): TRuntime {
return (
runtime ??
(createLoggerBackedRuntime({
logger,
exitError: () => new Error("Runtime exit not available"),
}) as TRuntime)
);
}
export { resolveLoggerBackedRuntime } from "openclaw/plugin-sdk/extension-shared";

View File

@@ -1,18 +1,4 @@
export function readStatusIssueFields<TField extends string>(
value: unknown,
fields: readonly TField[],
): Record<TField, unknown> | null {
if (!value || typeof value !== "object") {
return null;
}
const record = value as Record<string, unknown>;
const result = {} as Record<TField, unknown>;
for (const field of fields) {
result[field] = record[field];
}
return result;
}
export function coerceStatusIssueAccountId(value: unknown): string | undefined {
return typeof value === "string" ? value : typeof value === "number" ? String(value) : undefined;
}
export {
coerceStatusIssueAccountId,
readStatusIssueFields,
} from "openclaw/plugin-sdk/extension-shared";

View File

@@ -1,13 +1 @@
import fs from "node:fs/promises";
import path from "node:path";
export async function createWindowsCmdShimFixture(params: {
shimPath: string;
scriptPath: string;
shimLine: string;
}): Promise<void> {
await fs.mkdir(path.dirname(params.scriptPath), { recursive: true });
await fs.mkdir(path.dirname(params.shimPath), { recursive: true });
await fs.writeFile(params.scriptPath, "module.exports = {};\n", "utf8");
await fs.writeFile(params.shimPath, `@echo off\r\n${params.shimLine}\r\n`, "utf8");
}
export { createWindowsCmdShimFixture } from "openclaw/plugin-sdk/testing";

View File

@@ -0,0 +1,221 @@
import type { OpenClawConfig } from "openclaw/plugin-sdk/testing";
import { beforeEach, describe, expect, it, vi } from "vitest";
export const readConfigFileSnapshotForWrite = vi.fn();
export const writeConfigFile = vi.fn();
export const loadCronStore = vi.fn();
export const resolveCronStorePath = vi.fn();
export const saveCronStore = vi.fn();
vi.mock("openclaw/plugin-sdk/config-runtime", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/config-runtime")>();
return {
...actual,
readConfigFileSnapshotForWrite,
writeConfigFile,
loadCronStore,
resolveCronStorePath,
saveCronStore,
};
});
export function installMaybePersistResolvedTelegramTargetTests(params?: {
includeGatewayScopeCases?: boolean;
}) {
describe("maybePersistResolvedTelegramTarget", () => {
let maybePersistResolvedTelegramTarget: typeof import("./target-writeback.js").maybePersistResolvedTelegramTarget;
beforeEach(async () => {
vi.resetModules();
({ maybePersistResolvedTelegramTarget } = await import("./target-writeback.js"));
readConfigFileSnapshotForWrite.mockReset();
writeConfigFile.mockReset();
loadCronStore.mockReset();
resolveCronStorePath.mockReset();
saveCronStore.mockReset();
resolveCronStorePath.mockReturnValue("/tmp/cron/jobs.json");
});
it("skips writeback when target is already numeric", async () => {
await maybePersistResolvedTelegramTarget({
cfg: {} as OpenClawConfig,
rawTarget: "-100123",
resolvedChatId: "-100123",
});
expect(readConfigFileSnapshotForWrite).not.toHaveBeenCalled();
expect(loadCronStore).not.toHaveBeenCalled();
});
if (params?.includeGatewayScopeCases) {
it("skips config and cron writeback for gateway callers missing operator.admin", async () => {
await maybePersistResolvedTelegramTarget({
cfg: {
cron: { store: "/tmp/cron/jobs.json" },
} as OpenClawConfig,
rawTarget: "t.me/mychannel",
resolvedChatId: "-100123",
gatewayClientScopes: ["operator.write"],
});
expect(readConfigFileSnapshotForWrite).not.toHaveBeenCalled();
expect(writeConfigFile).not.toHaveBeenCalled();
expect(loadCronStore).not.toHaveBeenCalled();
expect(saveCronStore).not.toHaveBeenCalled();
});
it("skips config and cron writeback for gateway callers with an empty scope set", async () => {
await maybePersistResolvedTelegramTarget({
cfg: {
cron: { store: "/tmp/cron/jobs.json" },
} as OpenClawConfig,
rawTarget: "t.me/mychannel",
resolvedChatId: "-100123",
gatewayClientScopes: [],
});
expect(readConfigFileSnapshotForWrite).not.toHaveBeenCalled();
expect(writeConfigFile).not.toHaveBeenCalled();
expect(loadCronStore).not.toHaveBeenCalled();
expect(saveCronStore).not.toHaveBeenCalled();
});
}
it("writes back matching config and cron targets", async () => {
readConfigFileSnapshotForWrite.mockResolvedValue({
snapshot: {
config: {
channels: {
telegram: {
defaultTo: "t.me/mychannel",
accounts: {
alerts: {
defaultTo: "@mychannel",
},
},
},
},
},
},
writeOptions: { expectedConfigPath: "/tmp/openclaw.json" },
});
loadCronStore.mockResolvedValue({
version: 1,
jobs: [
{ id: "a", delivery: { channel: "telegram", to: "https://t.me/mychannel" } },
{ id: "b", delivery: { channel: "slack", to: "C123" } },
],
});
await maybePersistResolvedTelegramTarget({
cfg: {
cron: { store: "/tmp/cron/jobs.json" },
} as OpenClawConfig,
rawTarget: "t.me/mychannel",
resolvedChatId: "-100123",
});
expect(writeConfigFile).toHaveBeenCalledTimes(1);
expect(writeConfigFile).toHaveBeenCalledWith(
expect.objectContaining({
channels: {
telegram: {
defaultTo: "-100123",
accounts: {
alerts: {
defaultTo: "-100123",
},
},
},
},
}),
expect.objectContaining({ expectedConfigPath: "/tmp/openclaw.json" }),
);
expect(saveCronStore).toHaveBeenCalledTimes(1);
expect(saveCronStore).toHaveBeenCalledWith(
"/tmp/cron/jobs.json",
expect.objectContaining({
jobs: [
{ id: "a", delivery: { channel: "telegram", to: "-100123" } },
{ id: "b", delivery: { channel: "slack", to: "C123" } },
],
}),
);
});
it("preserves topic suffix style in writeback target", async () => {
readConfigFileSnapshotForWrite.mockResolvedValue({
snapshot: {
config: {
channels: {
telegram: {
defaultTo: "t.me/mychannel:topic:9",
},
},
},
},
writeOptions: {},
});
loadCronStore.mockResolvedValue({ version: 1, jobs: [] });
await maybePersistResolvedTelegramTarget({
cfg: {} as OpenClawConfig,
rawTarget: "t.me/mychannel:topic:9",
resolvedChatId: "-100123",
});
expect(writeConfigFile).toHaveBeenCalledWith(
expect.objectContaining({
channels: {
telegram: {
defaultTo: "-100123:topic:9",
},
},
}),
expect.any(Object),
);
});
it("matches username targets case-insensitively", async () => {
readConfigFileSnapshotForWrite.mockResolvedValue({
snapshot: {
config: {
channels: {
telegram: {
defaultTo: "https://t.me/mychannel",
},
},
},
},
writeOptions: {},
});
loadCronStore.mockResolvedValue({
version: 1,
jobs: [{ id: "a", delivery: { channel: "telegram", to: "https://t.me/mychannel" } }],
});
await maybePersistResolvedTelegramTarget({
cfg: {} as OpenClawConfig,
rawTarget: "@MyChannel",
resolvedChatId: "-100123",
});
expect(writeConfigFile).toHaveBeenCalledWith(
expect.objectContaining({
channels: {
telegram: {
defaultTo: "-100123",
},
},
}),
expect.any(Object),
);
expect(saveCronStore).toHaveBeenCalledWith(
"/tmp/cron/jobs.json",
expect.objectContaining({
jobs: [{ id: "a", delivery: { channel: "telegram", to: "-100123" } }],
}),
);
});
});
}

View File

@@ -1,216 +1,3 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../../../src/config/config.js";
import { installMaybePersistResolvedTelegramTargetTests } from "./target-writeback.test-shared.js";
const readConfigFileSnapshotForWrite = vi.fn();
const writeConfigFile = vi.fn();
const loadCronStore = vi.fn();
const resolveCronStorePath = vi.fn();
const saveCronStore = vi.fn();
vi.mock("openclaw/plugin-sdk/config-runtime", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/config-runtime")>();
return {
...actual,
readConfigFileSnapshotForWrite,
writeConfigFile,
loadCronStore,
resolveCronStorePath,
saveCronStore,
};
});
describe("maybePersistResolvedTelegramTarget", () => {
let maybePersistResolvedTelegramTarget: typeof import("./target-writeback.js").maybePersistResolvedTelegramTarget;
beforeEach(async () => {
vi.resetModules();
({ maybePersistResolvedTelegramTarget } = await import("./target-writeback.js"));
readConfigFileSnapshotForWrite.mockReset();
writeConfigFile.mockReset();
loadCronStore.mockReset();
resolveCronStorePath.mockReset();
saveCronStore.mockReset();
resolveCronStorePath.mockReturnValue("/tmp/cron/jobs.json");
});
it("skips writeback when target is already numeric", async () => {
await maybePersistResolvedTelegramTarget({
cfg: {} as OpenClawConfig,
rawTarget: "-100123",
resolvedChatId: "-100123",
});
expect(readConfigFileSnapshotForWrite).not.toHaveBeenCalled();
expect(loadCronStore).not.toHaveBeenCalled();
});
it("skips config and cron writeback for gateway callers missing operator.admin", async () => {
await maybePersistResolvedTelegramTarget({
cfg: {
cron: { store: "/tmp/cron/jobs.json" },
} as OpenClawConfig,
rawTarget: "t.me/mychannel",
resolvedChatId: "-100123",
gatewayClientScopes: ["operator.write"],
});
expect(readConfigFileSnapshotForWrite).not.toHaveBeenCalled();
expect(writeConfigFile).not.toHaveBeenCalled();
expect(loadCronStore).not.toHaveBeenCalled();
expect(saveCronStore).not.toHaveBeenCalled();
});
it("skips config and cron writeback for gateway callers with an empty scope set", async () => {
await maybePersistResolvedTelegramTarget({
cfg: {
cron: { store: "/tmp/cron/jobs.json" },
} as OpenClawConfig,
rawTarget: "t.me/mychannel",
resolvedChatId: "-100123",
gatewayClientScopes: [],
});
expect(readConfigFileSnapshotForWrite).not.toHaveBeenCalled();
expect(writeConfigFile).not.toHaveBeenCalled();
expect(loadCronStore).not.toHaveBeenCalled();
expect(saveCronStore).not.toHaveBeenCalled();
});
it("writes back matching config and cron targets for gateway callers with operator.admin", async () => {
readConfigFileSnapshotForWrite.mockResolvedValue({
snapshot: {
config: {
channels: {
telegram: {
defaultTo: "t.me/mychannel",
accounts: {
alerts: {
defaultTo: "@mychannel",
},
},
},
},
},
},
writeOptions: { expectedConfigPath: "/tmp/openclaw.json" },
});
loadCronStore.mockResolvedValue({
version: 1,
jobs: [
{ id: "a", delivery: { channel: "telegram", to: "https://t.me/mychannel" } },
{ id: "b", delivery: { channel: "slack", to: "C123" } },
],
});
await maybePersistResolvedTelegramTarget({
cfg: {
cron: { store: "/tmp/cron/jobs.json" },
} as OpenClawConfig,
rawTarget: "t.me/mychannel",
resolvedChatId: "-100123",
gatewayClientScopes: ["operator.admin"],
});
expect(writeConfigFile).toHaveBeenCalledTimes(1);
expect(writeConfigFile).toHaveBeenCalledWith(
expect.objectContaining({
channels: {
telegram: {
defaultTo: "-100123",
accounts: {
alerts: {
defaultTo: "-100123",
},
},
},
},
}),
expect.objectContaining({ expectedConfigPath: "/tmp/openclaw.json" }),
);
expect(saveCronStore).toHaveBeenCalledTimes(1);
expect(saveCronStore).toHaveBeenCalledWith(
"/tmp/cron/jobs.json",
expect.objectContaining({
jobs: [
{ id: "a", delivery: { channel: "telegram", to: "-100123" } },
{ id: "b", delivery: { channel: "slack", to: "C123" } },
],
}),
);
});
it("preserves topic suffix style in writeback target", async () => {
readConfigFileSnapshotForWrite.mockResolvedValue({
snapshot: {
config: {
channels: {
telegram: {
defaultTo: "t.me/mychannel:topic:9",
},
},
},
},
writeOptions: {},
});
loadCronStore.mockResolvedValue({ version: 1, jobs: [] });
await maybePersistResolvedTelegramTarget({
cfg: {} as OpenClawConfig,
rawTarget: "t.me/mychannel:topic:9",
resolvedChatId: "-100123",
});
expect(writeConfigFile).toHaveBeenCalledWith(
expect.objectContaining({
channels: {
telegram: {
defaultTo: "-100123:topic:9",
},
},
}),
expect.any(Object),
);
});
it("matches username targets case-insensitively", async () => {
readConfigFileSnapshotForWrite.mockResolvedValue({
snapshot: {
config: {
channels: {
telegram: {
defaultTo: "https://t.me/mychannel",
},
},
},
},
writeOptions: {},
});
loadCronStore.mockResolvedValue({
version: 1,
jobs: [{ id: "a", delivery: { channel: "telegram", to: "https://t.me/mychannel" } }],
});
await maybePersistResolvedTelegramTarget({
cfg: {} as OpenClawConfig,
rawTarget: "@MyChannel",
resolvedChatId: "-100123",
});
expect(writeConfigFile).toHaveBeenCalledWith(
expect.objectContaining({
channels: {
telegram: {
defaultTo: "-100123",
},
},
}),
expect.any(Object),
);
expect(saveCronStore).toHaveBeenCalledWith(
"/tmp/cron/jobs.json",
expect.objectContaining({
jobs: [{ id: "a", delivery: { channel: "telegram", to: "-100123" } }],
}),
);
});
});
installMaybePersistResolvedTelegramTargetTests({ includeGatewayScopeCases: true });

View File

@@ -1,11 +1,11 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../../../src/config/config.js";
import { describe, expect, it } from "vitest";
import { isNumericTelegramUserId, normalizeTelegramAllowFromEntry } from "./allow-from.js";
import {
resolveTelegramGroupRequireMention,
resolveTelegramGroupToolPolicy,
} from "./group-policy.js";
import { looksLikeTelegramTargetId, normalizeTelegramMessagingTarget } from "./normalize.js";
import { installMaybePersistResolvedTelegramTargetTests } from "./target-writeback.test-shared.js";
import {
isNumericTelegramChatId,
normalizeTelegramChatId,
@@ -14,24 +14,6 @@ import {
stripTelegramInternalPrefixes,
} from "./targets.js";
const readConfigFileSnapshotForWrite = vi.fn();
const writeConfigFile = vi.fn();
const loadCronStore = vi.fn();
const resolveCronStorePath = vi.fn();
const saveCronStore = vi.fn();
vi.mock("openclaw/plugin-sdk/config-runtime", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/config-runtime")>();
return {
...actual,
readConfigFileSnapshotForWrite,
writeConfigFile,
loadCronStore,
resolveCronStorePath,
saveCronStore,
};
});
describe("stripTelegramInternalPrefixes", () => {
it("strips telegram prefix", () => {
expect(stripTelegramInternalPrefixes("telegram:123")).toBe("123");
@@ -235,165 +217,4 @@ describe("telegram target normalization", () => {
});
});
describe("maybePersistResolvedTelegramTarget", () => {
let maybePersistResolvedTelegramTarget: typeof import("./target-writeback.js").maybePersistResolvedTelegramTarget;
beforeEach(async () => {
vi.resetModules();
({ maybePersistResolvedTelegramTarget } = await import("./target-writeback.js"));
readConfigFileSnapshotForWrite.mockReset();
writeConfigFile.mockReset();
loadCronStore.mockReset();
resolveCronStorePath.mockReset();
saveCronStore.mockReset();
resolveCronStorePath.mockReturnValue("/tmp/cron/jobs.json");
});
it("skips writeback when target is already numeric", async () => {
await maybePersistResolvedTelegramTarget({
cfg: {} as OpenClawConfig,
rawTarget: "-100123",
resolvedChatId: "-100123",
});
expect(readConfigFileSnapshotForWrite).not.toHaveBeenCalled();
expect(loadCronStore).not.toHaveBeenCalled();
});
it("writes back matching config and cron targets", async () => {
readConfigFileSnapshotForWrite.mockResolvedValue({
snapshot: {
config: {
channels: {
telegram: {
defaultTo: "t.me/mychannel",
accounts: {
alerts: {
defaultTo: "@mychannel",
},
},
},
},
},
},
writeOptions: { expectedConfigPath: "/tmp/openclaw.json" },
});
loadCronStore.mockResolvedValue({
version: 1,
jobs: [
{ id: "a", delivery: { channel: "telegram", to: "https://t.me/mychannel" } },
{ id: "b", delivery: { channel: "slack", to: "C123" } },
],
});
await maybePersistResolvedTelegramTarget({
cfg: {
cron: { store: "/tmp/cron/jobs.json" },
} as OpenClawConfig,
rawTarget: "t.me/mychannel",
resolvedChatId: "-100123",
});
expect(writeConfigFile).toHaveBeenCalledTimes(1);
expect(writeConfigFile).toHaveBeenCalledWith(
expect.objectContaining({
channels: {
telegram: {
defaultTo: "-100123",
accounts: {
alerts: {
defaultTo: "-100123",
},
},
},
},
}),
expect.objectContaining({ expectedConfigPath: "/tmp/openclaw.json" }),
);
expect(saveCronStore).toHaveBeenCalledTimes(1);
expect(saveCronStore).toHaveBeenCalledWith(
"/tmp/cron/jobs.json",
expect.objectContaining({
jobs: [
{ id: "a", delivery: { channel: "telegram", to: "-100123" } },
{ id: "b", delivery: { channel: "slack", to: "C123" } },
],
}),
);
});
it("preserves topic suffix style in writeback target", async () => {
readConfigFileSnapshotForWrite.mockResolvedValue({
snapshot: {
config: {
channels: {
telegram: {
defaultTo: "t.me/mychannel:topic:9",
},
},
},
},
writeOptions: {},
});
loadCronStore.mockResolvedValue({ version: 1, jobs: [] });
await maybePersistResolvedTelegramTarget({
cfg: {} as OpenClawConfig,
rawTarget: "t.me/mychannel:topic:9",
resolvedChatId: "-100123",
});
expect(writeConfigFile).toHaveBeenCalledWith(
expect.objectContaining({
channels: {
telegram: {
defaultTo: "-100123:topic:9",
},
},
}),
expect.any(Object),
);
});
it("matches username targets case-insensitively", async () => {
readConfigFileSnapshotForWrite.mockResolvedValue({
snapshot: {
config: {
channels: {
telegram: {
defaultTo: "https://t.me/mychannel",
},
},
},
},
writeOptions: {},
});
loadCronStore.mockResolvedValue({
version: 1,
jobs: [{ id: "a", delivery: { channel: "telegram", to: "https://t.me/mychannel" } }],
});
await maybePersistResolvedTelegramTarget({
cfg: {} as OpenClawConfig,
rawTarget: "@MyChannel",
resolvedChatId: "-100123",
});
expect(writeConfigFile).toHaveBeenCalledWith(
expect.objectContaining({
channels: {
telegram: {
defaultTo: "-100123",
},
},
}),
expect.any(Object),
);
expect(saveCronStore).toHaveBeenCalledWith(
"/tmp/cron/jobs.json",
expect.objectContaining({
jobs: [{ id: "a", delivery: { channel: "telegram", to: "-100123" } }],
}),
);
});
});
installMaybePersistResolvedTelegramTargetTests();

View File

@@ -67,6 +67,23 @@ describe("readResponseWithLimit", () => {
}
}, 5_000);
it("uses a custom idle-timeout error when provided", async () => {
vi.useFakeTimers();
try {
const body = makeStallingStream([new Uint8Array([1, 2])]);
const res = new Response(body);
const readPromise = readResponseWithLimit(res, 1024, {
chunkTimeoutMs: 50,
onIdleTimeout: ({ chunkTimeoutMs }) => new Error(`custom idle ${chunkTimeoutMs}`),
});
const rejection = expect(readPromise).rejects.toThrow("custom idle 50");
await vi.advanceTimersByTimeAsync(60);
await rejection;
} finally {
vi.useRealTimers();
}
}, 5_000);
it("does not time out while chunks keep arriving", async () => {
vi.useFakeTimers();
try {

View File

@@ -1,6 +1,7 @@
async function readChunkWithIdleTimeout(
reader: ReadableStreamDefaultReader<Uint8Array>,
chunkTimeoutMs: number,
onIdleTimeout?: (params: { chunkTimeoutMs: number }) => Error,
): Promise<Awaited<ReturnType<typeof reader.read>>> {
let timeoutId: ReturnType<typeof setTimeout> | undefined;
let timedOut = false;
@@ -17,7 +18,10 @@ async function readChunkWithIdleTimeout(
timedOut = true;
clear();
void reader.cancel().catch(() => undefined);
reject(new Error(`Media download stalled: no data received for ${chunkTimeoutMs}ms`));
reject(
onIdleTimeout?.({ chunkTimeoutMs }) ??
new Error(`Media download stalled: no data received for ${chunkTimeoutMs}ms`),
);
}, chunkTimeoutMs);
void reader.read().then(
@@ -48,6 +52,7 @@ async function readResponsePrefix(
maxBytes: number,
opts?: {
chunkTimeoutMs?: number;
onIdleTimeout?: (params: { chunkTimeoutMs: number }) => Error;
},
): Promise<ReadResponsePrefixResult> {
const chunkTimeoutMs = opts?.chunkTimeoutMs;
@@ -72,7 +77,7 @@ async function readResponsePrefix(
try {
while (true) {
const { done, value } = chunkTimeoutMs
? await readChunkWithIdleTimeout(reader, chunkTimeoutMs)
? await readChunkWithIdleTimeout(reader, chunkTimeoutMs, opts?.onIdleTimeout)
: await reader.read();
if (done) {
size = total;
@@ -121,13 +126,17 @@ export async function readResponseWithLimit(
opts?: {
onOverflow?: (params: { size: number; maxBytes: number; res: Response }) => Error;
chunkTimeoutMs?: number;
onIdleTimeout?: (params: { chunkTimeoutMs: number }) => Error;
},
): Promise<Buffer> {
const onOverflow =
opts?.onOverflow ??
((params: { size: number; maxBytes: number }) =>
new Error(`Content too large: ${params.size} bytes (limit: ${params.maxBytes} bytes)`));
const prefix = await readResponsePrefix(res, maxBytes, { chunkTimeoutMs: opts?.chunkTimeoutMs });
const prefix = await readResponsePrefix(res, maxBytes, {
chunkTimeoutMs: opts?.chunkTimeoutMs,
onIdleTimeout: opts?.onIdleTimeout,
});
if (prefix.truncated) {
throw onOverflow({ size: prefix.size, maxBytes, res });
}
@@ -140,11 +149,15 @@ export async function readResponseTextSnippet(
maxBytes?: number;
maxChars?: number;
chunkTimeoutMs?: number;
onIdleTimeout?: (params: { chunkTimeoutMs: number }) => Error;
},
): Promise<string | undefined> {
const maxBytes = opts?.maxBytes ?? 8 * 1024;
const maxChars = opts?.maxChars ?? 200;
const prefix = await readResponsePrefix(res, maxBytes, { chunkTimeoutMs: opts?.chunkTimeoutMs });
const prefix = await readResponsePrefix(res, maxBytes, {
chunkTimeoutMs: opts?.chunkTimeoutMs,
onIdleTimeout: opts?.onIdleTimeout,
});
if (prefix.buffer.length === 0) {
return undefined;
}

View File

@@ -14,6 +14,7 @@ export * from "../media/mime.js";
export * from "../media/outbound-attachment.js";
export * from "../media/png-encode.ts";
export * from "../media/qr-image.ts";
export * from "../media/read-response-with-limit.js";
export * from "../media/store.js";
export * from "../media/temp-files.js";
export { resolveChannelMediaMaxBytes } from "../channels/plugins/media-limits.js";