🛠 chore: add subscribeStreamEvents to InMemoryStreamEventManager (#12964)

*  feat: add subscribeStreamEvents to InMemoryStreamEventManager and use factory for stream route

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* 🐛 fix: remove duplicate agentExecution types and fix stream route test mock

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Arvin Xu
2026-03-14 13:07:46 +08:00
committed by GitHub
parent 6052b67953
commit a96cac59d7
6 changed files with 264 additions and 4 deletions

View File

@@ -145,6 +145,60 @@ describe('createEnableChecker', () => {
});
});
describe('default behavior - should disable unknown tools', () => {
it('should disable tools not listed in rules by default', () => {
const checker = createEnableChecker({
rules: {
'knowledge-base': true,
'memory': false,
'web-browsing': true,
},
});
// Tools in rules should follow their rule
expect(checker(makeParams('knowledge-base'))).toBe(true);
expect(checker(makeParams('memory'))).toBe(false);
expect(checker(makeParams('web-browsing'))).toBe(true);
// BUG: Tools NOT in rules currently default to true,
// but should default to false to prevent unintended tool activation
// This is the regression test for the "all 7 builtin tools enabled" bug
expect(checker(makeParams('lobe-tools'))).toBe(false);
expect(checker(makeParams('lobe-skills'))).toBe(false);
expect(checker(makeParams('lobe-skill-store'))).toBe(false);
});
});
describe('user-selected tools via rules', () => {
it('should only enable user-selected tools plus explicitly enabled defaults', () => {
// Simulates: user selected only "notebook", system enables knowledge-base and web-browsing
const userPlugins = ['notebook'];
const rules: Record<string, boolean> = {
// System-level rules
'knowledge-base': true,
'memory': false,
'web-browsing': true,
// User-selected plugins
...Object.fromEntries(userPlugins.map((id) => [id, true])),
};
const checker = createEnableChecker({ rules });
// User-selected tool: enabled
expect(checker(makeParams('notebook'))).toBe(true);
// System-enabled tools: follow their rules
expect(checker(makeParams('knowledge-base'))).toBe(true);
expect(checker(makeParams('web-browsing'))).toBe(true);
expect(checker(makeParams('memory'))).toBe(false);
// Default tools NOT in rules: should be disabled
expect(checker(makeParams('lobe-tools'))).toBe(false);
expect(checker(makeParams('lobe-skills'))).toBe(false);
expect(checker(makeParams('lobe-skill-store'))).toBe(false);
});
});
describe('priority order', () => {
it('should apply: explicitActivation > platformFilter > rules > default', () => {
const checker = createEnableChecker({

View File

@@ -11,7 +11,7 @@ const mockStreamEventManager = {
};
vi.mock('@/server/modules/AgentRuntime', () => ({
StreamEventManager: vi.fn(() => mockStreamEventManager),
createStreamEventManager: vi.fn(() => mockStreamEventManager),
}));
describe('/api/agent/stream route', () => {

View File

@@ -3,7 +3,7 @@ import debug from 'debug';
import { type NextRequest } from 'next/server';
import { NextResponse } from 'next/server';
import { StreamEventManager } from '@/server/modules/AgentRuntime';
import { createStreamEventManager } from '@/server/modules/AgentRuntime';
const log = debug('api-route:agent:stream');
const timing = debug('lobe-server:agent-runtime:timing');
@@ -13,8 +13,8 @@ const timing = debug('lobe-server:agent-runtime:timing');
* Provides real-time Agent execution event stream for clients
*/
export async function GET(request: NextRequest) {
// Initialize stream event manager
const streamManager = new StreamEventManager();
// Initialize stream event manager (uses InMemory singleton in local dev, Redis in production)
const streamManager = createStreamEventManager();
const { searchParams } = new URL(request.url);
const operationId = searchParams.get('operationId');

View File

@@ -130,6 +130,42 @@ export class InMemoryStreamEventManager implements IStreamEventManager {
log('InMemoryStreamEventManager disconnected');
}
/**
* Subscribe to stream events (for SSE endpoint)
* Compatible with Redis StreamEventManager.subscribeStreamEvents
*/
async subscribeStreamEvents(
operationId: string,
_lastEventId: string,
onEvents: (events: StreamEvent[]) => void,
signal?: AbortSignal,
): Promise<void> {
return new Promise<void>((resolve) => {
const unsubscribe = this.subscribe(operationId, (events) => {
onEvents(events);
// Check if agent_runtime_end was received — caller will handle closing
const hasEnd = events.some((e) => e.type === 'agent_runtime_end');
if (hasEnd) {
unsubscribe();
resolve();
}
});
// Handle abort signal
if (signal) {
const onAbort = () => {
unsubscribe();
resolve();
};
if (signal.aborted) {
onAbort();
return;
}
signal.addEventListener('abort', onAbort, { once: true });
}
});
}
/**
* Subscribe to stream events (for testing)
*/

View File

@@ -0,0 +1,160 @@
import { describe, expect, it, vi } from 'vitest';
import { InMemoryStreamEventManager } from '../InMemoryStreamEventManager';
import type { StreamEvent } from '../StreamEventManager';
describe('InMemoryStreamEventManager', () => {
let manager: InMemoryStreamEventManager;
beforeEach(() => {
manager = new InMemoryStreamEventManager();
});
afterEach(() => {
manager.clear();
});
describe('publishStreamEvent', () => {
it('should publish and store events', async () => {
const eventId = await manager.publishStreamEvent('op-1', {
data: { msg: 'hello' },
stepIndex: 0,
type: 'agent_runtime_init',
});
expect(eventId).toBeDefined();
const events = manager.getAllEvents('op-1');
expect(events).toHaveLength(1);
expect(events[0].type).toBe('agent_runtime_init');
});
it('should notify subscribers on publish', async () => {
const callback = vi.fn();
manager.subscribe('op-1', callback);
await manager.publishStreamEvent('op-1', {
data: { msg: 'hello' },
stepIndex: 0,
type: 'agent_runtime_init',
});
expect(callback).toHaveBeenCalledTimes(1);
expect(callback).toHaveBeenCalledWith(
expect.arrayContaining([expect.objectContaining({ type: 'agent_runtime_init' })]),
);
});
});
describe('subscribe', () => {
it('should return an unsubscribe function', async () => {
const callback = vi.fn();
const unsubscribe = manager.subscribe('op-1', callback);
await manager.publishStreamEvent('op-1', {
data: {},
stepIndex: 0,
type: 'agent_runtime_init',
});
expect(callback).toHaveBeenCalledTimes(1);
unsubscribe();
await manager.publishStreamEvent('op-1', {
data: {},
stepIndex: 1,
type: 'agent_runtime_end',
});
expect(callback).toHaveBeenCalledTimes(1);
});
});
describe('subscribeStreamEvents', () => {
it('should resolve when agent_runtime_end event is received', async () => {
const receivedEvents: StreamEvent[] = [];
const subscribePromise = manager.subscribeStreamEvents('op-1', '0', (events) => {
receivedEvents.push(...events);
});
// Publish some events
await manager.publishStreamEvent('op-1', {
data: { status: 'running' },
stepIndex: 0,
type: 'agent_runtime_init',
});
await manager.publishStreamEvent('op-1', {
data: { status: 'done' },
stepIndex: 1,
type: 'agent_runtime_end',
});
await subscribePromise;
expect(receivedEvents).toHaveLength(2);
expect(receivedEvents[0].type).toBe('agent_runtime_init');
expect(receivedEvents[1].type).toBe('agent_runtime_end');
});
it('should resolve immediately if signal is already aborted', async () => {
const controller = new AbortController();
controller.abort();
const receivedEvents: StreamEvent[] = [];
await manager.subscribeStreamEvents(
'op-1',
'0',
(events) => {
receivedEvents.push(...events);
},
controller.signal,
);
expect(receivedEvents).toHaveLength(0);
});
it('should resolve when signal is aborted', async () => {
const controller = new AbortController();
const receivedEvents: StreamEvent[] = [];
const subscribePromise = manager.subscribeStreamEvents(
'op-1',
'0',
(events) => {
receivedEvents.push(...events);
},
controller.signal,
);
await manager.publishStreamEvent('op-1', {
data: { status: 'running' },
stepIndex: 0,
type: 'agent_runtime_init',
});
controller.abort();
await subscribePromise;
expect(receivedEvents).toHaveLength(1);
expect(receivedEvents[0].type).toBe('agent_runtime_init');
});
});
describe('clear', () => {
it('should clear all stored events and subscribers', async () => {
await manager.publishStreamEvent('op-1', {
data: {},
stepIndex: 0,
type: 'agent_runtime_init',
});
expect(manager.getAllEvents('op-1')).toHaveLength(1);
manager.clear();
expect(manager.getAllEvents('op-1')).toHaveLength(0);
});
});
});

View File

@@ -144,4 +144,14 @@ export interface IStreamEventManager {
operationId: string,
event: Omit<StreamEvent, 'operationId' | 'timestamp'>,
) => Promise<string>;
/**
* Subscribe to stream events (for SSE endpoint)
*/
subscribeStreamEvents: (
operationId: string,
lastEventId: string,
onEvents: (events: StreamEvent[]) => void,
signal?: AbortSignal,
) => Promise<void>;
}