diff --git a/extensions/matrix/src/matrix/sdk/read-response-with-limit.ts b/extensions/matrix/src/matrix/sdk/read-response-with-limit.ts index 2077f56e5c3..97abda6e1e8 100644 --- a/extensions/matrix/src/matrix/sdk/read-response-with-limit.ts +++ b/extensions/matrix/src/matrix/sdk/read-response-with-limit.ts @@ -1,41 +1,4 @@ -async function readChunkWithIdleTimeout( - reader: ReadableStreamDefaultReader, - chunkTimeoutMs: number, -): Promise>> { - let timeoutId: ReturnType | undefined; - let timedOut = false; - - return await new Promise((resolve, reject) => { - const clear = () => { - if (timeoutId !== undefined) { - clearTimeout(timeoutId); - timeoutId = undefined; - } - }; - - timeoutId = setTimeout(() => { - timedOut = true; - clear(); - void reader.cancel().catch(() => undefined); - reject(new Error(`Matrix media download stalled: no data received for ${chunkTimeoutMs}ms`)); - }, chunkTimeoutMs); - - void reader.read().then( - (result) => { - clear(); - if (!timedOut) { - resolve(result); - } - }, - (err) => { - clear(); - if (!timedOut) { - reject(err); - } - }, - ); - }); -} +import { readResponseWithLimit as readSharedResponseWithLimit } from "openclaw/plugin-sdk/media-runtime"; export async function readResponseWithLimit( res: Response, @@ -43,53 +6,14 @@ export async function readResponseWithLimit( opts?: { onOverflow?: (params: { size: number; maxBytes: number; res: Response }) => Error; chunkTimeoutMs?: number; + onIdleTimeout?: (params: { chunkTimeoutMs: number }) => Error; }, ): Promise { - const onOverflow = - opts?.onOverflow ?? - ((params: { size: number; maxBytes: number }) => - new Error(`Content too large: ${params.size} bytes (limit: ${params.maxBytes} bytes)`)); - const chunkTimeoutMs = opts?.chunkTimeoutMs; - - const body = res.body; - if (!body || typeof body.getReader !== "function") { - const fallback = Buffer.from(await res.arrayBuffer()); - if (fallback.length > maxBytes) { - throw onOverflow({ size: fallback.length, maxBytes, res }); - } - return fallback; - } - - const reader = body.getReader(); - const chunks: Uint8Array[] = []; - let total = 0; - try { - while (true) { - const { done, value } = chunkTimeoutMs - ? await readChunkWithIdleTimeout(reader, chunkTimeoutMs) - : await reader.read(); - if (done) { - break; - } - if (value?.length) { - total += value.length; - if (total > maxBytes) { - try { - await reader.cancel(); - } catch {} - throw onOverflow({ size: total, maxBytes, res }); - } - chunks.push(value); - } - } - } finally { - try { - reader.releaseLock(); - } catch {} - } - - return Buffer.concat( - chunks.map((chunk) => Buffer.from(chunk)), - total, - ); + return await readSharedResponseWithLimit(res, maxBytes, { + ...opts, + onIdleTimeout: + opts?.onIdleTimeout ?? + (({ chunkTimeoutMs }) => + new Error(`Matrix media download stalled: no data received for ${chunkTimeoutMs}ms`)), + }); } diff --git a/extensions/matrix/src/matrix/sdk/transport.test.ts b/extensions/matrix/src/matrix/sdk/transport.test.ts index 03aaf36b811..a349a53624e 100644 --- a/extensions/matrix/src/matrix/sdk/transport.test.ts +++ b/extensions/matrix/src/matrix/sdk/transport.test.ts @@ -66,4 +66,44 @@ describe("performMatrixRequest", () => { }), ).rejects.toThrow("Matrix media exceeds configured size limit"); }); + + it("uses the matrix-specific idle-timeout error for stalled raw downloads", async () => { + vi.useFakeTimers(); + try { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new Uint8Array([1, 2, 3])); + }, + }); + vi.stubGlobal( + "fetch", + vi.fn( + async () => + new Response(stream, { + status: 200, + }), + ), + ); + + const requestPromise = performMatrixRequest({ + homeserver: "http://127.0.0.1:8008", + accessToken: "token", + method: "GET", + endpoint: "/_matrix/media/v3/download/example/id", + timeoutMs: 5000, + raw: true, + maxBytes: 1024, + readIdleTimeoutMs: 50, + ssrfPolicy: { allowPrivateNetwork: true }, + }); + + const rejection = expect(requestPromise).rejects.toThrow( + "Matrix media download stalled: no data received for 50ms", + ); + await vi.advanceTimersByTimeAsync(60); + await rejection; + } finally { + vi.useRealTimers(); + } + }, 5_000); }); diff --git a/extensions/shared/channel-status-summary.ts b/extensions/shared/channel-status-summary.ts index 5ebdb067596..1cddcb6fd37 100644 --- a/extensions/shared/channel-status-summary.ts +++ b/extensions/shared/channel-status-summary.ts @@ -1,48 +1,5 @@ -type PassiveChannelStatusSnapshot = { - configured?: boolean; - running?: boolean; - lastStartAt?: number | null; - lastStopAt?: number | null; - lastError?: string | null; - probe?: unknown; - lastProbeAt?: number | null; -}; - -type TrafficStatusSnapshot = { - lastInboundAt?: number | null; - lastOutboundAt?: number | null; -}; - -export function buildPassiveChannelStatusSummary( - snapshot: PassiveChannelStatusSnapshot, - extra?: TExtra, -) { - return { - configured: snapshot.configured ?? false, - ...(extra ?? ({} as TExtra)), - running: snapshot.running ?? false, - lastStartAt: snapshot.lastStartAt ?? null, - lastStopAt: snapshot.lastStopAt ?? null, - lastError: snapshot.lastError ?? null, - }; -} - -export function buildPassiveProbedChannelStatusSummary( - snapshot: PassiveChannelStatusSnapshot, - extra?: TExtra, -) { - return { - ...buildPassiveChannelStatusSummary(snapshot, extra), - probe: snapshot.probe, - lastProbeAt: snapshot.lastProbeAt ?? null, - }; -} - -export function buildTrafficStatusSummary( - snapshot?: TSnapshot | null, -) { - return { - lastInboundAt: snapshot?.lastInboundAt ?? null, - lastOutboundAt: snapshot?.lastOutboundAt ?? null, - }; -} +export { + buildPassiveChannelStatusSummary, + buildPassiveProbedChannelStatusSummary, + buildTrafficStatusSummary, +} from "openclaw/plugin-sdk/extension-shared"; diff --git a/extensions/shared/config-schema-helpers.ts b/extensions/shared/config-schema-helpers.ts index 495793b54b6..fb077a0b1f5 100644 --- a/extensions/shared/config-schema-helpers.ts +++ b/extensions/shared/config-schema-helpers.ts @@ -1,25 +1 @@ -import type { z } from "zod"; - -type RequireOpenAllowFromFn = (params: { - policy?: string; - allowFrom?: Array; - ctx: z.RefinementCtx; - path: Array; - message: string; -}) => void; - -export function requireChannelOpenAllowFrom(params: { - channel: string; - policy?: string; - allowFrom?: Array; - ctx: z.RefinementCtx; - requireOpenAllowFrom: RequireOpenAllowFromFn; -}) { - params.requireOpenAllowFrom({ - policy: params.policy, - allowFrom: params.allowFrom, - ctx: params.ctx, - path: ["allowFrom"], - message: `channels.${params.channel}.dmPolicy="open" requires channels.${params.channel}.allowFrom to include "*"`, - }); -} +export { requireChannelOpenAllowFrom } from "openclaw/plugin-sdk/extension-shared"; diff --git a/extensions/shared/deferred.ts b/extensions/shared/deferred.ts index 1a874100916..3cd299d98ee 100644 --- a/extensions/shared/deferred.ts +++ b/extensions/shared/deferred.ts @@ -1,9 +1 @@ -export function createDeferred() { - let resolve!: (value: T | PromiseLike) => void; - let reject!: (reason?: unknown) => void; - const promise = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - return { promise, resolve, reject }; -} +export { createDeferred } from "openclaw/plugin-sdk/extension-shared"; diff --git a/extensions/shared/passive-monitor.ts b/extensions/shared/passive-monitor.ts index f9cd2ed58ab..01487d02812 100644 --- a/extensions/shared/passive-monitor.ts +++ b/extensions/shared/passive-monitor.ts @@ -1,18 +1 @@ -import { runPassiveAccountLifecycle } from "openclaw/plugin-sdk/channel-lifecycle"; - -type StoppableMonitor = { - stop: () => void; -}; - -export async function runStoppablePassiveMonitor(params: { - abortSignal: AbortSignal; - start: () => Promise; -}): Promise { - await runPassiveAccountLifecycle({ - abortSignal: params.abortSignal, - start: params.start, - stop: async (monitor) => { - monitor.stop(); - }, - }); -} +export { runStoppablePassiveMonitor } from "openclaw/plugin-sdk/extension-shared"; diff --git a/extensions/shared/resolve-target-test-helpers.ts b/extensions/shared/resolve-target-test-helpers.ts index 282c5e82e57..29a0bc68a88 100644 --- a/extensions/shared/resolve-target-test-helpers.ts +++ b/extensions/shared/resolve-target-test-helpers.ts @@ -1,66 +1 @@ -import { expect, it } from "vitest"; - -type ResolveTargetMode = "explicit" | "implicit" | "heartbeat"; - -type ResolveTargetResult = { - ok: boolean; - to?: string; - error?: unknown; -}; - -type ResolveTargetFn = (params: { - to?: string; - mode: ResolveTargetMode; - allowFrom: string[]; -}) => ResolveTargetResult; - -export function installCommonResolveTargetErrorCases(params: { - resolveTarget: ResolveTargetFn; - implicitAllowFrom: string[]; -}) { - const { resolveTarget, implicitAllowFrom } = params; - - it("should error on normalization failure with allowlist (implicit mode)", () => { - const result = resolveTarget({ - to: "invalid-target", - mode: "implicit", - allowFrom: implicitAllowFrom, - }); - - expect(result.ok).toBe(false); - expect(result.error).toBeDefined(); - }); - - it("should error when no target provided with allowlist", () => { - const result = resolveTarget({ - to: undefined, - mode: "implicit", - allowFrom: implicitAllowFrom, - }); - - expect(result.ok).toBe(false); - expect(result.error).toBeDefined(); - }); - - it("should error when no target and no allowlist", () => { - const result = resolveTarget({ - to: undefined, - mode: "explicit", - allowFrom: [], - }); - - expect(result.ok).toBe(false); - expect(result.error).toBeDefined(); - }); - - it("should handle whitespace-only target", () => { - const result = resolveTarget({ - to: " ", - mode: "explicit", - allowFrom: [], - }); - - expect(result.ok).toBe(false); - expect(result.error).toBeDefined(); - }); -} +export { installCommonResolveTargetErrorCases } from "openclaw/plugin-sdk/testing"; diff --git a/extensions/shared/runtime.ts b/extensions/shared/runtime.ts index a534fc57d4b..e66828d0289 100644 --- a/extensions/shared/runtime.ts +++ b/extensions/shared/runtime.ts @@ -1,14 +1 @@ -import { createLoggerBackedRuntime } from "openclaw/plugin-sdk/runtime"; - -export function resolveLoggerBackedRuntime( - runtime: TRuntime | undefined, - logger: Parameters[0]["logger"], -): TRuntime { - return ( - runtime ?? - (createLoggerBackedRuntime({ - logger, - exitError: () => new Error("Runtime exit not available"), - }) as TRuntime) - ); -} +export { resolveLoggerBackedRuntime } from "openclaw/plugin-sdk/extension-shared"; diff --git a/extensions/shared/status-issues.ts b/extensions/shared/status-issues.ts index 1eb39e2b686..4b2b6d27df1 100644 --- a/extensions/shared/status-issues.ts +++ b/extensions/shared/status-issues.ts @@ -1,18 +1,4 @@ -export function readStatusIssueFields( - value: unknown, - fields: readonly TField[], -): Record | null { - if (!value || typeof value !== "object") { - return null; - } - const record = value as Record; - const result = {} as Record; - for (const field of fields) { - result[field] = record[field]; - } - return result; -} - -export function coerceStatusIssueAccountId(value: unknown): string | undefined { - return typeof value === "string" ? value : typeof value === "number" ? String(value) : undefined; -} +export { + coerceStatusIssueAccountId, + readStatusIssueFields, +} from "openclaw/plugin-sdk/extension-shared"; diff --git a/extensions/shared/windows-cmd-shim-test-fixtures.ts b/extensions/shared/windows-cmd-shim-test-fixtures.ts index ce73d0f8398..ccaa146100e 100644 --- a/extensions/shared/windows-cmd-shim-test-fixtures.ts +++ b/extensions/shared/windows-cmd-shim-test-fixtures.ts @@ -1,13 +1 @@ -import fs from "node:fs/promises"; -import path from "node:path"; - -export async function createWindowsCmdShimFixture(params: { - shimPath: string; - scriptPath: string; - shimLine: string; -}): Promise { - await fs.mkdir(path.dirname(params.scriptPath), { recursive: true }); - await fs.mkdir(path.dirname(params.shimPath), { recursive: true }); - await fs.writeFile(params.scriptPath, "module.exports = {};\n", "utf8"); - await fs.writeFile(params.shimPath, `@echo off\r\n${params.shimLine}\r\n`, "utf8"); -} +export { createWindowsCmdShimFixture } from "openclaw/plugin-sdk/testing"; diff --git a/extensions/telegram/src/target-writeback.test-shared.ts b/extensions/telegram/src/target-writeback.test-shared.ts new file mode 100644 index 00000000000..ceb91dadacb --- /dev/null +++ b/extensions/telegram/src/target-writeback.test-shared.ts @@ -0,0 +1,221 @@ +import type { OpenClawConfig } from "openclaw/plugin-sdk/testing"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +export const readConfigFileSnapshotForWrite = vi.fn(); +export const writeConfigFile = vi.fn(); +export const loadCronStore = vi.fn(); +export const resolveCronStorePath = vi.fn(); +export const saveCronStore = vi.fn(); + +vi.mock("openclaw/plugin-sdk/config-runtime", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + readConfigFileSnapshotForWrite, + writeConfigFile, + loadCronStore, + resolveCronStorePath, + saveCronStore, + }; +}); + +export function installMaybePersistResolvedTelegramTargetTests(params?: { + includeGatewayScopeCases?: boolean; +}) { + describe("maybePersistResolvedTelegramTarget", () => { + let maybePersistResolvedTelegramTarget: typeof import("./target-writeback.js").maybePersistResolvedTelegramTarget; + + beforeEach(async () => { + vi.resetModules(); + ({ maybePersistResolvedTelegramTarget } = await import("./target-writeback.js")); + readConfigFileSnapshotForWrite.mockReset(); + writeConfigFile.mockReset(); + loadCronStore.mockReset(); + resolveCronStorePath.mockReset(); + saveCronStore.mockReset(); + resolveCronStorePath.mockReturnValue("/tmp/cron/jobs.json"); + }); + + it("skips writeback when target is already numeric", async () => { + await maybePersistResolvedTelegramTarget({ + cfg: {} as OpenClawConfig, + rawTarget: "-100123", + resolvedChatId: "-100123", + }); + + expect(readConfigFileSnapshotForWrite).not.toHaveBeenCalled(); + expect(loadCronStore).not.toHaveBeenCalled(); + }); + + if (params?.includeGatewayScopeCases) { + it("skips config and cron writeback for gateway callers missing operator.admin", async () => { + await maybePersistResolvedTelegramTarget({ + cfg: { + cron: { store: "/tmp/cron/jobs.json" }, + } as OpenClawConfig, + rawTarget: "t.me/mychannel", + resolvedChatId: "-100123", + gatewayClientScopes: ["operator.write"], + }); + + expect(readConfigFileSnapshotForWrite).not.toHaveBeenCalled(); + expect(writeConfigFile).not.toHaveBeenCalled(); + expect(loadCronStore).not.toHaveBeenCalled(); + expect(saveCronStore).not.toHaveBeenCalled(); + }); + + it("skips config and cron writeback for gateway callers with an empty scope set", async () => { + await maybePersistResolvedTelegramTarget({ + cfg: { + cron: { store: "/tmp/cron/jobs.json" }, + } as OpenClawConfig, + rawTarget: "t.me/mychannel", + resolvedChatId: "-100123", + gatewayClientScopes: [], + }); + + expect(readConfigFileSnapshotForWrite).not.toHaveBeenCalled(); + expect(writeConfigFile).not.toHaveBeenCalled(); + expect(loadCronStore).not.toHaveBeenCalled(); + expect(saveCronStore).not.toHaveBeenCalled(); + }); + } + + it("writes back matching config and cron targets", async () => { + readConfigFileSnapshotForWrite.mockResolvedValue({ + snapshot: { + config: { + channels: { + telegram: { + defaultTo: "t.me/mychannel", + accounts: { + alerts: { + defaultTo: "@mychannel", + }, + }, + }, + }, + }, + }, + writeOptions: { expectedConfigPath: "/tmp/openclaw.json" }, + }); + loadCronStore.mockResolvedValue({ + version: 1, + jobs: [ + { id: "a", delivery: { channel: "telegram", to: "https://t.me/mychannel" } }, + { id: "b", delivery: { channel: "slack", to: "C123" } }, + ], + }); + + await maybePersistResolvedTelegramTarget({ + cfg: { + cron: { store: "/tmp/cron/jobs.json" }, + } as OpenClawConfig, + rawTarget: "t.me/mychannel", + resolvedChatId: "-100123", + }); + + expect(writeConfigFile).toHaveBeenCalledTimes(1); + expect(writeConfigFile).toHaveBeenCalledWith( + expect.objectContaining({ + channels: { + telegram: { + defaultTo: "-100123", + accounts: { + alerts: { + defaultTo: "-100123", + }, + }, + }, + }, + }), + expect.objectContaining({ expectedConfigPath: "/tmp/openclaw.json" }), + ); + expect(saveCronStore).toHaveBeenCalledTimes(1); + expect(saveCronStore).toHaveBeenCalledWith( + "/tmp/cron/jobs.json", + expect.objectContaining({ + jobs: [ + { id: "a", delivery: { channel: "telegram", to: "-100123" } }, + { id: "b", delivery: { channel: "slack", to: "C123" } }, + ], + }), + ); + }); + + it("preserves topic suffix style in writeback target", async () => { + readConfigFileSnapshotForWrite.mockResolvedValue({ + snapshot: { + config: { + channels: { + telegram: { + defaultTo: "t.me/mychannel:topic:9", + }, + }, + }, + }, + writeOptions: {}, + }); + loadCronStore.mockResolvedValue({ version: 1, jobs: [] }); + + await maybePersistResolvedTelegramTarget({ + cfg: {} as OpenClawConfig, + rawTarget: "t.me/mychannel:topic:9", + resolvedChatId: "-100123", + }); + + expect(writeConfigFile).toHaveBeenCalledWith( + expect.objectContaining({ + channels: { + telegram: { + defaultTo: "-100123:topic:9", + }, + }, + }), + expect.any(Object), + ); + }); + + it("matches username targets case-insensitively", async () => { + readConfigFileSnapshotForWrite.mockResolvedValue({ + snapshot: { + config: { + channels: { + telegram: { + defaultTo: "https://t.me/mychannel", + }, + }, + }, + }, + writeOptions: {}, + }); + loadCronStore.mockResolvedValue({ + version: 1, + jobs: [{ id: "a", delivery: { channel: "telegram", to: "https://t.me/mychannel" } }], + }); + + await maybePersistResolvedTelegramTarget({ + cfg: {} as OpenClawConfig, + rawTarget: "@MyChannel", + resolvedChatId: "-100123", + }); + + expect(writeConfigFile).toHaveBeenCalledWith( + expect.objectContaining({ + channels: { + telegram: { + defaultTo: "-100123", + }, + }, + }), + expect.any(Object), + ); + expect(saveCronStore).toHaveBeenCalledWith( + "/tmp/cron/jobs.json", + expect.objectContaining({ + jobs: [{ id: "a", delivery: { channel: "telegram", to: "-100123" } }], + }), + ); + }); + }); +} diff --git a/extensions/telegram/src/target-writeback.test.ts b/extensions/telegram/src/target-writeback.test.ts index 6eaf765030f..a6d5e634196 100644 --- a/extensions/telegram/src/target-writeback.test.ts +++ b/extensions/telegram/src/target-writeback.test.ts @@ -1,216 +1,3 @@ -import { beforeEach, describe, expect, it, vi } from "vitest"; -import type { OpenClawConfig } from "../../../src/config/config.js"; +import { installMaybePersistResolvedTelegramTargetTests } from "./target-writeback.test-shared.js"; -const readConfigFileSnapshotForWrite = vi.fn(); -const writeConfigFile = vi.fn(); -const loadCronStore = vi.fn(); -const resolveCronStorePath = vi.fn(); -const saveCronStore = vi.fn(); - -vi.mock("openclaw/plugin-sdk/config-runtime", async (importOriginal) => { - const actual = await importOriginal(); - return { - ...actual, - readConfigFileSnapshotForWrite, - writeConfigFile, - loadCronStore, - resolveCronStorePath, - saveCronStore, - }; -}); - -describe("maybePersistResolvedTelegramTarget", () => { - let maybePersistResolvedTelegramTarget: typeof import("./target-writeback.js").maybePersistResolvedTelegramTarget; - - beforeEach(async () => { - vi.resetModules(); - ({ maybePersistResolvedTelegramTarget } = await import("./target-writeback.js")); - readConfigFileSnapshotForWrite.mockReset(); - writeConfigFile.mockReset(); - loadCronStore.mockReset(); - resolveCronStorePath.mockReset(); - saveCronStore.mockReset(); - resolveCronStorePath.mockReturnValue("/tmp/cron/jobs.json"); - }); - - it("skips writeback when target is already numeric", async () => { - await maybePersistResolvedTelegramTarget({ - cfg: {} as OpenClawConfig, - rawTarget: "-100123", - resolvedChatId: "-100123", - }); - - expect(readConfigFileSnapshotForWrite).not.toHaveBeenCalled(); - expect(loadCronStore).not.toHaveBeenCalled(); - }); - - it("skips config and cron writeback for gateway callers missing operator.admin", async () => { - await maybePersistResolvedTelegramTarget({ - cfg: { - cron: { store: "/tmp/cron/jobs.json" }, - } as OpenClawConfig, - rawTarget: "t.me/mychannel", - resolvedChatId: "-100123", - gatewayClientScopes: ["operator.write"], - }); - - expect(readConfigFileSnapshotForWrite).not.toHaveBeenCalled(); - expect(writeConfigFile).not.toHaveBeenCalled(); - expect(loadCronStore).not.toHaveBeenCalled(); - expect(saveCronStore).not.toHaveBeenCalled(); - }); - - it("skips config and cron writeback for gateway callers with an empty scope set", async () => { - await maybePersistResolvedTelegramTarget({ - cfg: { - cron: { store: "/tmp/cron/jobs.json" }, - } as OpenClawConfig, - rawTarget: "t.me/mychannel", - resolvedChatId: "-100123", - gatewayClientScopes: [], - }); - - expect(readConfigFileSnapshotForWrite).not.toHaveBeenCalled(); - expect(writeConfigFile).not.toHaveBeenCalled(); - expect(loadCronStore).not.toHaveBeenCalled(); - expect(saveCronStore).not.toHaveBeenCalled(); - }); - - it("writes back matching config and cron targets for gateway callers with operator.admin", async () => { - readConfigFileSnapshotForWrite.mockResolvedValue({ - snapshot: { - config: { - channels: { - telegram: { - defaultTo: "t.me/mychannel", - accounts: { - alerts: { - defaultTo: "@mychannel", - }, - }, - }, - }, - }, - }, - writeOptions: { expectedConfigPath: "/tmp/openclaw.json" }, - }); - loadCronStore.mockResolvedValue({ - version: 1, - jobs: [ - { id: "a", delivery: { channel: "telegram", to: "https://t.me/mychannel" } }, - { id: "b", delivery: { channel: "slack", to: "C123" } }, - ], - }); - - await maybePersistResolvedTelegramTarget({ - cfg: { - cron: { store: "/tmp/cron/jobs.json" }, - } as OpenClawConfig, - rawTarget: "t.me/mychannel", - resolvedChatId: "-100123", - gatewayClientScopes: ["operator.admin"], - }); - - expect(writeConfigFile).toHaveBeenCalledTimes(1); - expect(writeConfigFile).toHaveBeenCalledWith( - expect.objectContaining({ - channels: { - telegram: { - defaultTo: "-100123", - accounts: { - alerts: { - defaultTo: "-100123", - }, - }, - }, - }, - }), - expect.objectContaining({ expectedConfigPath: "/tmp/openclaw.json" }), - ); - expect(saveCronStore).toHaveBeenCalledTimes(1); - expect(saveCronStore).toHaveBeenCalledWith( - "/tmp/cron/jobs.json", - expect.objectContaining({ - jobs: [ - { id: "a", delivery: { channel: "telegram", to: "-100123" } }, - { id: "b", delivery: { channel: "slack", to: "C123" } }, - ], - }), - ); - }); - - it("preserves topic suffix style in writeback target", async () => { - readConfigFileSnapshotForWrite.mockResolvedValue({ - snapshot: { - config: { - channels: { - telegram: { - defaultTo: "t.me/mychannel:topic:9", - }, - }, - }, - }, - writeOptions: {}, - }); - loadCronStore.mockResolvedValue({ version: 1, jobs: [] }); - - await maybePersistResolvedTelegramTarget({ - cfg: {} as OpenClawConfig, - rawTarget: "t.me/mychannel:topic:9", - resolvedChatId: "-100123", - }); - - expect(writeConfigFile).toHaveBeenCalledWith( - expect.objectContaining({ - channels: { - telegram: { - defaultTo: "-100123:topic:9", - }, - }, - }), - expect.any(Object), - ); - }); - - it("matches username targets case-insensitively", async () => { - readConfigFileSnapshotForWrite.mockResolvedValue({ - snapshot: { - config: { - channels: { - telegram: { - defaultTo: "https://t.me/mychannel", - }, - }, - }, - }, - writeOptions: {}, - }); - loadCronStore.mockResolvedValue({ - version: 1, - jobs: [{ id: "a", delivery: { channel: "telegram", to: "https://t.me/mychannel" } }], - }); - - await maybePersistResolvedTelegramTarget({ - cfg: {} as OpenClawConfig, - rawTarget: "@MyChannel", - resolvedChatId: "-100123", - }); - - expect(writeConfigFile).toHaveBeenCalledWith( - expect.objectContaining({ - channels: { - telegram: { - defaultTo: "-100123", - }, - }, - }), - expect.any(Object), - ); - expect(saveCronStore).toHaveBeenCalledWith( - "/tmp/cron/jobs.json", - expect.objectContaining({ - jobs: [{ id: "a", delivery: { channel: "telegram", to: "-100123" } }], - }), - ); - }); -}); +installMaybePersistResolvedTelegramTargetTests({ includeGatewayScopeCases: true }); diff --git a/extensions/telegram/src/targets.test.ts b/extensions/telegram/src/targets.test.ts index 0761066cf91..8888d23e91d 100644 --- a/extensions/telegram/src/targets.test.ts +++ b/extensions/telegram/src/targets.test.ts @@ -1,11 +1,11 @@ -import { beforeEach, describe, expect, it, vi } from "vitest"; -import type { OpenClawConfig } from "../../../src/config/config.js"; +import { describe, expect, it } from "vitest"; import { isNumericTelegramUserId, normalizeTelegramAllowFromEntry } from "./allow-from.js"; import { resolveTelegramGroupRequireMention, resolveTelegramGroupToolPolicy, } from "./group-policy.js"; import { looksLikeTelegramTargetId, normalizeTelegramMessagingTarget } from "./normalize.js"; +import { installMaybePersistResolvedTelegramTargetTests } from "./target-writeback.test-shared.js"; import { isNumericTelegramChatId, normalizeTelegramChatId, @@ -14,24 +14,6 @@ import { stripTelegramInternalPrefixes, } from "./targets.js"; -const readConfigFileSnapshotForWrite = vi.fn(); -const writeConfigFile = vi.fn(); -const loadCronStore = vi.fn(); -const resolveCronStorePath = vi.fn(); -const saveCronStore = vi.fn(); - -vi.mock("openclaw/plugin-sdk/config-runtime", async (importOriginal) => { - const actual = await importOriginal(); - return { - ...actual, - readConfigFileSnapshotForWrite, - writeConfigFile, - loadCronStore, - resolveCronStorePath, - saveCronStore, - }; -}); - describe("stripTelegramInternalPrefixes", () => { it("strips telegram prefix", () => { expect(stripTelegramInternalPrefixes("telegram:123")).toBe("123"); @@ -235,165 +217,4 @@ describe("telegram target normalization", () => { }); }); -describe("maybePersistResolvedTelegramTarget", () => { - let maybePersistResolvedTelegramTarget: typeof import("./target-writeback.js").maybePersistResolvedTelegramTarget; - - beforeEach(async () => { - vi.resetModules(); - ({ maybePersistResolvedTelegramTarget } = await import("./target-writeback.js")); - readConfigFileSnapshotForWrite.mockReset(); - writeConfigFile.mockReset(); - loadCronStore.mockReset(); - resolveCronStorePath.mockReset(); - saveCronStore.mockReset(); - resolveCronStorePath.mockReturnValue("/tmp/cron/jobs.json"); - }); - - it("skips writeback when target is already numeric", async () => { - await maybePersistResolvedTelegramTarget({ - cfg: {} as OpenClawConfig, - rawTarget: "-100123", - resolvedChatId: "-100123", - }); - - expect(readConfigFileSnapshotForWrite).not.toHaveBeenCalled(); - expect(loadCronStore).not.toHaveBeenCalled(); - }); - - it("writes back matching config and cron targets", async () => { - readConfigFileSnapshotForWrite.mockResolvedValue({ - snapshot: { - config: { - channels: { - telegram: { - defaultTo: "t.me/mychannel", - accounts: { - alerts: { - defaultTo: "@mychannel", - }, - }, - }, - }, - }, - }, - writeOptions: { expectedConfigPath: "/tmp/openclaw.json" }, - }); - loadCronStore.mockResolvedValue({ - version: 1, - jobs: [ - { id: "a", delivery: { channel: "telegram", to: "https://t.me/mychannel" } }, - { id: "b", delivery: { channel: "slack", to: "C123" } }, - ], - }); - - await maybePersistResolvedTelegramTarget({ - cfg: { - cron: { store: "/tmp/cron/jobs.json" }, - } as OpenClawConfig, - rawTarget: "t.me/mychannel", - resolvedChatId: "-100123", - }); - - expect(writeConfigFile).toHaveBeenCalledTimes(1); - expect(writeConfigFile).toHaveBeenCalledWith( - expect.objectContaining({ - channels: { - telegram: { - defaultTo: "-100123", - accounts: { - alerts: { - defaultTo: "-100123", - }, - }, - }, - }, - }), - expect.objectContaining({ expectedConfigPath: "/tmp/openclaw.json" }), - ); - expect(saveCronStore).toHaveBeenCalledTimes(1); - expect(saveCronStore).toHaveBeenCalledWith( - "/tmp/cron/jobs.json", - expect.objectContaining({ - jobs: [ - { id: "a", delivery: { channel: "telegram", to: "-100123" } }, - { id: "b", delivery: { channel: "slack", to: "C123" } }, - ], - }), - ); - }); - - it("preserves topic suffix style in writeback target", async () => { - readConfigFileSnapshotForWrite.mockResolvedValue({ - snapshot: { - config: { - channels: { - telegram: { - defaultTo: "t.me/mychannel:topic:9", - }, - }, - }, - }, - writeOptions: {}, - }); - loadCronStore.mockResolvedValue({ version: 1, jobs: [] }); - - await maybePersistResolvedTelegramTarget({ - cfg: {} as OpenClawConfig, - rawTarget: "t.me/mychannel:topic:9", - resolvedChatId: "-100123", - }); - - expect(writeConfigFile).toHaveBeenCalledWith( - expect.objectContaining({ - channels: { - telegram: { - defaultTo: "-100123:topic:9", - }, - }, - }), - expect.any(Object), - ); - }); - - it("matches username targets case-insensitively", async () => { - readConfigFileSnapshotForWrite.mockResolvedValue({ - snapshot: { - config: { - channels: { - telegram: { - defaultTo: "https://t.me/mychannel", - }, - }, - }, - }, - writeOptions: {}, - }); - loadCronStore.mockResolvedValue({ - version: 1, - jobs: [{ id: "a", delivery: { channel: "telegram", to: "https://t.me/mychannel" } }], - }); - - await maybePersistResolvedTelegramTarget({ - cfg: {} as OpenClawConfig, - rawTarget: "@MyChannel", - resolvedChatId: "-100123", - }); - - expect(writeConfigFile).toHaveBeenCalledWith( - expect.objectContaining({ - channels: { - telegram: { - defaultTo: "-100123", - }, - }, - }), - expect.any(Object), - ); - expect(saveCronStore).toHaveBeenCalledWith( - "/tmp/cron/jobs.json", - expect.objectContaining({ - jobs: [{ id: "a", delivery: { channel: "telegram", to: "-100123" } }], - }), - ); - }); -}); +installMaybePersistResolvedTelegramTargetTests(); diff --git a/src/media/read-response-with-limit.test.ts b/src/media/read-response-with-limit.test.ts index bab822420c3..a654957c974 100644 --- a/src/media/read-response-with-limit.test.ts +++ b/src/media/read-response-with-limit.test.ts @@ -67,6 +67,23 @@ describe("readResponseWithLimit", () => { } }, 5_000); + it("uses a custom idle-timeout error when provided", async () => { + vi.useFakeTimers(); + try { + const body = makeStallingStream([new Uint8Array([1, 2])]); + const res = new Response(body); + const readPromise = readResponseWithLimit(res, 1024, { + chunkTimeoutMs: 50, + onIdleTimeout: ({ chunkTimeoutMs }) => new Error(`custom idle ${chunkTimeoutMs}`), + }); + const rejection = expect(readPromise).rejects.toThrow("custom idle 50"); + await vi.advanceTimersByTimeAsync(60); + await rejection; + } finally { + vi.useRealTimers(); + } + }, 5_000); + it("does not time out while chunks keep arriving", async () => { vi.useFakeTimers(); try { diff --git a/src/media/read-response-with-limit.ts b/src/media/read-response-with-limit.ts index 8edbdabb73a..d559a9caa11 100644 --- a/src/media/read-response-with-limit.ts +++ b/src/media/read-response-with-limit.ts @@ -1,6 +1,7 @@ async function readChunkWithIdleTimeout( reader: ReadableStreamDefaultReader, chunkTimeoutMs: number, + onIdleTimeout?: (params: { chunkTimeoutMs: number }) => Error, ): Promise>> { let timeoutId: ReturnType | undefined; let timedOut = false; @@ -17,7 +18,10 @@ async function readChunkWithIdleTimeout( timedOut = true; clear(); void reader.cancel().catch(() => undefined); - reject(new Error(`Media download stalled: no data received for ${chunkTimeoutMs}ms`)); + reject( + onIdleTimeout?.({ chunkTimeoutMs }) ?? + new Error(`Media download stalled: no data received for ${chunkTimeoutMs}ms`), + ); }, chunkTimeoutMs); void reader.read().then( @@ -48,6 +52,7 @@ async function readResponsePrefix( maxBytes: number, opts?: { chunkTimeoutMs?: number; + onIdleTimeout?: (params: { chunkTimeoutMs: number }) => Error; }, ): Promise { const chunkTimeoutMs = opts?.chunkTimeoutMs; @@ -72,7 +77,7 @@ async function readResponsePrefix( try { while (true) { const { done, value } = chunkTimeoutMs - ? await readChunkWithIdleTimeout(reader, chunkTimeoutMs) + ? await readChunkWithIdleTimeout(reader, chunkTimeoutMs, opts?.onIdleTimeout) : await reader.read(); if (done) { size = total; @@ -121,13 +126,17 @@ export async function readResponseWithLimit( opts?: { onOverflow?: (params: { size: number; maxBytes: number; res: Response }) => Error; chunkTimeoutMs?: number; + onIdleTimeout?: (params: { chunkTimeoutMs: number }) => Error; }, ): Promise { const onOverflow = opts?.onOverflow ?? ((params: { size: number; maxBytes: number }) => new Error(`Content too large: ${params.size} bytes (limit: ${params.maxBytes} bytes)`)); - const prefix = await readResponsePrefix(res, maxBytes, { chunkTimeoutMs: opts?.chunkTimeoutMs }); + const prefix = await readResponsePrefix(res, maxBytes, { + chunkTimeoutMs: opts?.chunkTimeoutMs, + onIdleTimeout: opts?.onIdleTimeout, + }); if (prefix.truncated) { throw onOverflow({ size: prefix.size, maxBytes, res }); } @@ -140,11 +149,15 @@ export async function readResponseTextSnippet( maxBytes?: number; maxChars?: number; chunkTimeoutMs?: number; + onIdleTimeout?: (params: { chunkTimeoutMs: number }) => Error; }, ): Promise { const maxBytes = opts?.maxBytes ?? 8 * 1024; const maxChars = opts?.maxChars ?? 200; - const prefix = await readResponsePrefix(res, maxBytes, { chunkTimeoutMs: opts?.chunkTimeoutMs }); + const prefix = await readResponsePrefix(res, maxBytes, { + chunkTimeoutMs: opts?.chunkTimeoutMs, + onIdleTimeout: opts?.onIdleTimeout, + }); if (prefix.buffer.length === 0) { return undefined; } diff --git a/src/plugin-sdk/media-runtime.ts b/src/plugin-sdk/media-runtime.ts index 07d954a4681..6b7c9dce408 100644 --- a/src/plugin-sdk/media-runtime.ts +++ b/src/plugin-sdk/media-runtime.ts @@ -14,6 +14,7 @@ export * from "../media/mime.js"; export * from "../media/outbound-attachment.js"; export * from "../media/png-encode.ts"; export * from "../media/qr-image.ts"; +export * from "../media/read-response-with-limit.js"; export * from "../media/store.js"; export * from "../media/temp-files.js"; export { resolveChannelMediaMaxBytes } from "../channels/plugins/media-limits.js";