mirror of
https://github.com/lobehub/lobehub.git
synced 2026-03-27 13:29:15 +07:00
@@ -170,7 +170,14 @@ export class AgentRuntime {
|
||||
|
||||
// Special handling for batch tool execution
|
||||
if (instruction.type === 'call_tools_batch') {
|
||||
result = await this.executeToolsBatch(instruction as any, currentState, runtimeContext);
|
||||
// Check if custom executor is provided (e.g., server-side with DB access)
|
||||
const customExecutor = this.executors['call_tools_batch' as keyof typeof this.executors];
|
||||
if (customExecutor) {
|
||||
result = await customExecutor(instruction, currentState, runtimeContext);
|
||||
} else {
|
||||
// Fallback to built-in executeToolsBatch
|
||||
result = await this.executeToolsBatch(instruction as any, currentState, runtimeContext);
|
||||
}
|
||||
} else {
|
||||
const executor = this.executors[instruction.type as keyof typeof this.executors];
|
||||
if (!executor) {
|
||||
|
||||
@@ -617,6 +617,168 @@ export const createRuntimeExecutors = (
|
||||
};
|
||||
}
|
||||
},
|
||||
|
||||
/**
|
||||
* Batch tool execution with database sync
|
||||
* Executes multiple tools concurrently and refreshes messages from database after completion
|
||||
*/
|
||||
call_tools_batch: async (instruction, state) => {
|
||||
const { payload } = instruction as Extract<AgentInstruction, { type: 'call_tools_batch' }>;
|
||||
const { parentMessageId, toolsCalling } = payload;
|
||||
const { operationId, stepIndex, streamManager, toolExecutionService } = ctx;
|
||||
const events: AgentEvent[] = [];
|
||||
|
||||
const operationLogId = `${operationId}:${stepIndex}`;
|
||||
log(`[${operationLogId}][call_tools_batch] Starting batch execution for ${toolsCalling.length} tools`);
|
||||
|
||||
// Track all tool message IDs created during execution
|
||||
const toolMessageIds: string[] = [];
|
||||
const toolResults: any[] = [];
|
||||
|
||||
// Execute all tools concurrently
|
||||
await Promise.all(
|
||||
toolsCalling.map(async (chatToolPayload: ChatToolPayload) => {
|
||||
const toolName = `${chatToolPayload.identifier}/${chatToolPayload.apiName}`;
|
||||
|
||||
// Publish tool execution start event
|
||||
await streamManager.publishStreamEvent(operationId, {
|
||||
data: { parentMessageId, toolCalling: chatToolPayload },
|
||||
stepIndex,
|
||||
type: 'tool_start',
|
||||
});
|
||||
|
||||
try {
|
||||
log(`[${operationLogId}] Executing tool ${toolName} ...`);
|
||||
const executionResult = await toolExecutionService.executeTool(chatToolPayload, {
|
||||
serverDB: ctx.serverDB,
|
||||
toolManifestMap: state.toolManifestMap,
|
||||
topicId: ctx.topicId,
|
||||
userId: ctx.userId,
|
||||
});
|
||||
|
||||
const executionTime = executionResult.executionTime;
|
||||
const isSuccess = executionResult.success;
|
||||
log(
|
||||
`[${operationLogId}] Executed ${toolName} in ${executionTime}ms, success: ${isSuccess}`,
|
||||
);
|
||||
|
||||
// Publish tool execution result event
|
||||
await streamManager.publishStreamEvent(operationId, {
|
||||
data: {
|
||||
executionTime,
|
||||
isSuccess,
|
||||
payload: { parentMessageId, toolCalling: chatToolPayload },
|
||||
phase: 'tool_execution',
|
||||
result: executionResult,
|
||||
},
|
||||
stepIndex,
|
||||
type: 'tool_end',
|
||||
});
|
||||
|
||||
// Create tool message in database
|
||||
try {
|
||||
const toolMessage = await ctx.messageModel.create({
|
||||
agentId: state.metadata!.agentId!,
|
||||
content: executionResult.content,
|
||||
parentId: parentMessageId,
|
||||
plugin: chatToolPayload as any,
|
||||
pluginError: executionResult.error,
|
||||
pluginState: executionResult.state,
|
||||
role: 'tool',
|
||||
threadId: state.metadata?.threadId,
|
||||
tool_call_id: chatToolPayload.id,
|
||||
topicId: state.metadata?.topicId,
|
||||
});
|
||||
toolMessageIds.push(toolMessage.id);
|
||||
log(`[${operationLogId}] Created tool message ${toolMessage.id} for ${toolName}`);
|
||||
} catch (error) {
|
||||
console.error(`[${operationLogId}] Failed to create tool message for ${toolName}:`, error);
|
||||
}
|
||||
|
||||
// Collect tool result
|
||||
toolResults.push({
|
||||
data: executionResult,
|
||||
executionTime,
|
||||
isSuccess,
|
||||
toolCall: chatToolPayload,
|
||||
toolCallId: chatToolPayload.id,
|
||||
});
|
||||
|
||||
events.push({ id: chatToolPayload.id, result: executionResult, type: 'tool_result' });
|
||||
|
||||
// Accumulate usage
|
||||
const toolCost = TOOL_PRICING[toolName] || 0;
|
||||
UsageCounter.accumulateTool({
|
||||
cost: state.cost,
|
||||
executionTime,
|
||||
success: isSuccess,
|
||||
toolCost,
|
||||
toolName,
|
||||
usage: state.usage,
|
||||
});
|
||||
} catch (error) {
|
||||
console.error(`[${operationLogId}] Tool execution failed for ${toolName}:`, error);
|
||||
|
||||
// Publish error event
|
||||
await streamManager.publishStreamEvent(operationId, {
|
||||
data: {
|
||||
error: (error as Error).message,
|
||||
phase: 'tool_execution',
|
||||
},
|
||||
stepIndex,
|
||||
type: 'error',
|
||||
});
|
||||
|
||||
events.push({ error, type: 'error' });
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
log(`[${operationLogId}][call_tools_batch] All tools executed, created ${toolMessageIds.length} tool messages`);
|
||||
|
||||
// Refresh messages from database to ensure state is in sync
|
||||
const newState = structuredClone(state);
|
||||
|
||||
// Query latest messages from database
|
||||
const latestMessages = await ctx.messageModel.query({
|
||||
topicId: state.metadata?.topicId,
|
||||
});
|
||||
|
||||
// Convert DB messages to LLM format with id
|
||||
newState.messages = latestMessages.map((msg: any) => ({
|
||||
content: msg.content,
|
||||
id: msg.id,
|
||||
role: msg.role,
|
||||
tool_call_id: msg.tool_call_id,
|
||||
tool_calls: msg.tool_calls,
|
||||
}));
|
||||
|
||||
log(`[${operationLogId}][call_tools_batch] Refreshed ${newState.messages.length} messages from database`);
|
||||
|
||||
// Get the last tool message ID as parentMessageId for next LLM call
|
||||
const lastToolMessageId = toolMessageIds.at(-1);
|
||||
|
||||
return {
|
||||
events,
|
||||
newState,
|
||||
nextContext: {
|
||||
payload: {
|
||||
parentMessageId: lastToolMessageId ?? parentMessageId,
|
||||
toolCount: toolsCalling.length,
|
||||
toolResults,
|
||||
},
|
||||
phase: 'tools_batch_result',
|
||||
session: {
|
||||
eventCount: events.length,
|
||||
messageCount: newState.messages.length,
|
||||
sessionId: operationId,
|
||||
status: 'running',
|
||||
stepCount: state.stepCount + 1,
|
||||
},
|
||||
},
|
||||
};
|
||||
},
|
||||
|
||||
/**
|
||||
* Complete runtime execution
|
||||
*/
|
||||
|
||||
@@ -540,6 +540,420 @@ describe('RuntimeExecutors', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('call_tools_batch executor', () => {
|
||||
const createMockState = (overrides?: Partial<AgentState>): AgentState => ({
|
||||
cost: createMockCost(),
|
||||
createdAt: new Date().toISOString(),
|
||||
lastModified: new Date().toISOString(),
|
||||
maxSteps: 100,
|
||||
messages: [],
|
||||
metadata: {
|
||||
agentId: 'agent-123',
|
||||
threadId: 'thread-123',
|
||||
topicId: 'topic-123',
|
||||
},
|
||||
operationId: 'op-123',
|
||||
status: 'running',
|
||||
stepCount: 0,
|
||||
toolManifestMap: {},
|
||||
usage: createMockUsage(),
|
||||
...overrides,
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
// Reset mock to return unique IDs for each call
|
||||
let callCount = 0;
|
||||
mockMessageModel.create.mockImplementation(() => {
|
||||
callCount++;
|
||||
return Promise.resolve({ id: `tool-msg-${callCount}` });
|
||||
});
|
||||
|
||||
// Mock query to return messages from database
|
||||
mockMessageModel.query = vi.fn().mockResolvedValue([
|
||||
{ id: 'msg-1', content: 'Hello', role: 'user' },
|
||||
{ id: 'msg-2', content: 'Response', role: 'assistant', tool_calls: [] },
|
||||
{ id: 'tool-msg-1', content: 'Tool result 1', role: 'tool', tool_call_id: 'tool-call-1' },
|
||||
{ id: 'tool-msg-2', content: 'Tool result 2', role: 'tool', tool_call_id: 'tool-call-2' },
|
||||
]);
|
||||
});
|
||||
|
||||
it('should execute multiple tools concurrently and create tool messages', async () => {
|
||||
const executors = createRuntimeExecutors(ctx);
|
||||
const state = createMockState();
|
||||
|
||||
const instruction = {
|
||||
payload: {
|
||||
parentMessageId: 'assistant-msg-123',
|
||||
toolsCalling: [
|
||||
{
|
||||
apiName: 'search',
|
||||
arguments: '{"query": "test1"}',
|
||||
id: 'tool-call-1',
|
||||
identifier: 'web-search',
|
||||
type: 'default' as const,
|
||||
},
|
||||
{
|
||||
apiName: 'crawl',
|
||||
arguments: '{"url": "https://example.com"}',
|
||||
id: 'tool-call-2',
|
||||
identifier: 'web-browsing',
|
||||
type: 'default' as const,
|
||||
},
|
||||
],
|
||||
},
|
||||
type: 'call_tools_batch' as const,
|
||||
};
|
||||
|
||||
await executors.call_tools_batch!(instruction, state);
|
||||
|
||||
// Should execute both tools
|
||||
expect(mockToolExecutionService.executeTool).toHaveBeenCalledTimes(2);
|
||||
|
||||
// Should create two tool messages
|
||||
expect(mockMessageModel.create).toHaveBeenCalledTimes(2);
|
||||
|
||||
// Verify first tool message
|
||||
expect(mockMessageModel.create).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
agentId: 'agent-123',
|
||||
parentId: 'assistant-msg-123',
|
||||
role: 'tool',
|
||||
tool_call_id: 'tool-call-1',
|
||||
}),
|
||||
);
|
||||
|
||||
// Verify second tool message
|
||||
expect(mockMessageModel.create).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
agentId: 'agent-123',
|
||||
parentId: 'assistant-msg-123',
|
||||
role: 'tool',
|
||||
tool_call_id: 'tool-call-2',
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('should refresh messages from database after batch execution', async () => {
|
||||
const executors = createRuntimeExecutors(ctx);
|
||||
const state = createMockState({ messages: [{ content: 'old', role: 'user' }] });
|
||||
|
||||
const instruction = {
|
||||
payload: {
|
||||
parentMessageId: 'assistant-msg-123',
|
||||
toolsCalling: [
|
||||
{
|
||||
apiName: 'search',
|
||||
arguments: '{}',
|
||||
id: 'tool-call-1',
|
||||
identifier: 'web-search',
|
||||
type: 'default' as const,
|
||||
},
|
||||
],
|
||||
},
|
||||
type: 'call_tools_batch' as const,
|
||||
};
|
||||
|
||||
const result = await executors.call_tools_batch!(instruction, state);
|
||||
|
||||
// Should query messages from database
|
||||
expect(mockMessageModel.query).toHaveBeenCalledWith({
|
||||
topicId: 'topic-123',
|
||||
});
|
||||
|
||||
// Messages should be refreshed from database (4 messages from mock)
|
||||
expect(result.newState.messages).toHaveLength(4);
|
||||
});
|
||||
|
||||
it('should include id in refreshed messages', async () => {
|
||||
const executors = createRuntimeExecutors(ctx);
|
||||
const state = createMockState();
|
||||
|
||||
const instruction = {
|
||||
payload: {
|
||||
parentMessageId: 'assistant-msg-123',
|
||||
toolsCalling: [
|
||||
{
|
||||
apiName: 'search',
|
||||
arguments: '{}',
|
||||
id: 'tool-call-1',
|
||||
identifier: 'web-search',
|
||||
type: 'default' as const,
|
||||
},
|
||||
],
|
||||
},
|
||||
type: 'call_tools_batch' as const,
|
||||
};
|
||||
|
||||
const result = await executors.call_tools_batch!(instruction, state);
|
||||
|
||||
// Each message should have an id
|
||||
result.newState.messages.forEach((msg: any) => {
|
||||
expect(msg.id).toBeDefined();
|
||||
expect(typeof msg.id).toBe('string');
|
||||
});
|
||||
|
||||
// Verify specific message ids
|
||||
expect(result.newState.messages[0].id).toBe('msg-1');
|
||||
expect(result.newState.messages[2].id).toBe('tool-msg-1');
|
||||
});
|
||||
|
||||
it('should return last tool message ID as parentMessageId in nextContext', async () => {
|
||||
let callCount = 0;
|
||||
mockMessageModel.create.mockImplementation(() => {
|
||||
callCount++;
|
||||
return Promise.resolve({ id: `created-tool-msg-${callCount}` });
|
||||
});
|
||||
|
||||
const executors = createRuntimeExecutors(ctx);
|
||||
const state = createMockState();
|
||||
|
||||
const instruction = {
|
||||
payload: {
|
||||
parentMessageId: 'assistant-msg-123',
|
||||
toolsCalling: [
|
||||
{
|
||||
apiName: 'search',
|
||||
arguments: '{}',
|
||||
id: 'tool-call-1',
|
||||
identifier: 'web-search',
|
||||
type: 'default' as const,
|
||||
},
|
||||
{
|
||||
apiName: 'crawl',
|
||||
arguments: '{}',
|
||||
id: 'tool-call-2',
|
||||
identifier: 'web-browsing',
|
||||
type: 'default' as const,
|
||||
},
|
||||
],
|
||||
},
|
||||
type: 'call_tools_batch' as const,
|
||||
};
|
||||
|
||||
const result = await executors.call_tools_batch!(instruction, state);
|
||||
|
||||
// parentMessageId should be the last created tool message ID
|
||||
const payload = result.nextContext!.payload as { parentMessageId?: string };
|
||||
expect(payload.parentMessageId).toBe('created-tool-msg-2');
|
||||
expect(result.nextContext!.phase).toBe('tools_batch_result');
|
||||
});
|
||||
|
||||
it('should fallback to original parentMessageId if no tool messages created', async () => {
|
||||
// All tool message creations fail
|
||||
mockMessageModel.create.mockRejectedValue(new Error('Database error'));
|
||||
|
||||
const executors = createRuntimeExecutors(ctx);
|
||||
const state = createMockState();
|
||||
|
||||
const instruction = {
|
||||
payload: {
|
||||
parentMessageId: 'original-parent-123',
|
||||
toolsCalling: [
|
||||
{
|
||||
apiName: 'search',
|
||||
arguments: '{}',
|
||||
id: 'tool-call-1',
|
||||
identifier: 'web-search',
|
||||
type: 'default' as const,
|
||||
},
|
||||
],
|
||||
},
|
||||
type: 'call_tools_batch' as const,
|
||||
};
|
||||
|
||||
const result = await executors.call_tools_batch!(instruction, state);
|
||||
|
||||
// Should fallback to original parentMessageId
|
||||
const payload = result.nextContext!.payload as { parentMessageId?: string };
|
||||
expect(payload.parentMessageId).toBe('original-parent-123');
|
||||
});
|
||||
|
||||
it('should continue processing other tools if one tool execution fails', async () => {
|
||||
// First tool fails, second succeeds
|
||||
mockToolExecutionService.executeTool
|
||||
.mockRejectedValueOnce(new Error('Tool execution error'))
|
||||
.mockResolvedValueOnce({
|
||||
content: 'Tool result 2',
|
||||
error: null,
|
||||
executionTime: 100,
|
||||
state: {},
|
||||
success: true,
|
||||
});
|
||||
|
||||
const executors = createRuntimeExecutors(ctx);
|
||||
const state = createMockState();
|
||||
|
||||
const instruction = {
|
||||
payload: {
|
||||
parentMessageId: 'assistant-msg-123',
|
||||
toolsCalling: [
|
||||
{
|
||||
apiName: 'search',
|
||||
arguments: '{}',
|
||||
id: 'tool-call-1',
|
||||
identifier: 'web-search',
|
||||
type: 'default' as const,
|
||||
},
|
||||
{
|
||||
apiName: 'crawl',
|
||||
arguments: '{}',
|
||||
id: 'tool-call-2',
|
||||
identifier: 'web-browsing',
|
||||
type: 'default' as const,
|
||||
},
|
||||
],
|
||||
},
|
||||
type: 'call_tools_batch' as const,
|
||||
};
|
||||
|
||||
const result = await executors.call_tools_batch!(instruction, state);
|
||||
|
||||
// Both tools should be attempted
|
||||
expect(mockToolExecutionService.executeTool).toHaveBeenCalledTimes(2);
|
||||
|
||||
// Only one tool message should be created (for the successful tool)
|
||||
expect(mockMessageModel.create).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Should still return result (not throw)
|
||||
expect(result.nextContext).toBeDefined();
|
||||
expect(result.nextContext!.phase).toBe('tools_batch_result');
|
||||
});
|
||||
|
||||
it('should continue if tool message creation fails for one tool', async () => {
|
||||
// First message creation succeeds, second fails
|
||||
mockMessageModel.create
|
||||
.mockResolvedValueOnce({ id: 'tool-msg-1' })
|
||||
.mockRejectedValueOnce(new Error('Database error'));
|
||||
|
||||
const executors = createRuntimeExecutors(ctx);
|
||||
const state = createMockState();
|
||||
|
||||
const instruction = {
|
||||
payload: {
|
||||
parentMessageId: 'assistant-msg-123',
|
||||
toolsCalling: [
|
||||
{
|
||||
apiName: 'search',
|
||||
arguments: '{}',
|
||||
id: 'tool-call-1',
|
||||
identifier: 'web-search',
|
||||
type: 'default' as const,
|
||||
},
|
||||
{
|
||||
apiName: 'crawl',
|
||||
arguments: '{}',
|
||||
id: 'tool-call-2',
|
||||
identifier: 'web-browsing',
|
||||
type: 'default' as const,
|
||||
},
|
||||
],
|
||||
},
|
||||
type: 'call_tools_batch' as const,
|
||||
};
|
||||
|
||||
const result = await executors.call_tools_batch!(instruction, state);
|
||||
|
||||
// Both tools should be executed
|
||||
expect(mockToolExecutionService.executeTool).toHaveBeenCalledTimes(2);
|
||||
|
||||
// Should still return result
|
||||
expect(result.nextContext).toBeDefined();
|
||||
|
||||
// parentMessageId should be the first successful tool message
|
||||
const payload = result.nextContext!.payload as { parentMessageId?: string };
|
||||
expect(payload.parentMessageId).toBe('tool-msg-1');
|
||||
});
|
||||
|
||||
it('should publish tool_start and tool_end events for each tool', async () => {
|
||||
const executors = createRuntimeExecutors(ctx);
|
||||
const state = createMockState();
|
||||
|
||||
const instruction = {
|
||||
payload: {
|
||||
parentMessageId: 'assistant-msg-123',
|
||||
toolsCalling: [
|
||||
{
|
||||
apiName: 'search',
|
||||
arguments: '{}',
|
||||
id: 'tool-call-1',
|
||||
identifier: 'web-search',
|
||||
type: 'default' as const,
|
||||
},
|
||||
{
|
||||
apiName: 'crawl',
|
||||
arguments: '{}',
|
||||
id: 'tool-call-2',
|
||||
identifier: 'web-browsing',
|
||||
type: 'default' as const,
|
||||
},
|
||||
],
|
||||
},
|
||||
type: 'call_tools_batch' as const,
|
||||
};
|
||||
|
||||
await executors.call_tools_batch!(instruction, state);
|
||||
|
||||
// Should publish tool_start for each tool
|
||||
expect(mockStreamManager.publishStreamEvent).toHaveBeenCalledWith(
|
||||
'op-123',
|
||||
expect.objectContaining({ type: 'tool_start' }),
|
||||
);
|
||||
|
||||
// Should publish tool_end for each tool
|
||||
expect(mockStreamManager.publishStreamEvent).toHaveBeenCalledWith(
|
||||
'op-123',
|
||||
expect.objectContaining({ type: 'tool_end' }),
|
||||
);
|
||||
|
||||
// At least 4 events (2 tool_start + 2 tool_end)
|
||||
expect(mockStreamManager.publishStreamEvent.mock.calls.length).toBeGreaterThanOrEqual(4);
|
||||
});
|
||||
|
||||
it('should include toolCount and toolResults in nextContext payload', async () => {
|
||||
const executors = createRuntimeExecutors(ctx);
|
||||
const state = createMockState();
|
||||
|
||||
const instruction = {
|
||||
payload: {
|
||||
parentMessageId: 'assistant-msg-123',
|
||||
toolsCalling: [
|
||||
{
|
||||
apiName: 'search',
|
||||
arguments: '{}',
|
||||
id: 'tool-call-1',
|
||||
identifier: 'web-search',
|
||||
type: 'default' as const,
|
||||
},
|
||||
{
|
||||
apiName: 'crawl',
|
||||
arguments: '{}',
|
||||
id: 'tool-call-2',
|
||||
identifier: 'web-browsing',
|
||||
type: 'default' as const,
|
||||
},
|
||||
],
|
||||
},
|
||||
type: 'call_tools_batch' as const,
|
||||
};
|
||||
|
||||
const result = await executors.call_tools_batch!(instruction, state);
|
||||
|
||||
const payload = result.nextContext!.payload as {
|
||||
toolCount: number;
|
||||
toolResults: any[];
|
||||
};
|
||||
|
||||
expect(payload.toolCount).toBe(2);
|
||||
expect(payload.toolResults).toHaveLength(2);
|
||||
expect(payload.toolResults[0]).toEqual(
|
||||
expect.objectContaining({
|
||||
toolCallId: 'tool-call-1',
|
||||
isSuccess: true,
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('resolve_aborted_tools executor', () => {
|
||||
const createMockState = (overrides?: Partial<AgentState>): AgentState => ({
|
||||
cost: createMockCost(),
|
||||
|
||||
Reference in New Issue
Block a user