🐛 fix: fix agent runtime error handle (#12834)

* improve inspect partial ability

* fix error

* fix runtime error
This commit is contained in:
Arvin Xu
2026-03-09 12:24:13 +08:00
committed by GitHub
parent 6e26135978
commit c6de80931e
5 changed files with 251 additions and 76 deletions

View File

@@ -96,10 +96,10 @@ agent-tracing inspect <traceId> -s 0 -j
# List in-progress partial snapshots
agent-tracing partial list
# Inspect a partial snapshot (defaults to latest)
agent-tracing partial inspect
agent-tracing partial inspect <operationId>
agent-tracing partial inspect -j
# Inspect a partial (use `inspect` directly — all flags work with partial IDs)
agent-tracing inspect <partialOperationId>
agent-tracing inspect <partialOperationId> -T
agent-tracing inspect <partialOperationId> -p
# Clean up stale partial snapshots
agent-tracing partial clean

View File

@@ -1,11 +1,9 @@
import type { Command } from 'commander';
import { FileSnapshotStore } from '../store/file-store';
import type { ExecutionSnapshot } from '../types';
import { renderSnapshot } from '../viewer';
export function registerPartialCommand(program: Command) {
const partial = program.command('partial').description('Inspect in-progress (partial) snapshots');
const partial = program.command('partial').description('Manage in-progress (partial) snapshots');
partial
.command('list')
@@ -36,54 +34,10 @@ export function registerPartialCommand(program: Command) {
console.log(` ${file}`);
}
}
});
partial
.command('inspect')
.alias('view')
.description('Inspect a partial snapshot')
.argument('[id]', 'Partial operation ID or filename (defaults to latest)')
.option('-j, --json', 'Output as JSON')
.action(async (id: string | undefined, opts: { json?: boolean }) => {
const store = new FileSnapshotStore();
const files = await store.listPartials();
if (files.length === 0) {
console.error('No partial snapshots found.');
process.exit(1);
}
const data = id ? await store.getPartial(id) : await store.getPartial(files[0]);
if (!data) {
console.error(id ? `Partial not found: ${id}` : 'No partial snapshots found.');
process.exit(1);
}
if (opts.json) {
console.log(JSON.stringify(data, null, 2));
return;
}
// Render as a snapshot (fill in defaults for missing fields)
const snapshot: ExecutionSnapshot = {
completedAt: undefined,
completionReason: undefined,
error: undefined,
model: data.model,
operationId: data.operationId ?? '?',
provider: data.provider,
startedAt: data.startedAt ?? Date.now(),
steps: data.steps ?? [],
totalCost: data.totalCost ?? 0,
totalSteps: data.steps?.length ?? 0,
totalTokens: data.totalTokens ?? 0,
traceId: data.traceId ?? '?',
...data,
};
console.log('[PARTIAL - in progress]\n');
console.log(renderSnapshot(snapshot));
console.log(
`\nUse ${`"agent-tracing inspect <id>"`.toString()} to inspect a partial with full flags.`,
);
});
partial

View File

@@ -39,12 +39,19 @@ export class FileSnapshotStore implements ISnapshotStore {
async get(traceId: string): Promise<ExecutionSnapshot | null> {
if (traceId === 'latest') return this.getLatest();
// Search completed snapshots first
const files = await this.listFiles();
const match = files.find((f) => f.includes(traceId.slice(0, 12)));
if (!match) return null;
if (match) {
const content = await fs.readFile(path.join(this.dir, match), 'utf8');
return JSON.parse(content) as ExecutionSnapshot;
}
const content = await fs.readFile(path.join(this.dir, match), 'utf8');
return JSON.parse(content) as ExecutionSnapshot;
// Fallback to partials
const partial = await this.getPartial(traceId);
if (partial) return partialToSnapshot(partial);
return null;
}
async list(options?: { limit?: number }): Promise<SnapshotSummary[]> {
@@ -161,6 +168,24 @@ export class FileSnapshotStore implements ISnapshotStore {
}
}
function partialToSnapshot(partial: Partial<ExecutionSnapshot>): ExecutionSnapshot {
return {
completedAt: undefined,
completionReason: undefined,
error: undefined,
model: partial.model,
operationId: partial.operationId ?? '?',
provider: partial.provider,
startedAt: partial.startedAt ?? Date.now(),
steps: partial.steps ?? [],
totalCost: partial.totalCost ?? 0,
totalSteps: partial.steps?.length ?? 0,
totalTokens: partial.totalTokens ?? 0,
traceId: partial.traceId ?? '?',
...partial,
} as ExecutionSnapshot;
}
function toSummary(snapshot: ExecutionSnapshot): SnapshotSummary {
return {
completionReason: snapshot.completionReason,

View File

@@ -907,30 +907,56 @@ export class AgentRuntimeService {
} catch (error) {
log('Step %d failed for operation %s: %O', stepIndex, operationId, error);
// Publish error event
await this.streamManager.publishStreamEvent(operationId, {
data: {
error: (error as Error).message,
phase: 'step_execution',
// Build error state — try loading current state from coordinator, but if that
// also fails (e.g. Redis ECONNRESET), fall back to a minimal error state so
// that completion callbacks and webhooks can still fire.
let finalStateWithError: any;
try {
await this.streamManager.publishStreamEvent(operationId, {
data: {
error: (error as Error).message,
phase: 'step_execution',
stepIndex,
},
stepIndex,
},
stepIndex,
type: 'error',
});
type: 'error',
});
} catch (publishError) {
log(
'[%s] Failed to publish error event (infra may be down): %O',
operationId,
publishError,
);
}
// Build and save error state so it's persisted for later retrieval
const errorState = await this.coordinator.loadAgentState(operationId);
const finalStateWithError = {
...errorState!,
error: formatErrorForState(error),
status: 'error' as const,
};
try {
const errorState = await this.coordinator.loadAgentState(operationId);
finalStateWithError = {
...errorState!,
error: formatErrorForState(error),
status: 'error' as const,
};
} catch (loadError) {
log('[%s] Failed to load error state (infra may be down): %O', operationId, loadError);
// Fallback: construct a minimal error state so callbacks still receive useful info
finalStateWithError = {
error: formatErrorForState(error),
status: 'error' as const,
};
}
// Save the error state to coordinator so getOperationStatus can retrieve it
await this.coordinator.saveAgentState(operationId, finalStateWithError);
try {
await this.coordinator.saveAgentState(operationId, finalStateWithError);
} catch (saveError) {
log('[%s] Failed to save error state (infra may be down): %O', operationId, saveError);
}
// Trigger completion webhook on error (fire-and-forget)
await this.triggerCompletionWebhook(finalStateWithError, operationId, 'error');
try {
await this.triggerCompletionWebhook(finalStateWithError, operationId, 'error');
} catch (webhookError) {
log('[%s] Failed to trigger completion webhook: %O', operationId, webhookError);
}
// Also call onComplete callback when execution fails
if (callbacks?.onComplete) {

View File

@@ -297,3 +297,173 @@ describe('AgentRuntimeService.executeStep - step idempotency (distributed lock)'
expect(coordinator.tryClaimStep).toHaveBeenCalledWith('op-args', 42, 35);
});
});
describe('AgentRuntimeService.executeStep - Redis failure in error handler', () => {
const createService = () => {
const service = new AgentRuntimeService({} as any, 'user-1', { queueService: null });
return service;
};
it('should still call onComplete when Redis fails in catch block (ECONNRESET scenario)', async () => {
const service = createService();
const coordinator = (service as any).coordinator;
const streamManager = (service as any).streamManager;
coordinator.tryClaimStep = vi.fn().mockResolvedValue(true);
// First loadAgentState call succeeds (returns running state to enter step execution)
// Second call in catch block fails (Redis ECONNRESET)
let loadCallCount = 0;
coordinator.loadAgentState = vi.fn().mockImplementation(() => {
loadCallCount++;
if (loadCallCount === 1) {
return Promise.resolve({
status: 'running',
stepCount: 5,
lastModified: new Date().toISOString(),
metadata: {},
});
}
return Promise.reject(new Error('Reached the max retries per request limit (which is 3)'));
});
// publishStreamEvent: first call (step_start) succeeds, subsequent calls fail
// Simulates Redis going down mid-execution
let publishCallCount = 0;
streamManager.publishStreamEvent = vi.fn().mockImplementation(() => {
publishCallCount++;
if (publishCallCount === 1) return Promise.resolve();
return Promise.reject(new Error('Redis ECONNRESET'));
});
// saveAgentState fails (Redis is down)
coordinator.saveAgentState = vi.fn().mockRejectedValue(new Error('Redis ECONNRESET'));
const onComplete = vi.fn();
service.registerStepCallbacks('op-redis-fail', { onComplete });
// executeStep re-throws the original error after running callbacks
await expect(
service.executeStep({
operationId: 'op-redis-fail',
stepIndex: 6,
context: { phase: 'user_input' } as any,
}),
).rejects.toThrow();
// onComplete MUST be called even when Redis is completely down
expect(onComplete).toHaveBeenCalledWith(
expect.objectContaining({
operationId: 'op-redis-fail',
reason: 'error',
}),
);
});
it('should still trigger completion webhook when Redis fails in catch block', async () => {
const service = createService();
const coordinator = (service as any).coordinator;
const streamManager = (service as any).streamManager;
coordinator.tryClaimStep = vi.fn().mockResolvedValue(true);
let loadCallCount = 0;
coordinator.loadAgentState = vi.fn().mockImplementation(() => {
loadCallCount++;
if (loadCallCount === 1) {
return Promise.resolve({
status: 'running',
stepCount: 5,
lastModified: new Date().toISOString(),
metadata: {},
});
}
return Promise.reject(new Error('Redis ECONNRESET'));
});
// First publishStreamEvent call (step_start) succeeds, subsequent fail
let publishCallCount = 0;
streamManager.publishStreamEvent = vi.fn().mockImplementation(() => {
publishCallCount++;
if (publishCallCount === 1) return Promise.resolve();
return Promise.reject(new Error('Redis ECONNRESET'));
});
coordinator.saveAgentState = vi.fn().mockRejectedValue(new Error('Redis ECONNRESET'));
// Spy on triggerCompletionWebhook
const triggerSpy = vi
.spyOn(service as any, 'triggerCompletionWebhook')
.mockResolvedValue(undefined);
// executeStep re-throws the original error after running callbacks
await expect(
service.executeStep({
operationId: 'op-redis-webhook',
stepIndex: 6,
context: { phase: 'user_input' } as any,
}),
).rejects.toThrow();
// Completion webhook MUST be triggered even when Redis is down
expect(triggerSpy).toHaveBeenCalledWith(
expect.objectContaining({ status: 'error' }),
'op-redis-webhook',
'error',
);
});
it('should preserve loaded state metadata when only saveAgentState fails', async () => {
const service = createService();
const coordinator = (service as any).coordinator;
const streamManager = (service as any).streamManager;
coordinator.tryClaimStep = vi.fn().mockResolvedValue(true);
const stateWithWebhook = {
status: 'running',
stepCount: 5,
lastModified: new Date().toISOString(),
metadata: { completionWebhook: 'https://example.com/webhook' },
};
// loadAgentState always succeeds (returns state with webhook metadata)
coordinator.loadAgentState = vi.fn().mockResolvedValue(stateWithWebhook);
// saveAgentState fails (write-only Redis failure)
coordinator.saveAgentState = vi.fn().mockRejectedValue(new Error('Redis write failed'));
// publishStreamEvent: first call succeeds, subsequent fail
let publishCallCount = 0;
streamManager.publishStreamEvent = vi.fn().mockImplementation(() => {
publishCallCount++;
if (publishCallCount === 1) return Promise.resolve();
return Promise.reject(new Error('Redis ECONNRESET'));
});
const onComplete = vi.fn();
service.registerStepCallbacks('op-save-fail', { onComplete });
await expect(
service.executeStep({
operationId: 'op-save-fail',
stepIndex: 6,
context: { phase: 'user_input' } as any,
}),
).rejects.toThrow();
// onComplete must receive the full state with metadata (not a minimal fallback)
expect(onComplete).toHaveBeenCalledWith(
expect.objectContaining({
finalState: expect.objectContaining({
metadata: expect.objectContaining({
completionWebhook: 'https://example.com/webhook',
}),
status: 'error',
}),
operationId: 'op-save-fail',
reason: 'error',
}),
);
});
});