From ddbe661642f21c4e3f2e4b3e2d5dfd241abe93f9 Mon Sep 17 00:00:00 2001 From: Arvin Xu Date: Tue, 20 Jan 2026 20:12:52 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20fix(model-runtime):=20fix=20Qwen?= =?UTF-8?q?=20parallel=20tool=20calls=20arguments=20incorrectly=20merged?= =?UTF-8?q?=20(#11649)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 🐛 fix(model-runtime): fix Qwen parallel tool calls arguments incorrectly merged When using Qwen model with parallel tool calls, arguments from different tool calls were incorrectly merged into the first tool call. Root cause: `streamContext.tool` only stored ONE tool's info, but parallel calls have multiple tools. When subsequent chunks lacked `id` field, they all used the first tool's id as fallback. Fix: Use `streamContext.tools` (a map by index) instead of `streamContext.tool` to correctly track multiple parallel tool calls. Fixes LOBE-3903 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * ♻️ refactor(test): use array-based chunks in parallel tool calls test Refactor test to define all chunks as an array and use forEach to enqueue, improving code clarity and maintainability. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * ✅ test: add SSE protocol output verification for parallel tool calls Verify streaming chunks output format with standard SSE protocol assertions, ensuring each tool call chunk has correct id based on its index. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --------- Co-authored-by: Claude --- .../src/core/streams/qwen.test.ts | 320 ++++++++++++++++++ .../model-runtime/src/core/streams/qwen.ts | 29 +- 2 files changed, 339 insertions(+), 10 deletions(-) diff --git a/packages/model-runtime/src/core/streams/qwen.test.ts b/packages/model-runtime/src/core/streams/qwen.test.ts index 923282cf66..4da2aa735e 100644 --- a/packages/model-runtime/src/core/streams/qwen.test.ts +++ b/packages/model-runtime/src/core/streams/qwen.test.ts @@ -621,6 +621,326 @@ describe('QwenAIStream', () => { }); }); +// Test case for parallel tool calls bug (LOBE-3903) +// This test reproduces the issue where Qwen model returns 3 parallel tool calls +// for querying time in Beijing, Shanghai, and Nanjing simultaneously. +// The bug causes arguments from different tool calls to be incorrectly merged. +describe('parallel tool calls streaming bug', () => { + it('should handle 3 parallel tool calls with incremental arguments (Qwen qwen3-max behavior)', async () => { + // This test simulates the exact stream pattern from the bug report: + // User asks: "查一下北京、上海、南京的时间,同时调用3次mcp" + // Model returns 3 parallel tool calls with index 0, 1, 2 + // Subsequent chunks contain arguments without id field, only index + const streamId = 'chatcmpl-23f324a2-059f-9ab4-b7b3-f47bcba5ebf7'; + + // Define all chunks as an array for clarity and maintainability + const chunks = [ + // Chunk 0: First tool call starts (index=0) + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { + role: 'assistant', + tool_calls: [ + { + id: 'call_c7d8b4984a4d4f54a4956bca', + type: 'function', + function: { name: 'time____get_time____mcp', arguments: '' }, + index: 0, + }, + ], + }, + finish_reason: null, + }, + ], + }, + // Chunk 1: First tool call continues with empty arguments + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { + tool_calls: [ + { + id: 'call_c7d8b4984a4d4f54a4956bca', + type: 'function', + function: { arguments: '' }, + index: 0, + }, + ], + }, + finish_reason: null, + }, + ], + }, + // Chunk 2: First tool call arguments part 1 (北京) + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { + tool_calls: [ + { type: 'function', function: { arguments: '{"location": "北京' }, index: 0 }, + ], + }, + finish_reason: null, + }, + ], + }, + // Chunk 3: First tool call arguments part 2 + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { tool_calls: [{ type: 'function', function: { arguments: '"}' }, index: 0 }] }, + finish_reason: null, + }, + ], + }, + // Chunk 4: Empty arguments for first tool call + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { tool_calls: [{ type: 'function', function: { arguments: '' }, index: 0 }] }, + finish_reason: null, + }, + ], + }, + // Chunk 5: Second tool call starts (index=1) - 上海 + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { + tool_calls: [ + { + id: 'call_f564785a14534d9a8c5ee641', + type: 'function', + function: { name: 'time____get_time____mcp', arguments: '' }, + index: 1, + }, + ], + }, + finish_reason: null, + }, + ], + }, + // Chunk 6: Second tool call arguments (上海) + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { + tool_calls: [ + { type: 'function', function: { arguments: '{"location": "上海' }, index: 1 }, + ], + }, + finish_reason: null, + }, + ], + }, + // Chunk 7: Second tool call arguments part 2 + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { tool_calls: [{ type: 'function', function: { arguments: '"}' }, index: 1 }] }, + finish_reason: null, + }, + ], + }, + // Chunk 8: Third tool call starts (index=2) - 南京 + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { + tool_calls: [ + { + id: 'call_19693813aebd434aab821f06', + type: 'function', + function: { name: 'time____get_time____mcp', arguments: '{"location": "' }, + index: 2, + }, + ], + }, + finish_reason: null, + }, + ], + }, + // Chunk 9: Third tool call arguments (南京) + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { + tool_calls: [{ type: 'function', function: { arguments: '南京"' }, index: 2 }], + }, + finish_reason: null, + }, + ], + }, + // Chunk 10: Third tool call arguments final + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [ + { + index: 0, + delta: { tool_calls: [{ type: 'function', function: { arguments: '}' }, index: 2 }] }, + finish_reason: null, + }, + ], + }, + // Chunk 11: Finish + { + id: streamId, + object: 'chat.completion.chunk', + created: 1768906556, + model: 'qwen3-max', + choices: [{ index: 0, delta: {}, finish_reason: 'tool_calls' }], + }, + ]; + + const mockOpenAIStream = new ReadableStream({ + start(controller) { + chunks.forEach((chunk) => controller.enqueue(chunk)); + controller.close(); + }, + }); + + let aggregatedToolCalls: any[] = []; + + const protocolStream = QwenAIStream(mockOpenAIStream, { + callbacks: { + onToolsCalling: ({ toolsCalling }) => { + aggregatedToolCalls = toolsCalling; + }, + }, + }); + + const decoder = new TextDecoder(); + const outputChunks: string[] = []; + + // @ts-ignore + for await (const chunk of protocolStream) { + outputChunks.push(decoder.decode(chunk, { stream: true })); + } + + // Verify streaming chunks output format (SSE protocol) + // Each tool call chunk should have correct id based on its index + expect(outputChunks).toEqual([ + // Chunk 0: First tool call starts (index=0) + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"","name":"time____get_time____mcp"},"id":"call_c7d8b4984a4d4f54a4956bca","index":0,"type":"function"}]\n\n', + // Chunk 1: First tool call continues + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"","name":null},"id":"call_c7d8b4984a4d4f54a4956bca","index":0,"type":"function"}]\n\n', + // Chunk 2: First tool call arguments part 1 + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"{\\"location\\": \\"北京","name":null},"id":"call_c7d8b4984a4d4f54a4956bca","index":0,"type":"function"}]\n\n', + // Chunk 3: First tool call arguments part 2 + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"\\"}","name":null},"id":"call_c7d8b4984a4d4f54a4956bca","index":0,"type":"function"}]\n\n', + // Chunk 4: Empty arguments + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"","name":null},"id":"call_c7d8b4984a4d4f54a4956bca","index":0,"type":"function"}]\n\n', + // Chunk 5: Second tool call starts (index=1) - should have its own id + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"","name":"time____get_time____mcp"},"id":"call_f564785a14534d9a8c5ee641","index":1,"type":"function"}]\n\n', + // Chunk 6: Second tool call arguments - should use index=1's stored id + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"{\\"location\\": \\"上海","name":null},"id":"call_f564785a14534d9a8c5ee641","index":1,"type":"function"}]\n\n', + // Chunk 7: Second tool call arguments part 2 + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"\\"}","name":null},"id":"call_f564785a14534d9a8c5ee641","index":1,"type":"function"}]\n\n', + // Chunk 8: Third tool call starts (index=2) + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"{\\"location\\": \\"","name":"time____get_time____mcp"},"id":"call_19693813aebd434aab821f06","index":2,"type":"function"}]\n\n', + // Chunk 9: Third tool call arguments - should use index=2's stored id + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"南京\\"","name":null},"id":"call_19693813aebd434aab821f06","index":2,"type":"function"}]\n\n', + // Chunk 10: Third tool call arguments final + `id: ${streamId}\n`, + 'event: tool_calls\n', + 'data: [{"function":{"arguments":"}","name":null},"id":"call_19693813aebd434aab821f06","index":2,"type":"function"}]\n\n', + // Chunk 11: Finish + `id: ${streamId}\n`, + 'event: stop\n', + 'data: "tool_calls"\n\n', + ]); + + // Verify aggregated tool calls have correct arguments (not merged incorrectly) + expect(aggregatedToolCalls).toHaveLength(3); + expect(aggregatedToolCalls[0]).toMatchObject({ + id: 'call_c7d8b4984a4d4f54a4956bca', + function: { name: 'time____get_time____mcp', arguments: '{"location": "北京"}' }, + }); + expect(aggregatedToolCalls[1]).toMatchObject({ + id: 'call_f564785a14534d9a8c5ee641', + function: { name: 'time____get_time____mcp', arguments: '{"location": "上海"}' }, + }); + expect(aggregatedToolCalls[2]).toMatchObject({ + id: 'call_19693813aebd434aab821f06', + function: { name: 'time____get_time____mcp', arguments: '{"location": "南京"}' }, + }); + }); +}); + describe('transformQwenStream', () => { it('should handle usage chunk', () => { const mockChunk: OpenAI.ChatCompletionChunk = { diff --git a/packages/model-runtime/src/core/streams/qwen.ts b/packages/model-runtime/src/core/streams/qwen.ts index 733e5f4543..4e9d5c2ece 100644 --- a/packages/model-runtime/src/core/streams/qwen.ts +++ b/packages/model-runtime/src/core/streams/qwen.ts @@ -70,17 +70,26 @@ export const transformQwenStream = ( if (item.delta?.tool_calls) { return { - data: item.delta.tool_calls.map((value, index): StreamToolCallChunkData => { - // Store first tool call's info in streamContext for subsequent chunks - // (similar pattern to OpenAI stream handling) - if (streamContext && !streamContext.tool && value.id && value.function?.name) { - streamContext.tool = { + data: item.delta.tool_calls.map((value, arrayIndex): StreamToolCallChunkData => { + // Get the actual tool index from the chunk, fallback to array position + const toolIndex = typeof value.index !== 'undefined' ? value.index : arrayIndex; + + // Store tool call info in streamContext.tools map by index for parallel tool calls + // This allows us to correctly track multiple tools being called in parallel + if (streamContext && value.id && value.function?.name) { + if (!streamContext.tools) { + streamContext.tools = {}; + } + streamContext.tools[toolIndex] = { id: value.id, - index: typeof value.index !== 'undefined' ? value.index : index, + index: toolIndex, name: value.function.name, }; } + // Get stored tool info for this index (for incremental chunks without id) + const storedTool = streamContext?.tools?.[toolIndex]; + return { // Qwen models may send tool_calls in two separate chunks: // 1. First chunk: {id, name} without arguments @@ -91,10 +100,10 @@ export const transformQwenStream = ( arguments: value.function?.arguments ?? '', name: value.function?.name ?? null, }, - // For incremental chunks without id, use the stored tool id from streamContext - id: - value.id || streamContext?.tool?.id || generateToolCallId(index, value.function?.name), - index: typeof value.index !== 'undefined' ? value.index : index, + // For incremental chunks without id, use the stored tool id from streamContext.tools + // based on the tool index to correctly associate arguments with the right tool call + id: value.id || storedTool?.id || generateToolCallId(toolIndex, value.function?.name), + index: toolIndex, type: value.type || 'function', }; }),