♻️ refactor: refactor agent-runtime hooks mode (#13145)

*  feat: add Agent Runtime Hooks — external lifecycle hook system

Hooks are registered once and automatically adapt to runtime mode:
- Local: handler functions called directly (in-process)
- Production: webhook configs persisted to Redis, delivered via HTTP/QStash

- HookDispatcher: register, dispatch, serialize hooks per operationId
- AgentHook type: id, type (beforeStep/afterStep/onComplete/onError),
  handler function, optional webhook config
- Integrated into AgentRuntimeService.createOperation + executeStep
- Hooks persisted in AgentState.metadata._hooks for cross-request survival
- Dispatched at both normal completion and error paths
- Non-fatal: hook errors never affect main execution flow

LOBE-6208

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

*  test: add HookDispatcher unit tests (19 tests)

Tests cover:
- register/unregister/hasHooks
- Local mode dispatch: matching types, multiple handlers, error isolation
- Production mode dispatch: webhook delivery, body merging, mode isolation
- Serialization: getSerializedHooks filters webhook-only hooks
- All hook types: beforeStep, afterStep, onComplete, onError

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

*  feat: migrate SubAgent to hooks + add afterStep dispatch + finalState

- AgentHookEvent: added finalState field (local-mode only, stripped from webhooks)
- AgentRuntimeService: dispatch afterStep hooks alongside legacy callbacks
- AiAgentService: createThreadHooks() replaces createThreadMetadataCallbacks()
  for SubAgent Thread execution — same behavior, using hooks API
- HookDispatcher: strip finalState from webhook payloads (too large)

LOBE-6208

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* 🐛 fix: add Vercel bypass header to QStash hook webhooks

Preserves x-vercel-protection-bypass header when delivering hook
webhooks via QStash, matching existing behavior in
AgentRuntimeService.deliverWebhook and libs/qstash.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

*  feat: migrate Eval Run to hooks + add finalState to AgentHookEvent

Eval Run now uses hooks API instead of raw completionWebhook:
- executeTrajectory: hook with local handler + webhook fallback
- executeThreadTrajectory: hook with local handler + webhook fallback
- Local mode now works for eval runs (previously production-only)

Also:
- AgentHookEvent: added finalState field (local-only, stripped from webhooks)
  for consumers that need deep state access

LOBE-6208

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* 🐛 fix: dispatch beforeStep hooks + fix completion event payload fields

P1: Add hookDispatcher.dispatch('beforeStep') alongside legacy
onBeforeStep callback. All 4 hook types now dispatch correctly:
beforeStep, afterStep, onComplete, onError.

P2: Fix completion event payload to use actual AgentState fields
(state.cost.total, state.usage.llm.*, state.messages) instead of
non-existent state.session.* properties. Matches the field access
pattern in triggerCompletionWebhook.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* 🐛 fix: update eval test assertions for hooks migration + fix status type

- Test: update executeTrajectory assertion to expect hooks array
  instead of completionWebhook object
- Fix: add fallback for event.status (string | undefined) when passing
  to recordTrajectoryCompletion/recordThreadCompletion (status: string)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* 🐛 fix: update SubAgent test assertions for hooks migration

Update execGroupSubAgentTask tests to expect hooks array instead of
stepCallbacks object, matching the SubAgent → hooks migration.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Arvin Xu
2026-03-20 12:05:25 +08:00
committed by GitHub
parent 6ce9d9a814
commit d3ea4a4894
10 changed files with 985 additions and 40 deletions

View File

@@ -191,10 +191,16 @@ describe('AgentEvalRunService', () => {
expect(mockExecAgent).toHaveBeenCalledWith( expect(mockExecAgent).toHaveBeenCalledWith(
expect.objectContaining({ expect.objectContaining({
autoStart: true, autoStart: true,
completionWebhook: { hooks: expect.arrayContaining([
expect.objectContaining({
id: 'eval-trajectory-complete',
type: 'onComplete',
webhook: {
body: { runId: run.id, testCaseId: testCase.id, userId }, body: { runId: run.id, testCaseId: testCase.id, userId },
url: 'https://test.example.com/api/workflows/agent-eval-run/on-trajectory-complete', url: '/api/workflows/agent-eval-run/on-trajectory-complete',
}, },
}),
]),
prompt: 'Hello world', prompt: 'Hello world',
userInterventionConfig: { approvalMode: 'headless' }, userInterventionConfig: { approvalMode: 'headless' },
}), }),

View File

@@ -21,7 +21,6 @@ import {
import { MessageModel } from '@/database/models/message'; import { MessageModel } from '@/database/models/message';
import { ThreadModel } from '@/database/models/thread'; import { ThreadModel } from '@/database/models/thread';
import { TopicModel } from '@/database/models/topic'; import { TopicModel } from '@/database/models/topic';
import { appEnv } from '@/envs/app';
import { AgentService } from '@/server/services/agent'; import { AgentService } from '@/server/services/agent';
import { AgentRuntimeService } from '@/server/services/agentRuntime/AgentRuntimeService'; import { AgentRuntimeService } from '@/server/services/agentRuntime/AgentRuntimeService';
import { AiAgentService } from '@/server/services/aiAgent'; import { AiAgentService } from '@/server/services/aiAgent';
@@ -264,20 +263,45 @@ export class AgentEvalRunService {
await this.runTopicModel.updateByRunAndTopic(runId, topicId, { status: 'running' }); await this.runTopicModel.updateByRunAndTopic(runId, topicId, { status: 'running' });
const aiAgentService = new AiAgentService(this.db, this.userId); const aiAgentService = new AiAgentService(this.db, this.userId);
const webhookUrl = new URL( const webhookUrl = '/api/workflows/agent-eval-run/on-trajectory-complete';
'/api/workflows/agent-eval-run/on-trajectory-complete', const userId = this.userId;
appEnv.APP_URL, const db = this.db;
).toString();
try { try {
const execResult = await aiAgentService.execAgent({ const execResult = await aiAgentService.execAgent({
agentId: run.targetAgentId ?? undefined, agentId: run.targetAgentId ?? undefined,
appContext: { topicId }, appContext: { topicId },
autoStart: true, autoStart: true,
completionWebhook: { hooks: [
body: { runId, testCaseId, userId: this.userId }, {
handler: async (event) => {
// Local mode: directly record completion
const service = new AgentEvalRunService(db, userId);
await service.recordTrajectoryCompletion({
runId,
status: event.status || event.reason || 'done',
telemetry: {
completionReason: event.reason,
cost: event.cost,
duration: event.duration,
errorDetail: event.errorDetail,
errorMessage: event.errorMessage,
llmCalls: event.llmCalls,
steps: event.steps,
toolCalls: event.toolCalls,
totalTokens: event.totalTokens,
},
testCaseId,
});
},
id: 'eval-trajectory-complete',
type: 'onComplete' as const,
webhook: {
body: { runId, testCaseId, userId },
url: webhookUrl, url: webhookUrl,
}, },
},
],
...(envPrompt && { evalContext: { envPrompt } }), ...(envPrompt && { evalContext: { envPrompt } }),
maxSteps: run.config?.maxSteps, maxSteps: run.config?.maxSteps,
prompt: params.testCase.content.input || '', prompt: params.testCase.content.input || '',
@@ -381,20 +405,46 @@ export class AgentEvalRunService {
const { envPrompt, run, runId, testCaseId, threadId, topicId } = params; const { envPrompt, run, runId, testCaseId, threadId, topicId } = params;
const aiAgentService = new AiAgentService(this.db, this.userId); const aiAgentService = new AiAgentService(this.db, this.userId);
const webhookUrl = new URL( const webhookUrl = '/api/workflows/agent-eval-run/on-thread-complete';
'/api/workflows/agent-eval-run/on-thread-complete', const userId = this.userId;
appEnv.APP_URL, const db = this.db;
).toString();
try { try {
const execResult = await aiAgentService.execAgent({ const execResult = await aiAgentService.execAgent({
agentId: run.targetAgentId ?? undefined, agentId: run.targetAgentId ?? undefined,
appContext: { threadId, topicId }, appContext: { threadId, topicId },
autoStart: true, autoStart: true,
completionWebhook: { hooks: [
body: { runId, testCaseId, threadId, topicId, userId: this.userId }, {
handler: async (event) => {
// Local mode: directly record thread completion
const service = new AgentEvalRunService(db, userId);
await service.recordThreadCompletion({
runId,
status: event.status || event.reason || 'done',
telemetry: {
completionReason: event.reason,
cost: event.cost,
duration: event.duration,
errorMessage: event.errorMessage,
llmCalls: event.llmCalls,
steps: event.steps,
toolCalls: event.toolCalls,
totalTokens: event.totalTokens,
},
testCaseId,
threadId,
topicId,
});
},
id: 'eval-thread-complete',
type: 'onComplete' as const,
webhook: {
body: { runId, testCaseId, threadId, topicId, userId },
url: webhookUrl, url: webhookUrl,
}, },
},
],
...(envPrompt && { evalContext: { envPrompt } }), ...(envPrompt && { evalContext: { envPrompt } }),
maxSteps: run.config?.maxSteps, maxSteps: run.config?.maxSteps,
prompt: params.testCase.content.input || '', prompt: params.testCase.content.input || '',

View File

@@ -21,6 +21,7 @@ import { LocalQueueServiceImpl } from '@/server/services/queue/impls';
import { ToolExecutionService } from '@/server/services/toolExecution'; import { ToolExecutionService } from '@/server/services/toolExecution';
import { BuiltinToolsExecutor } from '@/server/services/toolExecution/builtin'; import { BuiltinToolsExecutor } from '@/server/services/toolExecution/builtin';
import { hookDispatcher } from './hooks';
import { import {
type AgentExecutionParams, type AgentExecutionParams,
type AgentExecutionResult, type AgentExecutionResult,
@@ -267,6 +268,7 @@ export class AgentRuntimeService {
appContext, appContext,
toolSet, toolSet,
stepCallbacks, stepCallbacks,
hooks,
userInterventionConfig, userInterventionConfig,
completionWebhook, completionWebhook,
stepWebhook, stepWebhook,
@@ -355,6 +357,26 @@ export class AgentRuntimeService {
this.registerStepCallbacks(operationId, stepCallbacks); this.registerStepCallbacks(operationId, stepCallbacks);
} }
// Register external hooks
if (hooks && hooks.length > 0) {
hookDispatcher.register(operationId, hooks);
// Persist webhook configs to state metadata for production mode
const serializedHooks = hookDispatcher.getSerializedHooks(operationId);
if (serializedHooks && serializedHooks.length > 0) {
const currentState = await this.coordinator.loadAgentState(operationId);
if (currentState) {
await this.coordinator.saveAgentState(operationId, {
...currentState,
metadata: {
...currentState.metadata,
_hooks: serializedHooks,
},
});
}
}
}
let messageId: string | undefined; let messageId: string | undefined;
let autoStarted = false; let autoStarted = false;
@@ -481,7 +503,7 @@ export class AgentRuntimeService {
}; };
} }
// Call onBeforeStep callback // Call onBeforeStep callback (legacy)
if (callbacks?.onBeforeStep) { if (callbacks?.onBeforeStep) {
try { try {
await callbacks.onBeforeStep({ await callbacks.onBeforeStep({
@@ -495,6 +517,26 @@ export class AgentRuntimeService {
} }
} }
// Dispatch beforeStep hooks
try {
const beforeStepMetadata = agentState?.metadata || {};
await hookDispatcher.dispatch(
operationId,
'beforeStep',
{
agentId: beforeStepMetadata?.agentId || '',
finalState: agentState,
operationId,
stepIndex,
steps: agentState?.stepCount || 0,
userId: beforeStepMetadata?.userId || this.userId,
},
beforeStepMetadata._hooks,
);
} catch (hookError) {
log('[%s] beforeStep hook dispatch error: %O', operationId, hookError);
}
// Create Agent and Runtime instances // Create Agent and Runtime instances
// Use agentState.metadata which contains the full app context (topicId, agentId, etc.) // Use agentState.metadata which contains the full app context (topicId, agentId, etc.)
// operationMetadata only contains basic fields (agentConfig, modelRuntimeConfig, userId) // operationMetadata only contains basic fields (agentConfig, modelRuntimeConfig, userId)
@@ -735,7 +777,7 @@ export class AgentRuntimeService {
totalTokens: totalTokensNum, totalTokens: totalTokensNum,
}; };
// Call onAfterStep callback with presentation data // Call onAfterStep callback with presentation data (legacy)
if (callbacks?.onAfterStep) { if (callbacks?.onAfterStep) {
try { try {
await callbacks.onAfterStep({ await callbacks.onAfterStep({
@@ -751,6 +793,28 @@ export class AgentRuntimeService {
} }
} }
// Dispatch afterStep hooks
try {
const metadata = stepResult.newState?.metadata || {};
await hookDispatcher.dispatch(
operationId,
'afterStep',
{
agentId: metadata?.agentId || '',
finalState: stepResult.newState,
operationId,
shouldContinue,
status: stepResult.newState?.status,
stepIndex,
steps: stepResult.newState?.stepCount || 0,
userId: metadata?.userId || this.userId,
},
metadata._hooks,
);
} catch (hookError) {
log('[%s] afterStep hook dispatch error: %O', operationId, hookError);
}
// Record step snapshot via injected snapshot store // Record step snapshot via injected snapshot store
if (this.snapshotStore) { if (this.snapshotStore) {
try { try {
@@ -909,7 +973,10 @@ export class AgentRuntimeService {
// Trigger completion webhook (fire-and-forget) // Trigger completion webhook (fire-and-forget)
await this.triggerCompletionWebhook(stepResult.newState, operationId, reason); await this.triggerCompletionWebhook(stepResult.newState, operationId, reason);
// Call onComplete callback // Dispatch onComplete hooks
await this.dispatchCompletionHooks(operationId, stepResult.newState, reason);
// Call onComplete callback (legacy)
if (callbacks?.onComplete) { if (callbacks?.onComplete) {
try { try {
await callbacks.onComplete({ await callbacks.onComplete({
@@ -1029,7 +1096,10 @@ export class AgentRuntimeService {
log('[%s] Failed to trigger completion webhook: %O', operationId, webhookError); log('[%s] Failed to trigger completion webhook: %O', operationId, webhookError);
} }
// Also call onComplete callback when execution fails // Dispatch onComplete + onError hooks
await this.dispatchCompletionHooks(operationId, finalStateWithError, 'error');
// Also call onComplete callback when execution fails (legacy)
if (callbacks?.onComplete) { if (callbacks?.onComplete) {
try { try {
await callbacks.onComplete({ await callbacks.onComplete({
@@ -1543,6 +1613,65 @@ export class AgentRuntimeService {
} }
} }
/**
* Dispatch onComplete (and onError) hooks via HookDispatcher.
* Fire-and-forget: errors are logged but never thrown.
*/
private async dispatchCompletionHooks(
operationId: string,
state: any,
reason: string,
): Promise<void> {
try {
const metadata = state?.metadata || {};
// Extract last assistant content (same as triggerCompletionWebhook)
const lastAssistantContent = state?.messages
?.slice()
.reverse()
.find(
(m: { content?: string; role: string }) => m.role === 'assistant' && m.content,
)?.content;
const duration = state?.createdAt
? Date.now() - new Date(state.createdAt).getTime()
: undefined;
const event = {
agentId: metadata?.agentId || '',
cost: state?.cost?.total,
duration,
errorDetail: state?.error,
errorMessage: this.extractErrorMessage?.(state?.error) || String(state?.error || ''),
// Full state available in local mode only (not serialized to webhooks)
finalState: state,
lastAssistantContent,
llmCalls: state?.usage?.llm?.apiCalls,
operationId,
reason,
status: state?.status || reason,
steps: state?.stepCount || 0,
toolCalls: state?.usage?.tools?.totalCalls,
topicId: metadata?.topicId,
totalTokens: state?.usage?.llm?.tokens?.total,
userId: metadata?.userId || this.userId,
};
// Dispatch onComplete hooks
await hookDispatcher.dispatch(operationId, 'onComplete', event, metadata._hooks);
// Also dispatch onError hooks if reason is error
if (reason === 'error') {
await hookDispatcher.dispatch(operationId, 'onError', event, metadata._hooks);
}
// Cleanup hooks after completion
hookDispatcher.unregister(operationId);
} catch (error) {
log('[%s] Hook dispatch error (non-fatal): %O', operationId, error);
}
}
/** /**
* Trigger completion webhook if configured in state metadata. * Trigger completion webhook if configured in state metadata.
* Fire-and-forget: errors are logged but never thrown. * Fire-and-forget: errors are logged but never thrown.

View File

@@ -0,0 +1,206 @@
import debug from 'debug';
import urlJoin from 'url-join';
import { isQueueAgentRuntimeEnabled } from '@/server/services/queue/impls';
import type {
AgentHook,
AgentHookEvent,
AgentHookType,
AgentHookWebhook,
SerializedHook,
} from './types';
const log = debug('lobe-server:hook-dispatcher');
/**
* Delivers a webhook via HTTP POST (fetch or QStash)
*/
async function deliverWebhook(
webhook: AgentHookWebhook,
payload: Record<string, unknown>,
): Promise<void> {
const { url, delivery = 'fetch' } = webhook;
// Resolve URL: relative paths joined with INTERNAL_APP_URL or APP_URL
const resolvedUrl = url.startsWith('http')
? url
: urlJoin(process.env.INTERNAL_APP_URL || process.env.APP_URL || '', url);
if (delivery === 'qstash') {
try {
const { Client } = await import('@upstash/qstash');
const qstashToken = process.env.QSTASH_TOKEN;
if (!qstashToken) {
log('QStash token not available, falling back to fetch delivery');
await fetchDeliver(resolvedUrl, payload);
return;
}
const client = new Client({ token: qstashToken });
await client.publishJSON({
body: payload,
headers: {
...(process.env.VERCEL_AUTOMATION_BYPASS_SECRET && {
'x-vercel-protection-bypass': process.env.VERCEL_AUTOMATION_BYPASS_SECRET,
}),
},
url: resolvedUrl,
});
log('Webhook delivered via QStash: %s', url);
} catch (error) {
log('QStash delivery failed, falling back to fetch: %O', error);
await fetchDeliver(resolvedUrl, payload);
}
} else {
await fetchDeliver(resolvedUrl, payload);
}
}
async function fetchDeliver(url: string, payload: Record<string, unknown>): Promise<void> {
try {
const res = await fetch(url, {
body: JSON.stringify(payload),
headers: { 'Content-Type': 'application/json' },
method: 'POST',
});
log('Webhook delivered via fetch: %s (status: %d)', url, res.status);
} catch (error) {
log('Webhook fetch delivery failed: %s %O', url, error);
// Hook errors should not affect main flow
}
}
/**
* HookDispatcher — central hub for registering and dispatching agent lifecycle hooks
*
* Local mode: hooks are stored in memory, handler functions called directly
* Production mode: webhook configs persisted in AgentState.metadata._hooks,
* delivered via HTTP POST or QStash
*/
export class HookDispatcher {
/**
* In-memory hook store (local mode)
* Maps operationId → AgentHook[]
*/
private hooks: Map<string, AgentHook[]> = new Map();
/**
* Dispatch hooks for a given event type
*
* In local mode: calls handler functions from memory
* In production mode: delivers webhooks from serialized config
*/
async dispatch(
operationId: string,
type: AgentHookType,
event: AgentHookEvent,
serializedHooks?: SerializedHook[],
): Promise<void> {
const isQueueMode = isQueueAgentRuntimeEnabled();
if (!isQueueMode) {
// Local mode: call handler functions directly
const hooks = this.hooks.get(operationId)?.filter((h) => h.type === type) || [];
for (const hook of hooks) {
try {
log('[%s][%s] Dispatching local hook: %s', operationId, type, hook.id);
await hook.handler(event);
} catch (error) {
log('[%s][%s] Hook error (non-fatal): %s %O', operationId, type, hook.id, error);
// Hook errors should NOT affect main execution flow
}
}
} else {
// Production mode: deliver via webhooks
const webhookHooks =
serializedHooks?.filter((h) => h.type === type && h.webhook) ||
this.getSerializedHooks(operationId)?.filter((h) => h.type === type) ||
[];
for (const hook of webhookHooks) {
try {
log(
'[%s][%s] Delivering webhook hook: %s → %s',
operationId,
type,
hook.id,
hook.webhook.url,
);
// Strip finalState from webhook payload (too large, local-only)
const { finalState: _, ...webhookEvent } = event;
await deliverWebhook(hook.webhook, {
...webhookEvent,
hookId: hook.id,
hookType: type,
...hook.webhook.body,
});
} catch (error) {
log(
'[%s][%s] Webhook delivery error (non-fatal): %s %O',
operationId,
type,
hook.id,
error,
);
}
}
}
}
/**
* Get serialized hooks for an operation (for production mode persistence)
*/
getSerializedHooks(operationId: string): SerializedHook[] | undefined {
const hooks = this.hooks.get(operationId);
if (!hooks) return undefined;
return hooks
.filter((h) => h.webhook)
.map((h) => ({
id: h.id,
type: h.type,
webhook: h.webhook!,
}));
}
/**
* Check if any hooks are registered for an operation
*/
hasHooks(operationId: string): boolean {
return (this.hooks.get(operationId)?.length ?? 0) > 0;
}
/**
* Register hooks for an operation
*
* In local mode: stores hooks in memory (including handler functions)
* In production mode: caller should persist getSerializedHooks() to state.metadata._hooks
*/
register(operationId: string, hooks: AgentHook[]): void {
if (hooks.length === 0) return;
const existing = this.hooks.get(operationId) || [];
this.hooks.set(operationId, [...existing, ...hooks]);
log(
'[%s] Registered %d hooks: %s',
operationId,
hooks.length,
hooks.map((h) => `${h.type}:${h.id}`).join(', '),
);
}
/**
* Unregister all hooks for an operation (cleanup)
*/
unregister(operationId: string): void {
this.hooks.delete(operationId);
log('[%s] Unregistered all hooks', operationId);
}
}
/**
* Singleton instance — shared across the application
*/
export const hookDispatcher = new HookDispatcher();

View File

@@ -0,0 +1,301 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { HookDispatcher } from '../HookDispatcher';
import type { AgentHook, AgentHookEvent } from '../types';
// Mock isQueueAgentRuntimeEnabled to control local vs production mode
vi.mock('@/server/services/queue/impls', () => ({
isQueueAgentRuntimeEnabled: vi.fn(() => false), // Default: local mode
}));
const { isQueueAgentRuntimeEnabled } = await import('@/server/services/queue/impls');
describe('HookDispatcher', () => {
let dispatcher: HookDispatcher;
const operationId = 'op_test_123';
const makeEvent = (overrides?: Partial<AgentHookEvent>): AgentHookEvent => ({
agentId: 'agt_test',
operationId,
reason: 'done',
status: 'done',
userId: 'user_test',
...overrides,
});
beforeEach(() => {
dispatcher = new HookDispatcher();
vi.mocked(isQueueAgentRuntimeEnabled).mockReturnValue(false);
});
afterEach(() => {
vi.restoreAllMocks();
});
describe('register', () => {
it('should register hooks for an operation', () => {
const hook: AgentHook = {
handler: vi.fn(),
id: 'test-hook',
type: 'onComplete',
};
dispatcher.register(operationId, [hook]);
expect(dispatcher.hasHooks(operationId)).toBe(true);
});
it('should append hooks to existing registrations', () => {
const hook1: AgentHook = { handler: vi.fn(), id: 'hook-1', type: 'onComplete' };
const hook2: AgentHook = { handler: vi.fn(), id: 'hook-2', type: 'onError' };
dispatcher.register(operationId, [hook1]);
dispatcher.register(operationId, [hook2]);
expect(dispatcher.hasHooks(operationId)).toBe(true);
});
it('should not register empty hooks array', () => {
dispatcher.register(operationId, []);
expect(dispatcher.hasHooks(operationId)).toBe(false);
});
});
describe('dispatch (local mode)', () => {
it('should call handler for matching hook type', async () => {
const handler = vi.fn();
dispatcher.register(operationId, [{ handler, id: 'test', type: 'onComplete' }]);
await dispatcher.dispatch(operationId, 'onComplete', makeEvent());
expect(handler).toHaveBeenCalledTimes(1);
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
operationId,
reason: 'done',
}),
);
});
it('should not call handler for non-matching hook type', async () => {
const handler = vi.fn();
dispatcher.register(operationId, [{ handler, id: 'test', type: 'onComplete' }]);
await dispatcher.dispatch(operationId, 'onError', makeEvent());
expect(handler).not.toHaveBeenCalled();
});
it('should call multiple handlers of same type', async () => {
const handler1 = vi.fn();
const handler2 = vi.fn();
dispatcher.register(operationId, [
{ handler: handler1, id: 'hook-1', type: 'onComplete' },
{ handler: handler2, id: 'hook-2', type: 'onComplete' },
]);
await dispatcher.dispatch(operationId, 'onComplete', makeEvent());
expect(handler1).toHaveBeenCalledTimes(1);
expect(handler2).toHaveBeenCalledTimes(1);
});
it('should not throw if handler throws (non-fatal)', async () => {
const handler = vi.fn().mockRejectedValue(new Error('hook failed'));
dispatcher.register(operationId, [{ handler, id: 'failing-hook', type: 'onComplete' }]);
// Should not throw
await expect(
dispatcher.dispatch(operationId, 'onComplete', makeEvent()),
).resolves.toBeUndefined();
});
it('should call remaining hooks even if one fails', async () => {
const failingHandler = vi.fn().mockRejectedValue(new Error('fail'));
const successHandler = vi.fn();
dispatcher.register(operationId, [
{ handler: failingHandler, id: 'failing', type: 'onComplete' },
{ handler: successHandler, id: 'success', type: 'onComplete' },
]);
await dispatcher.dispatch(operationId, 'onComplete', makeEvent());
expect(failingHandler).toHaveBeenCalled();
expect(successHandler).toHaveBeenCalled();
});
it('should handle no registered hooks gracefully', async () => {
await expect(
dispatcher.dispatch('unknown_op', 'onComplete', makeEvent()),
).resolves.toBeUndefined();
});
});
describe('dispatch (production mode)', () => {
beforeEach(() => {
vi.mocked(isQueueAgentRuntimeEnabled).mockReturnValue(true);
// Mock global fetch
global.fetch = vi.fn().mockResolvedValue({ status: 200 });
});
afterEach(() => {
vi.restoreAllMocks();
});
it('should deliver webhook for hooks with webhook config', async () => {
dispatcher.register(operationId, [
{
handler: vi.fn(), // handler not called in production mode
id: 'webhook-hook',
type: 'onComplete',
webhook: { url: 'https://example.com/hook' },
},
]);
const serialized = dispatcher.getSerializedHooks(operationId);
await dispatcher.dispatch(operationId, 'onComplete', makeEvent(), serialized);
expect(global.fetch).toHaveBeenCalledWith(
'https://example.com/hook',
expect.objectContaining({
method: 'POST',
body: expect.any(String),
}),
);
});
it('should merge webhook.body into payload', async () => {
dispatcher.register(operationId, [
{
handler: vi.fn(),
id: 'custom-body-hook',
type: 'onComplete',
webhook: {
body: { taskId: 'task_123', customField: 'value' },
url: 'https://example.com/hook',
},
},
]);
const serialized = dispatcher.getSerializedHooks(operationId);
await dispatcher.dispatch(operationId, 'onComplete', makeEvent(), serialized);
const call = vi.mocked(global.fetch).mock.calls[0];
const body = JSON.parse(call[1]?.body as string);
expect(body.taskId).toBe('task_123');
expect(body.customField).toBe('value');
expect(body.hookId).toBe('custom-body-hook');
});
it('should not call local handler in production mode', async () => {
const handler = vi.fn();
dispatcher.register(operationId, [
{
handler,
id: 'prod-hook',
type: 'onComplete',
webhook: { url: 'https://example.com/hook' },
},
]);
const serialized = dispatcher.getSerializedHooks(operationId);
await dispatcher.dispatch(operationId, 'onComplete', makeEvent(), serialized);
expect(handler).not.toHaveBeenCalled();
});
it('should skip hooks without webhook config in production mode', async () => {
dispatcher.register(operationId, [
{ handler: vi.fn(), id: 'local-only', type: 'onComplete' },
]);
const serialized = dispatcher.getSerializedHooks(operationId);
await dispatcher.dispatch(operationId, 'onComplete', makeEvent(), serialized);
expect(global.fetch).not.toHaveBeenCalled();
});
});
describe('getSerializedHooks', () => {
it('should return only hooks with webhook config', () => {
dispatcher.register(operationId, [
{ handler: vi.fn(), id: 'local-only', type: 'onComplete' },
{
handler: vi.fn(),
id: 'with-webhook',
type: 'onComplete',
webhook: { url: '/api/hook' },
},
]);
const serialized = dispatcher.getSerializedHooks(operationId);
expect(serialized).toHaveLength(1);
expect(serialized![0].id).toBe('with-webhook');
expect(serialized![0].webhook.url).toBe('/api/hook');
});
it('should return undefined for unknown operation', () => {
expect(dispatcher.getSerializedHooks('unknown')).toBeUndefined();
});
});
describe('unregister', () => {
it('should remove all hooks for an operation', () => {
dispatcher.register(operationId, [{ handler: vi.fn(), id: 'hook', type: 'onComplete' }]);
expect(dispatcher.hasHooks(operationId)).toBe(true);
dispatcher.unregister(operationId);
expect(dispatcher.hasHooks(operationId)).toBe(false);
});
});
describe('hook types', () => {
it('should dispatch beforeStep hooks', async () => {
const handler = vi.fn();
dispatcher.register(operationId, [{ handler, id: 'before', type: 'beforeStep' }]);
await dispatcher.dispatch(operationId, 'beforeStep', makeEvent({ stepIndex: 0 }));
expect(handler).toHaveBeenCalled();
});
it('should dispatch afterStep hooks', async () => {
const handler = vi.fn();
dispatcher.register(operationId, [{ handler, id: 'after', type: 'afterStep' }]);
await dispatcher.dispatch(
operationId,
'afterStep',
makeEvent({ stepIndex: 1, shouldContinue: true }),
);
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
shouldContinue: true,
stepIndex: 1,
}),
);
});
it('should dispatch onError hooks', async () => {
const handler = vi.fn();
dispatcher.register(operationId, [{ handler, id: 'error', type: 'onError' }]);
await dispatcher.dispatch(
operationId,
'onError',
makeEvent({
errorMessage: 'Something went wrong',
reason: 'error',
}),
);
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
errorMessage: 'Something went wrong',
reason: 'error',
}),
);
});
});
});

View File

@@ -0,0 +1,8 @@
export { HookDispatcher, hookDispatcher } from './HookDispatcher';
export type {
AgentHook,
AgentHookEvent,
AgentHookType,
AgentHookWebhook,
SerializedHook,
} from './types';

View File

@@ -0,0 +1,106 @@
/**
* Agent Runtime Hooks — external lifecycle hook system
*
* Hooks are registered once and automatically adapt to the runtime mode:
* - Local mode: handler function is called directly (in-process)
* - Production (QStash) mode: webhook is delivered via HTTP POST
*/
// ── Hook Types ──────────────────────────────────────────
/**
* Lifecycle hook points in agent execution
*/
export type AgentHookType =
| 'afterStep' // After each step completes
| 'beforeStep' // Before each step executes
| 'onComplete' // Operation reaches terminal state (done/error/interrupted)
| 'onError'; // Error during execution
/**
* Hook definition — consumers register these with execAgent
*/
export interface AgentHook {
/** Handler function for local mode (called in-process) */
handler: (event: AgentHookEvent) => Promise<void>;
/** Unique hook identifier (for logging, debugging, idempotency) */
id: string;
/** Hook lifecycle point */
type: AgentHookType;
/** Webhook config for production mode (if omitted, hook only works in local mode) */
webhook?: AgentHookWebhook;
}
/**
* Webhook delivery configuration for production mode
*/
export interface AgentHookWebhook {
/** Custom data merged into webhook payload */
body?: Record<string, unknown>;
/** Delivery method: 'fetch' (plain HTTP) or 'qstash' (guaranteed delivery). Default: 'qstash' */
delivery?: 'fetch' | 'qstash';
/** Webhook endpoint URL (relative or absolute) */
url: string;
}
// ── Hook Events ──────────────────────────────────────────
/**
* Unified event payload passed to hook handlers and webhook payloads
*/
export interface AgentHookEvent {
// Identification
agentId: string;
// Statistics
cost?: number;
duration?: number;
// Content
errorDetail?: string;
errorMessage?: string;
/**
* Full AgentState — only available in local mode.
* Not serialized to webhook payloads.
* Use for consumers that need deep state access (e.g., SubAgent Thread updates).
*/
finalState?: any;
lastAssistantContent?: string;
llmCalls?: number;
// Caller-provided metadata (from webhook.body)
metadata?: Record<string, unknown>;
operationId: string;
// Execution result
reason?: string; // 'done' | 'error' | 'interrupted' | 'max_steps' | 'cost_limit'
// Step-specific (for beforeStep/afterStep)
shouldContinue?: boolean;
status?: string; // 'done' | 'error' | 'interrupted' | 'waiting_for_human'
stepIndex?: number;
steps?: number;
stepType?: string; // 'call_llm' | 'call_tool'
toolCalls?: number;
topicId?: string;
totalTokens?: number;
userId: string;
}
// ── Serialized Hook (for Redis persistence) ──────────────
/**
* Serialized hook config stored in AgentState.metadata._hooks
* Only contains webhook info (handler functions can't be serialized)
*/
export interface SerializedHook {
id: string;
type: AgentHookType;
webhook: AgentHookWebhook;
}

View File

@@ -4,6 +4,8 @@ import { type UserInterventionConfig } from '@lobechat/types';
import { type ServerUserMemoryConfig } from '@/server/modules/Mecha/ContextEngineering/types'; import { type ServerUserMemoryConfig } from '@/server/modules/Mecha/ContextEngineering/types';
import { type AgentHook } from './hooks/types';
// ==================== Operation Tool Set ==================== // ==================== Operation Tool Set ====================
export interface OperationToolSet { export interface OperationToolSet {
@@ -152,6 +154,11 @@ export interface OperationCreationParams {
/** Discord context for injecting channel/guild info into agent system message */ /** Discord context for injecting channel/guild info into agent system message */
discordContext?: any; discordContext?: any;
evalContext?: any; evalContext?: any;
/**
* External lifecycle hooks
* Registered once, auto-adapt to local (in-memory) or production (webhook) mode
*/
hooks?: AgentHook[];
initialContext: AgentRuntimeContext; initialContext: AgentRuntimeContext;
initialMessages?: any[]; initialMessages?: any[];
maxSteps?: number; maxSteps?: number;

View File

@@ -207,11 +207,11 @@ describe('AiAgentService.execSubAgentTask', () => {
topicId: 'topic-1', topicId: 'topic-1',
}, },
autoStart: true, autoStart: true,
hooks: expect.arrayContaining([
expect.objectContaining({ id: 'thread-metadata-update', type: 'afterStep' }),
expect.objectContaining({ id: 'thread-completion', type: 'onComplete' }),
]),
prompt: 'Test instruction', prompt: 'Test instruction',
stepCallbacks: expect.objectContaining({
onAfterStep: expect.any(Function),
onComplete: expect.any(Function),
}),
userInterventionConfig: { userInterventionConfig: {
approvalMode: 'headless', approvalMode: 'headless',
}, },
@@ -447,20 +447,21 @@ describe('AiAgentService.execSubAgentTask', () => {
topicId: 'topic-1', topicId: 'topic-1',
}); });
// Verify that stepCallbacks was passed with onComplete // Verify that hooks were passed with onComplete
expect(execAgentSpy).toHaveBeenCalledWith( expect(execAgentSpy).toHaveBeenCalledWith(
expect.objectContaining({ expect.objectContaining({
stepCallbacks: expect.objectContaining({ hooks: expect.arrayContaining([
onComplete: expect.any(Function), expect.objectContaining({ id: 'thread-completion', type: 'onComplete' }),
}), ]),
}), }),
); );
// Get the onComplete callback // Get the onComplete hook handler
const callArgs = execAgentSpy.mock.calls[0][0]; const callArgs = execAgentSpy.mock.calls[0][0];
const onComplete = callArgs.stepCallbacks?.onComplete; const onCompleteHook = callArgs.hooks?.find((h: any) => h.id === 'thread-completion');
expect(onComplete).toBeDefined(); expect(onCompleteHook).toBeDefined();
expect(onCompleteHook!.handler).toBeInstanceOf(Function);
}); });
}); });
}); });

View File

@@ -43,6 +43,7 @@ import { type ServerUserMemoryConfig } from '@/server/modules/Mecha/ContextEngin
import { AgentService } from '@/server/services/agent'; import { AgentService } from '@/server/services/agent';
import type { AgentRuntimeServiceOptions } from '@/server/services/agentRuntime'; import type { AgentRuntimeServiceOptions } from '@/server/services/agentRuntime';
import { AgentRuntimeService } from '@/server/services/agentRuntime'; import { AgentRuntimeService } from '@/server/services/agentRuntime';
import { type AgentHook } from '@/server/services/agentRuntime/hooks/types';
import { type StepLifecycleCallbacks } from '@/server/services/agentRuntime/types'; import { type StepLifecycleCallbacks } from '@/server/services/agentRuntime/types';
import { FileService } from '@/server/services/file'; import { FileService } from '@/server/services/file';
import { KlavisService } from '@/server/services/klavis'; import { KlavisService } from '@/server/services/klavis';
@@ -103,6 +104,8 @@ interface InternalExecAgentParams extends ExecAgentParams {
size?: number; size?: number;
url: string; url: string;
}>; }>;
/** External lifecycle hooks (auto-adapt to local/production mode) */
hooks?: AgentHook[];
/** Maximum steps for the agent operation */ /** Maximum steps for the agent operation */
maxSteps?: number; maxSteps?: number;
/** Step lifecycle callbacks for operation tracking (server-side only) */ /** Step lifecycle callbacks for operation tracking (server-side only) */
@@ -204,6 +207,7 @@ export class AiAgentService {
discordContext, discordContext,
existingMessageIds = [], existingMessageIds = [],
files, files,
hooks,
instructions, instructions,
stepCallbacks, stepCallbacks,
stream, stream,
@@ -793,6 +797,7 @@ export class AiAgentService {
initialMessages: allMessages, initialMessages: allMessages,
maxSteps, maxSteps,
modelRuntimeConfig: { model, provider }, modelRuntimeConfig: { model, provider },
hooks,
operationId, operationId,
stepCallbacks, stepCallbacks,
stepWebhook, stepWebhook,
@@ -979,18 +984,18 @@ export class AiAgentService {
status: ThreadStatus.Processing, status: ThreadStatus.Processing,
}); });
// 3. Create step lifecycle callbacks for updating Thread metadata and task message // 3. Create hooks for updating Thread metadata and task message
const stepCallbacks = this.createThreadMetadataCallbacks(thread.id, startedAt, parentMessageId); const threadHooks = this.createThreadHooks(thread.id, startedAt, parentMessageId);
// 4. Delegate to execAgent with threadId in appContext and callbacks // 4. Delegate to execAgent with threadId in appContext and hooks
// The instruction will be created as user message in the Thread // The instruction will be created as user message in the Thread
// Use headless mode to skip human approval in async task execution // Use headless mode to skip human approval in async task execution
const result = await this.execAgent({ const result = await this.execAgent({
agentId, agentId,
appContext: { groupId, threadId: thread.id, topicId }, appContext: { groupId, threadId: thread.id, topicId },
autoStart: true, autoStart: true,
hooks: threadHooks,
prompt: instruction, prompt: instruction,
stepCallbacks,
userInterventionConfig: { approvalMode: 'headless' }, userInterventionConfig: { approvalMode: 'headless' },
}); });
@@ -1155,6 +1160,132 @@ export class AiAgentService {
}; };
} }
/**
* Create hooks for tracking Thread metadata updates during SubAgent execution.
* Replaces the legacy createThreadMetadataCallbacks with the hooks system.
*/
private createThreadHooks(
threadId: string,
startedAt: string,
sourceMessageId: string,
): AgentHook[] {
let accumulatedToolCalls = 0;
return [
{
handler: async (event) => {
const state = event.finalState;
if (!state) return;
// Count tool calls from step result
const stepToolCalls = state.session?.toolCalls || 0;
if (stepToolCalls > accumulatedToolCalls) {
accumulatedToolCalls = stepToolCalls;
}
try {
await this.threadModel.update(threadId, {
metadata: {
operationId: event.operationId,
startedAt,
totalMessages: state.messages?.length ?? 0,
totalTokens: this.calculateTotalTokens(state.usage),
totalToolCalls: accumulatedToolCalls,
},
});
} catch (error) {
log('Thread hook afterStep: failed to update metadata: %O', error);
}
},
id: 'thread-metadata-update',
type: 'afterStep' as const,
},
{
handler: async (event) => {
const finalState = event.finalState;
if (!finalState) return;
const completedAt = new Date().toISOString();
const duration = Date.now() - new Date(startedAt).getTime();
// Map completion reason to ThreadStatus
let status: ThreadStatus;
switch (event.reason) {
case 'done': {
status = ThreadStatus.Completed;
break;
}
case 'error': {
status = ThreadStatus.Failed;
break;
}
case 'interrupted': {
status = ThreadStatus.Cancel;
break;
}
case 'waiting_for_human': {
status = ThreadStatus.InReview;
break;
}
default: {
status = ThreadStatus.Completed;
}
}
if (event.reason === 'error' && finalState.error) {
console.error(
'Thread hook onComplete: task failed for thread %s:',
threadId,
finalState.error,
);
}
try {
// Update task message with summary
const lastAssistantMessage = finalState.messages
?.slice()
.reverse()
.find((m: { role: string }) => m.role === 'assistant');
if (lastAssistantMessage?.content) {
await this.messageModel.update(sourceMessageId, {
content: lastAssistantMessage.content,
});
}
const formattedError = formatErrorForMetadata(finalState.error);
await this.threadModel.update(threadId, {
metadata: {
completedAt,
duration,
error: formattedError,
operationId: finalState.operationId,
startedAt,
totalCost: finalState.cost?.total,
totalMessages: finalState.messages?.length ?? 0,
totalTokens: this.calculateTotalTokens(finalState.usage),
totalToolCalls: accumulatedToolCalls,
},
status,
});
log(
'Thread hook onComplete: thread %s status=%s reason=%s',
threadId,
status,
event.reason,
);
} catch (error) {
console.error('Thread hook onComplete: failed to update: %O', error);
}
},
id: 'thread-completion',
type: 'onComplete' as const,
},
];
}
/** /**
* Calculate total tokens from AgentState usage object * Calculate total tokens from AgentState usage object
* AgentState.usage is of type Usage from @lobechat/agent-runtime * AgentState.usage is of type Usage from @lobechat/agent-runtime