From a96cac59d7d300c32ae55b0f9fa5462ba13504a1 Mon Sep 17 00:00:00 2001 From: Arvin Xu Date: Sat, 14 Mar 2026 13:07:46 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=9B=A0=20chore:=20add=20subscribeStreamEv?= =?UTF-8?q?ents=20to=20InMemoryStreamEventManager=20(#12964)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ✨ feat: add subscribeStreamEvents to InMemoryStreamEventManager and use factory for stream route Co-Authored-By: Claude Opus 4.6 * 🐛 fix: remove duplicate agentExecution types and fix stream route test mock Co-Authored-By: Claude Opus 4.6 --------- Co-authored-by: Claude Opus 4.6 --- .../__tests__/enableCheckerFactory.test.ts | 54 ++++++ .../api/agent/stream/__tests__/route.test.ts | 2 +- src/app/(backend)/api/agent/stream/route.ts | 6 +- .../InMemoryStreamEventManager.ts | 36 ++++ .../InMemoryStreamEventManager.test.ts | 160 ++++++++++++++++++ src/server/modules/AgentRuntime/types.ts | 10 ++ 6 files changed, 264 insertions(+), 4 deletions(-) create mode 100644 src/server/modules/AgentRuntime/__tests__/InMemoryStreamEventManager.test.ts diff --git a/packages/context-engine/src/engine/tools/__tests__/enableCheckerFactory.test.ts b/packages/context-engine/src/engine/tools/__tests__/enableCheckerFactory.test.ts index 53f7c7a85f..ab532c4d6d 100644 --- a/packages/context-engine/src/engine/tools/__tests__/enableCheckerFactory.test.ts +++ b/packages/context-engine/src/engine/tools/__tests__/enableCheckerFactory.test.ts @@ -145,6 +145,60 @@ describe('createEnableChecker', () => { }); }); + describe('default behavior - should disable unknown tools', () => { + it('should disable tools not listed in rules by default', () => { + const checker = createEnableChecker({ + rules: { + 'knowledge-base': true, + 'memory': false, + 'web-browsing': true, + }, + }); + + // Tools in rules should follow their rule + expect(checker(makeParams('knowledge-base'))).toBe(true); + expect(checker(makeParams('memory'))).toBe(false); + expect(checker(makeParams('web-browsing'))).toBe(true); + + // BUG: Tools NOT in rules currently default to true, + // but should default to false to prevent unintended tool activation + // This is the regression test for the "all 7 builtin tools enabled" bug + expect(checker(makeParams('lobe-tools'))).toBe(false); + expect(checker(makeParams('lobe-skills'))).toBe(false); + expect(checker(makeParams('lobe-skill-store'))).toBe(false); + }); + }); + + describe('user-selected tools via rules', () => { + it('should only enable user-selected tools plus explicitly enabled defaults', () => { + // Simulates: user selected only "notebook", system enables knowledge-base and web-browsing + const userPlugins = ['notebook']; + const rules: Record = { + // System-level rules + 'knowledge-base': true, + 'memory': false, + 'web-browsing': true, + // User-selected plugins + ...Object.fromEntries(userPlugins.map((id) => [id, true])), + }; + + const checker = createEnableChecker({ rules }); + + // User-selected tool: enabled + expect(checker(makeParams('notebook'))).toBe(true); + + // System-enabled tools: follow their rules + expect(checker(makeParams('knowledge-base'))).toBe(true); + expect(checker(makeParams('web-browsing'))).toBe(true); + expect(checker(makeParams('memory'))).toBe(false); + + // Default tools NOT in rules: should be disabled + expect(checker(makeParams('lobe-tools'))).toBe(false); + expect(checker(makeParams('lobe-skills'))).toBe(false); + expect(checker(makeParams('lobe-skill-store'))).toBe(false); + }); + }); + describe('priority order', () => { it('should apply: explicitActivation > platformFilter > rules > default', () => { const checker = createEnableChecker({ diff --git a/src/app/(backend)/api/agent/stream/__tests__/route.test.ts b/src/app/(backend)/api/agent/stream/__tests__/route.test.ts index 13639a3168..dfcae0c188 100644 --- a/src/app/(backend)/api/agent/stream/__tests__/route.test.ts +++ b/src/app/(backend)/api/agent/stream/__tests__/route.test.ts @@ -11,7 +11,7 @@ const mockStreamEventManager = { }; vi.mock('@/server/modules/AgentRuntime', () => ({ - StreamEventManager: vi.fn(() => mockStreamEventManager), + createStreamEventManager: vi.fn(() => mockStreamEventManager), })); describe('/api/agent/stream route', () => { diff --git a/src/app/(backend)/api/agent/stream/route.ts b/src/app/(backend)/api/agent/stream/route.ts index 19669c80b2..403d168716 100644 --- a/src/app/(backend)/api/agent/stream/route.ts +++ b/src/app/(backend)/api/agent/stream/route.ts @@ -3,7 +3,7 @@ import debug from 'debug'; import { type NextRequest } from 'next/server'; import { NextResponse } from 'next/server'; -import { StreamEventManager } from '@/server/modules/AgentRuntime'; +import { createStreamEventManager } from '@/server/modules/AgentRuntime'; const log = debug('api-route:agent:stream'); const timing = debug('lobe-server:agent-runtime:timing'); @@ -13,8 +13,8 @@ const timing = debug('lobe-server:agent-runtime:timing'); * Provides real-time Agent execution event stream for clients */ export async function GET(request: NextRequest) { - // Initialize stream event manager - const streamManager = new StreamEventManager(); + // Initialize stream event manager (uses InMemory singleton in local dev, Redis in production) + const streamManager = createStreamEventManager(); const { searchParams } = new URL(request.url); const operationId = searchParams.get('operationId'); diff --git a/src/server/modules/AgentRuntime/InMemoryStreamEventManager.ts b/src/server/modules/AgentRuntime/InMemoryStreamEventManager.ts index ee83b2674d..af84847782 100644 --- a/src/server/modules/AgentRuntime/InMemoryStreamEventManager.ts +++ b/src/server/modules/AgentRuntime/InMemoryStreamEventManager.ts @@ -130,6 +130,42 @@ export class InMemoryStreamEventManager implements IStreamEventManager { log('InMemoryStreamEventManager disconnected'); } + /** + * Subscribe to stream events (for SSE endpoint) + * Compatible with Redis StreamEventManager.subscribeStreamEvents + */ + async subscribeStreamEvents( + operationId: string, + _lastEventId: string, + onEvents: (events: StreamEvent[]) => void, + signal?: AbortSignal, + ): Promise { + return new Promise((resolve) => { + const unsubscribe = this.subscribe(operationId, (events) => { + onEvents(events); + // Check if agent_runtime_end was received — caller will handle closing + const hasEnd = events.some((e) => e.type === 'agent_runtime_end'); + if (hasEnd) { + unsubscribe(); + resolve(); + } + }); + + // Handle abort signal + if (signal) { + const onAbort = () => { + unsubscribe(); + resolve(); + }; + if (signal.aborted) { + onAbort(); + return; + } + signal.addEventListener('abort', onAbort, { once: true }); + } + }); + } + /** * Subscribe to stream events (for testing) */ diff --git a/src/server/modules/AgentRuntime/__tests__/InMemoryStreamEventManager.test.ts b/src/server/modules/AgentRuntime/__tests__/InMemoryStreamEventManager.test.ts new file mode 100644 index 0000000000..ef7a8c316c --- /dev/null +++ b/src/server/modules/AgentRuntime/__tests__/InMemoryStreamEventManager.test.ts @@ -0,0 +1,160 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { InMemoryStreamEventManager } from '../InMemoryStreamEventManager'; +import type { StreamEvent } from '../StreamEventManager'; + +describe('InMemoryStreamEventManager', () => { + let manager: InMemoryStreamEventManager; + + beforeEach(() => { + manager = new InMemoryStreamEventManager(); + }); + + afterEach(() => { + manager.clear(); + }); + + describe('publishStreamEvent', () => { + it('should publish and store events', async () => { + const eventId = await manager.publishStreamEvent('op-1', { + data: { msg: 'hello' }, + stepIndex: 0, + type: 'agent_runtime_init', + }); + + expect(eventId).toBeDefined(); + const events = manager.getAllEvents('op-1'); + expect(events).toHaveLength(1); + expect(events[0].type).toBe('agent_runtime_init'); + }); + + it('should notify subscribers on publish', async () => { + const callback = vi.fn(); + manager.subscribe('op-1', callback); + + await manager.publishStreamEvent('op-1', { + data: { msg: 'hello' }, + stepIndex: 0, + type: 'agent_runtime_init', + }); + + expect(callback).toHaveBeenCalledTimes(1); + expect(callback).toHaveBeenCalledWith( + expect.arrayContaining([expect.objectContaining({ type: 'agent_runtime_init' })]), + ); + }); + }); + + describe('subscribe', () => { + it('should return an unsubscribe function', async () => { + const callback = vi.fn(); + const unsubscribe = manager.subscribe('op-1', callback); + + await manager.publishStreamEvent('op-1', { + data: {}, + stepIndex: 0, + type: 'agent_runtime_init', + }); + expect(callback).toHaveBeenCalledTimes(1); + + unsubscribe(); + + await manager.publishStreamEvent('op-1', { + data: {}, + stepIndex: 1, + type: 'agent_runtime_end', + }); + expect(callback).toHaveBeenCalledTimes(1); + }); + }); + + describe('subscribeStreamEvents', () => { + it('should resolve when agent_runtime_end event is received', async () => { + const receivedEvents: StreamEvent[] = []; + + const subscribePromise = manager.subscribeStreamEvents('op-1', '0', (events) => { + receivedEvents.push(...events); + }); + + // Publish some events + await manager.publishStreamEvent('op-1', { + data: { status: 'running' }, + stepIndex: 0, + type: 'agent_runtime_init', + }); + + await manager.publishStreamEvent('op-1', { + data: { status: 'done' }, + stepIndex: 1, + type: 'agent_runtime_end', + }); + + await subscribePromise; + + expect(receivedEvents).toHaveLength(2); + expect(receivedEvents[0].type).toBe('agent_runtime_init'); + expect(receivedEvents[1].type).toBe('agent_runtime_end'); + }); + + it('should resolve immediately if signal is already aborted', async () => { + const controller = new AbortController(); + controller.abort(); + + const receivedEvents: StreamEvent[] = []; + + await manager.subscribeStreamEvents( + 'op-1', + '0', + (events) => { + receivedEvents.push(...events); + }, + controller.signal, + ); + + expect(receivedEvents).toHaveLength(0); + }); + + it('should resolve when signal is aborted', async () => { + const controller = new AbortController(); + const receivedEvents: StreamEvent[] = []; + + const subscribePromise = manager.subscribeStreamEvents( + 'op-1', + '0', + (events) => { + receivedEvents.push(...events); + }, + controller.signal, + ); + + await manager.publishStreamEvent('op-1', { + data: { status: 'running' }, + stepIndex: 0, + type: 'agent_runtime_init', + }); + + controller.abort(); + + await subscribePromise; + + expect(receivedEvents).toHaveLength(1); + expect(receivedEvents[0].type).toBe('agent_runtime_init'); + }); + }); + + describe('clear', () => { + it('should clear all stored events and subscribers', async () => { + await manager.publishStreamEvent('op-1', { + data: {}, + stepIndex: 0, + type: 'agent_runtime_init', + }); + + expect(manager.getAllEvents('op-1')).toHaveLength(1); + + manager.clear(); + + expect(manager.getAllEvents('op-1')).toHaveLength(0); + }); + }); +}); diff --git a/src/server/modules/AgentRuntime/types.ts b/src/server/modules/AgentRuntime/types.ts index b21b6715b0..ecc0706fe5 100644 --- a/src/server/modules/AgentRuntime/types.ts +++ b/src/server/modules/AgentRuntime/types.ts @@ -144,4 +144,14 @@ export interface IStreamEventManager { operationId: string, event: Omit, ) => Promise; + + /** + * Subscribe to stream events (for SSE endpoint) + */ + subscribeStreamEvents: ( + operationId: string, + lastEventId: string, + onEvents: (events: StreamEvent[]) => void, + signal?: AbortSignal, + ) => Promise; }