🐛 fix: fix anthropic thinking budget

This commit is contained in:
arvinxx
2025-12-31 01:49:42 +08:00
parent 69f4cf3dd9
commit 6e19bd3d4c
4 changed files with 1390 additions and 463 deletions

View File

@@ -0,0 +1,622 @@
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { StreamingHandler } from './StreamingHandler';
import type { StreamingCallbacks, StreamingContext } from './types/streaming';
const createMockCallbacks = (): StreamingCallbacks => ({
onContentUpdate: vi.fn(),
onReasoningUpdate: vi.fn(),
onToolCallsUpdate: vi.fn(),
onGroundingUpdate: vi.fn(),
onImagesUpdate: vi.fn(),
onReasoningStart: vi.fn(() => 'reasoning-op-id'),
onReasoningComplete: vi.fn(),
uploadBase64Image: vi.fn(async () => ({ id: 'img-id', url: 'https://s3/img.png' })),
transformToolCalls: vi.fn((calls) => calls.map((c: any) => ({ ...c, transformed: true }))),
toggleToolCallingStreaming: vi.fn(),
});
const mockContext: StreamingContext = {
messageId: 'msg-1',
operationId: 'op-1',
agentId: 'agent-1',
};
describe('StreamingHandler', () => {
describe('handleChunk - text', () => {
it('should accumulate text output', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({ type: 'text', text: 'Hello ' });
handler.handleChunk({ type: 'text', text: 'World' });
expect(handler.getOutput()).toBe('Hello World');
expect(callbacks.onContentUpdate).toHaveBeenCalledTimes(2);
});
it('should clean speaker tag from output', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({ type: 'text', text: '<speaker name="Agent" />\nHello' });
expect(handler.getOutput()).toBe('Hello');
});
it('should clean speaker tag across chunks', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({ type: 'text', text: '<speaker name="' });
handler.handleChunk({ type: 'text', text: 'Agent" />\n' });
handler.handleChunk({ type: 'text', text: 'Hello' });
expect(handler.getOutput()).toBe('Hello');
});
it('should not clean speaker tag if it appears in the middle of content', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({ type: 'text', text: 'Some content ' });
handler.handleChunk({ type: 'text', text: '<speaker name="Agent" /> more' });
// Speaker tag not at the beginning is not cleaned
expect(handler.getOutput()).toBe('Some content <speaker name="Agent" /> more');
});
});
describe('handleChunk - reasoning', () => {
it('should start reasoning timer on first chunk', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({ type: 'reasoning', text: 'Thinking...' });
expect(callbacks.onReasoningStart).toHaveBeenCalledTimes(1);
expect(callbacks.onReasoningUpdate).toHaveBeenCalledWith({ content: 'Thinking...' });
});
it('should accumulate reasoning content', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({ type: 'reasoning', text: 'Step 1. ' });
handler.handleChunk({ type: 'reasoning', text: 'Step 2.' });
expect(callbacks.onReasoningUpdate).toHaveBeenLastCalledWith({
content: 'Step 1. Step 2.',
});
});
it('should not start reasoning multiple times', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({ type: 'reasoning', text: 'A' });
handler.handleChunk({ type: 'reasoning', text: 'B' });
handler.handleChunk({ type: 'reasoning', text: 'C' });
expect(callbacks.onReasoningStart).toHaveBeenCalledTimes(1);
});
it('should end reasoning when text chunk arrives', async () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({ type: 'reasoning', text: 'Thinking...' });
await new Promise((r) => setTimeout(r, 10));
handler.handleChunk({ type: 'text', text: 'Result' });
expect(callbacks.onReasoningComplete).toHaveBeenCalledWith('reasoning-op-id');
expect(handler.getThinkingDuration()).toBeGreaterThan(0);
});
});
describe('handleChunk - reasoning_part', () => {
it('should handle text reasoning parts', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({
type: 'reasoning_part',
partType: 'text',
content: 'Thinking...',
});
expect(callbacks.onReasoningStart).toHaveBeenCalled();
expect(callbacks.onReasoningUpdate).toHaveBeenCalledWith({
content: 'Thinking...',
});
});
it('should merge consecutive text reasoning parts', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({
type: 'reasoning_part',
partType: 'text',
content: 'Step 1. ',
});
handler.handleChunk({
type: 'reasoning_part',
partType: 'text',
content: 'Step 2.',
});
expect(callbacks.onReasoningUpdate).toHaveBeenLastCalledWith({
content: 'Step 1. Step 2.',
});
});
it('should handle image reasoning parts with upload', async () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({
type: 'reasoning_part',
partType: 'image',
content: 'base64data',
mimeType: 'image/png',
});
expect(callbacks.onReasoningUpdate).toHaveBeenCalledWith({
tempDisplayContent: expect.any(Array),
isMultimodal: true,
});
expect(callbacks.uploadBase64Image).toHaveBeenCalled();
});
});
describe('handleChunk - content_part', () => {
it('should handle text content parts', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({
type: 'content_part',
partType: 'text',
content: 'Hello',
});
expect(handler.getOutput()).toBe('Hello');
});
it('should clean speaker tag from content parts', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({
type: 'content_part',
partType: 'text',
content: '<speaker name="Agent" />\nHello',
});
expect(handler.getOutput()).toBe('Hello');
});
it('should handle image content parts with upload', async () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({
type: 'content_part',
partType: 'image',
content: 'base64data',
mimeType: 'image/png',
});
expect(callbacks.uploadBase64Image).toHaveBeenCalled();
// Finish to wait for uploads
await handler.handleFinish({ type: 'stop' });
expect(callbacks.uploadBase64Image).toHaveBeenCalled();
});
it('should merge consecutive text content parts', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({
type: 'content_part',
partType: 'text',
content: 'Hello ',
});
handler.handleChunk({
type: 'content_part',
partType: 'text',
content: 'World',
});
expect(handler.getOutput()).toBe('Hello World');
});
});
describe('handleChunk - tool_calls', () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it('should mark as function call', async () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({
type: 'tool_calls',
tool_calls: [
{ id: 'call-1', type: 'function', function: { name: 'search', arguments: '{}' } },
],
});
expect(handler.getIsFunctionCall()).toBe(true);
expect(callbacks.toggleToolCallingStreaming).toHaveBeenCalled();
});
it('should throttle tool calls updates', async () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({
type: 'tool_calls',
tool_calls: [
{ id: 'call-1', type: 'function', function: { name: 'search', arguments: '{}' } },
],
});
handler.handleChunk({
type: 'tool_calls',
tool_calls: [
{
id: 'call-1',
type: 'function',
function: { name: 'search', arguments: '{"q":"test"}' },
},
],
});
// Initial call happens immediately due to leading: true
expect(callbacks.onToolCallsUpdate).toHaveBeenCalledTimes(1);
// Advance timer to allow trailing call
vi.advanceTimersByTime(300);
expect(callbacks.onToolCallsUpdate).toHaveBeenCalledTimes(2);
});
});
describe('handleChunk - grounding', () => {
it('should update grounding with citations', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({
type: 'grounding',
grounding: {
citations: [{ title: 'Source 1', url: 'https://example.com' }],
searchQueries: ['test query'],
},
});
expect(callbacks.onGroundingUpdate).toHaveBeenCalledWith({
citations: [{ title: 'Source 1', url: 'https://example.com' }],
searchQueries: ['test query'],
});
});
it('should not update grounding when no citations', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({
type: 'grounding',
grounding: { citations: [] },
});
expect(callbacks.onGroundingUpdate).not.toHaveBeenCalled();
});
it('should not update grounding when grounding is undefined', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({
type: 'grounding',
grounding: undefined,
});
expect(callbacks.onGroundingUpdate).not.toHaveBeenCalled();
});
});
describe('handleChunk - base64_image', () => {
it('should immediately display images', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({
type: 'base64_image',
image: { id: 'img-1', data: 'data:image/png;base64,abc' },
images: [{ id: 'img-1', data: 'data:image/png;base64,abc' }],
});
expect(callbacks.onImagesUpdate).toHaveBeenCalledWith([
{ id: 'img-1', url: 'data:image/png;base64,abc', alt: 'img-1' },
]);
});
it('should start upload task for image', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({
type: 'base64_image',
image: { id: 'img-1', data: 'data:image/png;base64,abc' },
images: [{ id: 'img-1', data: 'data:image/png;base64,abc' }],
});
expect(callbacks.uploadBase64Image).toHaveBeenCalledWith('data:image/png;base64,abc');
});
});
describe('handleChunk - stop', () => {
it('should end reasoning on stop', async () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({ type: 'reasoning', text: 'Thinking...' });
await new Promise((r) => setTimeout(r, 10));
handler.handleChunk({ type: 'stop' });
expect(callbacks.onReasoningComplete).toHaveBeenCalledWith('reasoning-op-id');
});
});
describe('handleFinish', () => {
it('should return correct result for text-only content', async () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({ type: 'text', text: 'Hello World' });
const result = await handler.handleFinish({
type: 'stop',
usage: { totalTokens: 100 } as any,
});
expect(result.content).toBe('Hello World');
expect(result.isFunctionCall).toBe(false);
expect(result.metadata.usage?.totalTokens).toBe(100);
});
it('should wait for image uploads', async () => {
const callbacks = createMockCallbacks();
callbacks.uploadBase64Image = vi.fn(
(): Promise<{ id?: string; url?: string }> =>
new Promise((r) => setTimeout(() => r({ id: 'img', url: 'https://s3/img.png' }), 50)),
);
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({
type: 'base64_image',
image: { id: 'img-1', data: 'base64...' },
images: [{ id: 'img-1', data: 'base64...' }],
});
const result = await handler.handleFinish({ type: 'stop' });
expect(result.metadata.imageList).toHaveLength(1);
expect(result.metadata.imageList?.[0].url).toBe('https://s3/img.png');
});
it('should include reasoning with duration', async () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({ type: 'reasoning', text: 'Thinking...' });
await new Promise((r) => setTimeout(r, 20));
handler.handleChunk({ type: 'text', text: 'Done' });
const result = await handler.handleFinish({ type: 'stop' });
expect(result.metadata.reasoning?.content).toBe('Thinking...');
expect(result.metadata.reasoning?.duration).toBeGreaterThan(0);
});
it('should include grounding from finish data', async () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({ type: 'text', text: 'Content' });
const result = await handler.handleFinish({
type: 'stop',
grounding: {
citations: [{ title: 'Source', url: 'https://example.com' }],
searchQueries: ['query'],
},
});
expect(result.metadata.search).toEqual({
citations: [{ title: 'Source', url: 'https://example.com' }],
searchQueries: ['query'],
});
});
it('should process tool calls from finish data', async () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
const result = await handler.handleFinish({
type: 'stop',
toolCalls: [
{
id: 'call-1',
type: 'function',
function: { name: 'search', arguments: '{"q":"test"}' },
},
],
});
expect(result.isFunctionCall).toBe(true);
expect(result.tools).toBeDefined();
expect(callbacks.transformToolCalls).toHaveBeenCalled();
});
it('should handle empty tool call arguments', async () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
const result = await handler.handleFinish({
type: 'stop',
toolCalls: [
{
id: 'call-1',
type: 'function',
function: { name: 'search', arguments: undefined as unknown as string },
},
],
});
expect(result.isFunctionCall).toBe(true);
// Verify arguments were filled with '{}'
expect(callbacks.transformToolCalls).toHaveBeenCalledWith([
{ id: 'call-1', type: 'function', function: { name: 'search', arguments: '{}' } },
]);
});
it('should update traceId from finish data', async () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({ type: 'text', text: 'Content' });
const result = await handler.handleFinish({
type: 'stop',
traceId: 'trace-123',
});
expect(result.traceId).toBe('trace-123');
expect(handler.getTraceId()).toBe('trace-123');
});
it('should use fallback reasoning from finish data when no streaming reasoning', async () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({ type: 'text', text: 'Content' });
const result = await handler.handleFinish({
type: 'stop',
reasoning: { content: 'Fallback reasoning' },
});
expect(result.metadata.reasoning?.content).toBe('Fallback reasoning');
});
it('should include reasoning signature from finish data', async () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({ type: 'reasoning', text: 'Thinking...' });
await new Promise((r) => setTimeout(r, 10));
handler.handleChunk({ type: 'text', text: 'Done' });
const result = await handler.handleFinish({
type: 'stop',
reasoning: { content: 'Thinking...', signature: 'test-signature-abc123' },
});
expect(result.metadata.reasoning?.content).toBe('Thinking...');
expect(result.metadata.reasoning?.signature).toBe('test-signature-abc123');
});
it('should include reasoning signature with multimodal reasoning', async () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({
type: 'reasoning_part',
partType: 'text',
content: 'Thinking with images...',
});
handler.handleChunk({
type: 'reasoning_part',
partType: 'image',
content: 'base64data',
mimeType: 'image/png',
});
handler.handleChunk({ type: 'text', text: 'Done' });
const result = await handler.handleFinish({
type: 'stop',
reasoning: { signature: 'multimodal-signature-xyz' },
});
expect(result.metadata.reasoning?.isMultimodal).toBe(true);
expect(result.metadata.reasoning?.signature).toBe('multimodal-signature-xyz');
});
it('should use fallback reasoning with signature when no streaming reasoning', async () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({ type: 'text', text: 'Content' });
const result = await handler.handleFinish({
type: 'stop',
reasoning: { content: 'Fallback', signature: 'fallback-sig' },
});
expect(result.metadata.reasoning?.content).toBe('Fallback');
expect(result.metadata.reasoning?.signature).toBe('fallback-sig');
});
});
describe('getter methods', () => {
it('getOutput should return accumulated output', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({ type: 'text', text: 'Test' });
expect(handler.getOutput()).toBe('Test');
});
it('getIsFunctionCall should return false by default', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
expect(handler.getIsFunctionCall()).toBe(false);
});
it('getTools should return undefined by default', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
expect(handler.getTools()).toBeUndefined();
});
it('getThinkingDuration should return undefined before reasoning ends', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
handler.handleChunk({ type: 'reasoning', text: 'Thinking' });
expect(handler.getThinkingDuration()).toBeUndefined();
});
it('getFinishType should return undefined before finish', () => {
const callbacks = createMockCallbacks();
const handler = new StreamingHandler(mockContext, callbacks);
expect(handler.getFinishType()).toBeUndefined();
});
});
});

View File

@@ -0,0 +1,537 @@
import type {
ChatImageItem,
ChatToolPayload,
MessageContentPart,
MessageToolCall,
} from '@lobechat/types';
import { serializePartsForStorage } from '@lobechat/utils';
import debug from 'debug';
import { throttle } from 'es-toolkit/compat';
import pMap from 'p-map';
import { cleanSpeakerTag } from '../../../utils/cleanSpeakerTag';
import type {
FinishData,
GroundingData,
ReasoningState,
StreamChunk,
StreamingCallbacks,
StreamingContext,
StreamingResult,
} from './types/streaming';
const log = debug('lobe-store:streaming-handler');
/**
* Streaming message handler
*
* Encapsulates all state and logic for streaming message processing, including:
* - Text content accumulation
* - Reasoning content processing
* - Multimodal content processing
* - Tool calls processing
* - Image upload management
*
* @example
* ```typescript
* const handler = new StreamingHandler(context, callbacks);
*
* // During streaming
* handler.handleChunk(chunk);
*
* // When streaming completes
* const result = await handler.handleFinish(finishData);
* ```
*/
export class StreamingHandler {
// ========== Text state ==========
private output = '';
// ========== Reasoning state ==========
private thinkingContent = '';
private thinkingStartAt?: number;
private thinkingDuration?: number;
private reasoningOperationId?: string;
private reasoningParts: MessageContentPart[] = [];
// ========== Multimodal state ==========
private contentParts: MessageContentPart[] = [];
// ========== Tool call state ==========
private isFunctionCall = false;
private tools?: ChatToolPayload[];
// ========== Image upload state ==========
private uploadTasks = new Map<string, Promise<{ id?: string; url?: string }>>();
private contentImageUploads = new Map<number, Promise<string>>();
private reasoningImageUploads = new Map<number, Promise<string>>();
// ========== Other state ==========
private msgTraceId?: string;
private finishType?: string;
// ========== Throttled updates ==========
private throttledUpdateToolCalls: ReturnType<typeof throttle>;
constructor(
private context: StreamingContext,
private callbacks: StreamingCallbacks,
) {
// Initialize throttled tool calls update (max once per 300ms)
this.throttledUpdateToolCalls = throttle(
(toolCalls) => {
const tools = this.callbacks.transformToolCalls(toolCalls);
this.callbacks.onToolCallsUpdate(tools);
},
300,
{ leading: true, trailing: true },
);
}
// ==================== Public API ====================
/**
* Handle streaming chunk
*/
handleChunk(chunk: StreamChunk): void {
switch (chunk.type) {
case 'text': {
this.handleTextChunk(chunk);
break;
}
case 'reasoning': {
this.handleReasoningChunk(chunk);
break;
}
case 'reasoning_part': {
this.handleReasoningPartChunk(chunk);
break;
}
case 'content_part': {
this.handleContentPartChunk(chunk);
break;
}
case 'tool_calls': {
this.handleToolCallsChunk(chunk);
break;
}
case 'grounding': {
this.handleGroundingChunk(chunk);
break;
}
case 'base64_image': {
this.handleBase64ImageChunk(chunk);
break;
}
case 'stop': {
this.handleStopChunk();
break;
}
}
}
/**
* Handle streaming finish
*/
async handleFinish(finishData: FinishData): Promise<StreamingResult> {
// Update traceId
if (finishData.traceId) {
this.msgTraceId = finishData.traceId;
}
// Wait for all image uploads to complete
const finalImages = await this.waitForImageUploads();
// Wait for multimodal image uploads to complete
await this.waitForMultimodalUploads();
// Process final tool calls
this.processFinalToolCalls(finishData.toolCalls);
// Build final result
return this.buildFinalResult(finishData, finalImages);
}
/**
* Get current output content
*/
getOutput(): string {
return this.output;
}
/**
* Get reasoning duration
*/
getThinkingDuration(): number | undefined {
return this.thinkingDuration;
}
/**
* Check if this is a function call
*/
getIsFunctionCall(): boolean {
return this.isFunctionCall;
}
/**
* Get tools
*/
getTools(): ChatToolPayload[] | undefined {
return this.tools;
}
/**
* Get trace ID
*/
getTraceId(): string | undefined {
return this.msgTraceId;
}
/**
* Get finish type
*/
getFinishType(): string | undefined {
return this.finishType;
}
// ==================== Chunk handling methods ====================
private handleTextChunk(chunk: { text: string; type: 'text' }): void {
this.output += chunk.text;
// Clean speaker tag that may be reproduced by model in group chat
this.output = cleanSpeakerTag(this.output);
// End reasoning timing
this.endReasoningIfNeeded();
log(
'[text stream] messageId=%s, output length=%d, operationId=%s',
this.context.messageId,
this.output.length,
this.context.operationId,
);
// Notify update
this.callbacks.onContentUpdate(this.output, this.buildReasoningState());
}
private handleReasoningChunk(chunk: { text: string; type: 'reasoning' }): void {
// Start reasoning timing
this.startReasoningIfNeeded();
this.thinkingContent += chunk.text;
this.callbacks.onReasoningUpdate({ content: this.thinkingContent });
}
private handleReasoningPartChunk(chunk: {
content: string;
mimeType?: string;
partType: 'text' | 'image';
type: 'reasoning_part';
}): void {
// Start reasoning timing
this.startReasoningIfNeeded();
if (chunk.partType === 'text') {
this.appendTextToReasoningParts(chunk.content);
this.thinkingContent += chunk.content;
} else if (chunk.partType === 'image' && chunk.mimeType) {
this.appendImageToReasoningParts(chunk.content, chunk.mimeType);
}
// Notify update
const hasImages = this.reasoningParts.some((p) => p.type === 'image');
this.callbacks.onReasoningUpdate(
hasImages
? { isMultimodal: true, tempDisplayContent: this.reasoningParts }
: { content: this.thinkingContent },
);
}
private handleContentPartChunk(chunk: {
content: string;
mimeType?: string;
partType: 'text' | 'image';
type: 'content_part';
}): void {
// End reasoning timing
this.endReasoningIfNeeded();
if (chunk.partType === 'text') {
this.appendTextToContentParts(chunk.content);
this.output += chunk.content;
// Clean speaker tag
this.output = cleanSpeakerTag(this.output);
} else if (chunk.partType === 'image' && chunk.mimeType) {
this.appendImageToContentParts(chunk.content, chunk.mimeType);
}
// Notify update
this.notifyContentPartUpdate();
}
private handleToolCallsChunk(chunk: {
isAnimationActives?: boolean[];
tool_calls: MessageToolCall[];
type: 'tool_calls';
}): void {
this.isFunctionCall = true;
this.callbacks.toggleToolCallingStreaming(this.context.messageId, chunk.isAnimationActives);
this.throttledUpdateToolCalls(chunk.tool_calls);
// End reasoning timing
this.endReasoningIfNeeded();
}
private handleGroundingChunk(chunk: { grounding?: GroundingData; type: 'grounding' }): void {
if (!chunk.grounding?.citations?.length) return;
this.callbacks.onGroundingUpdate({
citations: chunk.grounding.citations,
searchQueries: chunk.grounding.searchQueries,
});
}
private handleBase64ImageChunk(chunk: {
image: { data: string; id: string };
images: { data: string; id: string }[];
type: 'base64_image';
}): void {
// Immediately display images
this.callbacks.onImagesUpdate(chunk.images.map((i) => ({ alt: i.id, id: i.id, url: i.data })));
// Async upload
const task = this.callbacks.uploadBase64Image(chunk.image.data);
this.uploadTasks.set(chunk.image.id, task);
}
private handleStopChunk(): void {
this.endReasoningIfNeeded();
}
// ==================== Helper methods ====================
private startReasoningIfNeeded(): void {
if (!this.thinkingStartAt) {
this.thinkingStartAt = Date.now();
this.reasoningOperationId = this.callbacks.onReasoningStart();
}
}
private endReasoningIfNeeded(): void {
if (this.thinkingStartAt && !this.thinkingDuration) {
this.thinkingDuration = Date.now() - this.thinkingStartAt;
if (this.reasoningOperationId) {
this.callbacks.onReasoningComplete(this.reasoningOperationId);
this.reasoningOperationId = undefined;
}
}
}
private appendTextToReasoningParts(text: string): void {
const lastPart = this.reasoningParts.at(-1);
if (lastPart?.type === 'text') {
this.reasoningParts = [
...this.reasoningParts.slice(0, -1),
{ text: lastPart.text + text, type: 'text' },
];
} else {
this.reasoningParts = [...this.reasoningParts, { text, type: 'text' }];
}
}
private appendImageToReasoningParts(base64Content: string, mimeType: string): void {
const tempImage = `data:${mimeType};base64,${base64Content}`;
const partIndex = this.reasoningParts.length;
this.reasoningParts = [...this.reasoningParts, { image: tempImage, type: 'image' }];
// Async upload
const uploadTask = this.callbacks
.uploadBase64Image(tempImage)
.then((file) => {
const url = file?.url || tempImage;
const updatedParts = [...this.reasoningParts];
updatedParts[partIndex] = { image: url, type: 'image' };
this.reasoningParts = updatedParts;
return url;
})
.catch((error) => {
console.error('[reasoning_part] Image upload failed:', error);
return tempImage;
});
this.reasoningImageUploads.set(partIndex, uploadTask);
}
private appendTextToContentParts(text: string): void {
const lastPart = this.contentParts.at(-1);
if (lastPart?.type === 'text') {
this.contentParts = [
...this.contentParts.slice(0, -1),
{ text: lastPart.text + text, type: 'text' },
];
} else {
this.contentParts = [...this.contentParts, { text, type: 'text' }];
}
}
private appendImageToContentParts(base64Content: string, mimeType: string): void {
const tempImage = `data:${mimeType};base64,${base64Content}`;
const partIndex = this.contentParts.length;
this.contentParts = [...this.contentParts, { image: tempImage, type: 'image' }];
// Async upload
const uploadTask = this.callbacks
.uploadBase64Image(tempImage)
.then((file) => {
const url = file?.url || tempImage;
const updatedParts = [...this.contentParts];
updatedParts[partIndex] = { image: url, type: 'image' };
this.contentParts = updatedParts;
return url;
})
.catch((error) => {
console.error('[content_part] Image upload failed:', error);
return tempImage;
});
this.contentImageUploads.set(partIndex, uploadTask);
}
private notifyContentPartUpdate(): void {
const hasContentImages = this.contentParts.some((p) => p.type === 'image');
const hasReasoningImages = this.reasoningParts.some((p) => p.type === 'image');
this.callbacks.onContentUpdate(
this.output,
hasReasoningImages
? {
duration: this.thinkingDuration,
isMultimodal: true,
tempDisplayContent: this.reasoningParts,
}
: this.thinkingContent
? { content: this.thinkingContent, duration: this.thinkingDuration }
: undefined,
);
// If has content images, also notify with metadata for tempDisplayContent
if (hasContentImages) {
// This is handled in the main onContentUpdate callback
}
}
private buildReasoningState(): ReasoningState | undefined {
if (!this.thinkingContent) return undefined;
return { content: this.thinkingContent, duration: this.thinkingDuration };
}
private async waitForImageUploads(): Promise<ChatImageItem[]> {
if (this.uploadTasks.size === 0) return [];
try {
const results = await pMap(Array.from(this.uploadTasks.values()), (task) => task, {
concurrency: 5,
});
return results.filter((i) => !!i.url) as ChatImageItem[];
} catch (error) {
console.error('Error waiting for image uploads:', error);
return [];
}
}
private async waitForMultimodalUploads(): Promise<void> {
await Promise.allSettled([
...Array.from(this.contentImageUploads.values()),
...Array.from(this.reasoningImageUploads.values()),
]);
}
private processFinalToolCalls(toolCalls?: MessageToolCall[]): void {
if (!toolCalls?.length) return;
this.throttledUpdateToolCalls.flush();
this.callbacks.toggleToolCallingStreaming(this.context.messageId, undefined);
const processedToolCalls = toolCalls.map((item) => ({
...item,
function: {
...item.function,
arguments: item.function.arguments || '{}',
},
}));
this.tools = this.callbacks.transformToolCalls(processedToolCalls);
this.isFunctionCall = true;
}
private buildFinalResult(finishData: FinishData, finalImages: ChatImageItem[]): StreamingResult {
const hasContentImages = this.contentParts.some((p) => p.type === 'image');
const hasReasoningImages = this.reasoningParts.some((p) => p.type === 'image');
// Determine final content
const finalContent = hasContentImages
? serializePartsForStorage(this.contentParts)
: this.output;
// Determine final reasoning content
const finalDuration =
this.thinkingDuration && !isNaN(this.thinkingDuration) ? this.thinkingDuration : undefined;
// Get signature from finishData.reasoning (provided by backend in onFinish)
const reasoningSignature = finishData.reasoning?.signature;
let finalReasoning: ReasoningState | undefined;
if (hasReasoningImages) {
finalReasoning = {
content: serializePartsForStorage(this.reasoningParts),
duration: finalDuration,
isMultimodal: true,
signature: reasoningSignature,
};
} else if (this.thinkingContent) {
finalReasoning = {
content: this.thinkingContent,
duration: finalDuration,
signature: reasoningSignature,
};
} else if (finishData.reasoning?.content) {
finalReasoning = {
...finishData.reasoning,
duration: finalDuration,
};
}
this.finishType = finishData.type;
log(
'[handleFinish] messageId=%s, finishType=%s, operationId=%s',
this.context.messageId,
finishData.type,
this.context.operationId,
);
return {
content: finalContent,
finishType: finishData.type,
isFunctionCall: this.isFunctionCall,
metadata: {
finishType: finishData.type,
imageList: finalImages.length > 0 ? finalImages : undefined,
isMultimodal: hasContentImages || undefined,
performance: finishData.speed,
reasoning: finalReasoning,
search: finishData.grounding?.citations ? finishData.grounding : undefined,
usage: finishData.usage,
},
toolCalls: finishData.toolCalls,
tools: this.tools,
traceId: this.msgTraceId,
usage: finishData.usage,
};
}
}

View File

@@ -10,10 +10,8 @@ import {
import { PageAgentIdentifier } from '@lobechat/builtin-tool-page-agent';
import { isDesktop } from '@lobechat/const';
import {
type ChatImageItem,
type ChatToolPayload,
type ConversationContext,
type MessageContentPart,
type MessageMapScope,
type MessageToolCall,
type ModelUsage,
@@ -22,11 +20,8 @@ import {
TraceNameMap,
type UIChatMessage,
} from '@lobechat/types';
import { serializePartsForStorage } from '@lobechat/utils';
import debug from 'debug';
import { throttle } from 'es-toolkit/compat';
import { t } from 'i18next';
import pMap from 'p-map';
import { type StateCreator } from 'zustand/vanilla';
import { createAgentToolsEngine } from '@/helpers/toolEngineering';
@@ -41,9 +36,10 @@ import { toolInterventionSelectors } from '@/store/user/selectors';
import { getUserStoreState } from '@/store/user/store';
import { topicSelectors } from '../../../selectors';
import { cleanSpeakerTag } from '../../../utils/cleanSpeakerTag';
import { messageMapKey } from '../../../utils/messageMapKey';
import { selectTodosFromMessages } from '../../message/selectors/dbMessage';
import { StreamingHandler } from './StreamingHandler';
import type { StreamChunk } from './types/streaming';
const log = debug('lobe-store:streaming-executor');
@@ -352,41 +348,84 @@ export const streamingExecutor: StateCreator<
const finalAgentConfig = agentConfig || resolved.agentConfig;
const chatConfig = resolved.chatConfig;
let isFunctionCall = false;
let tools: ChatToolPayload[] | undefined;
let tool_calls: MessageToolCall[] | undefined;
let finalUsage;
let msgTraceId: string | undefined;
let output = '';
let finalUsage: ModelUsage | undefined;
let finalToolCalls: MessageToolCall[] | undefined;
let thinkingContent = '';
let thinkingStartAt: number;
let thinkingDuration: number | undefined;
let reasoningOperationId: string | undefined;
let finishType: string | undefined;
// to upload image
const uploadTasks: Map<string, Promise<{ id?: string; url?: string }>> = new Map();
// Multimodal content parts
let contentParts: MessageContentPart[] = [];
let reasoningParts: MessageContentPart[] = [];
const contentImageUploads: Map<number, Promise<string>> = new Map();
const reasoningImageUploads: Map<number, Promise<string>> = new Map();
// Throttle tool_calls updates to prevent excessive re-renders (max once per 300ms)
const throttledUpdateToolCalls = throttle(
(toolCalls: MessageToolCall[]) => {
internal_dispatchMessage(
{
id: messageId,
type: 'updateMessage',
value: { tools: get().internal_transformToolCalls(toolCalls) },
},
{ operationId },
);
// Create streaming handler with callbacks
const handler = new StreamingHandler(
{ messageId, operationId, agentId, groupId, topicId },
{
onContentUpdate: (content, reasoning) => {
internal_dispatchMessage(
{
id: messageId,
type: 'updateMessage',
value: { content, reasoning },
},
{ operationId },
);
},
onReasoningUpdate: (reasoning) => {
internal_dispatchMessage(
{
id: messageId,
type: 'updateMessage',
value: { reasoning },
},
{ operationId },
);
},
onToolCallsUpdate: (tools) => {
internal_dispatchMessage(
{
id: messageId,
type: 'updateMessage',
value: { tools },
},
{ operationId },
);
},
onGroundingUpdate: (grounding) => {
internal_dispatchMessage(
{
id: messageId,
type: 'updateMessage',
value: { search: grounding },
},
{ operationId },
);
},
onImagesUpdate: (images) => {
internal_dispatchMessage(
{
id: messageId,
type: 'updateMessage',
value: { imageList: images },
},
{ operationId },
);
},
onReasoningStart: () => {
const { operationId: reasoningOpId } = get().startOperation({
type: 'reasoning',
context: { ...fetchContext, messageId },
parentOperationId: operationId,
});
get().associateMessageWithOperation(messageId, reasoningOpId);
return reasoningOpId;
},
onReasoningComplete: (opId) => get().completeOperation(opId),
uploadBase64Image: (data) =>
getFileStoreState()
.uploadBase64FileWithProgress(data)
.then((file) => ({
id: file?.id,
url: file?.url,
alt: file?.filename || file?.id,
})),
transformToolCalls: get().internal_transformToolCalls,
toggleToolCallingStreaming: internal_toggleToolCallingStreaming,
},
300,
{ leading: true, trailing: true },
);
const historySummary = chatConfig.enableCompressHistory
@@ -432,7 +471,6 @@ export const streamingExecutor: StateCreator<
) => {
// if there is traceId, update it
if (traceId) {
msgTraceId = traceId;
messageService.updateMessage(
messageId,
{ traceId, observationId: observationId ?? undefined },
@@ -440,454 +478,65 @@ export const streamingExecutor: StateCreator<
);
}
// 等待所有图片上传完成
let finalImages: ChatImageItem[] = [];
if (uploadTasks.size > 0) {
try {
// 等待所有上传任务完成
const uploadResults = await pMap(Array.from(uploadTasks.values()), (task) => task, {
concurrency: 5,
});
// 使用上传后的 S3 URL 替换原始图像数据
finalImages = uploadResults.filter((i) => !!i.url) as ChatImageItem[];
} catch (error) {
console.error('Error waiting for image uploads:', error);
}
}
// Wait for all multimodal image uploads to complete
// Note: Arrays are already updated in-place when uploads complete
// Use Promise.allSettled to continue even if some uploads fail
await Promise.allSettled([
...Array.from(contentImageUploads.values()),
...Array.from(reasoningImageUploads.values()),
]);
let parsedToolCalls = toolCalls;
if (parsedToolCalls && parsedToolCalls.length > 0) {
// Flush any pending throttled updates before finalizing
throttledUpdateToolCalls.flush();
internal_toggleToolCallingStreaming(messageId, undefined);
tool_calls = toolCalls;
parsedToolCalls = parsedToolCalls.map((item) => ({
...item,
function: {
...item.function,
arguments: !!item.function.arguments ? item.function.arguments : '{}',
},
}));
tools = get().internal_transformToolCalls(parsedToolCalls);
isFunctionCall = true;
}
finalUsage = usage;
finishType = type;
log(
'[internal_fetchAIChatMessage] onFinish: messageId=%s, finishType=%s, operationId=%s',
messageId,
// Handle finish using StreamingHandler
const result = await handler.handleFinish({
traceId,
observationId,
toolCalls,
reasoning,
grounding,
usage,
speed,
type,
operationId,
);
});
// Check if there are any image parts
const hasContentImages = contentParts.some((part) => part.type === 'image');
const hasReasoningImages = reasoningParts.some((part) => part.type === 'image');
// Determine final content
// If has images, serialize contentParts; otherwise use accumulated output text
const finalContent = hasContentImages ? serializePartsForStorage(contentParts) : output;
const finalDuration =
thinkingDuration && !isNaN(thinkingDuration) ? thinkingDuration : undefined;
// Determine final reasoning content
// Priority: reasoningParts (multimodal) > thinkingContent (from reasoning_part text) > reasoning (from old reasoning event)
let finalReasoning: any = undefined;
if (hasReasoningImages) {
// Has images, use multimodal format
finalReasoning = {
content: serializePartsForStorage(reasoningParts),
duration: finalDuration,
isMultimodal: true,
};
} else if (thinkingContent) {
// Has text from reasoning_part but no images
finalReasoning = {
content: thinkingContent,
duration: finalDuration,
};
} else if (reasoning?.content) {
// Fallback to old reasoning event content
finalReasoning = {
...reasoning,
duration: finalDuration,
};
}
// Store for return value
finalUsage = result.usage;
finalToolCalls = result.toolCalls;
// update the content after fetch result
await optimisticUpdateMessageContent(
messageId,
finalContent,
result.content,
{
tools,
reasoning: finalReasoning,
search: !!grounding?.citations ? grounding : undefined,
imageList: finalImages.length > 0 ? finalImages : undefined,
tools: result.tools,
reasoning: result.metadata.reasoning,
search: result.metadata.search,
imageList: result.metadata.imageList,
metadata: {
...usage,
...speed,
performance: speed,
usage,
finishType: type,
...(hasContentImages && { isMultimodal: true }),
...result.metadata.usage,
...result.metadata.performance,
performance: result.metadata.performance,
usage: result.metadata.usage,
finishType: result.metadata.finishType,
...(result.metadata.isMultimodal && { isMultimodal: true }),
},
},
{ operationId },
);
},
onMessageHandle: async (chunk) => {
switch (chunk.type) {
case 'grounding': {
// if there is no citations, then stop
if (
!chunk.grounding ||
!chunk.grounding.citations ||
chunk.grounding.citations.length <= 0
)
return;
internal_dispatchMessage(
{
id: messageId,
type: 'updateMessage',
value: {
search: {
citations: chunk.grounding.citations,
searchQueries: chunk.grounding.searchQueries,
},
},
},
{ operationId },
);
break;
}
case 'base64_image': {
internal_dispatchMessage(
{
id: messageId,
type: 'updateMessage',
value: {
imageList: chunk.images.map((i) => ({ id: i.id, url: i.data, alt: i.id })),
},
},
{ operationId },
);
const image = chunk.image;
const task = getFileStoreState()
.uploadBase64FileWithProgress(image.data)
.then((value) => ({
id: value?.id,
url: value?.url,
alt: value?.filename || value?.id,
}));
uploadTasks.set(image.id, task);
break;
}
case 'text': {
output += chunk.text;
// Clean speaker tag that may be reproduced by model in group chat
// The tag is injected at message start to identify sender, but models may copy it
output = cleanSpeakerTag(output);
// if there is no duration, it means the end of reasoning
if (!thinkingDuration) {
thinkingDuration = Date.now() - thinkingStartAt;
// Complete reasoning operation if it exists
if (reasoningOperationId) {
get().completeOperation(reasoningOperationId);
reasoningOperationId = undefined;
}
}
log(
'[text stream] messageId=%s, output length=%d, operationId=%s',
messageId,
output.length,
operationId,
);
internal_dispatchMessage(
{
id: messageId,
type: 'updateMessage',
value: {
content: output,
reasoning: !!thinkingContent
? { content: thinkingContent, duration: thinkingDuration }
: undefined,
},
},
{ operationId },
);
break;
}
case 'reasoning': {
// if there is no thinkingStartAt, it means the start of reasoning
if (!thinkingStartAt) {
thinkingStartAt = Date.now();
// Create reasoning operation
const { operationId: reasoningOpId } = get().startOperation({
type: 'reasoning',
context: { ...fetchContext, messageId },
parentOperationId: operationId,
});
reasoningOperationId = reasoningOpId;
// Associate message with reasoning operation
get().associateMessageWithOperation(messageId, reasoningOperationId);
}
thinkingContent += chunk.text;
internal_dispatchMessage(
{
id: messageId,
type: 'updateMessage',
value: { reasoning: { content: thinkingContent } },
},
{ operationId },
);
break;
}
case 'reasoning_part': {
// Start reasoning if not started
if (!thinkingStartAt) {
thinkingStartAt = Date.now();
const { operationId: reasoningOpId } = get().startOperation({
type: 'reasoning',
context: { ...fetchContext, messageId },
parentOperationId: operationId,
});
reasoningOperationId = reasoningOpId;
get().associateMessageWithOperation(messageId, reasoningOperationId);
}
const { partType, content: partContent, mimeType } = chunk;
if (partType === 'text') {
const lastPart = reasoningParts.at(-1);
// If last part is also text, merge chunks together
if (lastPart?.type === 'text') {
reasoningParts = [
...reasoningParts.slice(0, -1),
{ type: 'text', text: lastPart.text + partContent },
];
} else {
// Create new text part (first chunk, may contain thoughtSignature)
reasoningParts = [...reasoningParts, { type: 'text', text: partContent }];
}
thinkingContent += partContent;
} else if (partType === 'image') {
// Image part - create new array to avoid mutation
const tempImage = `data:${mimeType};base64,${partContent}`;
const partIndex = reasoningParts.length;
const newPart: MessageContentPart = { type: 'image', image: tempImage };
reasoningParts = [...reasoningParts, newPart];
// Start upload task and update array when done
const uploadTask = getFileStoreState()
.uploadBase64FileWithProgress(tempImage)
.then((file) => {
const url = file?.url || tempImage;
// Replace the part at index by creating a new array
const updatedParts = [...reasoningParts];
updatedParts[partIndex] = { type: 'image', image: url };
reasoningParts = updatedParts;
return url;
})
.catch((error) => {
console.error('[reasoning_part] Image upload failed:', error);
return tempImage;
});
reasoningImageUploads.set(partIndex, uploadTask);
}
// Real-time update with display format
// Check if there are any image parts to determine if it's multimodal
const hasReasoningImages = reasoningParts.some((part) => part.type === 'image');
internal_dispatchMessage(
{
id: messageId,
type: 'updateMessage',
value: {
reasoning: hasReasoningImages
? { tempDisplayContent: reasoningParts, isMultimodal: true }
: { content: thinkingContent },
},
},
{ operationId },
);
break;
}
case 'content_part': {
const { partType, content: partContent, mimeType } = chunk;
// End reasoning when content starts
if (!thinkingDuration && reasoningOperationId) {
thinkingDuration = Date.now() - thinkingStartAt;
get().completeOperation(reasoningOperationId);
reasoningOperationId = undefined;
}
if (partType === 'text') {
const lastPart = contentParts.at(-1);
// If last part is also text, merge chunks together
if (lastPart?.type === 'text') {
contentParts = [
...contentParts.slice(0, -1),
{ type: 'text', text: lastPart.text + partContent },
];
} else {
// Create new text part (first chunk, may contain thoughtSignature)
contentParts = [...contentParts, { type: 'text', text: partContent }];
}
output += partContent;
// Clean speaker tag that may be reproduced by model in group chat
output = cleanSpeakerTag(output);
} else if (partType === 'image') {
// Image part - create new array to avoid mutation
const tempImage = `data:${mimeType};base64,${partContent}`;
const partIndex = contentParts.length;
const newPart: MessageContentPart = {
type: 'image',
image: tempImage,
};
contentParts = [...contentParts, newPart];
// Start upload task and update array when done
const uploadTask = getFileStoreState()
.uploadBase64FileWithProgress(tempImage)
.then((file) => {
const url = file?.url || tempImage;
// Replace the part at index by creating a new array
const updatedParts = [...contentParts];
updatedParts[partIndex] = {
type: 'image',
image: url,
};
contentParts = updatedParts;
return url;
})
.catch((error) => {
console.error('[content_part] Image upload failed:', error);
return tempImage;
});
contentImageUploads.set(partIndex, uploadTask);
}
// Real-time update with display format
// Check if there are any image parts to determine if it's multimodal
const hasContentImages = contentParts.some((part) => part.type === 'image');
const hasReasoningImages = reasoningParts.some((part) => part.type === 'image');
internal_dispatchMessage(
{
id: messageId,
type: 'updateMessage',
value: {
content: output,
reasoning: hasReasoningImages
? {
tempDisplayContent: reasoningParts,
isMultimodal: true,
duration: thinkingDuration,
}
: !!thinkingContent
? { content: thinkingContent, duration: thinkingDuration }
: undefined,
...(hasContentImages && {
metadata: {
isMultimodal: true,
tempDisplayContent: serializePartsForStorage(contentParts),
},
}),
},
},
{ operationId },
);
break;
}
// is this message is just a tool call
case 'tool_calls': {
internal_toggleToolCallingStreaming(messageId, chunk.isAnimationActives);
throttledUpdateToolCalls(chunk.tool_calls);
isFunctionCall = true;
// Complete reasoning operation if it exists
if (!thinkingDuration && reasoningOperationId) {
thinkingDuration = Date.now() - thinkingStartAt;
get().completeOperation(reasoningOperationId);
reasoningOperationId = undefined;
}
break;
}
case 'stop': {
// Complete reasoning operation when receiving stop signal
if (!thinkingDuration && reasoningOperationId) {
thinkingDuration = Date.now() - thinkingStartAt;
get().completeOperation(reasoningOperationId);
reasoningOperationId = undefined;
}
break;
}
}
// Delegate chunk handling to StreamingHandler
handler.handleChunk(chunk as StreamChunk);
},
});
log(
'[internal_fetchAIChatMessage] completed: messageId=%s, finishType=%s, isFunctionCall=%s, operationId=%s',
messageId,
finishType,
isFunctionCall,
handler.getFinishType(),
handler.getIsFunctionCall(),
operationId,
);
return {
isFunctionCall,
traceId: msgTraceId,
content: output,
tools,
isFunctionCall: handler.getIsFunctionCall(),
traceId: handler.getTraceId(),
content: handler.getOutput(),
tools: handler.getTools(),
usage: finalUsage,
tool_calls,
finishType,
tool_calls: finalToolCalls,
finishType: handler.getFinishType(),
};
},

View File

@@ -0,0 +1,119 @@
import type {
ChatImageItem,
ChatToolPayload,
GroundingSearch,
MessageContentPart,
MessageToolCall,
ModelPerformance,
ModelUsage,
} from '@lobechat/types';
/**
* Streaming context - immutable configuration
*/
export interface StreamingContext {
agentId: string;
groupId?: string;
messageId: string;
operationId?: string;
topicId?: string | null;
}
/**
* Reasoning state
*/
export interface ReasoningState {
content?: string;
duration?: number;
isMultimodal?: boolean;
signature?: string;
tempDisplayContent?: MessageContentPart[];
}
/**
* Grounding/search data - extends GroundingSearch for compatibility
*/
export type GroundingData = GroundingSearch;
/**
* Streaming callbacks - for notifying external state changes
*/
export interface StreamingCallbacks {
/** Content update */
onContentUpdate: (content: string, reasoning?: ReasoningState) => void;
/** Search grounding update */
onGroundingUpdate: (grounding: GroundingData) => void;
/** Image list update */
onImagesUpdate: (images: ChatImageItem[]) => void;
/** Complete reasoning operation */
onReasoningComplete: (operationId: string) => void;
/** Start reasoning operation */
onReasoningStart: () => string | undefined;
/** Reasoning state update */
onReasoningUpdate: (reasoning: ReasoningState) => void;
/** Tool calls update */
onToolCallsUpdate: (tools: ChatToolPayload[]) => void;
/** Toggle tool calling streaming animation */
toggleToolCallingStreaming: (messageId: string, isAnimationActives?: boolean[]) => void;
/** Transform tool calls */
transformToolCalls: (toolCalls: MessageToolCall[]) => ChatToolPayload[];
/** Upload base64 image */
uploadBase64Image: (base64Data: string) => Promise<{ id?: string; url?: string }>;
}
/**
* Finish callback data
*/
export interface FinishData {
grounding?: GroundingData;
observationId?: string | null;
reasoning?: { content?: string; signature?: string };
speed?: ModelPerformance;
toolCalls?: MessageToolCall[];
traceId?: string | null;
type?: string;
usage?: ModelUsage;
}
/**
* Final streaming result
*/
export interface StreamingResult {
content: string;
finishType?: string;
isFunctionCall: boolean;
metadata: {
finishType?: string;
imageList?: ChatImageItem[];
isMultimodal?: boolean;
performance?: ModelPerformance;
reasoning?: ReasoningState;
search?: GroundingData;
usage?: ModelUsage;
};
toolCalls?: MessageToolCall[];
tools?: ChatToolPayload[];
traceId?: string;
usage?: ModelUsage;
}
/**
* Stream chunk types
*/
export type StreamChunk =
| { text: string, type: 'text'; }
| { text: string, type: 'reasoning'; }
| { content: string; mimeType?: string, partType: 'text' | 'image'; type: 'reasoning_part'; }
| { content: string; mimeType?: string, partType: 'text' | 'image'; type: 'content_part'; }
| {
isAnimationActives?: boolean[];
tool_calls: MessageToolCall[];
type: 'tool_calls';
}
| { grounding?: GroundingData, type: 'grounding'; }
| {
image: { data: string, id: string; };
images: { data: string, id: string; }[];
type: 'base64_image';
}
| { type: 'stop' };