diff --git a/src/server/modules/AgentRuntime/AgentRuntimeCoordinator.ts b/src/server/modules/AgentRuntime/AgentRuntimeCoordinator.ts index cf07f6ed18..bdb9366f09 100644 --- a/src/server/modules/AgentRuntime/AgentRuntimeCoordinator.ts +++ b/src/server/modules/AgentRuntime/AgentRuntimeCoordinator.ts @@ -7,6 +7,16 @@ import { type IAgentStateManager, type IStreamEventManager } from './types'; const log = debug('lobe-server:agent-runtime:coordinator'); +const TERMINAL_STATUSES = new Set(['done', 'error', 'interrupted']); + +const hasEnteredTerminalState = ( + previousStatus?: AgentState['status'], + nextStatus?: AgentState['status'], +): nextStatus is 'done' | 'error' | 'interrupted' => { + const wasTerminal = previousStatus ? TERMINAL_STATUSES.has(previousStatus) : false; + return Boolean(nextStatus && TERMINAL_STATUSES.has(nextStatus) && !wasTerminal); +}; + export interface AgentRuntimeCoordinatorOptions { /** * Custom state manager implementation @@ -79,10 +89,15 @@ export class AgentRuntimeCoordinator { // Save state await this.stateManager.saveAgentState(operationId, state); - // If status changes to done, send agent runtime end event - if (state.status === 'done' && previousState?.status !== 'done') { - await this.streamEventManager.publishAgentRuntimeEnd(operationId, state.stepCount, state); - log('[%s] Agent runtime completed', operationId); + // Send a terminal event once the operation first enters a terminal state. + if (hasEnteredTerminalState(previousState?.status, state.status)) { + await this.streamEventManager.publishAgentRuntimeEnd( + operationId, + state.stepCount ?? previousState?.stepCount ?? 0, + state, + state.status, + ); + log('[%s] Agent runtime reached terminal state: %s', operationId, state.status); } } catch (error) { console.error('Failed to save agent state and handle events:', error); @@ -101,15 +116,19 @@ export class AgentRuntimeCoordinator { // Save step result await this.stateManager.saveStepResult(operationId, stepResult); - // If status changes to done, send agent_runtime_end event - // This ensures agent_runtime_end is sent after all step events - if (stepResult.newState.status === 'done' && previousState?.status !== 'done') { + // This ensures agent_runtime_end is sent after all step events. + if (hasEnteredTerminalState(previousState?.status, stepResult.newState.status)) { await this.streamEventManager.publishAgentRuntimeEnd( operationId, - stepResult.newState.stepCount, + stepResult.newState.stepCount ?? stepResult.stepIndex ?? previousState?.stepCount ?? 0, stepResult.newState, + stepResult.newState.status, + ); + log( + '[%s] Agent runtime reached terminal state after step result: %s', + operationId, + stepResult.newState.status, ); - log('[%s] Agent runtime completed', operationId); } } catch (error) { console.error('Failed to save step result and handle events:', error); diff --git a/src/server/modules/AgentRuntime/InMemoryStreamEventManager.ts b/src/server/modules/AgentRuntime/InMemoryStreamEventManager.ts index af84847782..894db9c50e 100644 --- a/src/server/modules/AgentRuntime/InMemoryStreamEventManager.ts +++ b/src/server/modules/AgentRuntime/InMemoryStreamEventManager.ts @@ -5,6 +5,18 @@ import { type IStreamEventManager } from './types'; const log = debug('lobe-server:agent-runtime:in-memory-stream-event-manager'); +const getDefaultReasonDetail = (finalState: any, reason?: string): string => { + if (reason === 'error') { + return finalState?.error?.message || finalState?.error?.type || 'Agent runtime failed'; + } + + if (reason === 'interrupted') { + return finalState?.error?.message || 'Agent runtime interrupted'; + } + + return 'Agent runtime completed successfully'; +}; + type EventCallback = (events: StreamEvent[]) => void; /** @@ -98,7 +110,7 @@ export class InMemoryStreamEventManager implements IStreamEventManager { operationId, phase: 'execution_complete', reason: reason || 'completed', - reasonDetail: reasonDetail || 'Agent runtime completed successfully', + reasonDetail: reasonDetail || getDefaultReasonDetail(finalState, reason), }, stepIndex, type: 'agent_runtime_end', diff --git a/src/server/modules/AgentRuntime/RuntimeExecutors.ts b/src/server/modules/AgentRuntime/RuntimeExecutors.ts index 66a4f26e31..558a35c4bc 100644 --- a/src/server/modules/AgentRuntime/RuntimeExecutors.ts +++ b/src/server/modules/AgentRuntime/RuntimeExecutors.ts @@ -45,6 +45,51 @@ const TOOL_PRICING: Record = { 'lobe-web-browsing/search': 0, }; +const formatErrorEventData = (error: unknown, phase: string) => { + let errorMessage = 'Unknown error'; + let errorType: string | undefined; + + if (error && typeof error === 'object') { + const payload = error as { error?: unknown; errorType?: unknown; message?: unknown }; + + if (typeof payload.errorType === 'string') { + errorType = payload.errorType; + } + + if (typeof payload.message === 'string' && payload.message.length > 0) { + errorMessage = payload.message; + } else if (typeof payload.error === 'string' && payload.error.length > 0) { + errorMessage = payload.error; + } else if ( + payload.error && + typeof payload.error === 'object' && + 'message' in payload.error && + typeof payload.error.message === 'string' + ) { + errorMessage = payload.error.message; + } else if (error instanceof Error && error.message.length > 0) { + errorMessage = error.message; + } else if (errorType) { + errorMessage = errorType; + } + } else if (error instanceof Error && error.message.length > 0) { + errorMessage = error.message; + errorType = error.name; + } else if (typeof error === 'string' && error.length > 0) { + errorMessage = error; + } + + if (!errorType && error instanceof Error && error.name) { + errorType = error.name; + } + + return { + error: errorMessage, + errorType, + phase, + }; +}; + export interface RuntimeExecutorContext { agentConfig?: any; discordContext?: any; @@ -623,10 +668,7 @@ export const createRuntimeExecutors = ( } catch (error) { // Publish error event await streamManager.publishStreamEvent(operationId, { - data: { - error: (error as Error).message, - phase: 'llm_execution', - }, + data: formatErrorEventData(error, 'llm_execution'), stepIndex, type: 'error', }); @@ -1082,10 +1124,7 @@ export const createRuntimeExecutors = ( } catch (error) { // Publish tool execution error event await streamManager.publishStreamEvent(operationId, { - data: { - error: (error as Error).message, - phase: 'tool_execution', - }, + data: formatErrorEventData(error, 'tool_execution'), stepIndex, type: 'error', }); @@ -1226,10 +1265,7 @@ export const createRuntimeExecutors = ( // Publish error event await streamManager.publishStreamEvent(operationId, { - data: { - error: (error as Error).message, - phase: 'tool_execution', - }, + data: formatErrorEventData(error, 'tool_execution'), stepIndex, type: 'error', }); diff --git a/src/server/modules/AgentRuntime/StreamEventManager.ts b/src/server/modules/AgentRuntime/StreamEventManager.ts index 8f68f46ff7..48fed84ab9 100644 --- a/src/server/modules/AgentRuntime/StreamEventManager.ts +++ b/src/server/modules/AgentRuntime/StreamEventManager.ts @@ -7,6 +7,18 @@ import { getAgentRuntimeRedisClient } from './redis'; const log = debug('lobe-server:agent-runtime:stream-event-manager'); const timing = debug('lobe-server:agent-runtime:timing'); +const getDefaultReasonDetail = (finalState: any, reason?: string): string => { + if (reason === 'error') { + return finalState?.error?.message || finalState?.error?.type || 'Agent runtime failed'; + } + + if (reason === 'interrupted') { + return finalState?.error?.message || 'Agent runtime interrupted'; + } + + return 'Agent runtime completed successfully'; +}; + export interface StreamEvent { data: any; id?: string; // Redis Stream event ID @@ -167,7 +179,7 @@ export class StreamEventManager { operationId, phase: 'execution_complete', reason: reason || 'completed', - reasonDetail: reasonDetail || 'Agent runtime completed successfully', + reasonDetail: reasonDetail || getDefaultReasonDetail(finalState, reason), }, stepIndex, type: 'agent_runtime_end', diff --git a/src/server/modules/AgentRuntime/__tests__/AgentRuntimeCoordinator.test.ts b/src/server/modules/AgentRuntime/__tests__/AgentRuntimeCoordinator.test.ts index 0e0275f9b7..adebb1002e 100644 --- a/src/server/modules/AgentRuntime/__tests__/AgentRuntimeCoordinator.test.ts +++ b/src/server/modules/AgentRuntime/__tests__/AgentRuntimeCoordinator.test.ts @@ -99,13 +99,65 @@ describe('AgentRuntimeCoordinator', () => { operationId, newState.stepCount, newState, + 'done', + ); + }); + + it('should publish end event when status changes to error', async () => { + const operationId = 'test-operation-id'; + const previousState = { status: 'running', stepCount: 3 }; + const newState = { error: { message: 'boom' }, status: 'error', stepCount: 5 }; + + mockStateManager.loadAgentState.mockResolvedValue(previousState); + + await coordinator.saveAgentState(operationId, newState as any); + + expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith( + operationId, + newState.stepCount, + newState, + 'error', + ); + }); + + it('should fallback to previous stepCount when terminal state is missing stepCount', async () => { + const operationId = 'test-operation-id'; + const previousState = { status: 'running', stepCount: 3 }; + const newState = { error: { message: 'boom' }, status: 'error' }; + + mockStateManager.loadAgentState.mockResolvedValue(previousState); + + await coordinator.saveAgentState(operationId, newState as any); + + expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith( + operationId, + previousState.stepCount, + newState, + 'error', + ); + }); + + it('should publish end event when status changes to interrupted', async () => { + const operationId = 'test-operation-id'; + const previousState = { status: 'running', stepCount: 3 }; + const newState = { status: 'interrupted', stepCount: 5 }; + + mockStateManager.loadAgentState.mockResolvedValue(previousState); + + await coordinator.saveAgentState(operationId, newState as any); + + expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith( + operationId, + newState.stepCount, + newState, + 'interrupted', ); }); it('should not publish end event when status was already done', async () => { const operationId = 'test-operation-id'; const previousState = { status: 'done', stepCount: 5 }; - const newState = { status: 'done', stepCount: 5 }; + const newState = { status: 'error', stepCount: 5 }; mockStateManager.loadAgentState.mockResolvedValue(previousState); @@ -149,6 +201,67 @@ describe('AgentRuntimeCoordinator', () => { operationId, 5, stepResult.newState, + 'done', + ); + }); + + it('should publish end event when status becomes error', async () => { + const operationId = 'test-operation-id'; + const stepResult = { + executionTime: 1000, + newState: { error: { message: 'boom' }, status: 'error', stepCount: 5 }, + stepIndex: 5, + }; + + mockStateManager.loadAgentState.mockResolvedValue({ status: 'running', stepCount: 4 }); + + await coordinator.saveStepResult(operationId, stepResult as any); + + expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith( + operationId, + 5, + stepResult.newState, + 'error', + ); + }); + + it('should fallback to stepResult.stepIndex when terminal step result state is missing stepCount', async () => { + const operationId = 'test-operation-id'; + const stepResult = { + executionTime: 1000, + newState: { error: { message: 'boom' }, status: 'error' }, + stepIndex: 5, + }; + + mockStateManager.loadAgentState.mockResolvedValue({ status: 'running', stepCount: 4 }); + + await coordinator.saveStepResult(operationId, stepResult as any); + + expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith( + operationId, + stepResult.stepIndex, + stepResult.newState, + 'error', + ); + }); + + it('should publish end event when status becomes interrupted', async () => { + const operationId = 'test-operation-id'; + const stepResult = { + executionTime: 1000, + newState: { status: 'interrupted', stepCount: 5 }, + stepIndex: 5, + }; + + mockStateManager.loadAgentState.mockResolvedValue({ status: 'running', stepCount: 4 }); + + await coordinator.saveStepResult(operationId, stepResult as any); + + expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith( + operationId, + 5, + stepResult.newState, + 'interrupted', ); }); @@ -172,7 +285,7 @@ describe('AgentRuntimeCoordinator', () => { const operationId = 'test-operation-id'; const stepResult = { executionTime: 1000, - newState: { status: 'done', stepCount: 5 }, + newState: { status: 'error', stepCount: 5 }, stepIndex: 5, }; diff --git a/src/server/modules/AgentRuntime/__tests__/RuntimeExecutors.test.ts b/src/server/modules/AgentRuntime/__tests__/RuntimeExecutors.test.ts index 67aa6abf7e..06bcccf4c6 100644 --- a/src/server/modules/AgentRuntime/__tests__/RuntimeExecutors.test.ts +++ b/src/server/modules/AgentRuntime/__tests__/RuntimeExecutors.test.ts @@ -2478,6 +2478,7 @@ describe('RuntimeExecutors', () => { type: 'error', data: expect.objectContaining({ error: '401 Unauthorized', + errorType: 'Error', phase: 'llm_execution', }), }), diff --git a/src/server/modules/AgentRuntime/__tests__/StreamEventManager.test.ts b/src/server/modules/AgentRuntime/__tests__/StreamEventManager.test.ts index da24507050..88c629540c 100644 --- a/src/server/modules/AgentRuntime/__tests__/StreamEventManager.test.ts +++ b/src/server/modules/AgentRuntime/__tests__/StreamEventManager.test.ts @@ -144,5 +144,45 @@ describe('StreamEventManager', () => { expect.any(String), ); }); + + it('should derive error reasonDetail from finalState when omitted', async () => { + const operationId = 'test-operation-id'; + const stepIndex = 3; + const finalState = { + error: { + message: 'Invalid provider API key', + type: 'InvalidProviderAPIKey', + }, + status: 'error', + }; + + mockRedis.xadd.mockResolvedValue('event-id-790'); + + await streamManager.publishAgentRuntimeEnd(operationId, stepIndex, finalState, 'error'); + + expect(mockRedis.xadd).toHaveBeenCalledWith( + expect.any(String), + expect.any(String), + expect.any(String), + expect.any(String), + expect.any(String), + expect.any(String), + expect.any(String), + expect.any(String), + expect.any(String), + expect.any(String), + operationId, + 'data', + JSON.stringify({ + finalState, + operationId, + phase: 'execution_complete', + reason: 'error', + reasonDetail: 'Invalid provider API key', + }), + expect.any(String), + expect.any(String), + ); + }); }); }); diff --git a/src/server/modules/AgentRuntime/__tests__/factory.test.ts b/src/server/modules/AgentRuntime/__tests__/factory.test.ts new file mode 100644 index 0000000000..d650484f72 --- /dev/null +++ b/src/server/modules/AgentRuntime/__tests__/factory.test.ts @@ -0,0 +1,112 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +import { createAgentStateManager, createStreamEventManager, isRedisAvailable } from '../factory'; + +const { + MockAgentStateManager, + MockStreamEventManager, + mockAppEnv, + mockGetAgentRuntimeRedisClient, + mockInMemoryAgentStateManager, + mockInMemoryStreamEventManager, +} = vi.hoisted(() => ({ + MockAgentStateManager: vi.fn(() => ({ kind: 'redis-state-manager' })), + MockStreamEventManager: vi.fn(() => ({ kind: 'redis-stream-event-manager' })), + mockAppEnv: { + enableQueueAgentRuntime: false, + }, + mockGetAgentRuntimeRedisClient: vi.fn(), + mockInMemoryAgentStateManager: { kind: 'in-memory-state-manager' }, + mockInMemoryStreamEventManager: { kind: 'in-memory-stream-event-manager' }, +})); + +vi.mock('@/envs/app', () => ({ + appEnv: mockAppEnv, +})); + +vi.mock('../redis', () => ({ + getAgentRuntimeRedisClient: mockGetAgentRuntimeRedisClient, +})); + +vi.mock('../InMemoryAgentStateManager', () => ({ + inMemoryAgentStateManager: mockInMemoryAgentStateManager, +})); + +vi.mock('../InMemoryStreamEventManager', () => ({ + inMemoryStreamEventManager: mockInMemoryStreamEventManager, +})); + +vi.mock('../AgentStateManager', () => ({ + AgentStateManager: MockAgentStateManager, +})); + +vi.mock('../StreamEventManager', () => ({ + StreamEventManager: MockStreamEventManager, +})); + +describe('AgentRuntime factory', () => { + beforeEach(() => { + vi.clearAllMocks(); + mockAppEnv.enableQueueAgentRuntime = false; + mockGetAgentRuntimeRedisClient.mockReturnValue(null); + }); + + describe('isRedisAvailable', () => { + it('returns true when a Redis client exists', () => { + mockGetAgentRuntimeRedisClient.mockReturnValue({ ping: vi.fn() }); + + expect(isRedisAvailable()).toBe(true); + }); + + it('returns false when Redis is unavailable', () => { + mockGetAgentRuntimeRedisClient.mockReturnValue(null); + + expect(isRedisAvailable()).toBe(false); + }); + }); + + describe('createAgentStateManager', () => { + it('uses in-memory state when queue mode is disabled', () => { + expect(createAgentStateManager()).toBe(mockInMemoryAgentStateManager); + expect(MockAgentStateManager).not.toHaveBeenCalled(); + }); + + it('uses Redis-backed state when queue mode is enabled and Redis is available', () => { + mockAppEnv.enableQueueAgentRuntime = true; + mockGetAgentRuntimeRedisClient.mockReturnValue({ ping: vi.fn() }); + + expect(createAgentStateManager()).toEqual({ kind: 'redis-state-manager' }); + expect(MockAgentStateManager).toHaveBeenCalledTimes(1); + }); + + it('throws when queue mode is enabled without Redis', () => { + mockAppEnv.enableQueueAgentRuntime = true; + + expect(() => createAgentStateManager()).toThrow( + 'Redis is required when AGENT_RUNTIME_MODE=queue. Please configure `REDIS_URL`.', + ); + }); + }); + + describe('createStreamEventManager', () => { + it('prefers Redis-backed streams when Redis is available in local mode', () => { + mockGetAgentRuntimeRedisClient.mockReturnValue({ ping: vi.fn() }); + + expect(createStreamEventManager()).toEqual({ kind: 'redis-stream-event-manager' }); + expect(MockStreamEventManager).toHaveBeenCalledTimes(1); + }); + + it('falls back to in-memory streams when local mode has no Redis', () => { + expect(createStreamEventManager()).toBe(mockInMemoryStreamEventManager); + expect(MockStreamEventManager).not.toHaveBeenCalled(); + }); + + it('throws when queue mode is enabled without Redis', () => { + mockAppEnv.enableQueueAgentRuntime = true; + + expect(() => createStreamEventManager()).toThrow( + 'Redis is required when AGENT_RUNTIME_MODE=queue. Please configure `REDIS_URL`.', + ); + }); + }); +}); diff --git a/src/server/modules/AgentRuntime/factory.ts b/src/server/modules/AgentRuntime/factory.ts index f3f1350596..8eaf218174 100644 --- a/src/server/modules/AgentRuntime/factory.ts +++ b/src/server/modules/AgentRuntime/factory.ts @@ -49,22 +49,22 @@ export const createAgentStateManager = (): IAgentStateManager => { /** * Create StreamEventManager based on configuration * - * - If enableQueueAgentRuntime=false (default): InMemoryStreamEventManager - * - If enableQueueAgentRuntime=true: RedisStreamEventManager (requires Redis) + * - If Redis is available: RedisStreamEventManager + * - If Redis is unavailable and enableQueueAgentRuntime=false (default): InMemoryStreamEventManager + * - If Redis is unavailable and enableQueueAgentRuntime=true: throw */ export const createStreamEventManager = (): IStreamEventManager => { - // When queue mode is disabled, always use InMemory for simplicity + // Prefer Redis whenever it is available so the runtime worker and SSE route + // can communicate through the same stream bus even in local mode. + if (isRedisAvailable()) { + log('Redis available, using StreamEventManager'); + return new StreamEventManager(); + } + if (!isQueueModeEnabled()) { - log('Queue mode disabled, using InMemoryStreamEventManager'); + log('Redis unavailable and queue mode disabled, using InMemoryStreamEventManager'); return inMemoryStreamEventManager; } - // Queue mode enabled, Redis is required - if (!isRedisAvailable()) { - throw new Error( - 'Redis is required when AGENT_RUNTIME_MODE=queue. Please configure `REDIS_URL`.', - ); - } - - return new StreamEventManager(); + throw new Error('Redis is required when AGENT_RUNTIME_MODE=queue. Please configure `REDIS_URL`.'); }; diff --git a/src/server/services/agentRuntime/AgentRuntimeService.test.ts b/src/server/services/agentRuntime/AgentRuntimeService.test.ts index 5d048c2f4c..2ddc546aa0 100644 --- a/src/server/services/agentRuntime/AgentRuntimeService.test.ts +++ b/src/server/services/agentRuntime/AgentRuntimeService.test.ts @@ -419,6 +419,7 @@ describe('AgentRuntimeService', () => { stepIndex: 1, phase: 'step_execution', error: 'Runtime error', + errorType: '500', }, }); }); diff --git a/src/server/services/agentRuntime/AgentRuntimeService.ts b/src/server/services/agentRuntime/AgentRuntimeService.ts index f8d7d67d2c..6099b2e7a3 100644 --- a/src/server/services/agentRuntime/AgentRuntimeService.ts +++ b/src/server/services/agentRuntime/AgentRuntimeService.ts @@ -973,6 +973,7 @@ export class AgentRuntimeService { }; } catch (error) { log('Step %d failed for operation %s: %O', stepIndex, operationId, error); + const formattedError = formatErrorForState(error); // Build error state — try loading current state from coordinator, but if that // also fails (e.g. Redis ECONNRESET), fall back to a minimal error state so @@ -981,7 +982,8 @@ export class AgentRuntimeService { try { await this.streamManager.publishStreamEvent(operationId, { data: { - error: (error as Error).message, + error: formattedError.message, + errorType: String(formattedError.type), phase: 'step_execution', stepIndex, }, @@ -1000,15 +1002,17 @@ export class AgentRuntimeService { const errorState = await this.coordinator.loadAgentState(operationId); finalStateWithError = { ...errorState!, - error: formatErrorForState(error), + error: formattedError, status: 'error' as const, + stepCount: errorState?.stepCount ?? stepIndex, }; } catch (loadError) { log('[%s] Failed to load error state (infra may be down): %O', operationId, loadError); // Fallback: construct a minimal error state so callbacks still receive useful info finalStateWithError = { - error: formatErrorForState(error), + error: formattedError, status: 'error' as const, + stepCount: stepIndex, }; } diff --git a/src/server/services/agentRuntime/__tests__/executeStep.test.ts b/src/server/services/agentRuntime/__tests__/executeStep.test.ts index 037fc753c8..8a8af31b8a 100644 --- a/src/server/services/agentRuntime/__tests__/executeStep.test.ts +++ b/src/server/services/agentRuntime/__tests__/executeStep.test.ts @@ -413,6 +413,100 @@ describe('AgentRuntimeService.executeStep - Redis failure in error handler', () ); }); + it('should include stepCount in fallback error state when state reload fails', async () => { + const service = createService(); + const coordinator = (service as any).coordinator; + const streamManager = (service as any).streamManager; + + coordinator.tryClaimStep = vi.fn().mockResolvedValue(true); + + let loadCallCount = 0; + coordinator.loadAgentState = vi.fn().mockImplementation(() => { + loadCallCount++; + if (loadCallCount === 1) { + return Promise.resolve({ + status: 'running', + stepCount: 5, + lastModified: new Date().toISOString(), + metadata: {}, + }); + } + return Promise.reject(new Error('Redis ECONNRESET')); + }); + + let publishCallCount = 0; + streamManager.publishStreamEvent = vi.fn().mockImplementation(() => { + publishCallCount++; + if (publishCallCount === 1) return Promise.resolve(); + return Promise.reject(new Error('Redis ECONNRESET')); + }); + + coordinator.saveAgentState = vi.fn().mockResolvedValue(undefined); + + await expect( + service.executeStep({ + operationId: 'op-fallback-step-count', + stepIndex: 6, + context: { phase: 'user_input' } as any, + }), + ).rejects.toThrow(); + + expect(coordinator.saveAgentState).toHaveBeenCalledWith( + 'op-fallback-step-count', + expect.objectContaining({ + status: 'error', + stepCount: 6, + }), + ); + }); + + it('should preserve stepCount when loadAgentState returns null in error handler', async () => { + const service = createService(); + const coordinator = (service as any).coordinator; + const streamManager = (service as any).streamManager; + + coordinator.tryClaimStep = vi.fn().mockResolvedValue(true); + + let loadCallCount = 0; + coordinator.loadAgentState = vi.fn().mockImplementation(() => { + loadCallCount++; + if (loadCallCount === 1) { + return Promise.resolve({ + status: 'running', + stepCount: 5, + lastModified: new Date().toISOString(), + metadata: {}, + }); + } + return Promise.resolve(null); + }); + + let publishCallCount = 0; + streamManager.publishStreamEvent = vi.fn().mockImplementation(() => { + publishCallCount++; + if (publishCallCount === 1) return Promise.resolve(); + return Promise.reject(new Error('Redis ECONNRESET')); + }); + + coordinator.saveAgentState = vi.fn().mockResolvedValue(undefined); + + await expect( + service.executeStep({ + operationId: 'op-null-step-count', + stepIndex: 7, + context: { phase: 'user_input' } as any, + }), + ).rejects.toThrow(); + + expect(coordinator.saveAgentState).toHaveBeenCalledWith( + 'op-null-step-count', + expect.objectContaining({ + status: 'error', + stepCount: 7, + }), + ); + }); + it('should preserve loaded state metadata when only saveAgentState fails', async () => { const service = createService(); const coordinator = (service as any).coordinator;