🐛 fix(model-runtime): fix Qwen parallel tool calls arguments incorrectly merged (#11649)

* 🐛 fix(model-runtime): fix Qwen parallel tool calls arguments incorrectly merged

When using Qwen model with parallel tool calls, arguments from different
tool calls were incorrectly merged into the first tool call.

Root cause: `streamContext.tool` only stored ONE tool's info, but parallel
calls have multiple tools. When subsequent chunks lacked `id` field, they
all used the first tool's id as fallback.

Fix: Use `streamContext.tools` (a map by index) instead of `streamContext.tool`
to correctly track multiple parallel tool calls.

Fixes LOBE-3903

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* ♻️ refactor(test): use array-based chunks in parallel tool calls test

Refactor test to define all chunks as an array and use forEach to enqueue,
improving code clarity and maintainability.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

*  test: add SSE protocol output verification for parallel tool calls

Verify streaming chunks output format with standard SSE protocol assertions,
ensuring each tool call chunk has correct id based on its index.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Arvin Xu
2026-01-20 20:12:52 +08:00
committed by GitHub
parent 5074fbef2c
commit ddbe661642
2 changed files with 339 additions and 10 deletions

View File

@@ -621,6 +621,326 @@ describe('QwenAIStream', () => {
});
});
// Test case for parallel tool calls bug (LOBE-3903)
// This test reproduces the issue where Qwen model returns 3 parallel tool calls
// for querying time in Beijing, Shanghai, and Nanjing simultaneously.
// The bug causes arguments from different tool calls to be incorrectly merged.
describe('parallel tool calls streaming bug', () => {
it('should handle 3 parallel tool calls with incremental arguments (Qwen qwen3-max behavior)', async () => {
// This test simulates the exact stream pattern from the bug report:
// User asks: "查一下北京、上海、南京的时间同时调用3次mcp"
// Model returns 3 parallel tool calls with index 0, 1, 2
// Subsequent chunks contain arguments without id field, only index
const streamId = 'chatcmpl-23f324a2-059f-9ab4-b7b3-f47bcba5ebf7';
// Define all chunks as an array for clarity and maintainability
const chunks = [
// Chunk 0: First tool call starts (index=0)
{
id: streamId,
object: 'chat.completion.chunk',
created: 1768906556,
model: 'qwen3-max',
choices: [
{
index: 0,
delta: {
role: 'assistant',
tool_calls: [
{
id: 'call_c7d8b4984a4d4f54a4956bca',
type: 'function',
function: { name: 'time____get_time____mcp', arguments: '' },
index: 0,
},
],
},
finish_reason: null,
},
],
},
// Chunk 1: First tool call continues with empty arguments
{
id: streamId,
object: 'chat.completion.chunk',
created: 1768906556,
model: 'qwen3-max',
choices: [
{
index: 0,
delta: {
tool_calls: [
{
id: 'call_c7d8b4984a4d4f54a4956bca',
type: 'function',
function: { arguments: '' },
index: 0,
},
],
},
finish_reason: null,
},
],
},
// Chunk 2: First tool call arguments part 1 (北京)
{
id: streamId,
object: 'chat.completion.chunk',
created: 1768906556,
model: 'qwen3-max',
choices: [
{
index: 0,
delta: {
tool_calls: [
{ type: 'function', function: { arguments: '{"location": "北京' }, index: 0 },
],
},
finish_reason: null,
},
],
},
// Chunk 3: First tool call arguments part 2
{
id: streamId,
object: 'chat.completion.chunk',
created: 1768906556,
model: 'qwen3-max',
choices: [
{
index: 0,
delta: { tool_calls: [{ type: 'function', function: { arguments: '"}' }, index: 0 }] },
finish_reason: null,
},
],
},
// Chunk 4: Empty arguments for first tool call
{
id: streamId,
object: 'chat.completion.chunk',
created: 1768906556,
model: 'qwen3-max',
choices: [
{
index: 0,
delta: { tool_calls: [{ type: 'function', function: { arguments: '' }, index: 0 }] },
finish_reason: null,
},
],
},
// Chunk 5: Second tool call starts (index=1) - 上海
{
id: streamId,
object: 'chat.completion.chunk',
created: 1768906556,
model: 'qwen3-max',
choices: [
{
index: 0,
delta: {
tool_calls: [
{
id: 'call_f564785a14534d9a8c5ee641',
type: 'function',
function: { name: 'time____get_time____mcp', arguments: '' },
index: 1,
},
],
},
finish_reason: null,
},
],
},
// Chunk 6: Second tool call arguments (上海)
{
id: streamId,
object: 'chat.completion.chunk',
created: 1768906556,
model: 'qwen3-max',
choices: [
{
index: 0,
delta: {
tool_calls: [
{ type: 'function', function: { arguments: '{"location": "上海' }, index: 1 },
],
},
finish_reason: null,
},
],
},
// Chunk 7: Second tool call arguments part 2
{
id: streamId,
object: 'chat.completion.chunk',
created: 1768906556,
model: 'qwen3-max',
choices: [
{
index: 0,
delta: { tool_calls: [{ type: 'function', function: { arguments: '"}' }, index: 1 }] },
finish_reason: null,
},
],
},
// Chunk 8: Third tool call starts (index=2) - 南京
{
id: streamId,
object: 'chat.completion.chunk',
created: 1768906556,
model: 'qwen3-max',
choices: [
{
index: 0,
delta: {
tool_calls: [
{
id: 'call_19693813aebd434aab821f06',
type: 'function',
function: { name: 'time____get_time____mcp', arguments: '{"location": "' },
index: 2,
},
],
},
finish_reason: null,
},
],
},
// Chunk 9: Third tool call arguments (南京)
{
id: streamId,
object: 'chat.completion.chunk',
created: 1768906556,
model: 'qwen3-max',
choices: [
{
index: 0,
delta: {
tool_calls: [{ type: 'function', function: { arguments: '南京"' }, index: 2 }],
},
finish_reason: null,
},
],
},
// Chunk 10: Third tool call arguments final
{
id: streamId,
object: 'chat.completion.chunk',
created: 1768906556,
model: 'qwen3-max',
choices: [
{
index: 0,
delta: { tool_calls: [{ type: 'function', function: { arguments: '}' }, index: 2 }] },
finish_reason: null,
},
],
},
// Chunk 11: Finish
{
id: streamId,
object: 'chat.completion.chunk',
created: 1768906556,
model: 'qwen3-max',
choices: [{ index: 0, delta: {}, finish_reason: 'tool_calls' }],
},
];
const mockOpenAIStream = new ReadableStream({
start(controller) {
chunks.forEach((chunk) => controller.enqueue(chunk));
controller.close();
},
});
let aggregatedToolCalls: any[] = [];
const protocolStream = QwenAIStream(mockOpenAIStream, {
callbacks: {
onToolsCalling: ({ toolsCalling }) => {
aggregatedToolCalls = toolsCalling;
},
},
});
const decoder = new TextDecoder();
const outputChunks: string[] = [];
// @ts-ignore
for await (const chunk of protocolStream) {
outputChunks.push(decoder.decode(chunk, { stream: true }));
}
// Verify streaming chunks output format (SSE protocol)
// Each tool call chunk should have correct id based on its index
expect(outputChunks).toEqual([
// Chunk 0: First tool call starts (index=0)
`id: ${streamId}\n`,
'event: tool_calls\n',
'data: [{"function":{"arguments":"","name":"time____get_time____mcp"},"id":"call_c7d8b4984a4d4f54a4956bca","index":0,"type":"function"}]\n\n',
// Chunk 1: First tool call continues
`id: ${streamId}\n`,
'event: tool_calls\n',
'data: [{"function":{"arguments":"","name":null},"id":"call_c7d8b4984a4d4f54a4956bca","index":0,"type":"function"}]\n\n',
// Chunk 2: First tool call arguments part 1
`id: ${streamId}\n`,
'event: tool_calls\n',
'data: [{"function":{"arguments":"{\\"location\\": \\"北京","name":null},"id":"call_c7d8b4984a4d4f54a4956bca","index":0,"type":"function"}]\n\n',
// Chunk 3: First tool call arguments part 2
`id: ${streamId}\n`,
'event: tool_calls\n',
'data: [{"function":{"arguments":"\\"}","name":null},"id":"call_c7d8b4984a4d4f54a4956bca","index":0,"type":"function"}]\n\n',
// Chunk 4: Empty arguments
`id: ${streamId}\n`,
'event: tool_calls\n',
'data: [{"function":{"arguments":"","name":null},"id":"call_c7d8b4984a4d4f54a4956bca","index":0,"type":"function"}]\n\n',
// Chunk 5: Second tool call starts (index=1) - should have its own id
`id: ${streamId}\n`,
'event: tool_calls\n',
'data: [{"function":{"arguments":"","name":"time____get_time____mcp"},"id":"call_f564785a14534d9a8c5ee641","index":1,"type":"function"}]\n\n',
// Chunk 6: Second tool call arguments - should use index=1's stored id
`id: ${streamId}\n`,
'event: tool_calls\n',
'data: [{"function":{"arguments":"{\\"location\\": \\"上海","name":null},"id":"call_f564785a14534d9a8c5ee641","index":1,"type":"function"}]\n\n',
// Chunk 7: Second tool call arguments part 2
`id: ${streamId}\n`,
'event: tool_calls\n',
'data: [{"function":{"arguments":"\\"}","name":null},"id":"call_f564785a14534d9a8c5ee641","index":1,"type":"function"}]\n\n',
// Chunk 8: Third tool call starts (index=2)
`id: ${streamId}\n`,
'event: tool_calls\n',
'data: [{"function":{"arguments":"{\\"location\\": \\"","name":"time____get_time____mcp"},"id":"call_19693813aebd434aab821f06","index":2,"type":"function"}]\n\n',
// Chunk 9: Third tool call arguments - should use index=2's stored id
`id: ${streamId}\n`,
'event: tool_calls\n',
'data: [{"function":{"arguments":"南京\\"","name":null},"id":"call_19693813aebd434aab821f06","index":2,"type":"function"}]\n\n',
// Chunk 10: Third tool call arguments final
`id: ${streamId}\n`,
'event: tool_calls\n',
'data: [{"function":{"arguments":"}","name":null},"id":"call_19693813aebd434aab821f06","index":2,"type":"function"}]\n\n',
// Chunk 11: Finish
`id: ${streamId}\n`,
'event: stop\n',
'data: "tool_calls"\n\n',
]);
// Verify aggregated tool calls have correct arguments (not merged incorrectly)
expect(aggregatedToolCalls).toHaveLength(3);
expect(aggregatedToolCalls[0]).toMatchObject({
id: 'call_c7d8b4984a4d4f54a4956bca',
function: { name: 'time____get_time____mcp', arguments: '{"location": "北京"}' },
});
expect(aggregatedToolCalls[1]).toMatchObject({
id: 'call_f564785a14534d9a8c5ee641',
function: { name: 'time____get_time____mcp', arguments: '{"location": "上海"}' },
});
expect(aggregatedToolCalls[2]).toMatchObject({
id: 'call_19693813aebd434aab821f06',
function: { name: 'time____get_time____mcp', arguments: '{"location": "南京"}' },
});
});
});
describe('transformQwenStream', () => {
it('should handle usage chunk', () => {
const mockChunk: OpenAI.ChatCompletionChunk = {

View File

@@ -70,17 +70,26 @@ export const transformQwenStream = (
if (item.delta?.tool_calls) {
return {
data: item.delta.tool_calls.map((value, index): StreamToolCallChunkData => {
// Store first tool call's info in streamContext for subsequent chunks
// (similar pattern to OpenAI stream handling)
if (streamContext && !streamContext.tool && value.id && value.function?.name) {
streamContext.tool = {
data: item.delta.tool_calls.map((value, arrayIndex): StreamToolCallChunkData => {
// Get the actual tool index from the chunk, fallback to array position
const toolIndex = typeof value.index !== 'undefined' ? value.index : arrayIndex;
// Store tool call info in streamContext.tools map by index for parallel tool calls
// This allows us to correctly track multiple tools being called in parallel
if (streamContext && value.id && value.function?.name) {
if (!streamContext.tools) {
streamContext.tools = {};
}
streamContext.tools[toolIndex] = {
id: value.id,
index: typeof value.index !== 'undefined' ? value.index : index,
index: toolIndex,
name: value.function.name,
};
}
// Get stored tool info for this index (for incremental chunks without id)
const storedTool = streamContext?.tools?.[toolIndex];
return {
// Qwen models may send tool_calls in two separate chunks:
// 1. First chunk: {id, name} without arguments
@@ -91,10 +100,10 @@ export const transformQwenStream = (
arguments: value.function?.arguments ?? '',
name: value.function?.name ?? null,
},
// For incremental chunks without id, use the stored tool id from streamContext
id:
value.id || streamContext?.tool?.id || generateToolCallId(index, value.function?.name),
index: typeof value.index !== 'undefined' ? value.index : index,
// For incremental chunks without id, use the stored tool id from streamContext.tools
// based on the tool index to correctly associate arguments with the right tool call
id: value.id || storedTool?.id || generateToolCallId(toolIndex, value.function?.name),
index: toolIndex,
type: value.type || 'function',
};
}),