diff --git a/packages/model-runtime/src/core/streams/qwen.test.ts b/packages/model-runtime/src/core/streams/qwen.test.ts index 923282cf66..4da2aa735e 100644 --- a/packages/model-runtime/src/core/streams/qwen.test.ts +++ b/packages/model-runtime/src/core/streams/qwen.test.ts @@ -621,6 +621,326 @@ describe('QwenAIStream', () => { }); }); +// Test case for parallel tool calls bug (LOBE-3903) +// This test reproduces the issue where Qwen model returns 3 parallel tool calls +// for querying time in Beijing, Shanghai, and Nanjing simultaneously. +// The bug causes arguments from different tool calls to be incorrectly merged. +describe('parallel tool calls streaming bug', () => { + it('should handle 3 parallel tool calls with incremental arguments (Qwen qwen3-max behavior)', async () => { + // This test simulates the exact stream pattern from the bug report: + // User asks: "查一下北京、上海、南京的时间,同时调用3次mcp" + // Model returns 3 parallel tool calls with index 0, 1, 2 + // Subsequent chunks contain arguments without id field, only index + const streamId = 'chatcmpl-23f324a2-059f-9ab4-b7b3-f47bcba5ebf7'; + + // Define all chunks as an array for clarity and maintainability + const chunks = [ + // Chunk 0: First tool call starts (index=0) + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { + role: 'assistant', + tool_calls: [ + { + id: 'call_c7d8b4984a4d4f54a4956bca', + type: 'function', + function: { name: 'time____get_time____mcp', arguments: '' }, + index: 0, + }, + ], + }, + finish_reason: null, + }, + ], + }, + // Chunk 1: First tool call continues with empty arguments + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { + tool_calls: [ + { + id: 'call_c7d8b4984a4d4f54a4956bca', + type: 'function', + function: { arguments: '' }, + index: 0, + }, + ], + }, + finish_reason: null, + }, + ], + }, + // Chunk 2: First tool call arguments part 1 (北京) + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { + tool_calls: [ + { type: 'function', function: { arguments: '{"location": "北京' }, index: 0 }, + ], + }, + finish_reason: null, + }, + ], + }, + // Chunk 3: First tool call arguments part 2 + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { tool_calls: [{ type: 'function', function: { arguments: '"}' }, index: 0 }] }, + finish_reason: null, + }, + ], + }, + // Chunk 4: Empty arguments for first tool call + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { tool_calls: [{ type: 'function', function: { arguments: '' }, index: 0 }] }, + finish_reason: null, + }, + ], + }, + // Chunk 5: Second tool call starts (index=1) - 上海 + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { + tool_calls: [ + { + id: 'call_f564785a14534d9a8c5ee641', + type: 'function', + function: { name: 'time____get_time____mcp', arguments: '' }, + index: 1, + }, + ], + }, + finish_reason: null, + }, + ], + }, + // Chunk 6: Second tool call arguments (上海) + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { + tool_calls: [ + { type: 'function', function: { arguments: '{"location": "上海' }, index: 1 }, + ], + }, + finish_reason: null, + }, + ], + }, + // Chunk 7: Second tool call arguments part 2 + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { tool_calls: [{ type: 'function', function: { arguments: '"}' }, index: 1 }] }, + finish_reason: null, + }, + ], + }, + // Chunk 8: Third tool call starts (index=2) - 南京 + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { + tool_calls: [ + { + id: 'call_19693813aebd434aab821f06', + type: 'function', + function: { name: 'time____get_time____mcp', arguments: '{"location": "' }, + index: 2, + }, + ], + }, + finish_reason: null, + }, + ], + }, + // Chunk 9: Third tool call arguments (南京) + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { + tool_calls: [{ type: 'function', function: { arguments: '南京"' }, index: 2 }], + }, + finish_reason: null, + }, + ], + }, + // Chunk 10: Third tool call arguments final + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { tool_calls: [{ type: 'function', function: { arguments: '}' }, index: 2 }] }, + finish_reason: null, + }, + ], + }, + // Chunk 11: Finish + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [{ index: 0, delta: {}, finish_reason: 'tool_calls' }], + }, + ]; + + const mockOpenAIStream = new ReadableStream({ + start(controller) { + chunks.forEach((chunk) => controller.enqueue(chunk)); + controller.close(); + }, + }); + + let aggregatedToolCalls: any[] = []; + + const protocolStream = QwenAIStream(mockOpenAIStream, { + callbacks: { + onToolsCalling: ({ toolsCalling }) => { + aggregatedToolCalls = toolsCalling; + }, + }, + }); + + const decoder = new TextDecoder(); + const outputChunks: string[] = []; + + // @ts-ignore + for await (const chunk of protocolStream) { + outputChunks.push(decoder.decode(chunk, { stream: true })); + } + + // Verify streaming chunks output format (SSE protocol) + // Each tool call chunk should have correct id based on its index + expect(outputChunks).toEqual([ + // Chunk 0: First tool call starts (index=0) + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"","name":"time____get_time____mcp"},"id":"call_c7d8b4984a4d4f54a4956bca","index":0,"type":"function"}]\n\n', + // Chunk 1: First tool call continues + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"","name":null},"id":"call_c7d8b4984a4d4f54a4956bca","index":0,"type":"function"}]\n\n', + // Chunk 2: First tool call arguments part 1 + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"{\\"location\\": \\"北京","name":null},"id":"call_c7d8b4984a4d4f54a4956bca","index":0,"type":"function"}]\n\n', + // Chunk 3: First tool call arguments part 2 + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"\\"}","name":null},"id":"call_c7d8b4984a4d4f54a4956bca","index":0,"type":"function"}]\n\n', + // Chunk 4: Empty arguments + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"","name":null},"id":"call_c7d8b4984a4d4f54a4956bca","index":0,"type":"function"}]\n\n', + // Chunk 5: Second tool call starts (index=1) - should have its own id + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"","name":"time____get_time____mcp"},"id":"call_f564785a14534d9a8c5ee641","index":1,"type":"function"}]\n\n', + // Chunk 6: Second tool call arguments - should use index=1's stored id + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"{\\"location\\": \\"上海","name":null},"id":"call_f564785a14534d9a8c5ee641","index":1,"type":"function"}]\n\n', + // Chunk 7: Second tool call arguments part 2 + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"\\"}","name":null},"id":"call_f564785a14534d9a8c5ee641","index":1,"type":"function"}]\n\n', + // Chunk 8: Third tool call starts (index=2) + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"{\\"location\\": \\"","name":"time____get_time____mcp"},"id":"call_19693813aebd434aab821f06","index":2,"type":"function"}]\n\n', + // Chunk 9: Third tool call arguments - should use index=2's stored id + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"南京\\"","name":null},"id":"call_19693813aebd434aab821f06","index":2,"type":"function"}]\n\n', + // Chunk 10: Third tool call arguments final + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"}","name":null},"id":"call_19693813aebd434aab821f06","index":2,"type":"function"}]\n\n', + // Chunk 11: Finish + `id: ${streamId}\n`, + 'event: stop\n', + 'data: "tool_calls"\n\n', + ]); + + // Verify aggregated tool calls have correct arguments (not merged incorrectly) + expect(aggregatedToolCalls).toHaveLength(3); + expect(aggregatedToolCalls[0]).toMatchObject({ + id: 'call_c7d8b4984a4d4f54a4956bca', + function: { name: 'time____get_time____mcp', arguments: '{"location": "北京"}' }, + }); + expect(aggregatedToolCalls[1]).toMatchObject({ + id: 'call_f564785a14534d9a8c5ee641', + function: { name: 'time____get_time____mcp', arguments: '{"location": "上海"}' }, + }); + expect(aggregatedToolCalls[2]).toMatchObject({ + id: 'call_19693813aebd434aab821f06', + function: { name: 'time____get_time____mcp', arguments: '{"location": "南京"}' }, + }); + }); +}); + describe('transformQwenStream', () => { it('should handle usage chunk', () => { const mockChunk: OpenAI.ChatCompletionChunk = { diff --git a/packages/model-runtime/src/core/streams/qwen.ts b/packages/model-runtime/src/core/streams/qwen.ts index 733e5f4543..4e9d5c2ece 100644 --- a/packages/model-runtime/src/core/streams/qwen.ts +++ b/packages/model-runtime/src/core/streams/qwen.ts @@ -70,17 +70,26 @@ export const transformQwenStream = ( if (item.delta?.tool_calls) { return { - data: item.delta.tool_calls.map((value, index): StreamToolCallChunkData => { - // Store first tool call's info in streamContext for subsequent chunks - // (similar pattern to OpenAI stream handling) - if (streamContext && !streamContext.tool && value.id && value.function?.name) { - streamContext.tool = { + data: item.delta.tool_calls.map((value, arrayIndex): StreamToolCallChunkData => { + // Get the actual tool index from the chunk, fallback to array position + const toolIndex = typeof value.index !== 'undefined' ? value.index : arrayIndex; + + // Store tool call info in streamContext.tools map by index for parallel tool calls + // This allows us to correctly track multiple tools being called in parallel + if (streamContext && value.id && value.function?.name) { + if (!streamContext.tools) { + streamContext.tools = {}; + } + streamContext.tools[toolIndex] = { id: value.id, - index: typeof value.index !== 'undefined' ? value.index : index, + index: toolIndex, name: value.function.name, }; } + // Get stored tool info for this index (for incremental chunks without id) + const storedTool = streamContext?.tools?.[toolIndex]; + return { // Qwen models may send tool_calls in two separate chunks: // 1. First chunk: {id, name} without arguments @@ -91,10 +100,10 @@ export const transformQwenStream = ( arguments: value.function?.arguments ?? '', name: value.function?.name ?? null, }, - // For incremental chunks without id, use the stored tool id from streamContext - id: - value.id || streamContext?.tool?.id || generateToolCallId(index, value.function?.name), - index: typeof value.index !== 'undefined' ? value.index : index, + // For incremental chunks without id, use the stored tool id from streamContext.tools + // based on the tool index to correctly associate arguments with the right tool call + id: value.id || storedTool?.id || generateToolCallId(toolIndex, value.function?.name), + index: toolIndex, type: value.type || 'function', }; }),