From 04f963d1da8d45b0e5d00900d9e3ed72821d205f Mon Sep 17 00:00:00 2001 From: Arvin Xu Date: Fri, 20 Mar 2026 01:01:35 +0800 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor:=20use=20incremen?= =?UTF-8?q?tal=20diff=20for=20snapshot=20messages=20to=20prevent=20OOM=20(?= =?UTF-8?q?#13136)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ♻️ refactor: use incremental diff for snapshot messages to prevent OOM Replace full messages/messagesAfter duplication per step with baseline + delta approach: - Step 0 and compression resets store full messagesBaseline - Other steps store only messagesDelta (new messages added) - Strip llm_stream events from snapshot (not useful for post-analysis) - Strip messages from done.finalState (reconstructible from delta chain) - Strip duplicate toolResults from context.payload - Reduce context_engine_result event size by removing messages and toolsConfig - Add reconstructMessages() utility for rebuilding full state from delta chain - AiAgentService constructor now accepts runtimeOptions for DI Co-Authored-By: Claude Opus 4.6 (1M context) * ♻️ refactor: add incremental toolset delta for snapshot - Store operationToolSet as toolsetBaseline in step 0 only (immutable) - Track activatedStepTools changes via per-step activatedStepToolsDelta - Strip operationToolSet/toolManifestMap/tools/toolSourceMap from done.finalState - Add reconstructToolsetBaseline() and reconstructActivatedStepTools() utilities Co-Authored-By: Claude Opus 4.6 (1M context) * 🐛 fix: correct snapshot delta recording and restore context-engine output - P1: messagesDelta now always stores only appended messages (afterMessages.slice), fixing duplication when isBaseline was true (step 0 / compression reset) - P2: Restore context_engine_result.output (processedMessages) — needed by inspect CLI for --env, --system-role, and -m commands - Add P1 regression test for message deduplication Co-Authored-By: Claude Opus 4.6 (1M context) --------- Co-authored-by: Claude Opus 4.6 (1M context) --- packages/agent-tracing/src/cli/inspect.ts | 3 +- packages/agent-tracing/src/index.ts | 7 + packages/agent-tracing/src/types.ts | 43 ++- .../src/utils/reconstruct.test.ts | 319 ++++++++++++++++++ .../agent-tracing/src/utils/reconstruct.ts | 86 +++++ packages/agent-tracing/src/viewer/index.ts | 47 ++- .../modules/AgentRuntime/RuntimeExecutors.ts | 16 +- .../agentRuntime/AgentRuntimeService.ts | 67 +++- src/server/services/aiAgent/index.ts | 9 +- 9 files changed, 578 insertions(+), 19 deletions(-) create mode 100644 packages/agent-tracing/src/utils/reconstruct.test.ts create mode 100644 packages/agent-tracing/src/utils/reconstruct.ts diff --git a/packages/agent-tracing/src/cli/inspect.ts b/packages/agent-tracing/src/cli/inspect.ts index be733b2f0c..afab7e126e 100644 --- a/packages/agent-tracing/src/cli/inspect.ts +++ b/packages/agent-tracing/src/cli/inspect.ts @@ -237,12 +237,13 @@ export function registerInspectCommand(program: Command) { const step = findStep(snapshot, stepIndex); if (msgIndex !== undefined) { - console.log(renderMessageDetail(step, msgIndex, msgSource)); + console.log(renderMessageDetail(step, msgIndex, msgSource, snapshot.steps)); return; } console.log( renderStepDetail(step, { + allSteps: snapshot.steps, context: opts.context, events: opts.events, messages: opts.messages, diff --git a/packages/agent-tracing/src/index.ts b/packages/agent-tracing/src/index.ts index 7d57482aad..5ae4677ca9 100644 --- a/packages/agent-tracing/src/index.ts +++ b/packages/agent-tracing/src/index.ts @@ -2,6 +2,13 @@ export { appendStepToPartial, finalizeSnapshot } from './recorder'; export { FileSnapshotStore } from './store/file-store'; export type { ISnapshotStore } from './store/types'; export type { ExecutionSnapshot, SnapshotSummary, StepSnapshot } from './types'; +export { + expandSnapshot, + isIncrementalFormat, + reconstructActivatedStepTools, + reconstructMessages, + reconstructToolsetBaseline, +} from './utils/reconstruct'; export { renderMessageDetail, renderSnapshot, diff --git a/packages/agent-tracing/src/types.ts b/packages/agent-tracing/src/types.ts index fc8edff5b2..709790d4c7 100644 --- a/packages/agent-tracing/src/types.ts +++ b/packages/agent-tracing/src/types.ts @@ -23,6 +23,11 @@ export interface ExecutionSnapshot { } export interface StepSnapshot { + /** + * Tools newly activated during this step via tool discovery. + * Append-only delta — accumulate across steps to reconstruct full `activatedStepTools`. + */ + activatedStepToolsDelta?: any[]; completedAt: number; // LLM data content?: string; @@ -32,12 +37,42 @@ export interface StepSnapshot { stepContext?: unknown; }; events?: Array<{ type: string; [key: string]: unknown }>; + executionTimeMs: number; inputTokens?: number; - // Detailed data (for inspect --step N) + + /** + * Whether this step triggered context compression. + * When true, `messagesBaseline` contains the compressed messages as a new baseline. + */ + isCompressionReset?: boolean; + /** + * @deprecated Use `messagesBaseline` + `messagesDelta` for incremental format. + * Kept for backward compatibility with old snapshots. + */ messages?: any[]; + + /** + * @deprecated Use `messagesBaseline` + `messagesDelta` for incremental format. + * Kept for backward compatibility with old snapshots. + */ messagesAfter?: any[]; + + /** + * Full messages baseline snapshot. Only present when: + * 1. `stepIndex === 0` (initial baseline) + * 2. Context compression occurred (`isCompressionReset === true`) + */ + messagesBaseline?: any[]; + + /** + * Incremental messages added by this step relative to the previous step's state. + * For `call_llm`: typically `[assistant message]` + * For `call_tool`: typically `[tool_result message(s)]` + */ + messagesDelta?: any[]; + outputTokens?: number; reasoning?: string; @@ -52,6 +87,11 @@ export interface StepSnapshot { identifier: string; arguments?: string; }>; + /** + * Operation-level tool set baseline. Only present at `stepIndex === 0`. + * Immutable after operation creation — stored once to avoid per-step duplication. + */ + toolsetBaseline?: any; toolsResult?: Array<{ apiName: string; identifier: string; @@ -59,6 +99,7 @@ export interface StepSnapshot { output?: string; }>; totalCost: number; + // Cumulative totalTokens: number; } diff --git a/packages/agent-tracing/src/utils/reconstruct.test.ts b/packages/agent-tracing/src/utils/reconstruct.test.ts new file mode 100644 index 0000000000..42e1e128e3 --- /dev/null +++ b/packages/agent-tracing/src/utils/reconstruct.test.ts @@ -0,0 +1,319 @@ +import { describe, expect, it } from 'vitest'; + +import type { ExecutionSnapshot, StepSnapshot } from '../types'; +import { + expandSnapshot, + isIncrementalFormat, + reconstructActivatedStepTools, + reconstructMessages, + reconstructToolsetBaseline, +} from './reconstruct'; + +function makeStep(overrides: Partial): StepSnapshot { + return { + completedAt: 0, + executionTimeMs: 0, + startedAt: 0, + stepIndex: 0, + stepType: 'call_llm', + totalCost: 0, + totalTokens: 0, + ...overrides, + }; +} + +describe('reconstructMessages', () => { + it('should reconstruct messages for step 0 (baseline)', () => { + const steps: StepSnapshot[] = [ + makeStep({ + messagesBaseline: [{ content: 'hello', role: 'user' }], + messagesDelta: [{ content: 'hi', role: 'assistant' }], + stepIndex: 0, + }), + ]; + + const result = reconstructMessages(steps, 0); + + expect(result.messages).toEqual([{ content: 'hello', role: 'user' }]); + expect(result.messagesAfter).toEqual([ + { content: 'hello', role: 'user' }, + { content: 'hi', role: 'assistant' }, + ]); + }); + + it('should accumulate deltas across multiple steps', () => { + const steps: StepSnapshot[] = [ + makeStep({ + messagesBaseline: [{ content: 'hello', role: 'user' }], + messagesDelta: [{ content: 'hi', role: 'assistant' }], + stepIndex: 0, + }), + makeStep({ + messagesDelta: [{ content: 'search result', role: 'tool' }], + stepIndex: 1, + stepType: 'call_tool', + }), + makeStep({ + messagesDelta: [{ content: 'based on results...', role: 'assistant' }], + stepIndex: 2, + }), + ]; + + const step1 = reconstructMessages(steps, 1); + expect(step1.messages).toEqual([ + { content: 'hello', role: 'user' }, + { content: 'hi', role: 'assistant' }, + ]); + expect(step1.messagesAfter).toEqual([ + { content: 'hello', role: 'user' }, + { content: 'hi', role: 'assistant' }, + { content: 'search result', role: 'tool' }, + ]); + + const step2 = reconstructMessages(steps, 2); + expect(step2.messages).toEqual([ + { content: 'hello', role: 'user' }, + { content: 'hi', role: 'assistant' }, + { content: 'search result', role: 'tool' }, + ]); + expect(step2.messagesAfter).toEqual([ + { content: 'hello', role: 'user' }, + { content: 'hi', role: 'assistant' }, + { content: 'search result', role: 'tool' }, + { content: 'based on results...', role: 'assistant' }, + ]); + }); + + it('should handle compression reset with new baseline', () => { + const steps: StepSnapshot[] = [ + makeStep({ + messagesBaseline: [{ content: 'hello', role: 'user' }], + messagesDelta: [{ content: 'response 1', role: 'assistant' }], + stepIndex: 0, + }), + makeStep({ + messagesDelta: [{ content: 'tool output', role: 'tool' }], + stepIndex: 1, + stepType: 'call_tool', + }), + // Compression resets baseline + makeStep({ + isCompressionReset: true, + messagesBaseline: [{ content: 'compressed summary', role: 'system' }], + messagesDelta: [{ content: 'post-compression response', role: 'assistant' }], + stepIndex: 2, + }), + ]; + + const result = reconstructMessages(steps, 2); + // After compression, baseline replaces everything + expect(result.messages).toEqual([{ content: 'compressed summary', role: 'system' }]); + expect(result.messagesAfter).toEqual([ + { content: 'compressed summary', role: 'system' }, + { content: 'post-compression response', role: 'assistant' }, + ]); + }); + + it('should handle steps after compression baseline', () => { + const steps: StepSnapshot[] = [ + makeStep({ + messagesBaseline: [{ content: 'hello', role: 'user' }], + messagesDelta: [{ content: 'r1', role: 'assistant' }], + stepIndex: 0, + }), + makeStep({ + isCompressionReset: true, + messagesBaseline: [{ content: 'summary', role: 'system' }], + messagesDelta: [{ content: 'r2', role: 'assistant' }], + stepIndex: 1, + }), + makeStep({ + messagesDelta: [{ content: 'tool', role: 'tool' }], + stepIndex: 2, + stepType: 'call_tool', + }), + ]; + + const result = reconstructMessages(steps, 2); + expect(result.messages).toEqual([ + { content: 'summary', role: 'system' }, + { content: 'r2', role: 'assistant' }, + ]); + expect(result.messagesAfter).toEqual([ + { content: 'summary', role: 'system' }, + { content: 'r2', role: 'assistant' }, + { content: 'tool', role: 'tool' }, + ]); + }); + + it('should not duplicate baseline messages in delta (P1 regression)', () => { + // Simulates what AgentRuntimeService records: + // prevMessages = [user], afterMessages = [user, assistant] + // delta should be [assistant] only, NOT [user, assistant] + const steps: StepSnapshot[] = [ + makeStep({ + messagesBaseline: [{ content: 'hello', role: 'user' }], + messagesDelta: [{ content: 'hi', role: 'assistant' }], // correct: only new msg + stepIndex: 0, + }), + makeStep({ + messagesDelta: [{ content: 'tool result', role: 'tool' }], + stepIndex: 1, + stepType: 'call_tool', + }), + makeStep({ + messagesDelta: [{ content: 'final answer', role: 'assistant' }], + stepIndex: 2, + }), + ]; + + // Verify the chain reconstructs correctly without duplication + const step2 = reconstructMessages(steps, 2); + expect(step2.messagesAfter).toEqual([ + { content: 'hello', role: 'user' }, + { content: 'hi', role: 'assistant' }, + { content: 'tool result', role: 'tool' }, + { content: 'final answer', role: 'assistant' }, + ]); + // Each message appears exactly once + expect(step2.messagesAfter).toHaveLength(4); + }); + + it('should handle empty delta', () => { + const steps: StepSnapshot[] = [ + makeStep({ + messagesBaseline: [{ content: 'hello', role: 'user' }], + messagesDelta: [], + stepIndex: 0, + }), + ]; + + const result = reconstructMessages(steps, 0); + expect(result.messages).toEqual([{ content: 'hello', role: 'user' }]); + expect(result.messagesAfter).toEqual([{ content: 'hello', role: 'user' }]); + }); +}); + +describe('isIncrementalFormat', () => { + it('should return true for incremental snapshots', () => { + const snapshot = { + steps: [makeStep({ messagesDelta: [], stepIndex: 0 })], + } as ExecutionSnapshot; + + expect(isIncrementalFormat(snapshot)).toBe(true); + }); + + it('should return false for legacy snapshots', () => { + const snapshot = { + steps: [makeStep({ messages: [], stepIndex: 0 })], + } as ExecutionSnapshot; + + expect(isIncrementalFormat(snapshot)).toBe(false); + }); +}); + +describe('expandSnapshot', () => { + it('should expand incremental snapshot to legacy format', () => { + const snapshot = { + steps: [ + makeStep({ + messagesBaseline: [{ content: 'hello', role: 'user' }], + messagesDelta: [{ content: 'hi', role: 'assistant' }], + stepIndex: 0, + }), + makeStep({ + messagesDelta: [{ content: 'tool result', role: 'tool' }], + stepIndex: 1, + stepType: 'call_tool', + }), + ], + } as ExecutionSnapshot; + + const expanded = expandSnapshot(snapshot); + + expect(expanded.steps[0].messages).toEqual([{ content: 'hello', role: 'user' }]); + expect(expanded.steps[0].messagesAfter).toEqual([ + { content: 'hello', role: 'user' }, + { content: 'hi', role: 'assistant' }, + ]); + expect(expanded.steps[1].messages).toEqual([ + { content: 'hello', role: 'user' }, + { content: 'hi', role: 'assistant' }, + ]); + expect(expanded.steps[1].messagesAfter).toEqual([ + { content: 'hello', role: 'user' }, + { content: 'hi', role: 'assistant' }, + { content: 'tool result', role: 'tool' }, + ]); + }); + + it('should return legacy snapshot as-is', () => { + const snapshot = { + steps: [ + makeStep({ + messages: [{ content: 'hello', role: 'user' }], + messagesAfter: [ + { content: 'hello', role: 'user' }, + { content: 'hi', role: 'assistant' }, + ], + stepIndex: 0, + }), + ], + } as ExecutionSnapshot; + + const result = expandSnapshot(snapshot); + expect(result).toBe(snapshot); // Same reference — no expansion needed + }); +}); + +describe('reconstructToolsetBaseline', () => { + it('should return toolset from step 0', () => { + const toolset = { enabledToolIds: ['search'], manifestMap: { search: {} } }; + const steps: StepSnapshot[] = [ + makeStep({ stepIndex: 0, toolsetBaseline: toolset }), + makeStep({ stepIndex: 1, stepType: 'call_tool' }), + ]; + + expect(reconstructToolsetBaseline(steps)).toEqual(toolset); + }); + + it('should return undefined when no toolset baseline exists', () => { + const steps: StepSnapshot[] = [makeStep({ stepIndex: 0 })]; + + expect(reconstructToolsetBaseline(steps)).toBeUndefined(); + }); +}); + +describe('reconstructActivatedStepTools', () => { + it('should accumulate deltas across steps', () => { + const steps: StepSnapshot[] = [ + makeStep({ stepIndex: 0 }), + makeStep({ + activatedStepToolsDelta: [{ activatedAtStep: 1, id: 'local-system', source: 'device' }], + stepIndex: 1, + stepType: 'call_tool', + }), + makeStep({ stepIndex: 2 }), + makeStep({ + activatedStepToolsDelta: [{ activatedAtStep: 3, id: 'mcp-tool', source: 'discovery' }], + stepIndex: 3, + stepType: 'call_tool', + }), + ]; + + expect(reconstructActivatedStepTools(steps, 1)).toEqual([ + { activatedAtStep: 1, id: 'local-system', source: 'device' }, + ]); + + expect(reconstructActivatedStepTools(steps, 3)).toEqual([ + { activatedAtStep: 1, id: 'local-system', source: 'device' }, + { activatedAtStep: 3, id: 'mcp-tool', source: 'discovery' }, + ]); + }); + + it('should return empty array when no tools activated', () => { + const steps: StepSnapshot[] = [makeStep({ stepIndex: 0 }), makeStep({ stepIndex: 1 })]; + + expect(reconstructActivatedStepTools(steps, 1)).toEqual([]); + }); +}); diff --git a/packages/agent-tracing/src/utils/reconstruct.ts b/packages/agent-tracing/src/utils/reconstruct.ts new file mode 100644 index 0000000000..e9569d6d82 --- /dev/null +++ b/packages/agent-tracing/src/utils/reconstruct.ts @@ -0,0 +1,86 @@ +import type { ExecutionSnapshot, StepSnapshot } from '../types'; + +/** + * Check whether a snapshot uses the incremental (delta) format. + */ +export function isIncrementalFormat(snapshot: ExecutionSnapshot): boolean { + return snapshot.steps.some((s) => s.messagesDelta !== undefined); +} + +/** + * Reconstruct full `messages` (before step) and `messagesAfter` (after step) + * from incremental baseline + delta chain. + */ +export function reconstructMessages( + steps: StepSnapshot[], + targetStepIndex: number, +): { messages: any[]; messagesAfter: any[] } { + let current: any[] = []; + + for (const step of steps) { + if (step.stepIndex > targetStepIndex) break; + + // Reset to baseline when present (step 0 or compression) + if (step.messagesBaseline) { + current = [...step.messagesBaseline]; + } + + const beforeStep = [...current]; + + // Apply delta + if (step.messagesDelta) { + current = [...current, ...step.messagesDelta]; + } + + if (step.stepIndex === targetStepIndex) { + return { messages: beforeStep, messagesAfter: current }; + } + } + + return { messages: current, messagesAfter: current }; +} + +/** + * Reconstruct the operation-level toolset baseline from snapshot steps. + * Returns the `operationToolSet` stored at step 0. + */ +export function reconstructToolsetBaseline(steps: StepSnapshot[]): any | undefined { + return steps.find((s) => s.toolsetBaseline)?.toolsetBaseline; +} + +/** + * Reconstruct cumulative `activatedStepTools` up to a given step + * from per-step deltas. + */ +export function reconstructActivatedStepTools( + steps: StepSnapshot[], + targetStepIndex: number, +): any[] { + const accumulated: any[] = []; + + for (const step of steps) { + if (step.stepIndex > targetStepIndex) break; + + if (step.activatedStepToolsDelta) { + accumulated.push(...step.activatedStepToolsDelta); + } + } + + return accumulated; +} + +/** + * Expand an incremental snapshot into legacy full-messages format. + * Useful for backward-compatible tooling. + */ +export function expandSnapshot(snapshot: ExecutionSnapshot): ExecutionSnapshot { + if (!isIncrementalFormat(snapshot)) return snapshot; + + return { + ...snapshot, + steps: snapshot.steps.map((step) => { + const { messages, messagesAfter } = reconstructMessages(snapshot.steps, step.stepIndex); + return { ...step, messages, messagesAfter }; + }), + }; +} diff --git a/packages/agent-tracing/src/viewer/index.ts b/packages/agent-tracing/src/viewer/index.ts index e55dfff53e..bf68d31761 100644 --- a/packages/agent-tracing/src/viewer/index.ts +++ b/packages/agent-tracing/src/viewer/index.ts @@ -1,6 +1,25 @@ import { encode } from 'gpt-tokenizer'; import type { ExecutionSnapshot, SnapshotSummary, StepSnapshot } from '../types'; +import { reconstructMessages } from '../utils/reconstruct'; + +/** + * Resolve messages for a step, supporting both legacy (full) and incremental (delta) formats. + */ +function resolveStepMessages( + step: StepSnapshot, + allSteps?: StepSnapshot[], +): { messages: any[] | undefined; messagesAfter: any[] | undefined } { + // Legacy format: messages stored directly on step + if (step.messages) { + return { messages: step.messages, messagesAfter: step.messagesAfter }; + } + // Incremental format: reconstruct from baseline + deltas + if (step.messagesDelta !== undefined && allSteps) { + return reconstructMessages(allSteps, step.stepIndex); + } + return { messages: undefined, messagesAfter: undefined }; +} // ANSI color helpers const dim = (s: string) => `\x1B[2m${s}\x1B[22m`; @@ -328,16 +347,19 @@ export function renderMessageDetail( step: StepSnapshot, msgIndex: number, source: 'input' | 'output' = 'output', + allSteps?: StepSnapshot[], ): string { const ceEvent = step.events?.find((e) => e.type === 'context_engine_result') as any; let messages: any[] | undefined; let label: string; + const { messages: resolvedMessages } = resolveStepMessages(step, allSteps); + if (source === 'input') { - messages = ceEvent?.input?.messages ?? step.messages; + messages = ceEvent?.input?.messages ?? resolvedMessages; label = ceEvent ? 'Context Engine Input' : 'Messages (before step)'; } else { - messages = ceEvent?.output ?? step.messages; + messages = ceEvent?.output ?? resolvedMessages; label = ceEvent ? 'Final LLM Payload' : 'Messages (before step)'; } @@ -760,7 +782,13 @@ export function renderDiff( export function renderStepDetail( step: StepSnapshot, - options?: { context?: boolean; events?: boolean; messages?: boolean; tools?: boolean }, + options?: { + allSteps?: StepSnapshot[]; + context?: boolean; + events?: boolean; + messages?: boolean; + tools?: boolean; + }, ): string { const lines: string[] = [ bold(`Step ${step.stepIndex}`) + ` [${step.stepType}] ${formatMs(step.executionTimeMs)}`, @@ -866,12 +894,15 @@ export function renderStepDetail( lines.push(dim('─'.repeat(60))); renderMessageList(lines, outputMsgs, 300); } - } else if (step.messages) { + } else { // Fallback: show raw DB messages if no context engine event - lines.push(''); - lines.push(bold(`Messages (before step): ${step.messages.length} messages`)); - lines.push(dim('─'.repeat(60))); - renderMessageList(lines, step.messages, 200); + const { messages: resolvedMsgs } = resolveStepMessages(step, options?.allSteps); + if (resolvedMsgs && resolvedMsgs.length > 0) { + lines.push(''); + lines.push(bold(`Messages (before step): ${resolvedMsgs.length} messages`)); + lines.push(dim('─'.repeat(60))); + renderMessageList(lines, resolvedMsgs, 200); + } } } diff --git a/src/server/modules/AgentRuntime/RuntimeExecutors.ts b/src/server/modules/AgentRuntime/RuntimeExecutors.ts index b8b12f0905..66a4f26e31 100644 --- a/src/server/modules/AgentRuntime/RuntimeExecutors.ts +++ b/src/server/modules/AgentRuntime/RuntimeExecutors.ts @@ -271,9 +271,21 @@ export const createRuntimeExecutors = ( processedMessages = await serverMessagesEngine(contextEngineInput); - // Emit context engine event for tracing (captures input params and final LLM messages) + // Emit context engine event for tracing + // Omit large/redundant fields to reduce snapshot size: + // - input.messages: reconstructible from step's messagesBaseline + messagesDelta + // - input.toolsConfig: static per operation, ~47KB of manifests repeated every call_llm step + // Keep output (processedMessages) — needed by inspect CLI for --env, --system-role, -m + const { + messages: _inputMsgs, + toolsConfig: _toolsConfig, + ...contextEngineInputLite + } = contextEngineInput; events.push({ - input: contextEngineInput, + input: { + ...contextEngineInputLite, + toolCount: _toolsConfig?.tools?.length ?? 0, + }, output: processedMessages, type: 'context_engine_result', } as any); diff --git a/src/server/services/agentRuntime/AgentRuntimeService.ts b/src/server/services/agentRuntime/AgentRuntimeService.ts index f17cc23f89..f8d7d67d2c 100644 --- a/src/server/services/agentRuntime/AgentRuntimeService.ts +++ b/src/server/services/agentRuntime/AgentRuntimeService.ts @@ -182,7 +182,7 @@ export class AgentRuntimeService { if (impl instanceof LocalQueueServiceImpl) { log('Setting up local execution callback'); impl.setExecutionCallback(async (operationId, stepIndex, context) => { - log('[%s][%d] Local callback executing...', operationId, stepIndex); + log('[%s][%d] Local step executing...', operationId, stepIndex); await this.executeStep({ context, operationId, @@ -767,24 +767,81 @@ export class AgentRuntimeService { } if (!partial.steps) partial.steps = []; + + // Incremental diff: only store message delta + baseline at reset points + const prevMessages = agentState?.messages ?? []; + const afterMessages = stepResult.newState.messages; + const isCompression = stepResult.events?.some( + (e: any) => e.type === 'compression_complete', + ); + const isBaseline = stepIndex === 0 || isCompression; + // Always store only the newly added messages, even for baseline steps + const messagesDelta = afterMessages.slice(prevMessages.length); + + // Strip heavy/redundant data from events before persisting to snapshot + const snapshotEvents = (stepResult.events as any[]) + ?.filter((e) => e.type !== 'llm_stream') + .map((e) => { + if (e.type === 'done' && e.finalState) { + // Remove reconstructible fields from finalState: + // - messages: from messagesBaseline + messagesDelta chain + // - operationToolSet: from toolsetBaseline (step 0) + // - toolManifestMap/tools/toolSourceMap: backward-compat copies of operationToolSet + const { + messages: _msgs, + operationToolSet: _ots, + toolManifestMap: _tmm, + toolSourceMap: _tsm, + tools: _tools, + // activatedStepTools is kept since it's the cumulative record + ...restState + } = e.finalState; + return { ...e, finalState: restState }; + } + return e; + }); + + // Strip toolResults from payload (already in step.toolsResult) + let snapshotPayload: unknown = currentContext?.payload; + if ( + snapshotPayload && + typeof snapshotPayload === 'object' && + 'toolResults' in snapshotPayload + ) { + const { toolResults: _tr, ...restPayload } = snapshotPayload as Record; + snapshotPayload = restPayload; + } + + // Compute activatedStepTools delta (newly discovered tools in this step) + const prevActivated = agentState?.activatedStepTools ?? []; + const afterActivated = stepResult.newState.activatedStepTools ?? []; + const activatedStepToolsDelta = + afterActivated.length > prevActivated.length + ? afterActivated.slice(prevActivated.length) + : undefined; + partial.steps.push({ + activatedStepToolsDelta, completedAt: Date.now(), content: stepPresentationData.content, context: { - payload: currentContext?.payload, + payload: snapshotPayload, phase: currentContext?.phase ?? 'unknown', stepContext: currentContext?.stepContext, }, - events: stepResult.events as any, + events: snapshotEvents, executionTimeMs: stepPresentationData.executionTimeMs, inputTokens: stepPresentationData.stepInputTokens, - messages: agentState?.messages, - messagesAfter: stepResult.newState.messages, + isCompressionReset: isCompression || undefined, + messagesBaseline: isBaseline ? prevMessages : undefined, + messagesDelta, outputTokens: stepPresentationData.stepOutputTokens, reasoning: stepPresentationData.reasoning, startedAt: startAt, stepIndex, stepType: stepPresentationData.stepType, + // Store operation-level toolset once at step 0 + toolsetBaseline: stepIndex === 0 ? agentState?.operationToolSet : undefined, toolsCalling: stepPresentationData.toolsCalling, toolsResult: stepPresentationData.toolsResult, totalCost: stepPresentationData.totalCost, diff --git a/src/server/services/aiAgent/index.ts b/src/server/services/aiAgent/index.ts index 8d24e476fa..eb23dc045b 100644 --- a/src/server/services/aiAgent/index.ts +++ b/src/server/services/aiAgent/index.ts @@ -41,6 +41,7 @@ import { } from '@/server/modules/Mecha'; import { type ServerUserMemoryConfig } from '@/server/modules/Mecha/ContextEngineering/types'; import { AgentService } from '@/server/services/agent'; +import type { AgentRuntimeServiceOptions } from '@/server/services/agentRuntime'; import { AgentRuntimeService } from '@/server/services/agentRuntime'; import { type StepLifecycleCallbacks } from '@/server/services/agentRuntime/types'; import { FileService } from '@/server/services/file'; @@ -161,7 +162,11 @@ export class AiAgentService { private readonly marketService: MarketService; private readonly klavisService: KlavisService; - constructor(db: LobeChatDatabase, userId: string) { + constructor( + db: LobeChatDatabase, + userId: string, + options?: { runtimeOptions?: AgentRuntimeServiceOptions }, + ) { this.userId = userId; this.db = db; this.agentModel = new AgentModel(db, userId); @@ -170,7 +175,7 @@ export class AiAgentService { this.pluginModel = new PluginModel(db, userId); this.threadModel = new ThreadModel(db, userId); this.topicModel = new TopicModel(db, userId); - this.agentRuntimeService = new AgentRuntimeService(db, userId); + this.agentRuntimeService = new AgentRuntimeService(db, userId, options?.runtimeOptions); this.marketService = new MarketService({ userInfo: { userId } }); this.klavisService = new KlavisService({ db, userId }); }