From c6de80931e0c2bb9f64838245cacf6008a045d2a Mon Sep 17 00:00:00 2001 From: Arvin Xu Date: Mon, 9 Mar 2026 12:24:13 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20fix:=20fix=20agent=20runtime=20e?= =?UTF-8?q?rror=20handle=20(#12834)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * improve inspect partial ability * fix error * fix runtime error --- .agents/skills/agent-tracing/SKILL.md | 8 +- packages/agent-tracing/src/cli/partial.ts | 54 +----- .../agent-tracing/src/store/file-store.ts | 31 +++- .../agentRuntime/AgentRuntimeService.ts | 64 +++++-- .../__tests__/executeStep.test.ts | 170 ++++++++++++++++++ 5 files changed, 251 insertions(+), 76 deletions(-) diff --git a/.agents/skills/agent-tracing/SKILL.md b/.agents/skills/agent-tracing/SKILL.md index 49867b3fe2..ea0b3ed4b1 100644 --- a/.agents/skills/agent-tracing/SKILL.md +++ b/.agents/skills/agent-tracing/SKILL.md @@ -96,10 +96,10 @@ agent-tracing inspect -s 0 -j # List in-progress partial snapshots agent-tracing partial list -# Inspect a partial snapshot (defaults to latest) -agent-tracing partial inspect -agent-tracing partial inspect -agent-tracing partial inspect -j +# Inspect a partial (use `inspect` directly — all flags work with partial IDs) +agent-tracing inspect +agent-tracing inspect -T +agent-tracing inspect -p # Clean up stale partial snapshots agent-tracing partial clean diff --git a/packages/agent-tracing/src/cli/partial.ts b/packages/agent-tracing/src/cli/partial.ts index a91e1a9fde..449763785d 100644 --- a/packages/agent-tracing/src/cli/partial.ts +++ b/packages/agent-tracing/src/cli/partial.ts @@ -1,11 +1,9 @@ import type { Command } from 'commander'; import { FileSnapshotStore } from '../store/file-store'; -import type { ExecutionSnapshot } from '../types'; -import { renderSnapshot } from '../viewer'; export function registerPartialCommand(program: Command) { - const partial = program.command('partial').description('Inspect in-progress (partial) snapshots'); + const partial = program.command('partial').description('Manage in-progress (partial) snapshots'); partial .command('list') @@ -36,54 +34,10 @@ export function registerPartialCommand(program: Command) { console.log(` ${file}`); } } - }); - partial - .command('inspect') - .alias('view') - .description('Inspect a partial snapshot') - .argument('[id]', 'Partial operation ID or filename (defaults to latest)') - .option('-j, --json', 'Output as JSON') - .action(async (id: string | undefined, opts: { json?: boolean }) => { - const store = new FileSnapshotStore(); - const files = await store.listPartials(); - - if (files.length === 0) { - console.error('No partial snapshots found.'); - process.exit(1); - } - - const data = id ? await store.getPartial(id) : await store.getPartial(files[0]); - - if (!data) { - console.error(id ? `Partial not found: ${id}` : 'No partial snapshots found.'); - process.exit(1); - } - - if (opts.json) { - console.log(JSON.stringify(data, null, 2)); - return; - } - - // Render as a snapshot (fill in defaults for missing fields) - const snapshot: ExecutionSnapshot = { - completedAt: undefined, - completionReason: undefined, - error: undefined, - model: data.model, - operationId: data.operationId ?? '?', - provider: data.provider, - startedAt: data.startedAt ?? Date.now(), - steps: data.steps ?? [], - totalCost: data.totalCost ?? 0, - totalSteps: data.steps?.length ?? 0, - totalTokens: data.totalTokens ?? 0, - traceId: data.traceId ?? '?', - ...data, - }; - - console.log('[PARTIAL - in progress]\n'); - console.log(renderSnapshot(snapshot)); + console.log( + `\nUse ${`"agent-tracing inspect "`.toString()} to inspect a partial with full flags.`, + ); }); partial diff --git a/packages/agent-tracing/src/store/file-store.ts b/packages/agent-tracing/src/store/file-store.ts index 7b498dcb4a..1175720805 100644 --- a/packages/agent-tracing/src/store/file-store.ts +++ b/packages/agent-tracing/src/store/file-store.ts @@ -39,12 +39,19 @@ export class FileSnapshotStore implements ISnapshotStore { async get(traceId: string): Promise { if (traceId === 'latest') return this.getLatest(); + // Search completed snapshots first const files = await this.listFiles(); const match = files.find((f) => f.includes(traceId.slice(0, 12))); - if (!match) return null; + if (match) { + const content = await fs.readFile(path.join(this.dir, match), 'utf8'); + return JSON.parse(content) as ExecutionSnapshot; + } - const content = await fs.readFile(path.join(this.dir, match), 'utf8'); - return JSON.parse(content) as ExecutionSnapshot; + // Fallback to partials + const partial = await this.getPartial(traceId); + if (partial) return partialToSnapshot(partial); + + return null; } async list(options?: { limit?: number }): Promise { @@ -161,6 +168,24 @@ export class FileSnapshotStore implements ISnapshotStore { } } +function partialToSnapshot(partial: Partial): ExecutionSnapshot { + return { + completedAt: undefined, + completionReason: undefined, + error: undefined, + model: partial.model, + operationId: partial.operationId ?? '?', + provider: partial.provider, + startedAt: partial.startedAt ?? Date.now(), + steps: partial.steps ?? [], + totalCost: partial.totalCost ?? 0, + totalSteps: partial.steps?.length ?? 0, + totalTokens: partial.totalTokens ?? 0, + traceId: partial.traceId ?? '?', + ...partial, + } as ExecutionSnapshot; +} + function toSummary(snapshot: ExecutionSnapshot): SnapshotSummary { return { completionReason: snapshot.completionReason, diff --git a/src/server/services/agentRuntime/AgentRuntimeService.ts b/src/server/services/agentRuntime/AgentRuntimeService.ts index 74a78d8bb9..03abbba8e9 100644 --- a/src/server/services/agentRuntime/AgentRuntimeService.ts +++ b/src/server/services/agentRuntime/AgentRuntimeService.ts @@ -907,30 +907,56 @@ export class AgentRuntimeService { } catch (error) { log('Step %d failed for operation %s: %O', stepIndex, operationId, error); - // Publish error event - await this.streamManager.publishStreamEvent(operationId, { - data: { - error: (error as Error).message, - phase: 'step_execution', + // Build error state — try loading current state from coordinator, but if that + // also fails (e.g. Redis ECONNRESET), fall back to a minimal error state so + // that completion callbacks and webhooks can still fire. + let finalStateWithError: any; + try { + await this.streamManager.publishStreamEvent(operationId, { + data: { + error: (error as Error).message, + phase: 'step_execution', + stepIndex, + }, stepIndex, - }, - stepIndex, - type: 'error', - }); + type: 'error', + }); + } catch (publishError) { + log( + '[%s] Failed to publish error event (infra may be down): %O', + operationId, + publishError, + ); + } - // Build and save error state so it's persisted for later retrieval - const errorState = await this.coordinator.loadAgentState(operationId); - const finalStateWithError = { - ...errorState!, - error: formatErrorForState(error), - status: 'error' as const, - }; + try { + const errorState = await this.coordinator.loadAgentState(operationId); + finalStateWithError = { + ...errorState!, + error: formatErrorForState(error), + status: 'error' as const, + }; + } catch (loadError) { + log('[%s] Failed to load error state (infra may be down): %O', operationId, loadError); + // Fallback: construct a minimal error state so callbacks still receive useful info + finalStateWithError = { + error: formatErrorForState(error), + status: 'error' as const, + }; + } - // Save the error state to coordinator so getOperationStatus can retrieve it - await this.coordinator.saveAgentState(operationId, finalStateWithError); + try { + await this.coordinator.saveAgentState(operationId, finalStateWithError); + } catch (saveError) { + log('[%s] Failed to save error state (infra may be down): %O', operationId, saveError); + } // Trigger completion webhook on error (fire-and-forget) - await this.triggerCompletionWebhook(finalStateWithError, operationId, 'error'); + try { + await this.triggerCompletionWebhook(finalStateWithError, operationId, 'error'); + } catch (webhookError) { + log('[%s] Failed to trigger completion webhook: %O', operationId, webhookError); + } // Also call onComplete callback when execution fails if (callbacks?.onComplete) { diff --git a/src/server/services/agentRuntime/__tests__/executeStep.test.ts b/src/server/services/agentRuntime/__tests__/executeStep.test.ts index d7ac6f661d..037fc753c8 100644 --- a/src/server/services/agentRuntime/__tests__/executeStep.test.ts +++ b/src/server/services/agentRuntime/__tests__/executeStep.test.ts @@ -297,3 +297,173 @@ describe('AgentRuntimeService.executeStep - step idempotency (distributed lock)' expect(coordinator.tryClaimStep).toHaveBeenCalledWith('op-args', 42, 35); }); }); + +describe('AgentRuntimeService.executeStep - Redis failure in error handler', () => { + const createService = () => { + const service = new AgentRuntimeService({} as any, 'user-1', { queueService: null }); + return service; + }; + + it('should still call onComplete when Redis fails in catch block (ECONNRESET scenario)', async () => { + const service = createService(); + const coordinator = (service as any).coordinator; + const streamManager = (service as any).streamManager; + + coordinator.tryClaimStep = vi.fn().mockResolvedValue(true); + + // First loadAgentState call succeeds (returns running state to enter step execution) + // Second call in catch block fails (Redis ECONNRESET) + let loadCallCount = 0; + coordinator.loadAgentState = vi.fn().mockImplementation(() => { + loadCallCount++; + if (loadCallCount === 1) { + return Promise.resolve({ + status: 'running', + stepCount: 5, + lastModified: new Date().toISOString(), + metadata: {}, + }); + } + return Promise.reject(new Error('Reached the max retries per request limit (which is 3)')); + }); + + // publishStreamEvent: first call (step_start) succeeds, subsequent calls fail + // Simulates Redis going down mid-execution + let publishCallCount = 0; + streamManager.publishStreamEvent = vi.fn().mockImplementation(() => { + publishCallCount++; + if (publishCallCount === 1) return Promise.resolve(); + return Promise.reject(new Error('Redis ECONNRESET')); + }); + + // saveAgentState fails (Redis is down) + coordinator.saveAgentState = vi.fn().mockRejectedValue(new Error('Redis ECONNRESET')); + + const onComplete = vi.fn(); + service.registerStepCallbacks('op-redis-fail', { onComplete }); + + // executeStep re-throws the original error after running callbacks + await expect( + service.executeStep({ + operationId: 'op-redis-fail', + stepIndex: 6, + context: { phase: 'user_input' } as any, + }), + ).rejects.toThrow(); + + // onComplete MUST be called even when Redis is completely down + expect(onComplete).toHaveBeenCalledWith( + expect.objectContaining({ + operationId: 'op-redis-fail', + reason: 'error', + }), + ); + }); + + it('should still trigger completion webhook when Redis fails in catch block', async () => { + const service = createService(); + const coordinator = (service as any).coordinator; + const streamManager = (service as any).streamManager; + + coordinator.tryClaimStep = vi.fn().mockResolvedValue(true); + + let loadCallCount = 0; + coordinator.loadAgentState = vi.fn().mockImplementation(() => { + loadCallCount++; + if (loadCallCount === 1) { + return Promise.resolve({ + status: 'running', + stepCount: 5, + lastModified: new Date().toISOString(), + metadata: {}, + }); + } + return Promise.reject(new Error('Redis ECONNRESET')); + }); + + // First publishStreamEvent call (step_start) succeeds, subsequent fail + let publishCallCount = 0; + streamManager.publishStreamEvent = vi.fn().mockImplementation(() => { + publishCallCount++; + if (publishCallCount === 1) return Promise.resolve(); + return Promise.reject(new Error('Redis ECONNRESET')); + }); + + coordinator.saveAgentState = vi.fn().mockRejectedValue(new Error('Redis ECONNRESET')); + + // Spy on triggerCompletionWebhook + const triggerSpy = vi + .spyOn(service as any, 'triggerCompletionWebhook') + .mockResolvedValue(undefined); + + // executeStep re-throws the original error after running callbacks + await expect( + service.executeStep({ + operationId: 'op-redis-webhook', + stepIndex: 6, + context: { phase: 'user_input' } as any, + }), + ).rejects.toThrow(); + + // Completion webhook MUST be triggered even when Redis is down + expect(triggerSpy).toHaveBeenCalledWith( + expect.objectContaining({ status: 'error' }), + 'op-redis-webhook', + 'error', + ); + }); + + it('should preserve loaded state metadata when only saveAgentState fails', async () => { + const service = createService(); + const coordinator = (service as any).coordinator; + const streamManager = (service as any).streamManager; + + coordinator.tryClaimStep = vi.fn().mockResolvedValue(true); + + const stateWithWebhook = { + status: 'running', + stepCount: 5, + lastModified: new Date().toISOString(), + metadata: { completionWebhook: 'https://example.com/webhook' }, + }; + + // loadAgentState always succeeds (returns state with webhook metadata) + coordinator.loadAgentState = vi.fn().mockResolvedValue(stateWithWebhook); + + // saveAgentState fails (write-only Redis failure) + coordinator.saveAgentState = vi.fn().mockRejectedValue(new Error('Redis write failed')); + + // publishStreamEvent: first call succeeds, subsequent fail + let publishCallCount = 0; + streamManager.publishStreamEvent = vi.fn().mockImplementation(() => { + publishCallCount++; + if (publishCallCount === 1) return Promise.resolve(); + return Promise.reject(new Error('Redis ECONNRESET')); + }); + + const onComplete = vi.fn(); + service.registerStepCallbacks('op-save-fail', { onComplete }); + + await expect( + service.executeStep({ + operationId: 'op-save-fail', + stepIndex: 6, + context: { phase: 'user_input' } as any, + }), + ).rejects.toThrow(); + + // onComplete must receive the full state with metadata (not a minimal fallback) + expect(onComplete).toHaveBeenCalledWith( + expect.objectContaining({ + finalState: expect.objectContaining({ + metadata: expect.objectContaining({ + completionWebhook: 'https://example.com/webhook', + }), + status: 'error', + }), + operationId: 'op-save-fail', + reason: 'error', + }), + ); + }); +});