🐛 fix: agent stream error in local dev (#13054)

* 🐛 fix: close local agent streams on terminal errors

* ♻️ refactor: revert redundant cli stream error handling

* 🧪 test: remove redundant cli stream error test

* wip: prune tests

* 🐛 fix: guard terminal agent runtime end step index
This commit is contained in:
Rylan Cai
2026-03-20 11:39:54 +08:00
committed by GitHub
parent f51da14f07
commit 6ce9d9a814
12 changed files with 484 additions and 40 deletions

View File

@@ -7,6 +7,16 @@ import { type IAgentStateManager, type IStreamEventManager } from './types';
const log = debug('lobe-server:agent-runtime:coordinator');
const TERMINAL_STATUSES = new Set<AgentState['status']>(['done', 'error', 'interrupted']);
const hasEnteredTerminalState = (
previousStatus?: AgentState['status'],
nextStatus?: AgentState['status'],
): nextStatus is 'done' | 'error' | 'interrupted' => {
const wasTerminal = previousStatus ? TERMINAL_STATUSES.has(previousStatus) : false;
return Boolean(nextStatus && TERMINAL_STATUSES.has(nextStatus) && !wasTerminal);
};
export interface AgentRuntimeCoordinatorOptions {
/**
* Custom state manager implementation
@@ -79,10 +89,15 @@ export class AgentRuntimeCoordinator {
// Save state
await this.stateManager.saveAgentState(operationId, state);
// If status changes to done, send agent runtime end event
if (state.status === 'done' && previousState?.status !== 'done') {
await this.streamEventManager.publishAgentRuntimeEnd(operationId, state.stepCount, state);
log('[%s] Agent runtime completed', operationId);
// Send a terminal event once the operation first enters a terminal state.
if (hasEnteredTerminalState(previousState?.status, state.status)) {
await this.streamEventManager.publishAgentRuntimeEnd(
operationId,
state.stepCount ?? previousState?.stepCount ?? 0,
state,
state.status,
);
log('[%s] Agent runtime reached terminal state: %s', operationId, state.status);
}
} catch (error) {
console.error('Failed to save agent state and handle events:', error);
@@ -101,15 +116,19 @@ export class AgentRuntimeCoordinator {
// Save step result
await this.stateManager.saveStepResult(operationId, stepResult);
// If status changes to done, send agent_runtime_end event
// This ensures agent_runtime_end is sent after all step events
if (stepResult.newState.status === 'done' && previousState?.status !== 'done') {
// This ensures agent_runtime_end is sent after all step events.
if (hasEnteredTerminalState(previousState?.status, stepResult.newState.status)) {
await this.streamEventManager.publishAgentRuntimeEnd(
operationId,
stepResult.newState.stepCount,
stepResult.newState.stepCount ?? stepResult.stepIndex ?? previousState?.stepCount ?? 0,
stepResult.newState,
stepResult.newState.status,
);
log(
'[%s] Agent runtime reached terminal state after step result: %s',
operationId,
stepResult.newState.status,
);
log('[%s] Agent runtime completed', operationId);
}
} catch (error) {
console.error('Failed to save step result and handle events:', error);

View File

@@ -5,6 +5,18 @@ import { type IStreamEventManager } from './types';
const log = debug('lobe-server:agent-runtime:in-memory-stream-event-manager');
const getDefaultReasonDetail = (finalState: any, reason?: string): string => {
if (reason === 'error') {
return finalState?.error?.message || finalState?.error?.type || 'Agent runtime failed';
}
if (reason === 'interrupted') {
return finalState?.error?.message || 'Agent runtime interrupted';
}
return 'Agent runtime completed successfully';
};
type EventCallback = (events: StreamEvent[]) => void;
/**
@@ -98,7 +110,7 @@ export class InMemoryStreamEventManager implements IStreamEventManager {
operationId,
phase: 'execution_complete',
reason: reason || 'completed',
reasonDetail: reasonDetail || 'Agent runtime completed successfully',
reasonDetail: reasonDetail || getDefaultReasonDetail(finalState, reason),
},
stepIndex,
type: 'agent_runtime_end',

View File

@@ -45,6 +45,51 @@ const TOOL_PRICING: Record<string, number> = {
'lobe-web-browsing/search': 0,
};
const formatErrorEventData = (error: unknown, phase: string) => {
let errorMessage = 'Unknown error';
let errorType: string | undefined;
if (error && typeof error === 'object') {
const payload = error as { error?: unknown; errorType?: unknown; message?: unknown };
if (typeof payload.errorType === 'string') {
errorType = payload.errorType;
}
if (typeof payload.message === 'string' && payload.message.length > 0) {
errorMessage = payload.message;
} else if (typeof payload.error === 'string' && payload.error.length > 0) {
errorMessage = payload.error;
} else if (
payload.error &&
typeof payload.error === 'object' &&
'message' in payload.error &&
typeof payload.error.message === 'string'
) {
errorMessage = payload.error.message;
} else if (error instanceof Error && error.message.length > 0) {
errorMessage = error.message;
} else if (errorType) {
errorMessage = errorType;
}
} else if (error instanceof Error && error.message.length > 0) {
errorMessage = error.message;
errorType = error.name;
} else if (typeof error === 'string' && error.length > 0) {
errorMessage = error;
}
if (!errorType && error instanceof Error && error.name) {
errorType = error.name;
}
return {
error: errorMessage,
errorType,
phase,
};
};
export interface RuntimeExecutorContext {
agentConfig?: any;
discordContext?: any;
@@ -623,10 +668,7 @@ export const createRuntimeExecutors = (
} catch (error) {
// Publish error event
await streamManager.publishStreamEvent(operationId, {
data: {
error: (error as Error).message,
phase: 'llm_execution',
},
data: formatErrorEventData(error, 'llm_execution'),
stepIndex,
type: 'error',
});
@@ -1082,10 +1124,7 @@ export const createRuntimeExecutors = (
} catch (error) {
// Publish tool execution error event
await streamManager.publishStreamEvent(operationId, {
data: {
error: (error as Error).message,
phase: 'tool_execution',
},
data: formatErrorEventData(error, 'tool_execution'),
stepIndex,
type: 'error',
});
@@ -1226,10 +1265,7 @@ export const createRuntimeExecutors = (
// Publish error event
await streamManager.publishStreamEvent(operationId, {
data: {
error: (error as Error).message,
phase: 'tool_execution',
},
data: formatErrorEventData(error, 'tool_execution'),
stepIndex,
type: 'error',
});

View File

@@ -7,6 +7,18 @@ import { getAgentRuntimeRedisClient } from './redis';
const log = debug('lobe-server:agent-runtime:stream-event-manager');
const timing = debug('lobe-server:agent-runtime:timing');
const getDefaultReasonDetail = (finalState: any, reason?: string): string => {
if (reason === 'error') {
return finalState?.error?.message || finalState?.error?.type || 'Agent runtime failed';
}
if (reason === 'interrupted') {
return finalState?.error?.message || 'Agent runtime interrupted';
}
return 'Agent runtime completed successfully';
};
export interface StreamEvent {
data: any;
id?: string; // Redis Stream event ID
@@ -167,7 +179,7 @@ export class StreamEventManager {
operationId,
phase: 'execution_complete',
reason: reason || 'completed',
reasonDetail: reasonDetail || 'Agent runtime completed successfully',
reasonDetail: reasonDetail || getDefaultReasonDetail(finalState, reason),
},
stepIndex,
type: 'agent_runtime_end',

View File

@@ -99,13 +99,65 @@ describe('AgentRuntimeCoordinator', () => {
operationId,
newState.stepCount,
newState,
'done',
);
});
it('should publish end event when status changes to error', async () => {
const operationId = 'test-operation-id';
const previousState = { status: 'running', stepCount: 3 };
const newState = { error: { message: 'boom' }, status: 'error', stepCount: 5 };
mockStateManager.loadAgentState.mockResolvedValue(previousState);
await coordinator.saveAgentState(operationId, newState as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
operationId,
newState.stepCount,
newState,
'error',
);
});
it('should fallback to previous stepCount when terminal state is missing stepCount', async () => {
const operationId = 'test-operation-id';
const previousState = { status: 'running', stepCount: 3 };
const newState = { error: { message: 'boom' }, status: 'error' };
mockStateManager.loadAgentState.mockResolvedValue(previousState);
await coordinator.saveAgentState(operationId, newState as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
operationId,
previousState.stepCount,
newState,
'error',
);
});
it('should publish end event when status changes to interrupted', async () => {
const operationId = 'test-operation-id';
const previousState = { status: 'running', stepCount: 3 };
const newState = { status: 'interrupted', stepCount: 5 };
mockStateManager.loadAgentState.mockResolvedValue(previousState);
await coordinator.saveAgentState(operationId, newState as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
operationId,
newState.stepCount,
newState,
'interrupted',
);
});
it('should not publish end event when status was already done', async () => {
const operationId = 'test-operation-id';
const previousState = { status: 'done', stepCount: 5 };
const newState = { status: 'done', stepCount: 5 };
const newState = { status: 'error', stepCount: 5 };
mockStateManager.loadAgentState.mockResolvedValue(previousState);
@@ -149,6 +201,67 @@ describe('AgentRuntimeCoordinator', () => {
operationId,
5,
stepResult.newState,
'done',
);
});
it('should publish end event when status becomes error', async () => {
const operationId = 'test-operation-id';
const stepResult = {
executionTime: 1000,
newState: { error: { message: 'boom' }, status: 'error', stepCount: 5 },
stepIndex: 5,
};
mockStateManager.loadAgentState.mockResolvedValue({ status: 'running', stepCount: 4 });
await coordinator.saveStepResult(operationId, stepResult as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
operationId,
5,
stepResult.newState,
'error',
);
});
it('should fallback to stepResult.stepIndex when terminal step result state is missing stepCount', async () => {
const operationId = 'test-operation-id';
const stepResult = {
executionTime: 1000,
newState: { error: { message: 'boom' }, status: 'error' },
stepIndex: 5,
};
mockStateManager.loadAgentState.mockResolvedValue({ status: 'running', stepCount: 4 });
await coordinator.saveStepResult(operationId, stepResult as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
operationId,
stepResult.stepIndex,
stepResult.newState,
'error',
);
});
it('should publish end event when status becomes interrupted', async () => {
const operationId = 'test-operation-id';
const stepResult = {
executionTime: 1000,
newState: { status: 'interrupted', stepCount: 5 },
stepIndex: 5,
};
mockStateManager.loadAgentState.mockResolvedValue({ status: 'running', stepCount: 4 });
await coordinator.saveStepResult(operationId, stepResult as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
operationId,
5,
stepResult.newState,
'interrupted',
);
});
@@ -172,7 +285,7 @@ describe('AgentRuntimeCoordinator', () => {
const operationId = 'test-operation-id';
const stepResult = {
executionTime: 1000,
newState: { status: 'done', stepCount: 5 },
newState: { status: 'error', stepCount: 5 },
stepIndex: 5,
};

View File

@@ -2478,6 +2478,7 @@ describe('RuntimeExecutors', () => {
type: 'error',
data: expect.objectContaining({
error: '401 Unauthorized',
errorType: 'Error',
phase: 'llm_execution',
}),
}),

View File

@@ -144,5 +144,45 @@ describe('StreamEventManager', () => {
expect.any(String),
);
});
it('should derive error reasonDetail from finalState when omitted', async () => {
const operationId = 'test-operation-id';
const stepIndex = 3;
const finalState = {
error: {
message: 'Invalid provider API key',
type: 'InvalidProviderAPIKey',
},
status: 'error',
};
mockRedis.xadd.mockResolvedValue('event-id-790');
await streamManager.publishAgentRuntimeEnd(operationId, stepIndex, finalState, 'error');
expect(mockRedis.xadd).toHaveBeenCalledWith(
expect.any(String),
expect.any(String),
expect.any(String),
expect.any(String),
expect.any(String),
expect.any(String),
expect.any(String),
expect.any(String),
expect.any(String),
expect.any(String),
operationId,
'data',
JSON.stringify({
finalState,
operationId,
phase: 'execution_complete',
reason: 'error',
reasonDetail: 'Invalid provider API key',
}),
expect.any(String),
expect.any(String),
);
});
});
});

View File

@@ -0,0 +1,112 @@
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { createAgentStateManager, createStreamEventManager, isRedisAvailable } from '../factory';
const {
MockAgentStateManager,
MockStreamEventManager,
mockAppEnv,
mockGetAgentRuntimeRedisClient,
mockInMemoryAgentStateManager,
mockInMemoryStreamEventManager,
} = vi.hoisted(() => ({
MockAgentStateManager: vi.fn(() => ({ kind: 'redis-state-manager' })),
MockStreamEventManager: vi.fn(() => ({ kind: 'redis-stream-event-manager' })),
mockAppEnv: {
enableQueueAgentRuntime: false,
},
mockGetAgentRuntimeRedisClient: vi.fn(),
mockInMemoryAgentStateManager: { kind: 'in-memory-state-manager' },
mockInMemoryStreamEventManager: { kind: 'in-memory-stream-event-manager' },
}));
vi.mock('@/envs/app', () => ({
appEnv: mockAppEnv,
}));
vi.mock('../redis', () => ({
getAgentRuntimeRedisClient: mockGetAgentRuntimeRedisClient,
}));
vi.mock('../InMemoryAgentStateManager', () => ({
inMemoryAgentStateManager: mockInMemoryAgentStateManager,
}));
vi.mock('../InMemoryStreamEventManager', () => ({
inMemoryStreamEventManager: mockInMemoryStreamEventManager,
}));
vi.mock('../AgentStateManager', () => ({
AgentStateManager: MockAgentStateManager,
}));
vi.mock('../StreamEventManager', () => ({
StreamEventManager: MockStreamEventManager,
}));
describe('AgentRuntime factory', () => {
beforeEach(() => {
vi.clearAllMocks();
mockAppEnv.enableQueueAgentRuntime = false;
mockGetAgentRuntimeRedisClient.mockReturnValue(null);
});
describe('isRedisAvailable', () => {
it('returns true when a Redis client exists', () => {
mockGetAgentRuntimeRedisClient.mockReturnValue({ ping: vi.fn() });
expect(isRedisAvailable()).toBe(true);
});
it('returns false when Redis is unavailable', () => {
mockGetAgentRuntimeRedisClient.mockReturnValue(null);
expect(isRedisAvailable()).toBe(false);
});
});
describe('createAgentStateManager', () => {
it('uses in-memory state when queue mode is disabled', () => {
expect(createAgentStateManager()).toBe(mockInMemoryAgentStateManager);
expect(MockAgentStateManager).not.toHaveBeenCalled();
});
it('uses Redis-backed state when queue mode is enabled and Redis is available', () => {
mockAppEnv.enableQueueAgentRuntime = true;
mockGetAgentRuntimeRedisClient.mockReturnValue({ ping: vi.fn() });
expect(createAgentStateManager()).toEqual({ kind: 'redis-state-manager' });
expect(MockAgentStateManager).toHaveBeenCalledTimes(1);
});
it('throws when queue mode is enabled without Redis', () => {
mockAppEnv.enableQueueAgentRuntime = true;
expect(() => createAgentStateManager()).toThrow(
'Redis is required when AGENT_RUNTIME_MODE=queue. Please configure `REDIS_URL`.',
);
});
});
describe('createStreamEventManager', () => {
it('prefers Redis-backed streams when Redis is available in local mode', () => {
mockGetAgentRuntimeRedisClient.mockReturnValue({ ping: vi.fn() });
expect(createStreamEventManager()).toEqual({ kind: 'redis-stream-event-manager' });
expect(MockStreamEventManager).toHaveBeenCalledTimes(1);
});
it('falls back to in-memory streams when local mode has no Redis', () => {
expect(createStreamEventManager()).toBe(mockInMemoryStreamEventManager);
expect(MockStreamEventManager).not.toHaveBeenCalled();
});
it('throws when queue mode is enabled without Redis', () => {
mockAppEnv.enableQueueAgentRuntime = true;
expect(() => createStreamEventManager()).toThrow(
'Redis is required when AGENT_RUNTIME_MODE=queue. Please configure `REDIS_URL`.',
);
});
});
});

View File

@@ -49,22 +49,22 @@ export const createAgentStateManager = (): IAgentStateManager => {
/**
* Create StreamEventManager based on configuration
*
* - If enableQueueAgentRuntime=false (default): InMemoryStreamEventManager
* - If enableQueueAgentRuntime=true: RedisStreamEventManager (requires Redis)
* - If Redis is available: RedisStreamEventManager
* - If Redis is unavailable and enableQueueAgentRuntime=false (default): InMemoryStreamEventManager
* - If Redis is unavailable and enableQueueAgentRuntime=true: throw
*/
export const createStreamEventManager = (): IStreamEventManager => {
// When queue mode is disabled, always use InMemory for simplicity
// Prefer Redis whenever it is available so the runtime worker and SSE route
// can communicate through the same stream bus even in local mode.
if (isRedisAvailable()) {
log('Redis available, using StreamEventManager');
return new StreamEventManager();
}
if (!isQueueModeEnabled()) {
log('Queue mode disabled, using InMemoryStreamEventManager');
log('Redis unavailable and queue mode disabled, using InMemoryStreamEventManager');
return inMemoryStreamEventManager;
}
// Queue mode enabled, Redis is required
if (!isRedisAvailable()) {
throw new Error(
'Redis is required when AGENT_RUNTIME_MODE=queue. Please configure `REDIS_URL`.',
);
}
return new StreamEventManager();
throw new Error('Redis is required when AGENT_RUNTIME_MODE=queue. Please configure `REDIS_URL`.');
};

View File

@@ -419,6 +419,7 @@ describe('AgentRuntimeService', () => {
stepIndex: 1,
phase: 'step_execution',
error: 'Runtime error',
errorType: '500',
},
});
});

View File

@@ -973,6 +973,7 @@ export class AgentRuntimeService {
};
} catch (error) {
log('Step %d failed for operation %s: %O', stepIndex, operationId, error);
const formattedError = formatErrorForState(error);
// Build error state — try loading current state from coordinator, but if that
// also fails (e.g. Redis ECONNRESET), fall back to a minimal error state so
@@ -981,7 +982,8 @@ export class AgentRuntimeService {
try {
await this.streamManager.publishStreamEvent(operationId, {
data: {
error: (error as Error).message,
error: formattedError.message,
errorType: String(formattedError.type),
phase: 'step_execution',
stepIndex,
},
@@ -1000,15 +1002,17 @@ export class AgentRuntimeService {
const errorState = await this.coordinator.loadAgentState(operationId);
finalStateWithError = {
...errorState!,
error: formatErrorForState(error),
error: formattedError,
status: 'error' as const,
stepCount: errorState?.stepCount ?? stepIndex,
};
} catch (loadError) {
log('[%s] Failed to load error state (infra may be down): %O', operationId, loadError);
// Fallback: construct a minimal error state so callbacks still receive useful info
finalStateWithError = {
error: formatErrorForState(error),
error: formattedError,
status: 'error' as const,
stepCount: stepIndex,
};
}

View File

@@ -413,6 +413,100 @@ describe('AgentRuntimeService.executeStep - Redis failure in error handler', ()
);
});
it('should include stepCount in fallback error state when state reload fails', async () => {
const service = createService();
const coordinator = (service as any).coordinator;
const streamManager = (service as any).streamManager;
coordinator.tryClaimStep = vi.fn().mockResolvedValue(true);
let loadCallCount = 0;
coordinator.loadAgentState = vi.fn().mockImplementation(() => {
loadCallCount++;
if (loadCallCount === 1) {
return Promise.resolve({
status: 'running',
stepCount: 5,
lastModified: new Date().toISOString(),
metadata: {},
});
}
return Promise.reject(new Error('Redis ECONNRESET'));
});
let publishCallCount = 0;
streamManager.publishStreamEvent = vi.fn().mockImplementation(() => {
publishCallCount++;
if (publishCallCount === 1) return Promise.resolve();
return Promise.reject(new Error('Redis ECONNRESET'));
});
coordinator.saveAgentState = vi.fn().mockResolvedValue(undefined);
await expect(
service.executeStep({
operationId: 'op-fallback-step-count',
stepIndex: 6,
context: { phase: 'user_input' } as any,
}),
).rejects.toThrow();
expect(coordinator.saveAgentState).toHaveBeenCalledWith(
'op-fallback-step-count',
expect.objectContaining({
status: 'error',
stepCount: 6,
}),
);
});
it('should preserve stepCount when loadAgentState returns null in error handler', async () => {
const service = createService();
const coordinator = (service as any).coordinator;
const streamManager = (service as any).streamManager;
coordinator.tryClaimStep = vi.fn().mockResolvedValue(true);
let loadCallCount = 0;
coordinator.loadAgentState = vi.fn().mockImplementation(() => {
loadCallCount++;
if (loadCallCount === 1) {
return Promise.resolve({
status: 'running',
stepCount: 5,
lastModified: new Date().toISOString(),
metadata: {},
});
}
return Promise.resolve(null);
});
let publishCallCount = 0;
streamManager.publishStreamEvent = vi.fn().mockImplementation(() => {
publishCallCount++;
if (publishCallCount === 1) return Promise.resolve();
return Promise.reject(new Error('Redis ECONNRESET'));
});
coordinator.saveAgentState = vi.fn().mockResolvedValue(undefined);
await expect(
service.executeStep({
operationId: 'op-null-step-count',
stepIndex: 7,
context: { phase: 'user_input' } as any,
}),
).rejects.toThrow();
expect(coordinator.saveAgentState).toHaveBeenCalledWith(
'op-null-step-count',
expect.objectContaining({
status: 'error',
stepCount: 7,
}),
);
});
it('should preserve loaded state metadata when only saveAgentState fails', async () => {
const service = createService();
const coordinator = (service as any).coordinator;