fix: avoid duplicate ACP Telegram finals (#55173)

* fix: avoid duplicate final ACP text on telegram

* fix: keep ACP final fallback for non-telegram blocks

* fix: count telegram ACP block replies as success

* fix: recover ACP final fallback after block failures

* fix: settle telegram ACP block delivery before fallback

* test: isolate ACP dispatch mocks under shared workers

* fix: prefer telegram provider for ACP visibility
This commit is contained in:
Ayaan Zaidi
2026-03-26 20:37:21 +05:30
committed by GitHub
parent 2ed11a375a
commit 4b1c37a152
8 changed files with 362 additions and 57 deletions

View File

@@ -1042,6 +1042,7 @@ export async function handleFeishuMessage(params: {
sendFinalReply: () => false, sendFinalReply: () => false,
waitForIdle: async () => {}, waitForIdle: async () => {},
getQueuedCounts: () => ({ tool: 0, block: 0, final: 0 }), getQueuedCounts: () => ({ tool: 0, block: 0, final: 0 }),
getFailedCounts: () => ({ tool: 0, block: 0, final: 0 }),
markComplete: () => {}, markComplete: () => {},
}; };

View File

@@ -10,6 +10,7 @@ function createDispatcher(record: string[]): ReplyDispatcher {
sendBlockReply: () => true, sendBlockReply: () => true,
sendFinalReply: () => true, sendFinalReply: () => true,
getQueuedCounts: () => ({ tool: 0, block: 0, final: 0 }), getQueuedCounts: () => ({ tool: 0, block: 0, final: 0 }),
getFailedCounts: () => ({ tool: 0, block: 0, final: 0 }),
markComplete: () => { markComplete: () => {
record.push("markComplete"); record.push("markComplete");
}, },
@@ -71,6 +72,7 @@ describe("withReplyDispatcher", () => {
return true; return true;
}, },
getQueuedCounts: () => ({ tool: 0, block: 0, final: 0 }), getQueuedCounts: () => ({ tool: 0, block: 0, final: 0 }),
getFailedCounts: () => ({ tool: 0, block: 0, final: 0 }),
markComplete: () => { markComplete: () => {
order.push("markComplete"); order.push("markComplete");
}, },

View File

@@ -22,6 +22,7 @@ function createDispatcher(): ReplyDispatcher {
sendFinalReply: vi.fn(() => true), sendFinalReply: vi.fn(() => true),
waitForIdle: vi.fn(async () => {}), waitForIdle: vi.fn(async () => {}),
getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
getFailedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
markComplete: vi.fn(), markComplete: vi.fn(),
}; };
} }
@@ -57,6 +58,7 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
}); });
await coordinator.deliver("final", { text: "hello" }, { skipTts: true }); await coordinator.deliver("final", { text: "hello" }, { skipTts: true });
await coordinator.settleVisibleText();
expect(ttsMocks.maybeApplyTtsToPayload).not.toHaveBeenCalled(); expect(ttsMocks.maybeApplyTtsToPayload).not.toHaveBeenCalled();
expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "hello" }); expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "hello" });
@@ -66,13 +68,100 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
const coordinator = createCoordinator(); const coordinator = createCoordinator();
expect(coordinator.hasDeliveredFinalReply()).toBe(false); expect(coordinator.hasDeliveredFinalReply()).toBe(false);
expect(coordinator.hasDeliveredVisibleText()).toBe(false);
expect(coordinator.hasFailedVisibleTextDelivery()).toBe(false);
await coordinator.deliver("final", { text: "hello" }, { skipTts: true }); await coordinator.deliver("final", { text: "hello" }, { skipTts: true });
await coordinator.settleVisibleText();
expect(coordinator.hasDeliveredFinalReply()).toBe(true); expect(coordinator.hasDeliveredFinalReply()).toBe(true);
expect(coordinator.hasDeliveredVisibleText()).toBe(true);
expect(coordinator.hasFailedVisibleTextDelivery()).toBe(false);
expect(coordinator.getRoutedCounts().final).toBe(0); expect(coordinator.getRoutedCounts().final).toBe(0);
}); });
it("tracks visible direct block text for dispatcher-backed delivery", async () => {
const coordinator = createAcpDispatchDeliveryCoordinator({
cfg: createAcpTestConfig(),
ctx: buildTestCtx({
Provider: "telegram",
Surface: "telegram",
SessionKey: "agent:codex-acp:session-1",
}),
dispatcher: createDispatcher(),
inboundAudio: false,
shouldRouteToOriginating: false,
});
await coordinator.deliver("block", { text: "hello" }, { skipTts: true });
await coordinator.settleVisibleText();
expect(coordinator.hasDeliveredFinalReply()).toBe(false);
expect(coordinator.hasDeliveredVisibleText()).toBe(true);
expect(coordinator.hasFailedVisibleTextDelivery()).toBe(false);
expect(coordinator.getRoutedCounts().block).toBe(0);
});
it("prefers provider over surface when detecting direct telegram visibility", async () => {
const coordinator = createAcpDispatchDeliveryCoordinator({
cfg: createAcpTestConfig(),
ctx: buildTestCtx({
Provider: "telegram",
Surface: "webchat",
SessionKey: "agent:codex-acp:session-1",
}),
dispatcher: createDispatcher(),
inboundAudio: false,
shouldRouteToOriginating: false,
});
await coordinator.deliver("block", { text: "hello" }, { skipTts: true });
await coordinator.settleVisibleText();
expect(coordinator.hasDeliveredVisibleText()).toBe(true);
expect(coordinator.hasFailedVisibleTextDelivery()).toBe(false);
});
it("does not treat non-telegram direct block text as visible", async () => {
const coordinator = createCoordinator();
await coordinator.deliver("block", { text: "hello" }, { skipTts: true });
await coordinator.settleVisibleText();
expect(coordinator.hasDeliveredFinalReply()).toBe(false);
expect(coordinator.hasDeliveredVisibleText()).toBe(false);
expect(coordinator.hasFailedVisibleTextDelivery()).toBe(false);
expect(coordinator.getRoutedCounts().block).toBe(0);
});
it("tracks failed visible telegram block delivery separately", async () => {
const dispatcher: ReplyDispatcher = {
sendToolResult: vi.fn(() => true),
sendBlockReply: vi.fn(() => false),
sendFinalReply: vi.fn(() => true),
waitForIdle: vi.fn(async () => {}),
getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
getFailedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
markComplete: vi.fn(),
};
const coordinator = createAcpDispatchDeliveryCoordinator({
cfg: createAcpTestConfig(),
ctx: buildTestCtx({
Provider: "telegram",
Surface: "telegram",
SessionKey: "agent:codex-acp:session-1",
}),
dispatcher,
inboundAudio: false,
shouldRouteToOriginating: false,
});
await coordinator.deliver("block", { text: "hello" }, { skipTts: true });
expect(coordinator.hasDeliveredVisibleText()).toBe(false);
expect(coordinator.hasFailedVisibleTextDelivery()).toBe(true);
});
it("starts reply lifecycle only once when called directly and through deliver", async () => { it("starts reply lifecycle only once when called directly and through deliver", async () => {
const onReplyStart = vi.fn(async () => {}); const onReplyStart = vi.fn(async () => {});
const coordinator = createCoordinator(onReplyStart); const coordinator = createCoordinator(onReplyStart);

View File

@@ -23,11 +23,34 @@ type ToolMessageHandle = {
messageId: string; messageId: string;
}; };
function normalizeDeliveryChannel(value: string | undefined): string | undefined {
const normalized = value?.trim().toLowerCase();
return normalized || undefined;
}
function shouldTreatDeliveredTextAsVisible(params: {
channel: string | undefined;
kind: ReplyDispatchKind;
text: string | undefined;
}): boolean {
if (!params.text?.trim()) {
return false;
}
if (params.kind === "final") {
return true;
}
return normalizeDeliveryChannel(params.channel) === "telegram";
}
type AcpDispatchDeliveryState = { type AcpDispatchDeliveryState = {
startedReplyLifecycle: boolean; startedReplyLifecycle: boolean;
accumulatedBlockText: string; accumulatedBlockText: string;
blockCount: number; blockCount: number;
deliveredFinalReply: boolean; deliveredFinalReply: boolean;
deliveredVisibleText: boolean;
failedVisibleTextDelivery: boolean;
queuedDirectVisibleTextDeliveries: number;
settledDirectVisibleText: boolean;
routedCounts: Record<ReplyDispatchKind, number>; routedCounts: Record<ReplyDispatchKind, number>;
toolMessageByCallId: Map<string, ToolMessageHandle>; toolMessageByCallId: Map<string, ToolMessageHandle>;
}; };
@@ -41,7 +64,10 @@ export type AcpDispatchDeliveryCoordinator = {
) => Promise<boolean>; ) => Promise<boolean>;
getBlockCount: () => number; getBlockCount: () => number;
getAccumulatedBlockText: () => string; getAccumulatedBlockText: () => string;
settleVisibleText: () => Promise<void>;
hasDeliveredFinalReply: () => boolean; hasDeliveredFinalReply: () => boolean;
hasDeliveredVisibleText: () => boolean;
hasFailedVisibleTextDelivery: () => boolean;
getRoutedCounts: () => Record<ReplyDispatchKind, number>; getRoutedCounts: () => Record<ReplyDispatchKind, number>;
applyRoutedCounts: (counts: Record<ReplyDispatchKind, number>) => void; applyRoutedCounts: (counts: Record<ReplyDispatchKind, number>) => void;
}; };
@@ -63,6 +89,10 @@ export function createAcpDispatchDeliveryCoordinator(params: {
accumulatedBlockText: "", accumulatedBlockText: "",
blockCount: 0, blockCount: 0,
deliveredFinalReply: false, deliveredFinalReply: false,
deliveredVisibleText: false,
failedVisibleTextDelivery: false,
queuedDirectVisibleTextDeliveries: 0,
settledDirectVisibleText: false,
routedCounts: { routedCounts: {
tool: 0, tool: 0,
block: 0, block: 0,
@@ -70,6 +100,24 @@ export function createAcpDispatchDeliveryCoordinator(params: {
}, },
toolMessageByCallId: new Map(), toolMessageByCallId: new Map(),
}; };
const directChannel = normalizeDeliveryChannel(params.ctx.Provider ?? params.ctx.Surface);
const routedChannel = normalizeDeliveryChannel(params.originatingChannel);
const settleDirectVisibleText = async () => {
if (state.settledDirectVisibleText || state.queuedDirectVisibleTextDeliveries === 0) {
return;
}
state.settledDirectVisibleText = true;
await params.dispatcher.waitForIdle();
const failedCounts = params.dispatcher.getFailedCounts();
const failedVisibleCount = failedCounts.block + failedCounts.final;
if (failedVisibleCount > 0) {
state.failedVisibleTextDelivery = true;
}
if (state.queuedDirectVisibleTextDeliveries > failedVisibleCount) {
state.deliveredVisibleText = true;
}
};
const startReplyLifecycleOnce = async () => { const startReplyLifecycleOnce = async () => {
if (state.startedReplyLifecycle) { if (state.startedReplyLifecycle) {
@@ -156,6 +204,11 @@ export function createAcpDispatchDeliveryCoordinator(params: {
} }
} }
const tracksVisibleText = shouldTreatDeliveredTextAsVisible({
channel: routedChannel,
kind,
text: ttsPayload.text,
});
const result = await routeReply({ const result = await routeReply({
payload: ttsPayload, payload: ttsPayload,
channel: params.originatingChannel, channel: params.originatingChannel,
@@ -166,6 +219,9 @@ export function createAcpDispatchDeliveryCoordinator(params: {
cfg: params.cfg, cfg: params.cfg,
}); });
if (!result.ok) { if (!result.ok) {
if (tracksVisibleText) {
state.failedVisibleTextDelivery = true;
}
logVerbose( logVerbose(
`dispatch-acp: route-reply (acp/${kind}) failed: ${result.error ?? "unknown error"}`, `dispatch-acp: route-reply (acp/${kind}) failed: ${result.error ?? "unknown error"}`,
); );
@@ -183,10 +239,18 @@ export function createAcpDispatchDeliveryCoordinator(params: {
if (kind === "final") { if (kind === "final") {
state.deliveredFinalReply = true; state.deliveredFinalReply = true;
} }
if (tracksVisibleText) {
state.deliveredVisibleText = true;
}
state.routedCounts[kind] += 1; state.routedCounts[kind] += 1;
return true; return true;
} }
const tracksVisibleText = shouldTreatDeliveredTextAsVisible({
channel: directChannel,
kind,
text: ttsPayload.text,
});
const delivered = const delivered =
kind === "tool" kind === "tool"
? params.dispatcher.sendToolResult(ttsPayload) ? params.dispatcher.sendToolResult(ttsPayload)
@@ -196,6 +260,12 @@ export function createAcpDispatchDeliveryCoordinator(params: {
if (kind === "final" && delivered) { if (kind === "final" && delivered) {
state.deliveredFinalReply = true; state.deliveredFinalReply = true;
} }
if (delivered && tracksVisibleText) {
state.queuedDirectVisibleTextDeliveries += 1;
state.settledDirectVisibleText = false;
} else if (!delivered && tracksVisibleText) {
state.failedVisibleTextDelivery = true;
}
return delivered; return delivered;
}; };
@@ -204,7 +274,10 @@ export function createAcpDispatchDeliveryCoordinator(params: {
deliver, deliver,
getBlockCount: () => state.blockCount, getBlockCount: () => state.blockCount,
getAccumulatedBlockText: () => state.accumulatedBlockText, getAccumulatedBlockText: () => state.accumulatedBlockText,
settleVisibleText: settleDirectVisibleText,
hasDeliveredFinalReply: () => state.deliveredFinalReply, hasDeliveredFinalReply: () => state.deliveredFinalReply,
hasDeliveredVisibleText: () => state.deliveredVisibleText,
hasFailedVisibleTextDelivery: () => state.failedVisibleTextDelivery,
getRoutedCounts: () => ({ ...state.routedCounts }), getRoutedCounts: () => ({ ...state.routedCounts }),
applyRoutedCounts: (counts) => { applyRoutedCounts: (counts) => {
counts.tool += state.routedCounts.tool; counts.tool += state.routedCounts.tool;

View File

@@ -52,44 +52,9 @@ const bindingServiceMocks = vi.hoisted(() => ({
listBySession: vi.fn<(sessionKey: string) => SessionBindingRecord[]>(() => []), listBySession: vi.fn<(sessionKey: string) => SessionBindingRecord[]>(() => []),
})); }));
vi.mock("../../acp/control-plane/manager.js", () => ({
getAcpSessionManager: () => managerMocks,
}));
vi.mock("../../acp/policy.js", () => ({
resolveAcpDispatchPolicyError: (cfg: OpenClawConfig) =>
policyMocks.resolveAcpDispatchPolicyError(cfg),
resolveAcpAgentPolicyError: (cfg: OpenClawConfig, agent: string) =>
policyMocks.resolveAcpAgentPolicyError(cfg, agent),
}));
vi.mock("./route-reply.js", () => ({
routeReply: (params: unknown) => routeMocks.routeReply(params),
}));
vi.mock("../../infra/outbound/message-action-runner.js", () => ({
runMessageAction: (params: unknown) => messageActionMocks.runMessageAction(params),
}));
vi.mock("../../tts/tts.js", () => ({
maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params),
resolveTtsConfig: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg),
}));
vi.mock("../../acp/runtime/session-meta.js", () => ({
readAcpSessionEntry: (params: { sessionKey: string; cfg?: OpenClawConfig }) =>
sessionMetaMocks.readAcpSessionEntry(params),
}));
vi.mock("../../infra/outbound/session-binding-service.js", () => ({
getSessionBindingService: () => ({
listBySession: (sessionKey: string) => bindingServiceMocks.listBySession(sessionKey),
}),
}));
const { tryDispatchAcpReply } = await import("./dispatch-acp.js");
const sessionKey = "agent:codex-acp:session-1"; const sessionKey = "agent:codex-acp:session-1";
type MockTtsReply = Awaited<ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>>; type MockTtsReply = Awaited<ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>>;
let tryDispatchAcpReply: typeof import("./dispatch-acp.js").tryDispatchAcpReply;
function createDispatcher(): { function createDispatcher(): {
dispatcher: ReplyDispatcher; dispatcher: ReplyDispatcher;
@@ -102,6 +67,7 @@ function createDispatcher(): {
sendFinalReply: vi.fn(() => true), sendFinalReply: vi.fn(() => true),
waitForIdle: vi.fn(async () => {}), waitForIdle: vi.fn(async () => {}),
getQueuedCounts: vi.fn(() => counts), getQueuedCounts: vi.fn(() => counts),
getFailedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
markComplete: vi.fn(), markComplete: vi.fn(),
}; };
return { dispatcher, counts }; return { dispatcher, counts };
@@ -245,7 +211,38 @@ function expectSecondRoutedPayload(payload: Partial<MockTtsReply>) {
} }
describe("tryDispatchAcpReply", () => { describe("tryDispatchAcpReply", () => {
beforeEach(() => { beforeEach(async () => {
vi.resetModules();
vi.doMock("../../acp/control-plane/manager.js", () => ({
getAcpSessionManager: () => managerMocks,
}));
vi.doMock("../../acp/policy.js", () => ({
resolveAcpDispatchPolicyError: (cfg: OpenClawConfig) =>
policyMocks.resolveAcpDispatchPolicyError(cfg),
resolveAcpAgentPolicyError: (cfg: OpenClawConfig, agent: string) =>
policyMocks.resolveAcpAgentPolicyError(cfg, agent),
}));
vi.doMock("./route-reply.js", () => ({
routeReply: (params: unknown) => routeMocks.routeReply(params),
}));
vi.doMock("../../infra/outbound/message-action-runner.js", () => ({
runMessageAction: (params: unknown) => messageActionMocks.runMessageAction(params),
}));
vi.doMock("../../tts/tts.js", () => ({
maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params),
resolveTtsConfig: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg),
}));
vi.doMock("../../acp/runtime/session-meta.js", () => ({
readAcpSessionEntry: (params: { sessionKey: string; cfg?: OpenClawConfig }) =>
sessionMetaMocks.readAcpSessionEntry(params),
}));
vi.doMock("../../infra/outbound/session-binding-service.js", () => ({
getSessionBindingService: () => ({
listBySession: (targetSessionKey: string) =>
bindingServiceMocks.listBySession(targetSessionKey),
}),
}));
({ tryDispatchAcpReply } = await import("./dispatch-acp.js"));
managerMocks.resolveSession.mockReset(); managerMocks.resolveSession.mockReset();
managerMocks.runTurn.mockReset(); managerMocks.runTurn.mockReset();
managerMocks.getObservabilitySnapshot.mockReset(); managerMocks.getObservabilitySnapshot.mockReset();
@@ -462,21 +459,148 @@ describe("tryDispatchAcpReply", () => {
expect(managerMocks.runTurn).not.toHaveBeenCalled(); expect(managerMocks.runTurn).not.toHaveBeenCalled();
expect(dispatcher.sendFinalReply).toHaveBeenCalledWith( expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(
expect.objectContaining({ expect.objectContaining({
text: expect.stringContaining("ACP_DISPATCH_DISABLED"), isError: true,
text: expect.stringContaining("ACP dispatch is disabled by policy."),
}), }),
); );
}); });
it("delivers final fallback text even when routed block text already existed", async () => { it("does not deliver final fallback text when routed block text was already visible", async () => {
setReadyAcpResolution(); setReadyAcpResolution();
ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>); queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>);
const { result } = await runRoutedAcpTextTurn("CODEX_OK"); const { result } = await runRoutedAcpTextTurn("CODEX_OK");
expect(result?.counts.block).toBe(1); expect(result?.counts.block).toBe(1);
expect(result?.counts.final).toBe(1); expect(result?.counts.final).toBe(0);
expect(routeMocks.routeReply).toHaveBeenCalledTimes(2); expect(routeMocks.routeReply).toHaveBeenCalledTimes(1);
expectSecondRoutedPayload({ text: "CODEX_OK" }); });
it("does not deliver final fallback text when direct block text was already visible", async () => {
setReadyAcpResolution();
ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>);
mockVisibleTextTurn("CODEX_OK");
const { dispatcher, counts } = createDispatcher();
const result = await runDispatch({
bodyForAgent: "reply",
dispatcher,
ctxOverrides: {
Provider: "telegram",
Surface: "telegram",
},
});
expect(result?.counts.block).toBe(0);
expect(result?.counts.final).toBe(0);
expect(counts.block).toBe(0);
expect(counts.final).toBe(0);
expect(dispatcher.sendBlockReply).toHaveBeenCalledWith(
expect.objectContaining({ text: "CODEX_OK" }),
);
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
});
it("treats visible telegram ACP block delivery as a successful final response", async () => {
setReadyAcpResolution();
ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>);
mockVisibleTextTurn("CODEX_OK");
const { dispatcher } = createDispatcher();
const result = await runDispatch({
bodyForAgent: "reply",
dispatcher,
ctxOverrides: {
Provider: "telegram",
Surface: "telegram",
},
});
expect(result?.queuedFinal).toBe(true);
expect(dispatcher.sendBlockReply).toHaveBeenCalledWith(
expect.objectContaining({ text: "CODEX_OK" }),
);
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
});
it("preserves final fallback when direct block text is filtered by non-telegram channels", async () => {
setReadyAcpResolution();
ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>);
mockVisibleTextTurn("CODEX_OK");
const { dispatcher, counts } = createDispatcher();
const result = await runDispatch({
bodyForAgent: "reply",
dispatcher,
});
expect(result?.counts.block).toBe(0);
expect(result?.counts.final).toBe(0);
expect(counts.block).toBe(0);
expect(counts.final).toBe(0);
expect(dispatcher.sendBlockReply).toHaveBeenCalledWith(
expect.objectContaining({ text: "CODEX_OK" }),
);
expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(
expect.objectContaining({ text: "CODEX_OK" }),
);
});
it("falls back to final text when a later telegram ACP block delivery fails", async () => {
setReadyAcpResolution();
ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
queueTtsReplies(
{ text: "First chunk. " },
{ text: "Second chunk." },
{} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>,
);
const cfg = createAcpTestConfig({
acp: {
enabled: true,
stream: {
deliveryMode: "live",
coalesceIdleMs: 0,
maxChunkChars: 64,
},
},
});
managerMocks.runTurn.mockImplementation(
async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
await onEvent({ type: "text_delta", text: "First chunk. ", tag: "agent_message_chunk" });
await onEvent({ type: "text_delta", text: "Second chunk.", tag: "agent_message_chunk" });
await onEvent({ type: "done" });
},
);
const { dispatcher } = createDispatcher();
(dispatcher.sendBlockReply as ReturnType<typeof vi.fn>)
.mockReturnValueOnce(true)
.mockReturnValueOnce(false);
const result = await runDispatch({
bodyForAgent: "reply",
cfg,
dispatcher,
ctxOverrides: {
Provider: "telegram",
Surface: "telegram",
},
});
expect(dispatcher.sendBlockReply).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ text: "First chunk. " }),
);
expect(dispatcher.sendBlockReply).toHaveBeenNthCalledWith(
2,
expect.objectContaining({ text: "Second chunk." }),
);
expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(
expect.objectContaining({ text: "First chunk. \nSecond chunk." }),
);
expect(result?.queuedFinal).toBe(true);
}); });
it("does not add text fallback when final TTS already delivered audio", async () => { it("does not add text fallback when final TTS already delivered audio", async () => {

View File

@@ -196,7 +196,9 @@ async function finalizeAcpTurnOutput(params: {
ttsChannel?: string; ttsChannel?: string;
shouldEmitResolvedIdentityNotice: boolean; shouldEmitResolvedIdentityNotice: boolean;
}): Promise<boolean> { }): Promise<boolean> {
let queuedFinal = false; await params.delivery.settleVisibleText();
let queuedFinal =
params.delivery.hasDeliveredVisibleText() && !params.delivery.hasFailedVisibleTextDelivery();
const ttsMode = resolveTtsConfig(params.cfg).mode ?? "final"; const ttsMode = resolveTtsConfig(params.cfg).mode ?? "final";
const accumulatedBlockText = params.delivery.getAccumulatedBlockText(); const accumulatedBlockText = params.delivery.getAccumulatedBlockText();
const hasAccumulatedBlockText = accumulatedBlockText.trim().length > 0; const hasAccumulatedBlockText = accumulatedBlockText.trim().length > 0;
@@ -233,7 +235,8 @@ async function finalizeAcpTurnOutput(params: {
ttsMode !== "all" && ttsMode !== "all" &&
hasAccumulatedBlockText && hasAccumulatedBlockText &&
!finalMediaDelivered && !finalMediaDelivered &&
!params.delivery.hasDeliveredFinalReply(); !params.delivery.hasDeliveredFinalReply() &&
(!params.delivery.hasDeliveredVisibleText() || params.delivery.hasFailedVisibleTextDelivery());
if (shouldDeliverTextFallback) { if (shouldDeliverTextFallback) {
const delivered = await params.delivery.deliver( const delivered = await params.delivery.deliver(
"final", "final",

View File

@@ -238,6 +238,7 @@ function createDispatcher(): ReplyDispatcher {
sendFinalReply: vi.fn(() => true), sendFinalReply: vi.fn(() => true),
waitForIdle: vi.fn(async () => {}), waitForIdle: vi.fn(async () => {}),
getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
getFailedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
markComplete: vi.fn(), markComplete: vi.fn(),
}; };
} }
@@ -1034,7 +1035,9 @@ describe("dispatchReplyFromConfig", () => {
const streamedText = blockCalls.map((call) => (call[0] as ReplyPayload).text ?? "").join(""); const streamedText = blockCalls.map((call) => (call[0] as ReplyPayload).text ?? "").join("");
expect(streamedText).toContain("hello"); expect(streamedText).toContain("hello");
expect(streamedText).toContain("world"); expect(streamedText).toContain("world");
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(
expect.objectContaining({ text: "hello world" }),
);
}); });
it("aborts ACP dispatch promptly when the caller abort signal fires", async () => { it("aborts ACP dispatch promptly when the caller abort signal fires", async () => {
@@ -1183,12 +1186,12 @@ describe("dispatchReplyFromConfig", () => {
await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver: vi.fn() }); await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver: vi.fn() });
const finalCalls = (dispatcher.sendFinalReply as ReturnType<typeof vi.fn>).mock.calls; const finalCalls = (dispatcher.sendFinalReply as ReturnType<typeof vi.fn>).mock.calls;
expect(finalCalls.length).toBe(1); expect(finalCalls.length).toBe(2);
const finalPayload = finalCalls[0]?.[0] as ReplyPayload | undefined; const noticePayload = finalCalls[1]?.[0] as ReplyPayload | undefined;
expect(finalPayload?.text).toContain("Session ids resolved"); expect(noticePayload?.text).toContain("Session ids resolved");
expect(finalPayload?.text).toContain("agent session id: inner-123"); expect(noticePayload?.text).toContain("agent session id: inner-123");
expect(finalPayload?.text).toContain("acpx session id: acpx-123"); expect(noticePayload?.text).toContain("acpx session id: acpx-123");
expect(finalPayload?.text).toContain("codex resume inner-123"); expect(noticePayload?.text).toContain("codex resume inner-123");
}); });
it("posts resolved-session-id notice when ACP session is bound even without MessageThreadId", async () => { it("posts resolved-session-id notice when ACP session is bound even without MessageThreadId", async () => {
@@ -1266,11 +1269,11 @@ describe("dispatchReplyFromConfig", () => {
await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver: vi.fn() }); await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver: vi.fn() });
const finalCalls = (dispatcher.sendFinalReply as ReturnType<typeof vi.fn>).mock.calls; const finalCalls = (dispatcher.sendFinalReply as ReturnType<typeof vi.fn>).mock.calls;
expect(finalCalls.length).toBe(1); expect(finalCalls.length).toBe(2);
const finalPayload = finalCalls[0]?.[0] as ReplyPayload | undefined; const noticePayload = finalCalls[1]?.[0] as ReplyPayload | undefined;
expect(finalPayload?.text).toContain("Session ids resolved"); expect(noticePayload?.text).toContain("Session ids resolved");
expect(finalPayload?.text).toContain("agent session id: inner-123"); expect(noticePayload?.text).toContain("agent session id: inner-123");
expect(finalPayload?.text).toContain("acpx session id: acpx-123"); expect(noticePayload?.text).toContain("acpx session id: acpx-123");
}); });
it("honors send-policy deny before ACP runtime dispatch", async () => { it("honors send-policy deny before ACP runtime dispatch", async () => {
@@ -1665,7 +1668,9 @@ describe("dispatchReplyFromConfig", () => {
.map((call) => ((call[0] as ReplyPayload).text ?? "").trim()) .map((call) => ((call[0] as ReplyPayload).text ?? "").trim())
.filter(Boolean); .filter(Boolean);
expect(blockTexts).toEqual(["What do you want to work on?"]); expect(blockTexts).toEqual(["What do you want to work on?"]);
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(
expect.objectContaining({ text: "What do you want to work on?" }),
);
}); });
it("generates final-mode TTS audio after ACP block streaming completes", async () => { it("generates final-mode TTS audio after ACP block streaming completes", async () => {

View File

@@ -80,6 +80,7 @@ export type ReplyDispatcher = {
sendFinalReply: (payload: ReplyPayload) => boolean; sendFinalReply: (payload: ReplyPayload) => boolean;
waitForIdle: () => Promise<void>; waitForIdle: () => Promise<void>;
getQueuedCounts: () => Record<ReplyDispatchKind, number>; getQueuedCounts: () => Record<ReplyDispatchKind, number>;
getFailedCounts: () => Record<ReplyDispatchKind, number>;
markComplete: () => void; markComplete: () => void;
}; };
@@ -125,6 +126,11 @@ export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDis
block: 0, block: 0,
final: 0, final: 0,
}; };
const failedCounts: Record<ReplyDispatchKind, number> = {
tool: 0,
block: 0,
final: 0,
};
// Register this dispatcher globally for gateway restart coordination. // Register this dispatcher globally for gateway restart coordination.
const { unregister } = registerDispatcher({ const { unregister } = registerDispatcher({
@@ -167,6 +173,7 @@ export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDis
await options.deliver(normalized, { kind }); await options.deliver(normalized, { kind });
}) })
.catch((err) => { .catch((err) => {
failedCounts[kind] += 1;
options.onError?.(err, { kind }); options.onError?.(err, { kind });
}) })
.finally(() => { .finally(() => {
@@ -213,6 +220,7 @@ export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDis
sendFinalReply: (payload) => enqueue("final", payload), sendFinalReply: (payload) => enqueue("final", payload),
waitForIdle: () => sendChain, waitForIdle: () => sendChain,
getQueuedCounts: () => ({ ...queuedCounts }), getQueuedCounts: () => ({ ...queuedCounts }),
getFailedCounts: () => ({ ...failedCounts }),
markComplete, markComplete,
}; };
} }