diff --git a/extensions/feishu/src/bot.ts b/extensions/feishu/src/bot.ts index 2a284ac502d..878cdf076ea 100644 --- a/extensions/feishu/src/bot.ts +++ b/extensions/feishu/src/bot.ts @@ -1042,6 +1042,7 @@ export async function handleFeishuMessage(params: { sendFinalReply: () => false, waitForIdle: async () => {}, getQueuedCounts: () => ({ tool: 0, block: 0, final: 0 }), + getFailedCounts: () => ({ tool: 0, block: 0, final: 0 }), markComplete: () => {}, }; diff --git a/src/auto-reply/dispatch.test.ts b/src/auto-reply/dispatch.test.ts index 327a8b30692..4fcad0f603f 100644 --- a/src/auto-reply/dispatch.test.ts +++ b/src/auto-reply/dispatch.test.ts @@ -10,6 +10,7 @@ function createDispatcher(record: string[]): ReplyDispatcher { sendBlockReply: () => true, sendFinalReply: () => true, getQueuedCounts: () => ({ tool: 0, block: 0, final: 0 }), + getFailedCounts: () => ({ tool: 0, block: 0, final: 0 }), markComplete: () => { record.push("markComplete"); }, @@ -71,6 +72,7 @@ describe("withReplyDispatcher", () => { return true; }, getQueuedCounts: () => ({ tool: 0, block: 0, final: 0 }), + getFailedCounts: () => ({ tool: 0, block: 0, final: 0 }), markComplete: () => { order.push("markComplete"); }, diff --git a/src/auto-reply/reply/dispatch-acp-delivery.test.ts b/src/auto-reply/reply/dispatch-acp-delivery.test.ts index c584cd3853e..fd0008f5602 100644 --- a/src/auto-reply/reply/dispatch-acp-delivery.test.ts +++ b/src/auto-reply/reply/dispatch-acp-delivery.test.ts @@ -22,6 +22,7 @@ function createDispatcher(): ReplyDispatcher { sendFinalReply: vi.fn(() => true), waitForIdle: vi.fn(async () => {}), getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), + getFailedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), markComplete: vi.fn(), }; } @@ -57,6 +58,7 @@ describe("createAcpDispatchDeliveryCoordinator", () => { }); await coordinator.deliver("final", { text: "hello" }, { skipTts: true }); + await coordinator.settleVisibleText(); expect(ttsMocks.maybeApplyTtsToPayload).not.toHaveBeenCalled(); expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "hello" }); @@ -66,13 +68,100 @@ describe("createAcpDispatchDeliveryCoordinator", () => { const coordinator = createCoordinator(); expect(coordinator.hasDeliveredFinalReply()).toBe(false); + expect(coordinator.hasDeliveredVisibleText()).toBe(false); + expect(coordinator.hasFailedVisibleTextDelivery()).toBe(false); await coordinator.deliver("final", { text: "hello" }, { skipTts: true }); + await coordinator.settleVisibleText(); expect(coordinator.hasDeliveredFinalReply()).toBe(true); + expect(coordinator.hasDeliveredVisibleText()).toBe(true); + expect(coordinator.hasFailedVisibleTextDelivery()).toBe(false); expect(coordinator.getRoutedCounts().final).toBe(0); }); + it("tracks visible direct block text for dispatcher-backed delivery", async () => { + const coordinator = createAcpDispatchDeliveryCoordinator({ + cfg: createAcpTestConfig(), + ctx: buildTestCtx({ + Provider: "telegram", + Surface: "telegram", + SessionKey: "agent:codex-acp:session-1", + }), + dispatcher: createDispatcher(), + inboundAudio: false, + shouldRouteToOriginating: false, + }); + + await coordinator.deliver("block", { text: "hello" }, { skipTts: true }); + await coordinator.settleVisibleText(); + + expect(coordinator.hasDeliveredFinalReply()).toBe(false); + expect(coordinator.hasDeliveredVisibleText()).toBe(true); + expect(coordinator.hasFailedVisibleTextDelivery()).toBe(false); + expect(coordinator.getRoutedCounts().block).toBe(0); + }); + + it("prefers provider over surface when detecting direct telegram visibility", async () => { + const coordinator = createAcpDispatchDeliveryCoordinator({ + cfg: createAcpTestConfig(), + ctx: buildTestCtx({ + Provider: "telegram", + Surface: "webchat", + SessionKey: "agent:codex-acp:session-1", + }), + dispatcher: createDispatcher(), + inboundAudio: false, + shouldRouteToOriginating: false, + }); + + await coordinator.deliver("block", { text: "hello" }, { skipTts: true }); + await coordinator.settleVisibleText(); + + expect(coordinator.hasDeliveredVisibleText()).toBe(true); + expect(coordinator.hasFailedVisibleTextDelivery()).toBe(false); + }); + + it("does not treat non-telegram direct block text as visible", async () => { + const coordinator = createCoordinator(); + + await coordinator.deliver("block", { text: "hello" }, { skipTts: true }); + await coordinator.settleVisibleText(); + + expect(coordinator.hasDeliveredFinalReply()).toBe(false); + expect(coordinator.hasDeliveredVisibleText()).toBe(false); + expect(coordinator.hasFailedVisibleTextDelivery()).toBe(false); + expect(coordinator.getRoutedCounts().block).toBe(0); + }); + + it("tracks failed visible telegram block delivery separately", async () => { + const dispatcher: ReplyDispatcher = { + sendToolResult: vi.fn(() => true), + sendBlockReply: vi.fn(() => false), + sendFinalReply: vi.fn(() => true), + waitForIdle: vi.fn(async () => {}), + getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), + getFailedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), + markComplete: vi.fn(), + }; + const coordinator = createAcpDispatchDeliveryCoordinator({ + cfg: createAcpTestConfig(), + ctx: buildTestCtx({ + Provider: "telegram", + Surface: "telegram", + SessionKey: "agent:codex-acp:session-1", + }), + dispatcher, + inboundAudio: false, + shouldRouteToOriginating: false, + }); + + await coordinator.deliver("block", { text: "hello" }, { skipTts: true }); + + expect(coordinator.hasDeliveredVisibleText()).toBe(false); + expect(coordinator.hasFailedVisibleTextDelivery()).toBe(true); + }); + it("starts reply lifecycle only once when called directly and through deliver", async () => { const onReplyStart = vi.fn(async () => {}); const coordinator = createCoordinator(onReplyStart); diff --git a/src/auto-reply/reply/dispatch-acp-delivery.ts b/src/auto-reply/reply/dispatch-acp-delivery.ts index 24310e80f67..fe74a5093ba 100644 --- a/src/auto-reply/reply/dispatch-acp-delivery.ts +++ b/src/auto-reply/reply/dispatch-acp-delivery.ts @@ -23,11 +23,34 @@ type ToolMessageHandle = { messageId: string; }; +function normalizeDeliveryChannel(value: string | undefined): string | undefined { + const normalized = value?.trim().toLowerCase(); + return normalized || undefined; +} + +function shouldTreatDeliveredTextAsVisible(params: { + channel: string | undefined; + kind: ReplyDispatchKind; + text: string | undefined; +}): boolean { + if (!params.text?.trim()) { + return false; + } + if (params.kind === "final") { + return true; + } + return normalizeDeliveryChannel(params.channel) === "telegram"; +} + type AcpDispatchDeliveryState = { startedReplyLifecycle: boolean; accumulatedBlockText: string; blockCount: number; deliveredFinalReply: boolean; + deliveredVisibleText: boolean; + failedVisibleTextDelivery: boolean; + queuedDirectVisibleTextDeliveries: number; + settledDirectVisibleText: boolean; routedCounts: Record; toolMessageByCallId: Map; }; @@ -41,7 +64,10 @@ export type AcpDispatchDeliveryCoordinator = { ) => Promise; getBlockCount: () => number; getAccumulatedBlockText: () => string; + settleVisibleText: () => Promise; hasDeliveredFinalReply: () => boolean; + hasDeliveredVisibleText: () => boolean; + hasFailedVisibleTextDelivery: () => boolean; getRoutedCounts: () => Record; applyRoutedCounts: (counts: Record) => void; }; @@ -63,6 +89,10 @@ export function createAcpDispatchDeliveryCoordinator(params: { accumulatedBlockText: "", blockCount: 0, deliveredFinalReply: false, + deliveredVisibleText: false, + failedVisibleTextDelivery: false, + queuedDirectVisibleTextDeliveries: 0, + settledDirectVisibleText: false, routedCounts: { tool: 0, block: 0, @@ -70,6 +100,24 @@ export function createAcpDispatchDeliveryCoordinator(params: { }, toolMessageByCallId: new Map(), }; + const directChannel = normalizeDeliveryChannel(params.ctx.Provider ?? params.ctx.Surface); + const routedChannel = normalizeDeliveryChannel(params.originatingChannel); + + const settleDirectVisibleText = async () => { + if (state.settledDirectVisibleText || state.queuedDirectVisibleTextDeliveries === 0) { + return; + } + state.settledDirectVisibleText = true; + await params.dispatcher.waitForIdle(); + const failedCounts = params.dispatcher.getFailedCounts(); + const failedVisibleCount = failedCounts.block + failedCounts.final; + if (failedVisibleCount > 0) { + state.failedVisibleTextDelivery = true; + } + if (state.queuedDirectVisibleTextDeliveries > failedVisibleCount) { + state.deliveredVisibleText = true; + } + }; const startReplyLifecycleOnce = async () => { if (state.startedReplyLifecycle) { @@ -156,6 +204,11 @@ export function createAcpDispatchDeliveryCoordinator(params: { } } + const tracksVisibleText = shouldTreatDeliveredTextAsVisible({ + channel: routedChannel, + kind, + text: ttsPayload.text, + }); const result = await routeReply({ payload: ttsPayload, channel: params.originatingChannel, @@ -166,6 +219,9 @@ export function createAcpDispatchDeliveryCoordinator(params: { cfg: params.cfg, }); if (!result.ok) { + if (tracksVisibleText) { + state.failedVisibleTextDelivery = true; + } logVerbose( `dispatch-acp: route-reply (acp/${kind}) failed: ${result.error ?? "unknown error"}`, ); @@ -183,10 +239,18 @@ export function createAcpDispatchDeliveryCoordinator(params: { if (kind === "final") { state.deliveredFinalReply = true; } + if (tracksVisibleText) { + state.deliveredVisibleText = true; + } state.routedCounts[kind] += 1; return true; } + const tracksVisibleText = shouldTreatDeliveredTextAsVisible({ + channel: directChannel, + kind, + text: ttsPayload.text, + }); const delivered = kind === "tool" ? params.dispatcher.sendToolResult(ttsPayload) @@ -196,6 +260,12 @@ export function createAcpDispatchDeliveryCoordinator(params: { if (kind === "final" && delivered) { state.deliveredFinalReply = true; } + if (delivered && tracksVisibleText) { + state.queuedDirectVisibleTextDeliveries += 1; + state.settledDirectVisibleText = false; + } else if (!delivered && tracksVisibleText) { + state.failedVisibleTextDelivery = true; + } return delivered; }; @@ -204,7 +274,10 @@ export function createAcpDispatchDeliveryCoordinator(params: { deliver, getBlockCount: () => state.blockCount, getAccumulatedBlockText: () => state.accumulatedBlockText, + settleVisibleText: settleDirectVisibleText, hasDeliveredFinalReply: () => state.deliveredFinalReply, + hasDeliveredVisibleText: () => state.deliveredVisibleText, + hasFailedVisibleTextDelivery: () => state.failedVisibleTextDelivery, getRoutedCounts: () => ({ ...state.routedCounts }), applyRoutedCounts: (counts) => { counts.tool += state.routedCounts.tool; diff --git a/src/auto-reply/reply/dispatch-acp.test.ts b/src/auto-reply/reply/dispatch-acp.test.ts index 6a18822edff..d24b21e9046 100644 --- a/src/auto-reply/reply/dispatch-acp.test.ts +++ b/src/auto-reply/reply/dispatch-acp.test.ts @@ -52,44 +52,9 @@ const bindingServiceMocks = vi.hoisted(() => ({ listBySession: vi.fn<(sessionKey: string) => SessionBindingRecord[]>(() => []), })); -vi.mock("../../acp/control-plane/manager.js", () => ({ - getAcpSessionManager: () => managerMocks, -})); - -vi.mock("../../acp/policy.js", () => ({ - resolveAcpDispatchPolicyError: (cfg: OpenClawConfig) => - policyMocks.resolveAcpDispatchPolicyError(cfg), - resolveAcpAgentPolicyError: (cfg: OpenClawConfig, agent: string) => - policyMocks.resolveAcpAgentPolicyError(cfg, agent), -})); - -vi.mock("./route-reply.js", () => ({ - routeReply: (params: unknown) => routeMocks.routeReply(params), -})); - -vi.mock("../../infra/outbound/message-action-runner.js", () => ({ - runMessageAction: (params: unknown) => messageActionMocks.runMessageAction(params), -})); - -vi.mock("../../tts/tts.js", () => ({ - maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params), - resolveTtsConfig: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg), -})); - -vi.mock("../../acp/runtime/session-meta.js", () => ({ - readAcpSessionEntry: (params: { sessionKey: string; cfg?: OpenClawConfig }) => - sessionMetaMocks.readAcpSessionEntry(params), -})); - -vi.mock("../../infra/outbound/session-binding-service.js", () => ({ - getSessionBindingService: () => ({ - listBySession: (sessionKey: string) => bindingServiceMocks.listBySession(sessionKey), - }), -})); - -const { tryDispatchAcpReply } = await import("./dispatch-acp.js"); const sessionKey = "agent:codex-acp:session-1"; type MockTtsReply = Awaited>; +let tryDispatchAcpReply: typeof import("./dispatch-acp.js").tryDispatchAcpReply; function createDispatcher(): { dispatcher: ReplyDispatcher; @@ -102,6 +67,7 @@ function createDispatcher(): { sendFinalReply: vi.fn(() => true), waitForIdle: vi.fn(async () => {}), getQueuedCounts: vi.fn(() => counts), + getFailedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), markComplete: vi.fn(), }; return { dispatcher, counts }; @@ -245,7 +211,38 @@ function expectSecondRoutedPayload(payload: Partial) { } describe("tryDispatchAcpReply", () => { - beforeEach(() => { + beforeEach(async () => { + vi.resetModules(); + vi.doMock("../../acp/control-plane/manager.js", () => ({ + getAcpSessionManager: () => managerMocks, + })); + vi.doMock("../../acp/policy.js", () => ({ + resolveAcpDispatchPolicyError: (cfg: OpenClawConfig) => + policyMocks.resolveAcpDispatchPolicyError(cfg), + resolveAcpAgentPolicyError: (cfg: OpenClawConfig, agent: string) => + policyMocks.resolveAcpAgentPolicyError(cfg, agent), + })); + vi.doMock("./route-reply.js", () => ({ + routeReply: (params: unknown) => routeMocks.routeReply(params), + })); + vi.doMock("../../infra/outbound/message-action-runner.js", () => ({ + runMessageAction: (params: unknown) => messageActionMocks.runMessageAction(params), + })); + vi.doMock("../../tts/tts.js", () => ({ + maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params), + resolveTtsConfig: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg), + })); + vi.doMock("../../acp/runtime/session-meta.js", () => ({ + readAcpSessionEntry: (params: { sessionKey: string; cfg?: OpenClawConfig }) => + sessionMetaMocks.readAcpSessionEntry(params), + })); + vi.doMock("../../infra/outbound/session-binding-service.js", () => ({ + getSessionBindingService: () => ({ + listBySession: (targetSessionKey: string) => + bindingServiceMocks.listBySession(targetSessionKey), + }), + })); + ({ tryDispatchAcpReply } = await import("./dispatch-acp.js")); managerMocks.resolveSession.mockReset(); managerMocks.runTurn.mockReset(); managerMocks.getObservabilitySnapshot.mockReset(); @@ -462,21 +459,148 @@ describe("tryDispatchAcpReply", () => { expect(managerMocks.runTurn).not.toHaveBeenCalled(); expect(dispatcher.sendFinalReply).toHaveBeenCalledWith( expect.objectContaining({ - text: expect.stringContaining("ACP_DISPATCH_DISABLED"), + isError: true, + text: expect.stringContaining("ACP dispatch is disabled by policy."), }), ); }); - it("delivers final fallback text even when routed block text already existed", async () => { + it("does not deliver final fallback text when routed block text was already visible", async () => { setReadyAcpResolution(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType); const { result } = await runRoutedAcpTextTurn("CODEX_OK"); expect(result?.counts.block).toBe(1); - expect(result?.counts.final).toBe(1); - expect(routeMocks.routeReply).toHaveBeenCalledTimes(2); - expectSecondRoutedPayload({ text: "CODEX_OK" }); + expect(result?.counts.final).toBe(0); + expect(routeMocks.routeReply).toHaveBeenCalledTimes(1); + }); + + it("does not deliver final fallback text when direct block text was already visible", async () => { + setReadyAcpResolution(); + ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); + queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType); + mockVisibleTextTurn("CODEX_OK"); + + const { dispatcher, counts } = createDispatcher(); + const result = await runDispatch({ + bodyForAgent: "reply", + dispatcher, + ctxOverrides: { + Provider: "telegram", + Surface: "telegram", + }, + }); + + expect(result?.counts.block).toBe(0); + expect(result?.counts.final).toBe(0); + expect(counts.block).toBe(0); + expect(counts.final).toBe(0); + expect(dispatcher.sendBlockReply).toHaveBeenCalledWith( + expect.objectContaining({ text: "CODEX_OK" }), + ); + expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); + }); + + it("treats visible telegram ACP block delivery as a successful final response", async () => { + setReadyAcpResolution(); + ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); + queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType); + mockVisibleTextTurn("CODEX_OK"); + + const { dispatcher } = createDispatcher(); + const result = await runDispatch({ + bodyForAgent: "reply", + dispatcher, + ctxOverrides: { + Provider: "telegram", + Surface: "telegram", + }, + }); + + expect(result?.queuedFinal).toBe(true); + expect(dispatcher.sendBlockReply).toHaveBeenCalledWith( + expect.objectContaining({ text: "CODEX_OK" }), + ); + expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); + }); + + it("preserves final fallback when direct block text is filtered by non-telegram channels", async () => { + setReadyAcpResolution(); + ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); + queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType); + mockVisibleTextTurn("CODEX_OK"); + + const { dispatcher, counts } = createDispatcher(); + const result = await runDispatch({ + bodyForAgent: "reply", + dispatcher, + }); + + expect(result?.counts.block).toBe(0); + expect(result?.counts.final).toBe(0); + expect(counts.block).toBe(0); + expect(counts.final).toBe(0); + expect(dispatcher.sendBlockReply).toHaveBeenCalledWith( + expect.objectContaining({ text: "CODEX_OK" }), + ); + expect(dispatcher.sendFinalReply).toHaveBeenCalledWith( + expect.objectContaining({ text: "CODEX_OK" }), + ); + }); + + it("falls back to final text when a later telegram ACP block delivery fails", async () => { + setReadyAcpResolution(); + ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); + queueTtsReplies( + { text: "First chunk. " }, + { text: "Second chunk." }, + {} as ReturnType, + ); + const cfg = createAcpTestConfig({ + acp: { + enabled: true, + stream: { + deliveryMode: "live", + coalesceIdleMs: 0, + maxChunkChars: 64, + }, + }, + }); + managerMocks.runTurn.mockImplementation( + async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { + await onEvent({ type: "text_delta", text: "First chunk. ", tag: "agent_message_chunk" }); + await onEvent({ type: "text_delta", text: "Second chunk.", tag: "agent_message_chunk" }); + await onEvent({ type: "done" }); + }, + ); + + const { dispatcher } = createDispatcher(); + (dispatcher.sendBlockReply as ReturnType) + .mockReturnValueOnce(true) + .mockReturnValueOnce(false); + const result = await runDispatch({ + bodyForAgent: "reply", + cfg, + dispatcher, + ctxOverrides: { + Provider: "telegram", + Surface: "telegram", + }, + }); + + expect(dispatcher.sendBlockReply).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ text: "First chunk. " }), + ); + expect(dispatcher.sendBlockReply).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ text: "Second chunk." }), + ); + expect(dispatcher.sendFinalReply).toHaveBeenCalledWith( + expect.objectContaining({ text: "First chunk. \nSecond chunk." }), + ); + expect(result?.queuedFinal).toBe(true); }); it("does not add text fallback when final TTS already delivered audio", async () => { diff --git a/src/auto-reply/reply/dispatch-acp.ts b/src/auto-reply/reply/dispatch-acp.ts index 5f0c168e305..2662e465224 100644 --- a/src/auto-reply/reply/dispatch-acp.ts +++ b/src/auto-reply/reply/dispatch-acp.ts @@ -196,7 +196,9 @@ async function finalizeAcpTurnOutput(params: { ttsChannel?: string; shouldEmitResolvedIdentityNotice: boolean; }): Promise { - let queuedFinal = false; + await params.delivery.settleVisibleText(); + let queuedFinal = + params.delivery.hasDeliveredVisibleText() && !params.delivery.hasFailedVisibleTextDelivery(); const ttsMode = resolveTtsConfig(params.cfg).mode ?? "final"; const accumulatedBlockText = params.delivery.getAccumulatedBlockText(); const hasAccumulatedBlockText = accumulatedBlockText.trim().length > 0; @@ -233,7 +235,8 @@ async function finalizeAcpTurnOutput(params: { ttsMode !== "all" && hasAccumulatedBlockText && !finalMediaDelivered && - !params.delivery.hasDeliveredFinalReply(); + !params.delivery.hasDeliveredFinalReply() && + (!params.delivery.hasDeliveredVisibleText() || params.delivery.hasFailedVisibleTextDelivery()); if (shouldDeliverTextFallback) { const delivered = await params.delivery.deliver( "final", diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index 025d0f3ff03..db7a5c156fa 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -238,6 +238,7 @@ function createDispatcher(): ReplyDispatcher { sendFinalReply: vi.fn(() => true), waitForIdle: vi.fn(async () => {}), getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), + getFailedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), markComplete: vi.fn(), }; } @@ -1034,7 +1035,9 @@ describe("dispatchReplyFromConfig", () => { const streamedText = blockCalls.map((call) => (call[0] as ReplyPayload).text ?? "").join(""); expect(streamedText).toContain("hello"); expect(streamedText).toContain("world"); - expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); + expect(dispatcher.sendFinalReply).toHaveBeenCalledWith( + expect.objectContaining({ text: "hello world" }), + ); }); it("aborts ACP dispatch promptly when the caller abort signal fires", async () => { @@ -1183,12 +1186,12 @@ describe("dispatchReplyFromConfig", () => { await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver: vi.fn() }); const finalCalls = (dispatcher.sendFinalReply as ReturnType).mock.calls; - expect(finalCalls.length).toBe(1); - const finalPayload = finalCalls[0]?.[0] as ReplyPayload | undefined; - expect(finalPayload?.text).toContain("Session ids resolved"); - expect(finalPayload?.text).toContain("agent session id: inner-123"); - expect(finalPayload?.text).toContain("acpx session id: acpx-123"); - expect(finalPayload?.text).toContain("codex resume inner-123"); + expect(finalCalls.length).toBe(2); + const noticePayload = finalCalls[1]?.[0] as ReplyPayload | undefined; + expect(noticePayload?.text).toContain("Session ids resolved"); + expect(noticePayload?.text).toContain("agent session id: inner-123"); + expect(noticePayload?.text).toContain("acpx session id: acpx-123"); + expect(noticePayload?.text).toContain("codex resume inner-123"); }); it("posts resolved-session-id notice when ACP session is bound even without MessageThreadId", async () => { @@ -1266,11 +1269,11 @@ describe("dispatchReplyFromConfig", () => { await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver: vi.fn() }); const finalCalls = (dispatcher.sendFinalReply as ReturnType).mock.calls; - expect(finalCalls.length).toBe(1); - const finalPayload = finalCalls[0]?.[0] as ReplyPayload | undefined; - expect(finalPayload?.text).toContain("Session ids resolved"); - expect(finalPayload?.text).toContain("agent session id: inner-123"); - expect(finalPayload?.text).toContain("acpx session id: acpx-123"); + expect(finalCalls.length).toBe(2); + const noticePayload = finalCalls[1]?.[0] as ReplyPayload | undefined; + expect(noticePayload?.text).toContain("Session ids resolved"); + expect(noticePayload?.text).toContain("agent session id: inner-123"); + expect(noticePayload?.text).toContain("acpx session id: acpx-123"); }); it("honors send-policy deny before ACP runtime dispatch", async () => { @@ -1665,7 +1668,9 @@ describe("dispatchReplyFromConfig", () => { .map((call) => ((call[0] as ReplyPayload).text ?? "").trim()) .filter(Boolean); expect(blockTexts).toEqual(["What do you want to work on?"]); - expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); + expect(dispatcher.sendFinalReply).toHaveBeenCalledWith( + expect.objectContaining({ text: "What do you want to work on?" }), + ); }); it("generates final-mode TTS audio after ACP block streaming completes", async () => { diff --git a/src/auto-reply/reply/reply-dispatcher.ts b/src/auto-reply/reply/reply-dispatcher.ts index d212245ef59..483bdcc21dd 100644 --- a/src/auto-reply/reply/reply-dispatcher.ts +++ b/src/auto-reply/reply/reply-dispatcher.ts @@ -80,6 +80,7 @@ export type ReplyDispatcher = { sendFinalReply: (payload: ReplyPayload) => boolean; waitForIdle: () => Promise; getQueuedCounts: () => Record; + getFailedCounts: () => Record; markComplete: () => void; }; @@ -125,6 +126,11 @@ export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDis block: 0, final: 0, }; + const failedCounts: Record = { + tool: 0, + block: 0, + final: 0, + }; // Register this dispatcher globally for gateway restart coordination. const { unregister } = registerDispatcher({ @@ -167,6 +173,7 @@ export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDis await options.deliver(normalized, { kind }); }) .catch((err) => { + failedCounts[kind] += 1; options.onError?.(err, { kind }); }) .finally(() => { @@ -213,6 +220,7 @@ export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDis sendFinalReply: (payload) => enqueue("final", payload), waitForIdle: () => sendChain, getQueuedCounts: () => ({ ...queuedCounts }), + getFailedCounts: () => ({ ...failedCounts }), markComplete, }; }