From d3ea4a48940f102b34b9bf08ef9aac1a9eb17a5e Mon Sep 17 00:00:00 2001 From: Arvin Xu Date: Fri, 20 Mar 2026 12:05:25 +0800 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor:=20refactor=20age?= =?UTF-8?q?nt-runtime=20hooks=20mode=20(#13145)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ✨ feat: add Agent Runtime Hooks — external lifecycle hook system Hooks are registered once and automatically adapt to runtime mode: - Local: handler functions called directly (in-process) - Production: webhook configs persisted to Redis, delivered via HTTP/QStash - HookDispatcher: register, dispatch, serialize hooks per operationId - AgentHook type: id, type (beforeStep/afterStep/onComplete/onError), handler function, optional webhook config - Integrated into AgentRuntimeService.createOperation + executeStep - Hooks persisted in AgentState.metadata._hooks for cross-request survival - Dispatched at both normal completion and error paths - Non-fatal: hook errors never affect main execution flow LOBE-6208 Co-Authored-By: Claude Opus 4.6 (1M context) * ✅ test: add HookDispatcher unit tests (19 tests) Tests cover: - register/unregister/hasHooks - Local mode dispatch: matching types, multiple handlers, error isolation - Production mode dispatch: webhook delivery, body merging, mode isolation - Serialization: getSerializedHooks filters webhook-only hooks - All hook types: beforeStep, afterStep, onComplete, onError Co-Authored-By: Claude Opus 4.6 (1M context) * ✨ feat: migrate SubAgent to hooks + add afterStep dispatch + finalState - AgentHookEvent: added finalState field (local-mode only, stripped from webhooks) - AgentRuntimeService: dispatch afterStep hooks alongside legacy callbacks - AiAgentService: createThreadHooks() replaces createThreadMetadataCallbacks() for SubAgent Thread execution — same behavior, using hooks API - HookDispatcher: strip finalState from webhook payloads (too large) LOBE-6208 Co-Authored-By: Claude Opus 4.6 (1M context) * 🐛 fix: add Vercel bypass header to QStash hook webhooks Preserves x-vercel-protection-bypass header when delivering hook webhooks via QStash, matching existing behavior in AgentRuntimeService.deliverWebhook and libs/qstash. Co-Authored-By: Claude Opus 4.6 (1M context) * ✨ feat: migrate Eval Run to hooks + add finalState to AgentHookEvent Eval Run now uses hooks API instead of raw completionWebhook: - executeTrajectory: hook with local handler + webhook fallback - executeThreadTrajectory: hook with local handler + webhook fallback - Local mode now works for eval runs (previously production-only) Also: - AgentHookEvent: added finalState field (local-only, stripped from webhooks) for consumers that need deep state access LOBE-6208 Co-Authored-By: Claude Opus 4.6 (1M context) * 🐛 fix: dispatch beforeStep hooks + fix completion event payload fields P1: Add hookDispatcher.dispatch('beforeStep') alongside legacy onBeforeStep callback. All 4 hook types now dispatch correctly: beforeStep, afterStep, onComplete, onError. P2: Fix completion event payload to use actual AgentState fields (state.cost.total, state.usage.llm.*, state.messages) instead of non-existent state.session.* properties. Matches the field access pattern in triggerCompletionWebhook. Co-Authored-By: Claude Opus 4.6 (1M context) * 🐛 fix: update eval test assertions for hooks migration + fix status type - Test: update executeTrajectory assertion to expect hooks array instead of completionWebhook object - Fix: add fallback for event.status (string | undefined) when passing to recordTrajectoryCompletion/recordThreadCompletion (status: string) Co-Authored-By: Claude Opus 4.6 (1M context) * 🐛 fix: update SubAgent test assertions for hooks migration Update execGroupSubAgentTask tests to expect hooks array instead of stepCallbacks object, matching the SubAgent → hooks migration. Co-Authored-By: Claude Opus 4.6 (1M context) --------- Co-authored-by: Claude Opus 4.6 (1M context) --- .../__tests__/trajectoryMethods.test.ts | 14 +- src/server/services/agentEvalRun/index.ts | 84 ++++- .../agentRuntime/AgentRuntimeService.ts | 137 +++++++- .../agentRuntime/hooks/HookDispatcher.ts | 206 ++++++++++++ .../hooks/__tests__/HookDispatcher.test.ts | 301 ++++++++++++++++++ .../services/agentRuntime/hooks/index.ts | 8 + .../services/agentRuntime/hooks/types.ts | 106 ++++++ src/server/services/agentRuntime/types.ts | 7 + .../__tests__/execGroupSubAgentTask.test.ts | 23 +- src/server/services/aiAgent/index.ts | 139 +++++++- 10 files changed, 985 insertions(+), 40 deletions(-) create mode 100644 src/server/services/agentRuntime/hooks/HookDispatcher.ts create mode 100644 src/server/services/agentRuntime/hooks/__tests__/HookDispatcher.test.ts create mode 100644 src/server/services/agentRuntime/hooks/index.ts create mode 100644 src/server/services/agentRuntime/hooks/types.ts diff --git a/src/server/services/agentEvalRun/__tests__/trajectoryMethods.test.ts b/src/server/services/agentEvalRun/__tests__/trajectoryMethods.test.ts index 36e4d9af90..aa7ba085a3 100644 --- a/src/server/services/agentEvalRun/__tests__/trajectoryMethods.test.ts +++ b/src/server/services/agentEvalRun/__tests__/trajectoryMethods.test.ts @@ -191,10 +191,16 @@ describe('AgentEvalRunService', () => { expect(mockExecAgent).toHaveBeenCalledWith( expect.objectContaining({ autoStart: true, - completionWebhook: { - body: { runId: run.id, testCaseId: testCase.id, userId }, - url: 'https://test.example.com/api/workflows/agent-eval-run/on-trajectory-complete', - }, + hooks: expect.arrayContaining([ + expect.objectContaining({ + id: 'eval-trajectory-complete', + type: 'onComplete', + webhook: { + body: { runId: run.id, testCaseId: testCase.id, userId }, + url: '/api/workflows/agent-eval-run/on-trajectory-complete', + }, + }), + ]), prompt: 'Hello world', userInterventionConfig: { approvalMode: 'headless' }, }), diff --git a/src/server/services/agentEvalRun/index.ts b/src/server/services/agentEvalRun/index.ts index ef5942381a..9bcc9fc09d 100644 --- a/src/server/services/agentEvalRun/index.ts +++ b/src/server/services/agentEvalRun/index.ts @@ -21,7 +21,6 @@ import { import { MessageModel } from '@/database/models/message'; import { ThreadModel } from '@/database/models/thread'; import { TopicModel } from '@/database/models/topic'; -import { appEnv } from '@/envs/app'; import { AgentService } from '@/server/services/agent'; import { AgentRuntimeService } from '@/server/services/agentRuntime/AgentRuntimeService'; import { AiAgentService } from '@/server/services/aiAgent'; @@ -264,20 +263,45 @@ export class AgentEvalRunService { await this.runTopicModel.updateByRunAndTopic(runId, topicId, { status: 'running' }); const aiAgentService = new AiAgentService(this.db, this.userId); - const webhookUrl = new URL( - '/api/workflows/agent-eval-run/on-trajectory-complete', - appEnv.APP_URL, - ).toString(); + const webhookUrl = '/api/workflows/agent-eval-run/on-trajectory-complete'; + const userId = this.userId; + const db = this.db; try { const execResult = await aiAgentService.execAgent({ agentId: run.targetAgentId ?? undefined, appContext: { topicId }, autoStart: true, - completionWebhook: { - body: { runId, testCaseId, userId: this.userId }, - url: webhookUrl, - }, + hooks: [ + { + handler: async (event) => { + // Local mode: directly record completion + const service = new AgentEvalRunService(db, userId); + await service.recordTrajectoryCompletion({ + runId, + status: event.status || event.reason || 'done', + telemetry: { + completionReason: event.reason, + cost: event.cost, + duration: event.duration, + errorDetail: event.errorDetail, + errorMessage: event.errorMessage, + llmCalls: event.llmCalls, + steps: event.steps, + toolCalls: event.toolCalls, + totalTokens: event.totalTokens, + }, + testCaseId, + }); + }, + id: 'eval-trajectory-complete', + type: 'onComplete' as const, + webhook: { + body: { runId, testCaseId, userId }, + url: webhookUrl, + }, + }, + ], ...(envPrompt && { evalContext: { envPrompt } }), maxSteps: run.config?.maxSteps, prompt: params.testCase.content.input || '', @@ -381,20 +405,46 @@ export class AgentEvalRunService { const { envPrompt, run, runId, testCaseId, threadId, topicId } = params; const aiAgentService = new AiAgentService(this.db, this.userId); - const webhookUrl = new URL( - '/api/workflows/agent-eval-run/on-thread-complete', - appEnv.APP_URL, - ).toString(); + const webhookUrl = '/api/workflows/agent-eval-run/on-thread-complete'; + const userId = this.userId; + const db = this.db; try { const execResult = await aiAgentService.execAgent({ agentId: run.targetAgentId ?? undefined, appContext: { threadId, topicId }, autoStart: true, - completionWebhook: { - body: { runId, testCaseId, threadId, topicId, userId: this.userId }, - url: webhookUrl, - }, + hooks: [ + { + handler: async (event) => { + // Local mode: directly record thread completion + const service = new AgentEvalRunService(db, userId); + await service.recordThreadCompletion({ + runId, + status: event.status || event.reason || 'done', + telemetry: { + completionReason: event.reason, + cost: event.cost, + duration: event.duration, + errorMessage: event.errorMessage, + llmCalls: event.llmCalls, + steps: event.steps, + toolCalls: event.toolCalls, + totalTokens: event.totalTokens, + }, + testCaseId, + threadId, + topicId, + }); + }, + id: 'eval-thread-complete', + type: 'onComplete' as const, + webhook: { + body: { runId, testCaseId, threadId, topicId, userId }, + url: webhookUrl, + }, + }, + ], ...(envPrompt && { evalContext: { envPrompt } }), maxSteps: run.config?.maxSteps, prompt: params.testCase.content.input || '', diff --git a/src/server/services/agentRuntime/AgentRuntimeService.ts b/src/server/services/agentRuntime/AgentRuntimeService.ts index 6099b2e7a3..525aa61c38 100644 --- a/src/server/services/agentRuntime/AgentRuntimeService.ts +++ b/src/server/services/agentRuntime/AgentRuntimeService.ts @@ -21,6 +21,7 @@ import { LocalQueueServiceImpl } from '@/server/services/queue/impls'; import { ToolExecutionService } from '@/server/services/toolExecution'; import { BuiltinToolsExecutor } from '@/server/services/toolExecution/builtin'; +import { hookDispatcher } from './hooks'; import { type AgentExecutionParams, type AgentExecutionResult, @@ -267,6 +268,7 @@ export class AgentRuntimeService { appContext, toolSet, stepCallbacks, + hooks, userInterventionConfig, completionWebhook, stepWebhook, @@ -355,6 +357,26 @@ export class AgentRuntimeService { this.registerStepCallbacks(operationId, stepCallbacks); } + // Register external hooks + if (hooks && hooks.length > 0) { + hookDispatcher.register(operationId, hooks); + + // Persist webhook configs to state metadata for production mode + const serializedHooks = hookDispatcher.getSerializedHooks(operationId); + if (serializedHooks && serializedHooks.length > 0) { + const currentState = await this.coordinator.loadAgentState(operationId); + if (currentState) { + await this.coordinator.saveAgentState(operationId, { + ...currentState, + metadata: { + ...currentState.metadata, + _hooks: serializedHooks, + }, + }); + } + } + } + let messageId: string | undefined; let autoStarted = false; @@ -481,7 +503,7 @@ export class AgentRuntimeService { }; } - // Call onBeforeStep callback + // Call onBeforeStep callback (legacy) if (callbacks?.onBeforeStep) { try { await callbacks.onBeforeStep({ @@ -495,6 +517,26 @@ export class AgentRuntimeService { } } + // Dispatch beforeStep hooks + try { + const beforeStepMetadata = agentState?.metadata || {}; + await hookDispatcher.dispatch( + operationId, + 'beforeStep', + { + agentId: beforeStepMetadata?.agentId || '', + finalState: agentState, + operationId, + stepIndex, + steps: agentState?.stepCount || 0, + userId: beforeStepMetadata?.userId || this.userId, + }, + beforeStepMetadata._hooks, + ); + } catch (hookError) { + log('[%s] beforeStep hook dispatch error: %O', operationId, hookError); + } + // Create Agent and Runtime instances // Use agentState.metadata which contains the full app context (topicId, agentId, etc.) // operationMetadata only contains basic fields (agentConfig, modelRuntimeConfig, userId) @@ -735,7 +777,7 @@ export class AgentRuntimeService { totalTokens: totalTokensNum, }; - // Call onAfterStep callback with presentation data + // Call onAfterStep callback with presentation data (legacy) if (callbacks?.onAfterStep) { try { await callbacks.onAfterStep({ @@ -751,6 +793,28 @@ export class AgentRuntimeService { } } + // Dispatch afterStep hooks + try { + const metadata = stepResult.newState?.metadata || {}; + await hookDispatcher.dispatch( + operationId, + 'afterStep', + { + agentId: metadata?.agentId || '', + finalState: stepResult.newState, + operationId, + shouldContinue, + status: stepResult.newState?.status, + stepIndex, + steps: stepResult.newState?.stepCount || 0, + userId: metadata?.userId || this.userId, + }, + metadata._hooks, + ); + } catch (hookError) { + log('[%s] afterStep hook dispatch error: %O', operationId, hookError); + } + // Record step snapshot via injected snapshot store if (this.snapshotStore) { try { @@ -909,7 +973,10 @@ export class AgentRuntimeService { // Trigger completion webhook (fire-and-forget) await this.triggerCompletionWebhook(stepResult.newState, operationId, reason); - // Call onComplete callback + // Dispatch onComplete hooks + await this.dispatchCompletionHooks(operationId, stepResult.newState, reason); + + // Call onComplete callback (legacy) if (callbacks?.onComplete) { try { await callbacks.onComplete({ @@ -1029,7 +1096,10 @@ export class AgentRuntimeService { log('[%s] Failed to trigger completion webhook: %O', operationId, webhookError); } - // Also call onComplete callback when execution fails + // Dispatch onComplete + onError hooks + await this.dispatchCompletionHooks(operationId, finalStateWithError, 'error'); + + // Also call onComplete callback when execution fails (legacy) if (callbacks?.onComplete) { try { await callbacks.onComplete({ @@ -1543,6 +1613,65 @@ export class AgentRuntimeService { } } + /** + * Dispatch onComplete (and onError) hooks via HookDispatcher. + * Fire-and-forget: errors are logged but never thrown. + */ + private async dispatchCompletionHooks( + operationId: string, + state: any, + reason: string, + ): Promise { + try { + const metadata = state?.metadata || {}; + + // Extract last assistant content (same as triggerCompletionWebhook) + const lastAssistantContent = state?.messages + ?.slice() + .reverse() + .find( + (m: { content?: string; role: string }) => m.role === 'assistant' && m.content, + )?.content; + + const duration = state?.createdAt + ? Date.now() - new Date(state.createdAt).getTime() + : undefined; + + const event = { + agentId: metadata?.agentId || '', + cost: state?.cost?.total, + duration, + errorDetail: state?.error, + errorMessage: this.extractErrorMessage?.(state?.error) || String(state?.error || ''), + // Full state available in local mode only (not serialized to webhooks) + finalState: state, + lastAssistantContent, + llmCalls: state?.usage?.llm?.apiCalls, + operationId, + reason, + status: state?.status || reason, + steps: state?.stepCount || 0, + toolCalls: state?.usage?.tools?.totalCalls, + topicId: metadata?.topicId, + totalTokens: state?.usage?.llm?.tokens?.total, + userId: metadata?.userId || this.userId, + }; + + // Dispatch onComplete hooks + await hookDispatcher.dispatch(operationId, 'onComplete', event, metadata._hooks); + + // Also dispatch onError hooks if reason is error + if (reason === 'error') { + await hookDispatcher.dispatch(operationId, 'onError', event, metadata._hooks); + } + + // Cleanup hooks after completion + hookDispatcher.unregister(operationId); + } catch (error) { + log('[%s] Hook dispatch error (non-fatal): %O', operationId, error); + } + } + /** * Trigger completion webhook if configured in state metadata. * Fire-and-forget: errors are logged but never thrown. diff --git a/src/server/services/agentRuntime/hooks/HookDispatcher.ts b/src/server/services/agentRuntime/hooks/HookDispatcher.ts new file mode 100644 index 0000000000..ce78721dcb --- /dev/null +++ b/src/server/services/agentRuntime/hooks/HookDispatcher.ts @@ -0,0 +1,206 @@ +import debug from 'debug'; +import urlJoin from 'url-join'; + +import { isQueueAgentRuntimeEnabled } from '@/server/services/queue/impls'; + +import type { + AgentHook, + AgentHookEvent, + AgentHookType, + AgentHookWebhook, + SerializedHook, +} from './types'; + +const log = debug('lobe-server:hook-dispatcher'); + +/** + * Delivers a webhook via HTTP POST (fetch or QStash) + */ +async function deliverWebhook( + webhook: AgentHookWebhook, + payload: Record, +): Promise { + const { url, delivery = 'fetch' } = webhook; + + // Resolve URL: relative paths joined with INTERNAL_APP_URL or APP_URL + const resolvedUrl = url.startsWith('http') + ? url + : urlJoin(process.env.INTERNAL_APP_URL || process.env.APP_URL || '', url); + + if (delivery === 'qstash') { + try { + const { Client } = await import('@upstash/qstash'); + const qstashToken = process.env.QSTASH_TOKEN; + if (!qstashToken) { + log('QStash token not available, falling back to fetch delivery'); + await fetchDeliver(resolvedUrl, payload); + return; + } + const client = new Client({ token: qstashToken }); + await client.publishJSON({ + body: payload, + headers: { + ...(process.env.VERCEL_AUTOMATION_BYPASS_SECRET && { + 'x-vercel-protection-bypass': process.env.VERCEL_AUTOMATION_BYPASS_SECRET, + }), + }, + url: resolvedUrl, + }); + log('Webhook delivered via QStash: %s', url); + } catch (error) { + log('QStash delivery failed, falling back to fetch: %O', error); + await fetchDeliver(resolvedUrl, payload); + } + } else { + await fetchDeliver(resolvedUrl, payload); + } +} + +async function fetchDeliver(url: string, payload: Record): Promise { + try { + const res = await fetch(url, { + body: JSON.stringify(payload), + headers: { 'Content-Type': 'application/json' }, + method: 'POST', + }); + log('Webhook delivered via fetch: %s (status: %d)', url, res.status); + } catch (error) { + log('Webhook fetch delivery failed: %s %O', url, error); + // Hook errors should not affect main flow + } +} + +/** + * HookDispatcher — central hub for registering and dispatching agent lifecycle hooks + * + * Local mode: hooks are stored in memory, handler functions called directly + * Production mode: webhook configs persisted in AgentState.metadata._hooks, + * delivered via HTTP POST or QStash + */ +export class HookDispatcher { + /** + * In-memory hook store (local mode) + * Maps operationId → AgentHook[] + */ + private hooks: Map = new Map(); + + /** + * Dispatch hooks for a given event type + * + * In local mode: calls handler functions from memory + * In production mode: delivers webhooks from serialized config + */ + async dispatch( + operationId: string, + type: AgentHookType, + event: AgentHookEvent, + serializedHooks?: SerializedHook[], + ): Promise { + const isQueueMode = isQueueAgentRuntimeEnabled(); + + if (!isQueueMode) { + // Local mode: call handler functions directly + const hooks = this.hooks.get(operationId)?.filter((h) => h.type === type) || []; + + for (const hook of hooks) { + try { + log('[%s][%s] Dispatching local hook: %s', operationId, type, hook.id); + await hook.handler(event); + } catch (error) { + log('[%s][%s] Hook error (non-fatal): %s %O', operationId, type, hook.id, error); + // Hook errors should NOT affect main execution flow + } + } + } else { + // Production mode: deliver via webhooks + const webhookHooks = + serializedHooks?.filter((h) => h.type === type && h.webhook) || + this.getSerializedHooks(operationId)?.filter((h) => h.type === type) || + []; + + for (const hook of webhookHooks) { + try { + log( + '[%s][%s] Delivering webhook hook: %s → %s', + operationId, + type, + hook.id, + hook.webhook.url, + ); + // Strip finalState from webhook payload (too large, local-only) + const { finalState: _, ...webhookEvent } = event; + await deliverWebhook(hook.webhook, { + ...webhookEvent, + hookId: hook.id, + hookType: type, + ...hook.webhook.body, + }); + } catch (error) { + log( + '[%s][%s] Webhook delivery error (non-fatal): %s %O', + operationId, + type, + hook.id, + error, + ); + } + } + } + } + + /** + * Get serialized hooks for an operation (for production mode persistence) + */ + getSerializedHooks(operationId: string): SerializedHook[] | undefined { + const hooks = this.hooks.get(operationId); + if (!hooks) return undefined; + + return hooks + .filter((h) => h.webhook) + .map((h) => ({ + id: h.id, + type: h.type, + webhook: h.webhook!, + })); + } + + /** + * Check if any hooks are registered for an operation + */ + hasHooks(operationId: string): boolean { + return (this.hooks.get(operationId)?.length ?? 0) > 0; + } + + /** + * Register hooks for an operation + * + * In local mode: stores hooks in memory (including handler functions) + * In production mode: caller should persist getSerializedHooks() to state.metadata._hooks + */ + register(operationId: string, hooks: AgentHook[]): void { + if (hooks.length === 0) return; + + const existing = this.hooks.get(operationId) || []; + this.hooks.set(operationId, [...existing, ...hooks]); + + log( + '[%s] Registered %d hooks: %s', + operationId, + hooks.length, + hooks.map((h) => `${h.type}:${h.id}`).join(', '), + ); + } + + /** + * Unregister all hooks for an operation (cleanup) + */ + unregister(operationId: string): void { + this.hooks.delete(operationId); + log('[%s] Unregistered all hooks', operationId); + } +} + +/** + * Singleton instance — shared across the application + */ +export const hookDispatcher = new HookDispatcher(); diff --git a/src/server/services/agentRuntime/hooks/__tests__/HookDispatcher.test.ts b/src/server/services/agentRuntime/hooks/__tests__/HookDispatcher.test.ts new file mode 100644 index 0000000000..c853e55add --- /dev/null +++ b/src/server/services/agentRuntime/hooks/__tests__/HookDispatcher.test.ts @@ -0,0 +1,301 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { HookDispatcher } from '../HookDispatcher'; +import type { AgentHook, AgentHookEvent } from '../types'; + +// Mock isQueueAgentRuntimeEnabled to control local vs production mode +vi.mock('@/server/services/queue/impls', () => ({ + isQueueAgentRuntimeEnabled: vi.fn(() => false), // Default: local mode +})); + +const { isQueueAgentRuntimeEnabled } = await import('@/server/services/queue/impls'); + +describe('HookDispatcher', () => { + let dispatcher: HookDispatcher; + const operationId = 'op_test_123'; + + const makeEvent = (overrides?: Partial): AgentHookEvent => ({ + agentId: 'agt_test', + operationId, + reason: 'done', + status: 'done', + userId: 'user_test', + ...overrides, + }); + + beforeEach(() => { + dispatcher = new HookDispatcher(); + vi.mocked(isQueueAgentRuntimeEnabled).mockReturnValue(false); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + describe('register', () => { + it('should register hooks for an operation', () => { + const hook: AgentHook = { + handler: vi.fn(), + id: 'test-hook', + type: 'onComplete', + }; + + dispatcher.register(operationId, [hook]); + expect(dispatcher.hasHooks(operationId)).toBe(true); + }); + + it('should append hooks to existing registrations', () => { + const hook1: AgentHook = { handler: vi.fn(), id: 'hook-1', type: 'onComplete' }; + const hook2: AgentHook = { handler: vi.fn(), id: 'hook-2', type: 'onError' }; + + dispatcher.register(operationId, [hook1]); + dispatcher.register(operationId, [hook2]); + + expect(dispatcher.hasHooks(operationId)).toBe(true); + }); + + it('should not register empty hooks array', () => { + dispatcher.register(operationId, []); + expect(dispatcher.hasHooks(operationId)).toBe(false); + }); + }); + + describe('dispatch (local mode)', () => { + it('should call handler for matching hook type', async () => { + const handler = vi.fn(); + dispatcher.register(operationId, [{ handler, id: 'test', type: 'onComplete' }]); + + await dispatcher.dispatch(operationId, 'onComplete', makeEvent()); + + expect(handler).toHaveBeenCalledTimes(1); + expect(handler).toHaveBeenCalledWith( + expect.objectContaining({ + operationId, + reason: 'done', + }), + ); + }); + + it('should not call handler for non-matching hook type', async () => { + const handler = vi.fn(); + dispatcher.register(operationId, [{ handler, id: 'test', type: 'onComplete' }]); + + await dispatcher.dispatch(operationId, 'onError', makeEvent()); + + expect(handler).not.toHaveBeenCalled(); + }); + + it('should call multiple handlers of same type', async () => { + const handler1 = vi.fn(); + const handler2 = vi.fn(); + + dispatcher.register(operationId, [ + { handler: handler1, id: 'hook-1', type: 'onComplete' }, + { handler: handler2, id: 'hook-2', type: 'onComplete' }, + ]); + + await dispatcher.dispatch(operationId, 'onComplete', makeEvent()); + + expect(handler1).toHaveBeenCalledTimes(1); + expect(handler2).toHaveBeenCalledTimes(1); + }); + + it('should not throw if handler throws (non-fatal)', async () => { + const handler = vi.fn().mockRejectedValue(new Error('hook failed')); + + dispatcher.register(operationId, [{ handler, id: 'failing-hook', type: 'onComplete' }]); + + // Should not throw + await expect( + dispatcher.dispatch(operationId, 'onComplete', makeEvent()), + ).resolves.toBeUndefined(); + }); + + it('should call remaining hooks even if one fails', async () => { + const failingHandler = vi.fn().mockRejectedValue(new Error('fail')); + const successHandler = vi.fn(); + + dispatcher.register(operationId, [ + { handler: failingHandler, id: 'failing', type: 'onComplete' }, + { handler: successHandler, id: 'success', type: 'onComplete' }, + ]); + + await dispatcher.dispatch(operationId, 'onComplete', makeEvent()); + + expect(failingHandler).toHaveBeenCalled(); + expect(successHandler).toHaveBeenCalled(); + }); + + it('should handle no registered hooks gracefully', async () => { + await expect( + dispatcher.dispatch('unknown_op', 'onComplete', makeEvent()), + ).resolves.toBeUndefined(); + }); + }); + + describe('dispatch (production mode)', () => { + beforeEach(() => { + vi.mocked(isQueueAgentRuntimeEnabled).mockReturnValue(true); + // Mock global fetch + global.fetch = vi.fn().mockResolvedValue({ status: 200 }); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('should deliver webhook for hooks with webhook config', async () => { + dispatcher.register(operationId, [ + { + handler: vi.fn(), // handler not called in production mode + id: 'webhook-hook', + type: 'onComplete', + webhook: { url: 'https://example.com/hook' }, + }, + ]); + + const serialized = dispatcher.getSerializedHooks(operationId); + await dispatcher.dispatch(operationId, 'onComplete', makeEvent(), serialized); + + expect(global.fetch).toHaveBeenCalledWith( + 'https://example.com/hook', + expect.objectContaining({ + method: 'POST', + body: expect.any(String), + }), + ); + }); + + it('should merge webhook.body into payload', async () => { + dispatcher.register(operationId, [ + { + handler: vi.fn(), + id: 'custom-body-hook', + type: 'onComplete', + webhook: { + body: { taskId: 'task_123', customField: 'value' }, + url: 'https://example.com/hook', + }, + }, + ]); + + const serialized = dispatcher.getSerializedHooks(operationId); + await dispatcher.dispatch(operationId, 'onComplete', makeEvent(), serialized); + + const call = vi.mocked(global.fetch).mock.calls[0]; + const body = JSON.parse(call[1]?.body as string); + expect(body.taskId).toBe('task_123'); + expect(body.customField).toBe('value'); + expect(body.hookId).toBe('custom-body-hook'); + }); + + it('should not call local handler in production mode', async () => { + const handler = vi.fn(); + dispatcher.register(operationId, [ + { + handler, + id: 'prod-hook', + type: 'onComplete', + webhook: { url: 'https://example.com/hook' }, + }, + ]); + + const serialized = dispatcher.getSerializedHooks(operationId); + await dispatcher.dispatch(operationId, 'onComplete', makeEvent(), serialized); + + expect(handler).not.toHaveBeenCalled(); + }); + + it('should skip hooks without webhook config in production mode', async () => { + dispatcher.register(operationId, [ + { handler: vi.fn(), id: 'local-only', type: 'onComplete' }, + ]); + + const serialized = dispatcher.getSerializedHooks(operationId); + await dispatcher.dispatch(operationId, 'onComplete', makeEvent(), serialized); + + expect(global.fetch).not.toHaveBeenCalled(); + }); + }); + + describe('getSerializedHooks', () => { + it('should return only hooks with webhook config', () => { + dispatcher.register(operationId, [ + { handler: vi.fn(), id: 'local-only', type: 'onComplete' }, + { + handler: vi.fn(), + id: 'with-webhook', + type: 'onComplete', + webhook: { url: '/api/hook' }, + }, + ]); + + const serialized = dispatcher.getSerializedHooks(operationId); + + expect(serialized).toHaveLength(1); + expect(serialized![0].id).toBe('with-webhook'); + expect(serialized![0].webhook.url).toBe('/api/hook'); + }); + + it('should return undefined for unknown operation', () => { + expect(dispatcher.getSerializedHooks('unknown')).toBeUndefined(); + }); + }); + + describe('unregister', () => { + it('should remove all hooks for an operation', () => { + dispatcher.register(operationId, [{ handler: vi.fn(), id: 'hook', type: 'onComplete' }]); + + expect(dispatcher.hasHooks(operationId)).toBe(true); + dispatcher.unregister(operationId); + expect(dispatcher.hasHooks(operationId)).toBe(false); + }); + }); + + describe('hook types', () => { + it('should dispatch beforeStep hooks', async () => { + const handler = vi.fn(); + dispatcher.register(operationId, [{ handler, id: 'before', type: 'beforeStep' }]); + + await dispatcher.dispatch(operationId, 'beforeStep', makeEvent({ stepIndex: 0 })); + expect(handler).toHaveBeenCalled(); + }); + + it('should dispatch afterStep hooks', async () => { + const handler = vi.fn(); + dispatcher.register(operationId, [{ handler, id: 'after', type: 'afterStep' }]); + + await dispatcher.dispatch( + operationId, + 'afterStep', + makeEvent({ stepIndex: 1, shouldContinue: true }), + ); + expect(handler).toHaveBeenCalledWith( + expect.objectContaining({ + shouldContinue: true, + stepIndex: 1, + }), + ); + }); + + it('should dispatch onError hooks', async () => { + const handler = vi.fn(); + dispatcher.register(operationId, [{ handler, id: 'error', type: 'onError' }]); + + await dispatcher.dispatch( + operationId, + 'onError', + makeEvent({ + errorMessage: 'Something went wrong', + reason: 'error', + }), + ); + expect(handler).toHaveBeenCalledWith( + expect.objectContaining({ + errorMessage: 'Something went wrong', + reason: 'error', + }), + ); + }); + }); +}); diff --git a/src/server/services/agentRuntime/hooks/index.ts b/src/server/services/agentRuntime/hooks/index.ts new file mode 100644 index 0000000000..a1ab473ed9 --- /dev/null +++ b/src/server/services/agentRuntime/hooks/index.ts @@ -0,0 +1,8 @@ +export { HookDispatcher, hookDispatcher } from './HookDispatcher'; +export type { + AgentHook, + AgentHookEvent, + AgentHookType, + AgentHookWebhook, + SerializedHook, +} from './types'; diff --git a/src/server/services/agentRuntime/hooks/types.ts b/src/server/services/agentRuntime/hooks/types.ts new file mode 100644 index 0000000000..3b4b7432e2 --- /dev/null +++ b/src/server/services/agentRuntime/hooks/types.ts @@ -0,0 +1,106 @@ +/** + * Agent Runtime Hooks — external lifecycle hook system + * + * Hooks are registered once and automatically adapt to the runtime mode: + * - Local mode: handler function is called directly (in-process) + * - Production (QStash) mode: webhook is delivered via HTTP POST + */ + +// ── Hook Types ────────────────────────────────────────── + +/** + * Lifecycle hook points in agent execution + */ +export type AgentHookType = + | 'afterStep' // After each step completes + | 'beforeStep' // Before each step executes + | 'onComplete' // Operation reaches terminal state (done/error/interrupted) + | 'onError'; // Error during execution + +/** + * Hook definition — consumers register these with execAgent + */ +export interface AgentHook { + /** Handler function for local mode (called in-process) */ + handler: (event: AgentHookEvent) => Promise; + + /** Unique hook identifier (for logging, debugging, idempotency) */ + id: string; + + /** Hook lifecycle point */ + type: AgentHookType; + + /** Webhook config for production mode (if omitted, hook only works in local mode) */ + webhook?: AgentHookWebhook; +} + +/** + * Webhook delivery configuration for production mode + */ +export interface AgentHookWebhook { + /** Custom data merged into webhook payload */ + body?: Record; + + /** Delivery method: 'fetch' (plain HTTP) or 'qstash' (guaranteed delivery). Default: 'qstash' */ + delivery?: 'fetch' | 'qstash'; + + /** Webhook endpoint URL (relative or absolute) */ + url: string; +} + +// ── Hook Events ────────────────────────────────────────── + +/** + * Unified event payload passed to hook handlers and webhook payloads + */ +export interface AgentHookEvent { + // Identification + agentId: string; + // Statistics + cost?: number; + duration?: number; + // Content + errorDetail?: string; + + errorMessage?: string; + + /** + * Full AgentState — only available in local mode. + * Not serialized to webhook payloads. + * Use for consumers that need deep state access (e.g., SubAgent Thread updates). + */ + finalState?: any; + lastAssistantContent?: string; + + llmCalls?: number; + // Caller-provided metadata (from webhook.body) + metadata?: Record; + operationId: string; + // Execution result + reason?: string; // 'done' | 'error' | 'interrupted' | 'max_steps' | 'cost_limit' + // Step-specific (for beforeStep/afterStep) + shouldContinue?: boolean; + status?: string; // 'done' | 'error' | 'interrupted' | 'waiting_for_human' + + stepIndex?: number; + steps?: number; + stepType?: string; // 'call_llm' | 'call_tool' + + toolCalls?: number; + topicId?: string; + totalTokens?: number; + + userId: string; +} + +// ── Serialized Hook (for Redis persistence) ────────────── + +/** + * Serialized hook config stored in AgentState.metadata._hooks + * Only contains webhook info (handler functions can't be serialized) + */ +export interface SerializedHook { + id: string; + type: AgentHookType; + webhook: AgentHookWebhook; +} diff --git a/src/server/services/agentRuntime/types.ts b/src/server/services/agentRuntime/types.ts index ee5e8b2119..da9a4eec34 100644 --- a/src/server/services/agentRuntime/types.ts +++ b/src/server/services/agentRuntime/types.ts @@ -4,6 +4,8 @@ import { type UserInterventionConfig } from '@lobechat/types'; import { type ServerUserMemoryConfig } from '@/server/modules/Mecha/ContextEngineering/types'; +import { type AgentHook } from './hooks/types'; + // ==================== Operation Tool Set ==================== export interface OperationToolSet { @@ -152,6 +154,11 @@ export interface OperationCreationParams { /** Discord context for injecting channel/guild info into agent system message */ discordContext?: any; evalContext?: any; + /** + * External lifecycle hooks + * Registered once, auto-adapt to local (in-memory) or production (webhook) mode + */ + hooks?: AgentHook[]; initialContext: AgentRuntimeContext; initialMessages?: any[]; maxSteps?: number; diff --git a/src/server/services/aiAgent/__tests__/execGroupSubAgentTask.test.ts b/src/server/services/aiAgent/__tests__/execGroupSubAgentTask.test.ts index 187312b5fc..1942a0a836 100644 --- a/src/server/services/aiAgent/__tests__/execGroupSubAgentTask.test.ts +++ b/src/server/services/aiAgent/__tests__/execGroupSubAgentTask.test.ts @@ -207,11 +207,11 @@ describe('AiAgentService.execSubAgentTask', () => { topicId: 'topic-1', }, autoStart: true, + hooks: expect.arrayContaining([ + expect.objectContaining({ id: 'thread-metadata-update', type: 'afterStep' }), + expect.objectContaining({ id: 'thread-completion', type: 'onComplete' }), + ]), prompt: 'Test instruction', - stepCallbacks: expect.objectContaining({ - onAfterStep: expect.any(Function), - onComplete: expect.any(Function), - }), userInterventionConfig: { approvalMode: 'headless', }, @@ -447,20 +447,21 @@ describe('AiAgentService.execSubAgentTask', () => { topicId: 'topic-1', }); - // Verify that stepCallbacks was passed with onComplete + // Verify that hooks were passed with onComplete expect(execAgentSpy).toHaveBeenCalledWith( expect.objectContaining({ - stepCallbacks: expect.objectContaining({ - onComplete: expect.any(Function), - }), + hooks: expect.arrayContaining([ + expect.objectContaining({ id: 'thread-completion', type: 'onComplete' }), + ]), }), ); - // Get the onComplete callback + // Get the onComplete hook handler const callArgs = execAgentSpy.mock.calls[0][0]; - const onComplete = callArgs.stepCallbacks?.onComplete; + const onCompleteHook = callArgs.hooks?.find((h: any) => h.id === 'thread-completion'); - expect(onComplete).toBeDefined(); + expect(onCompleteHook).toBeDefined(); + expect(onCompleteHook!.handler).toBeInstanceOf(Function); }); }); }); diff --git a/src/server/services/aiAgent/index.ts b/src/server/services/aiAgent/index.ts index eb23dc045b..55e1e4797f 100644 --- a/src/server/services/aiAgent/index.ts +++ b/src/server/services/aiAgent/index.ts @@ -43,6 +43,7 @@ import { type ServerUserMemoryConfig } from '@/server/modules/Mecha/ContextEngin import { AgentService } from '@/server/services/agent'; import type { AgentRuntimeServiceOptions } from '@/server/services/agentRuntime'; import { AgentRuntimeService } from '@/server/services/agentRuntime'; +import { type AgentHook } from '@/server/services/agentRuntime/hooks/types'; import { type StepLifecycleCallbacks } from '@/server/services/agentRuntime/types'; import { FileService } from '@/server/services/file'; import { KlavisService } from '@/server/services/klavis'; @@ -103,6 +104,8 @@ interface InternalExecAgentParams extends ExecAgentParams { size?: number; url: string; }>; + /** External lifecycle hooks (auto-adapt to local/production mode) */ + hooks?: AgentHook[]; /** Maximum steps for the agent operation */ maxSteps?: number; /** Step lifecycle callbacks for operation tracking (server-side only) */ @@ -204,6 +207,7 @@ export class AiAgentService { discordContext, existingMessageIds = [], files, + hooks, instructions, stepCallbacks, stream, @@ -793,6 +797,7 @@ export class AiAgentService { initialMessages: allMessages, maxSteps, modelRuntimeConfig: { model, provider }, + hooks, operationId, stepCallbacks, stepWebhook, @@ -979,18 +984,18 @@ export class AiAgentService { status: ThreadStatus.Processing, }); - // 3. Create step lifecycle callbacks for updating Thread metadata and task message - const stepCallbacks = this.createThreadMetadataCallbacks(thread.id, startedAt, parentMessageId); + // 3. Create hooks for updating Thread metadata and task message + const threadHooks = this.createThreadHooks(thread.id, startedAt, parentMessageId); - // 4. Delegate to execAgent with threadId in appContext and callbacks + // 4. Delegate to execAgent with threadId in appContext and hooks // The instruction will be created as user message in the Thread // Use headless mode to skip human approval in async task execution const result = await this.execAgent({ agentId, appContext: { groupId, threadId: thread.id, topicId }, autoStart: true, + hooks: threadHooks, prompt: instruction, - stepCallbacks, userInterventionConfig: { approvalMode: 'headless' }, }); @@ -1155,6 +1160,132 @@ export class AiAgentService { }; } + /** + * Create hooks for tracking Thread metadata updates during SubAgent execution. + * Replaces the legacy createThreadMetadataCallbacks with the hooks system. + */ + private createThreadHooks( + threadId: string, + startedAt: string, + sourceMessageId: string, + ): AgentHook[] { + let accumulatedToolCalls = 0; + + return [ + { + handler: async (event) => { + const state = event.finalState; + if (!state) return; + + // Count tool calls from step result + const stepToolCalls = state.session?.toolCalls || 0; + if (stepToolCalls > accumulatedToolCalls) { + accumulatedToolCalls = stepToolCalls; + } + + try { + await this.threadModel.update(threadId, { + metadata: { + operationId: event.operationId, + startedAt, + totalMessages: state.messages?.length ?? 0, + totalTokens: this.calculateTotalTokens(state.usage), + totalToolCalls: accumulatedToolCalls, + }, + }); + } catch (error) { + log('Thread hook afterStep: failed to update metadata: %O', error); + } + }, + id: 'thread-metadata-update', + type: 'afterStep' as const, + }, + { + handler: async (event) => { + const finalState = event.finalState; + if (!finalState) return; + + const completedAt = new Date().toISOString(); + const duration = Date.now() - new Date(startedAt).getTime(); + + // Map completion reason to ThreadStatus + let status: ThreadStatus; + switch (event.reason) { + case 'done': { + status = ThreadStatus.Completed; + break; + } + case 'error': { + status = ThreadStatus.Failed; + break; + } + case 'interrupted': { + status = ThreadStatus.Cancel; + break; + } + case 'waiting_for_human': { + status = ThreadStatus.InReview; + break; + } + default: { + status = ThreadStatus.Completed; + } + } + + if (event.reason === 'error' && finalState.error) { + console.error( + 'Thread hook onComplete: task failed for thread %s:', + threadId, + finalState.error, + ); + } + + try { + // Update task message with summary + const lastAssistantMessage = finalState.messages + ?.slice() + .reverse() + .find((m: { role: string }) => m.role === 'assistant'); + + if (lastAssistantMessage?.content) { + await this.messageModel.update(sourceMessageId, { + content: lastAssistantMessage.content, + }); + } + + const formattedError = formatErrorForMetadata(finalState.error); + + await this.threadModel.update(threadId, { + metadata: { + completedAt, + duration, + error: formattedError, + operationId: finalState.operationId, + startedAt, + totalCost: finalState.cost?.total, + totalMessages: finalState.messages?.length ?? 0, + totalTokens: this.calculateTotalTokens(finalState.usage), + totalToolCalls: accumulatedToolCalls, + }, + status, + }); + + log( + 'Thread hook onComplete: thread %s status=%s reason=%s', + threadId, + status, + event.reason, + ); + } catch (error) { + console.error('Thread hook onComplete: failed to update: %O', error); + } + }, + id: 'thread-completion', + type: 'onComplete' as const, + }, + ]; + } + /** * Calculate total tokens from AgentState usage object * AgentState.usage is of type Usage from @lobechat/agent-runtime