🐛 fix: fix cron job issue (#11835)

* fix cron job

* fix lint
This commit is contained in:
Arvin Xu
2026-01-26 10:13:10 +08:00
committed by GitHub
parent b593095971
commit 6d50f80966
3 changed files with 584 additions and 1 deletions

View File

@@ -170,7 +170,14 @@ export class AgentRuntime {
// Special handling for batch tool execution
if (instruction.type === 'call_tools_batch') {
result = await this.executeToolsBatch(instruction as any, currentState, runtimeContext);
// Check if custom executor is provided (e.g., server-side with DB access)
const customExecutor = this.executors['call_tools_batch' as keyof typeof this.executors];
if (customExecutor) {
result = await customExecutor(instruction, currentState, runtimeContext);
} else {
// Fallback to built-in executeToolsBatch
result = await this.executeToolsBatch(instruction as any, currentState, runtimeContext);
}
} else {
const executor = this.executors[instruction.type as keyof typeof this.executors];
if (!executor) {

View File

@@ -617,6 +617,168 @@ export const createRuntimeExecutors = (
};
}
},
/**
* Batch tool execution with database sync
* Executes multiple tools concurrently and refreshes messages from database after completion
*/
call_tools_batch: async (instruction, state) => {
const { payload } = instruction as Extract<AgentInstruction, { type: 'call_tools_batch' }>;
const { parentMessageId, toolsCalling } = payload;
const { operationId, stepIndex, streamManager, toolExecutionService } = ctx;
const events: AgentEvent[] = [];
const operationLogId = `${operationId}:${stepIndex}`;
log(`[${operationLogId}][call_tools_batch] Starting batch execution for ${toolsCalling.length} tools`);
// Track all tool message IDs created during execution
const toolMessageIds: string[] = [];
const toolResults: any[] = [];
// Execute all tools concurrently
await Promise.all(
toolsCalling.map(async (chatToolPayload: ChatToolPayload) => {
const toolName = `${chatToolPayload.identifier}/${chatToolPayload.apiName}`;
// Publish tool execution start event
await streamManager.publishStreamEvent(operationId, {
data: { parentMessageId, toolCalling: chatToolPayload },
stepIndex,
type: 'tool_start',
});
try {
log(`[${operationLogId}] Executing tool ${toolName} ...`);
const executionResult = await toolExecutionService.executeTool(chatToolPayload, {
serverDB: ctx.serverDB,
toolManifestMap: state.toolManifestMap,
topicId: ctx.topicId,
userId: ctx.userId,
});
const executionTime = executionResult.executionTime;
const isSuccess = executionResult.success;
log(
`[${operationLogId}] Executed ${toolName} in ${executionTime}ms, success: ${isSuccess}`,
);
// Publish tool execution result event
await streamManager.publishStreamEvent(operationId, {
data: {
executionTime,
isSuccess,
payload: { parentMessageId, toolCalling: chatToolPayload },
phase: 'tool_execution',
result: executionResult,
},
stepIndex,
type: 'tool_end',
});
// Create tool message in database
try {
const toolMessage = await ctx.messageModel.create({
agentId: state.metadata!.agentId!,
content: executionResult.content,
parentId: parentMessageId,
plugin: chatToolPayload as any,
pluginError: executionResult.error,
pluginState: executionResult.state,
role: 'tool',
threadId: state.metadata?.threadId,
tool_call_id: chatToolPayload.id,
topicId: state.metadata?.topicId,
});
toolMessageIds.push(toolMessage.id);
log(`[${operationLogId}] Created tool message ${toolMessage.id} for ${toolName}`);
} catch (error) {
console.error(`[${operationLogId}] Failed to create tool message for ${toolName}:`, error);
}
// Collect tool result
toolResults.push({
data: executionResult,
executionTime,
isSuccess,
toolCall: chatToolPayload,
toolCallId: chatToolPayload.id,
});
events.push({ id: chatToolPayload.id, result: executionResult, type: 'tool_result' });
// Accumulate usage
const toolCost = TOOL_PRICING[toolName] || 0;
UsageCounter.accumulateTool({
cost: state.cost,
executionTime,
success: isSuccess,
toolCost,
toolName,
usage: state.usage,
});
} catch (error) {
console.error(`[${operationLogId}] Tool execution failed for ${toolName}:`, error);
// Publish error event
await streamManager.publishStreamEvent(operationId, {
data: {
error: (error as Error).message,
phase: 'tool_execution',
},
stepIndex,
type: 'error',
});
events.push({ error, type: 'error' });
}
}),
);
log(`[${operationLogId}][call_tools_batch] All tools executed, created ${toolMessageIds.length} tool messages`);
// Refresh messages from database to ensure state is in sync
const newState = structuredClone(state);
// Query latest messages from database
const latestMessages = await ctx.messageModel.query({
topicId: state.metadata?.topicId,
});
// Convert DB messages to LLM format with id
newState.messages = latestMessages.map((msg: any) => ({
content: msg.content,
id: msg.id,
role: msg.role,
tool_call_id: msg.tool_call_id,
tool_calls: msg.tool_calls,
}));
log(`[${operationLogId}][call_tools_batch] Refreshed ${newState.messages.length} messages from database`);
// Get the last tool message ID as parentMessageId for next LLM call
const lastToolMessageId = toolMessageIds.at(-1);
return {
events,
newState,
nextContext: {
payload: {
parentMessageId: lastToolMessageId ?? parentMessageId,
toolCount: toolsCalling.length,
toolResults,
},
phase: 'tools_batch_result',
session: {
eventCount: events.length,
messageCount: newState.messages.length,
sessionId: operationId,
status: 'running',
stepCount: state.stepCount + 1,
},
},
};
},
/**
* Complete runtime execution
*/

View File

@@ -540,6 +540,420 @@ describe('RuntimeExecutors', () => {
});
});
describe('call_tools_batch executor', () => {
const createMockState = (overrides?: Partial<AgentState>): AgentState => ({
cost: createMockCost(),
createdAt: new Date().toISOString(),
lastModified: new Date().toISOString(),
maxSteps: 100,
messages: [],
metadata: {
agentId: 'agent-123',
threadId: 'thread-123',
topicId: 'topic-123',
},
operationId: 'op-123',
status: 'running',
stepCount: 0,
toolManifestMap: {},
usage: createMockUsage(),
...overrides,
});
beforeEach(() => {
// Reset mock to return unique IDs for each call
let callCount = 0;
mockMessageModel.create.mockImplementation(() => {
callCount++;
return Promise.resolve({ id: `tool-msg-${callCount}` });
});
// Mock query to return messages from database
mockMessageModel.query = vi.fn().mockResolvedValue([
{ id: 'msg-1', content: 'Hello', role: 'user' },
{ id: 'msg-2', content: 'Response', role: 'assistant', tool_calls: [] },
{ id: 'tool-msg-1', content: 'Tool result 1', role: 'tool', tool_call_id: 'tool-call-1' },
{ id: 'tool-msg-2', content: 'Tool result 2', role: 'tool', tool_call_id: 'tool-call-2' },
]);
});
it('should execute multiple tools concurrently and create tool messages', async () => {
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
const instruction = {
payload: {
parentMessageId: 'assistant-msg-123',
toolsCalling: [
{
apiName: 'search',
arguments: '{"query": "test1"}',
id: 'tool-call-1',
identifier: 'web-search',
type: 'default' as const,
},
{
apiName: 'crawl',
arguments: '{"url": "https://example.com"}',
id: 'tool-call-2',
identifier: 'web-browsing',
type: 'default' as const,
},
],
},
type: 'call_tools_batch' as const,
};
await executors.call_tools_batch!(instruction, state);
// Should execute both tools
expect(mockToolExecutionService.executeTool).toHaveBeenCalledTimes(2);
// Should create two tool messages
expect(mockMessageModel.create).toHaveBeenCalledTimes(2);
// Verify first tool message
expect(mockMessageModel.create).toHaveBeenCalledWith(
expect.objectContaining({
agentId: 'agent-123',
parentId: 'assistant-msg-123',
role: 'tool',
tool_call_id: 'tool-call-1',
}),
);
// Verify second tool message
expect(mockMessageModel.create).toHaveBeenCalledWith(
expect.objectContaining({
agentId: 'agent-123',
parentId: 'assistant-msg-123',
role: 'tool',
tool_call_id: 'tool-call-2',
}),
);
});
it('should refresh messages from database after batch execution', async () => {
const executors = createRuntimeExecutors(ctx);
const state = createMockState({ messages: [{ content: 'old', role: 'user' }] });
const instruction = {
payload: {
parentMessageId: 'assistant-msg-123',
toolsCalling: [
{
apiName: 'search',
arguments: '{}',
id: 'tool-call-1',
identifier: 'web-search',
type: 'default' as const,
},
],
},
type: 'call_tools_batch' as const,
};
const result = await executors.call_tools_batch!(instruction, state);
// Should query messages from database
expect(mockMessageModel.query).toHaveBeenCalledWith({
topicId: 'topic-123',
});
// Messages should be refreshed from database (4 messages from mock)
expect(result.newState.messages).toHaveLength(4);
});
it('should include id in refreshed messages', async () => {
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
const instruction = {
payload: {
parentMessageId: 'assistant-msg-123',
toolsCalling: [
{
apiName: 'search',
arguments: '{}',
id: 'tool-call-1',
identifier: 'web-search',
type: 'default' as const,
},
],
},
type: 'call_tools_batch' as const,
};
const result = await executors.call_tools_batch!(instruction, state);
// Each message should have an id
result.newState.messages.forEach((msg: any) => {
expect(msg.id).toBeDefined();
expect(typeof msg.id).toBe('string');
});
// Verify specific message ids
expect(result.newState.messages[0].id).toBe('msg-1');
expect(result.newState.messages[2].id).toBe('tool-msg-1');
});
it('should return last tool message ID as parentMessageId in nextContext', async () => {
let callCount = 0;
mockMessageModel.create.mockImplementation(() => {
callCount++;
return Promise.resolve({ id: `created-tool-msg-${callCount}` });
});
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
const instruction = {
payload: {
parentMessageId: 'assistant-msg-123',
toolsCalling: [
{
apiName: 'search',
arguments: '{}',
id: 'tool-call-1',
identifier: 'web-search',
type: 'default' as const,
},
{
apiName: 'crawl',
arguments: '{}',
id: 'tool-call-2',
identifier: 'web-browsing',
type: 'default' as const,
},
],
},
type: 'call_tools_batch' as const,
};
const result = await executors.call_tools_batch!(instruction, state);
// parentMessageId should be the last created tool message ID
const payload = result.nextContext!.payload as { parentMessageId?: string };
expect(payload.parentMessageId).toBe('created-tool-msg-2');
expect(result.nextContext!.phase).toBe('tools_batch_result');
});
it('should fallback to original parentMessageId if no tool messages created', async () => {
// All tool message creations fail
mockMessageModel.create.mockRejectedValue(new Error('Database error'));
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
const instruction = {
payload: {
parentMessageId: 'original-parent-123',
toolsCalling: [
{
apiName: 'search',
arguments: '{}',
id: 'tool-call-1',
identifier: 'web-search',
type: 'default' as const,
},
],
},
type: 'call_tools_batch' as const,
};
const result = await executors.call_tools_batch!(instruction, state);
// Should fallback to original parentMessageId
const payload = result.nextContext!.payload as { parentMessageId?: string };
expect(payload.parentMessageId).toBe('original-parent-123');
});
it('should continue processing other tools if one tool execution fails', async () => {
// First tool fails, second succeeds
mockToolExecutionService.executeTool
.mockRejectedValueOnce(new Error('Tool execution error'))
.mockResolvedValueOnce({
content: 'Tool result 2',
error: null,
executionTime: 100,
state: {},
success: true,
});
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
const instruction = {
payload: {
parentMessageId: 'assistant-msg-123',
toolsCalling: [
{
apiName: 'search',
arguments: '{}',
id: 'tool-call-1',
identifier: 'web-search',
type: 'default' as const,
},
{
apiName: 'crawl',
arguments: '{}',
id: 'tool-call-2',
identifier: 'web-browsing',
type: 'default' as const,
},
],
},
type: 'call_tools_batch' as const,
};
const result = await executors.call_tools_batch!(instruction, state);
// Both tools should be attempted
expect(mockToolExecutionService.executeTool).toHaveBeenCalledTimes(2);
// Only one tool message should be created (for the successful tool)
expect(mockMessageModel.create).toHaveBeenCalledTimes(1);
// Should still return result (not throw)
expect(result.nextContext).toBeDefined();
expect(result.nextContext!.phase).toBe('tools_batch_result');
});
it('should continue if tool message creation fails for one tool', async () => {
// First message creation succeeds, second fails
mockMessageModel.create
.mockResolvedValueOnce({ id: 'tool-msg-1' })
.mockRejectedValueOnce(new Error('Database error'));
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
const instruction = {
payload: {
parentMessageId: 'assistant-msg-123',
toolsCalling: [
{
apiName: 'search',
arguments: '{}',
id: 'tool-call-1',
identifier: 'web-search',
type: 'default' as const,
},
{
apiName: 'crawl',
arguments: '{}',
id: 'tool-call-2',
identifier: 'web-browsing',
type: 'default' as const,
},
],
},
type: 'call_tools_batch' as const,
};
const result = await executors.call_tools_batch!(instruction, state);
// Both tools should be executed
expect(mockToolExecutionService.executeTool).toHaveBeenCalledTimes(2);
// Should still return result
expect(result.nextContext).toBeDefined();
// parentMessageId should be the first successful tool message
const payload = result.nextContext!.payload as { parentMessageId?: string };
expect(payload.parentMessageId).toBe('tool-msg-1');
});
it('should publish tool_start and tool_end events for each tool', async () => {
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
const instruction = {
payload: {
parentMessageId: 'assistant-msg-123',
toolsCalling: [
{
apiName: 'search',
arguments: '{}',
id: 'tool-call-1',
identifier: 'web-search',
type: 'default' as const,
},
{
apiName: 'crawl',
arguments: '{}',
id: 'tool-call-2',
identifier: 'web-browsing',
type: 'default' as const,
},
],
},
type: 'call_tools_batch' as const,
};
await executors.call_tools_batch!(instruction, state);
// Should publish tool_start for each tool
expect(mockStreamManager.publishStreamEvent).toHaveBeenCalledWith(
'op-123',
expect.objectContaining({ type: 'tool_start' }),
);
// Should publish tool_end for each tool
expect(mockStreamManager.publishStreamEvent).toHaveBeenCalledWith(
'op-123',
expect.objectContaining({ type: 'tool_end' }),
);
// At least 4 events (2 tool_start + 2 tool_end)
expect(mockStreamManager.publishStreamEvent.mock.calls.length).toBeGreaterThanOrEqual(4);
});
it('should include toolCount and toolResults in nextContext payload', async () => {
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
const instruction = {
payload: {
parentMessageId: 'assistant-msg-123',
toolsCalling: [
{
apiName: 'search',
arguments: '{}',
id: 'tool-call-1',
identifier: 'web-search',
type: 'default' as const,
},
{
apiName: 'crawl',
arguments: '{}',
id: 'tool-call-2',
identifier: 'web-browsing',
type: 'default' as const,
},
],
},
type: 'call_tools_batch' as const,
};
const result = await executors.call_tools_batch!(instruction, state);
const payload = result.nextContext!.payload as {
toolCount: number;
toolResults: any[];
};
expect(payload.toolCount).toBe(2);
expect(payload.toolResults).toHaveLength(2);
expect(payload.toolResults[0]).toEqual(
expect.objectContaining({
toolCallId: 'tool-call-1',
isSuccess: true,
}),
);
});
});
describe('resolve_aborted_tools executor', () => {
const createMockState = (overrides?: Partial<AgentState>): AgentState => ({
cost: createMockCost(),