mirror of
https://github.com/lobehub/lobehub.git
synced 2026-03-26 13:19:34 +07:00
♻️ refactor: use incremental diff for snapshot messages to prevent OOM (#13136)
* ♻️ refactor: use incremental diff for snapshot messages to prevent OOM Replace full messages/messagesAfter duplication per step with baseline + delta approach: - Step 0 and compression resets store full messagesBaseline - Other steps store only messagesDelta (new messages added) - Strip llm_stream events from snapshot (not useful for post-analysis) - Strip messages from done.finalState (reconstructible from delta chain) - Strip duplicate toolResults from context.payload - Reduce context_engine_result event size by removing messages and toolsConfig - Add reconstructMessages() utility for rebuilding full state from delta chain - AiAgentService constructor now accepts runtimeOptions for DI Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * ♻️ refactor: add incremental toolset delta for snapshot - Store operationToolSet as toolsetBaseline in step 0 only (immutable) - Track activatedStepTools changes via per-step activatedStepToolsDelta - Strip operationToolSet/toolManifestMap/tools/toolSourceMap from done.finalState - Add reconstructToolsetBaseline() and reconstructActivatedStepTools() utilities Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 🐛 fix: correct snapshot delta recording and restore context-engine output - P1: messagesDelta now always stores only appended messages (afterMessages.slice), fixing duplication when isBaseline was true (step 0 / compression reset) - P2: Restore context_engine_result.output (processedMessages) — needed by inspect CLI for --env, --system-role, and -m commands - Add P1 regression test for message deduplication Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -237,12 +237,13 @@ export function registerInspectCommand(program: Command) {
|
||||
const step = findStep(snapshot, stepIndex);
|
||||
|
||||
if (msgIndex !== undefined) {
|
||||
console.log(renderMessageDetail(step, msgIndex, msgSource));
|
||||
console.log(renderMessageDetail(step, msgIndex, msgSource, snapshot.steps));
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(
|
||||
renderStepDetail(step, {
|
||||
allSteps: snapshot.steps,
|
||||
context: opts.context,
|
||||
events: opts.events,
|
||||
messages: opts.messages,
|
||||
|
||||
@@ -2,6 +2,13 @@ export { appendStepToPartial, finalizeSnapshot } from './recorder';
|
||||
export { FileSnapshotStore } from './store/file-store';
|
||||
export type { ISnapshotStore } from './store/types';
|
||||
export type { ExecutionSnapshot, SnapshotSummary, StepSnapshot } from './types';
|
||||
export {
|
||||
expandSnapshot,
|
||||
isIncrementalFormat,
|
||||
reconstructActivatedStepTools,
|
||||
reconstructMessages,
|
||||
reconstructToolsetBaseline,
|
||||
} from './utils/reconstruct';
|
||||
export {
|
||||
renderMessageDetail,
|
||||
renderSnapshot,
|
||||
|
||||
@@ -23,6 +23,11 @@ export interface ExecutionSnapshot {
|
||||
}
|
||||
|
||||
export interface StepSnapshot {
|
||||
/**
|
||||
* Tools newly activated during this step via tool discovery.
|
||||
* Append-only delta — accumulate across steps to reconstruct full `activatedStepTools`.
|
||||
*/
|
||||
activatedStepToolsDelta?: any[];
|
||||
completedAt: number;
|
||||
// LLM data
|
||||
content?: string;
|
||||
@@ -32,12 +37,42 @@ export interface StepSnapshot {
|
||||
stepContext?: unknown;
|
||||
};
|
||||
events?: Array<{ type: string; [key: string]: unknown }>;
|
||||
|
||||
executionTimeMs: number;
|
||||
|
||||
inputTokens?: number;
|
||||
// Detailed data (for inspect --step N)
|
||||
|
||||
/**
|
||||
* Whether this step triggered context compression.
|
||||
* When true, `messagesBaseline` contains the compressed messages as a new baseline.
|
||||
*/
|
||||
isCompressionReset?: boolean;
|
||||
/**
|
||||
* @deprecated Use `messagesBaseline` + `messagesDelta` for incremental format.
|
||||
* Kept for backward compatibility with old snapshots.
|
||||
*/
|
||||
messages?: any[];
|
||||
|
||||
/**
|
||||
* @deprecated Use `messagesBaseline` + `messagesDelta` for incremental format.
|
||||
* Kept for backward compatibility with old snapshots.
|
||||
*/
|
||||
messagesAfter?: any[];
|
||||
|
||||
/**
|
||||
* Full messages baseline snapshot. Only present when:
|
||||
* 1. `stepIndex === 0` (initial baseline)
|
||||
* 2. Context compression occurred (`isCompressionReset === true`)
|
||||
*/
|
||||
messagesBaseline?: any[];
|
||||
|
||||
/**
|
||||
* Incremental messages added by this step relative to the previous step's state.
|
||||
* For `call_llm`: typically `[assistant message]`
|
||||
* For `call_tool`: typically `[tool_result message(s)]`
|
||||
*/
|
||||
messagesDelta?: any[];
|
||||
|
||||
outputTokens?: number;
|
||||
|
||||
reasoning?: string;
|
||||
@@ -52,6 +87,11 @@ export interface StepSnapshot {
|
||||
identifier: string;
|
||||
arguments?: string;
|
||||
}>;
|
||||
/**
|
||||
* Operation-level tool set baseline. Only present at `stepIndex === 0`.
|
||||
* Immutable after operation creation — stored once to avoid per-step duplication.
|
||||
*/
|
||||
toolsetBaseline?: any;
|
||||
toolsResult?: Array<{
|
||||
apiName: string;
|
||||
identifier: string;
|
||||
@@ -59,6 +99,7 @@ export interface StepSnapshot {
|
||||
output?: string;
|
||||
}>;
|
||||
totalCost: number;
|
||||
|
||||
// Cumulative
|
||||
totalTokens: number;
|
||||
}
|
||||
|
||||
319
packages/agent-tracing/src/utils/reconstruct.test.ts
Normal file
319
packages/agent-tracing/src/utils/reconstruct.test.ts
Normal file
@@ -0,0 +1,319 @@
|
||||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
import type { ExecutionSnapshot, StepSnapshot } from '../types';
|
||||
import {
|
||||
expandSnapshot,
|
||||
isIncrementalFormat,
|
||||
reconstructActivatedStepTools,
|
||||
reconstructMessages,
|
||||
reconstructToolsetBaseline,
|
||||
} from './reconstruct';
|
||||
|
||||
function makeStep(overrides: Partial<StepSnapshot>): StepSnapshot {
|
||||
return {
|
||||
completedAt: 0,
|
||||
executionTimeMs: 0,
|
||||
startedAt: 0,
|
||||
stepIndex: 0,
|
||||
stepType: 'call_llm',
|
||||
totalCost: 0,
|
||||
totalTokens: 0,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
describe('reconstructMessages', () => {
|
||||
it('should reconstruct messages for step 0 (baseline)', () => {
|
||||
const steps: StepSnapshot[] = [
|
||||
makeStep({
|
||||
messagesBaseline: [{ content: 'hello', role: 'user' }],
|
||||
messagesDelta: [{ content: 'hi', role: 'assistant' }],
|
||||
stepIndex: 0,
|
||||
}),
|
||||
];
|
||||
|
||||
const result = reconstructMessages(steps, 0);
|
||||
|
||||
expect(result.messages).toEqual([{ content: 'hello', role: 'user' }]);
|
||||
expect(result.messagesAfter).toEqual([
|
||||
{ content: 'hello', role: 'user' },
|
||||
{ content: 'hi', role: 'assistant' },
|
||||
]);
|
||||
});
|
||||
|
||||
it('should accumulate deltas across multiple steps', () => {
|
||||
const steps: StepSnapshot[] = [
|
||||
makeStep({
|
||||
messagesBaseline: [{ content: 'hello', role: 'user' }],
|
||||
messagesDelta: [{ content: 'hi', role: 'assistant' }],
|
||||
stepIndex: 0,
|
||||
}),
|
||||
makeStep({
|
||||
messagesDelta: [{ content: 'search result', role: 'tool' }],
|
||||
stepIndex: 1,
|
||||
stepType: 'call_tool',
|
||||
}),
|
||||
makeStep({
|
||||
messagesDelta: [{ content: 'based on results...', role: 'assistant' }],
|
||||
stepIndex: 2,
|
||||
}),
|
||||
];
|
||||
|
||||
const step1 = reconstructMessages(steps, 1);
|
||||
expect(step1.messages).toEqual([
|
||||
{ content: 'hello', role: 'user' },
|
||||
{ content: 'hi', role: 'assistant' },
|
||||
]);
|
||||
expect(step1.messagesAfter).toEqual([
|
||||
{ content: 'hello', role: 'user' },
|
||||
{ content: 'hi', role: 'assistant' },
|
||||
{ content: 'search result', role: 'tool' },
|
||||
]);
|
||||
|
||||
const step2 = reconstructMessages(steps, 2);
|
||||
expect(step2.messages).toEqual([
|
||||
{ content: 'hello', role: 'user' },
|
||||
{ content: 'hi', role: 'assistant' },
|
||||
{ content: 'search result', role: 'tool' },
|
||||
]);
|
||||
expect(step2.messagesAfter).toEqual([
|
||||
{ content: 'hello', role: 'user' },
|
||||
{ content: 'hi', role: 'assistant' },
|
||||
{ content: 'search result', role: 'tool' },
|
||||
{ content: 'based on results...', role: 'assistant' },
|
||||
]);
|
||||
});
|
||||
|
||||
it('should handle compression reset with new baseline', () => {
|
||||
const steps: StepSnapshot[] = [
|
||||
makeStep({
|
||||
messagesBaseline: [{ content: 'hello', role: 'user' }],
|
||||
messagesDelta: [{ content: 'response 1', role: 'assistant' }],
|
||||
stepIndex: 0,
|
||||
}),
|
||||
makeStep({
|
||||
messagesDelta: [{ content: 'tool output', role: 'tool' }],
|
||||
stepIndex: 1,
|
||||
stepType: 'call_tool',
|
||||
}),
|
||||
// Compression resets baseline
|
||||
makeStep({
|
||||
isCompressionReset: true,
|
||||
messagesBaseline: [{ content: 'compressed summary', role: 'system' }],
|
||||
messagesDelta: [{ content: 'post-compression response', role: 'assistant' }],
|
||||
stepIndex: 2,
|
||||
}),
|
||||
];
|
||||
|
||||
const result = reconstructMessages(steps, 2);
|
||||
// After compression, baseline replaces everything
|
||||
expect(result.messages).toEqual([{ content: 'compressed summary', role: 'system' }]);
|
||||
expect(result.messagesAfter).toEqual([
|
||||
{ content: 'compressed summary', role: 'system' },
|
||||
{ content: 'post-compression response', role: 'assistant' },
|
||||
]);
|
||||
});
|
||||
|
||||
it('should handle steps after compression baseline', () => {
|
||||
const steps: StepSnapshot[] = [
|
||||
makeStep({
|
||||
messagesBaseline: [{ content: 'hello', role: 'user' }],
|
||||
messagesDelta: [{ content: 'r1', role: 'assistant' }],
|
||||
stepIndex: 0,
|
||||
}),
|
||||
makeStep({
|
||||
isCompressionReset: true,
|
||||
messagesBaseline: [{ content: 'summary', role: 'system' }],
|
||||
messagesDelta: [{ content: 'r2', role: 'assistant' }],
|
||||
stepIndex: 1,
|
||||
}),
|
||||
makeStep({
|
||||
messagesDelta: [{ content: 'tool', role: 'tool' }],
|
||||
stepIndex: 2,
|
||||
stepType: 'call_tool',
|
||||
}),
|
||||
];
|
||||
|
||||
const result = reconstructMessages(steps, 2);
|
||||
expect(result.messages).toEqual([
|
||||
{ content: 'summary', role: 'system' },
|
||||
{ content: 'r2', role: 'assistant' },
|
||||
]);
|
||||
expect(result.messagesAfter).toEqual([
|
||||
{ content: 'summary', role: 'system' },
|
||||
{ content: 'r2', role: 'assistant' },
|
||||
{ content: 'tool', role: 'tool' },
|
||||
]);
|
||||
});
|
||||
|
||||
it('should not duplicate baseline messages in delta (P1 regression)', () => {
|
||||
// Simulates what AgentRuntimeService records:
|
||||
// prevMessages = [user], afterMessages = [user, assistant]
|
||||
// delta should be [assistant] only, NOT [user, assistant]
|
||||
const steps: StepSnapshot[] = [
|
||||
makeStep({
|
||||
messagesBaseline: [{ content: 'hello', role: 'user' }],
|
||||
messagesDelta: [{ content: 'hi', role: 'assistant' }], // correct: only new msg
|
||||
stepIndex: 0,
|
||||
}),
|
||||
makeStep({
|
||||
messagesDelta: [{ content: 'tool result', role: 'tool' }],
|
||||
stepIndex: 1,
|
||||
stepType: 'call_tool',
|
||||
}),
|
||||
makeStep({
|
||||
messagesDelta: [{ content: 'final answer', role: 'assistant' }],
|
||||
stepIndex: 2,
|
||||
}),
|
||||
];
|
||||
|
||||
// Verify the chain reconstructs correctly without duplication
|
||||
const step2 = reconstructMessages(steps, 2);
|
||||
expect(step2.messagesAfter).toEqual([
|
||||
{ content: 'hello', role: 'user' },
|
||||
{ content: 'hi', role: 'assistant' },
|
||||
{ content: 'tool result', role: 'tool' },
|
||||
{ content: 'final answer', role: 'assistant' },
|
||||
]);
|
||||
// Each message appears exactly once
|
||||
expect(step2.messagesAfter).toHaveLength(4);
|
||||
});
|
||||
|
||||
it('should handle empty delta', () => {
|
||||
const steps: StepSnapshot[] = [
|
||||
makeStep({
|
||||
messagesBaseline: [{ content: 'hello', role: 'user' }],
|
||||
messagesDelta: [],
|
||||
stepIndex: 0,
|
||||
}),
|
||||
];
|
||||
|
||||
const result = reconstructMessages(steps, 0);
|
||||
expect(result.messages).toEqual([{ content: 'hello', role: 'user' }]);
|
||||
expect(result.messagesAfter).toEqual([{ content: 'hello', role: 'user' }]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('isIncrementalFormat', () => {
|
||||
it('should return true for incremental snapshots', () => {
|
||||
const snapshot = {
|
||||
steps: [makeStep({ messagesDelta: [], stepIndex: 0 })],
|
||||
} as ExecutionSnapshot;
|
||||
|
||||
expect(isIncrementalFormat(snapshot)).toBe(true);
|
||||
});
|
||||
|
||||
it('should return false for legacy snapshots', () => {
|
||||
const snapshot = {
|
||||
steps: [makeStep({ messages: [], stepIndex: 0 })],
|
||||
} as ExecutionSnapshot;
|
||||
|
||||
expect(isIncrementalFormat(snapshot)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe('expandSnapshot', () => {
|
||||
it('should expand incremental snapshot to legacy format', () => {
|
||||
const snapshot = {
|
||||
steps: [
|
||||
makeStep({
|
||||
messagesBaseline: [{ content: 'hello', role: 'user' }],
|
||||
messagesDelta: [{ content: 'hi', role: 'assistant' }],
|
||||
stepIndex: 0,
|
||||
}),
|
||||
makeStep({
|
||||
messagesDelta: [{ content: 'tool result', role: 'tool' }],
|
||||
stepIndex: 1,
|
||||
stepType: 'call_tool',
|
||||
}),
|
||||
],
|
||||
} as ExecutionSnapshot;
|
||||
|
||||
const expanded = expandSnapshot(snapshot);
|
||||
|
||||
expect(expanded.steps[0].messages).toEqual([{ content: 'hello', role: 'user' }]);
|
||||
expect(expanded.steps[0].messagesAfter).toEqual([
|
||||
{ content: 'hello', role: 'user' },
|
||||
{ content: 'hi', role: 'assistant' },
|
||||
]);
|
||||
expect(expanded.steps[1].messages).toEqual([
|
||||
{ content: 'hello', role: 'user' },
|
||||
{ content: 'hi', role: 'assistant' },
|
||||
]);
|
||||
expect(expanded.steps[1].messagesAfter).toEqual([
|
||||
{ content: 'hello', role: 'user' },
|
||||
{ content: 'hi', role: 'assistant' },
|
||||
{ content: 'tool result', role: 'tool' },
|
||||
]);
|
||||
});
|
||||
|
||||
it('should return legacy snapshot as-is', () => {
|
||||
const snapshot = {
|
||||
steps: [
|
||||
makeStep({
|
||||
messages: [{ content: 'hello', role: 'user' }],
|
||||
messagesAfter: [
|
||||
{ content: 'hello', role: 'user' },
|
||||
{ content: 'hi', role: 'assistant' },
|
||||
],
|
||||
stepIndex: 0,
|
||||
}),
|
||||
],
|
||||
} as ExecutionSnapshot;
|
||||
|
||||
const result = expandSnapshot(snapshot);
|
||||
expect(result).toBe(snapshot); // Same reference — no expansion needed
|
||||
});
|
||||
});
|
||||
|
||||
describe('reconstructToolsetBaseline', () => {
|
||||
it('should return toolset from step 0', () => {
|
||||
const toolset = { enabledToolIds: ['search'], manifestMap: { search: {} } };
|
||||
const steps: StepSnapshot[] = [
|
||||
makeStep({ stepIndex: 0, toolsetBaseline: toolset }),
|
||||
makeStep({ stepIndex: 1, stepType: 'call_tool' }),
|
||||
];
|
||||
|
||||
expect(reconstructToolsetBaseline(steps)).toEqual(toolset);
|
||||
});
|
||||
|
||||
it('should return undefined when no toolset baseline exists', () => {
|
||||
const steps: StepSnapshot[] = [makeStep({ stepIndex: 0 })];
|
||||
|
||||
expect(reconstructToolsetBaseline(steps)).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('reconstructActivatedStepTools', () => {
|
||||
it('should accumulate deltas across steps', () => {
|
||||
const steps: StepSnapshot[] = [
|
||||
makeStep({ stepIndex: 0 }),
|
||||
makeStep({
|
||||
activatedStepToolsDelta: [{ activatedAtStep: 1, id: 'local-system', source: 'device' }],
|
||||
stepIndex: 1,
|
||||
stepType: 'call_tool',
|
||||
}),
|
||||
makeStep({ stepIndex: 2 }),
|
||||
makeStep({
|
||||
activatedStepToolsDelta: [{ activatedAtStep: 3, id: 'mcp-tool', source: 'discovery' }],
|
||||
stepIndex: 3,
|
||||
stepType: 'call_tool',
|
||||
}),
|
||||
];
|
||||
|
||||
expect(reconstructActivatedStepTools(steps, 1)).toEqual([
|
||||
{ activatedAtStep: 1, id: 'local-system', source: 'device' },
|
||||
]);
|
||||
|
||||
expect(reconstructActivatedStepTools(steps, 3)).toEqual([
|
||||
{ activatedAtStep: 1, id: 'local-system', source: 'device' },
|
||||
{ activatedAtStep: 3, id: 'mcp-tool', source: 'discovery' },
|
||||
]);
|
||||
});
|
||||
|
||||
it('should return empty array when no tools activated', () => {
|
||||
const steps: StepSnapshot[] = [makeStep({ stepIndex: 0 }), makeStep({ stepIndex: 1 })];
|
||||
|
||||
expect(reconstructActivatedStepTools(steps, 1)).toEqual([]);
|
||||
});
|
||||
});
|
||||
86
packages/agent-tracing/src/utils/reconstruct.ts
Normal file
86
packages/agent-tracing/src/utils/reconstruct.ts
Normal file
@@ -0,0 +1,86 @@
|
||||
import type { ExecutionSnapshot, StepSnapshot } from '../types';
|
||||
|
||||
/**
|
||||
* Check whether a snapshot uses the incremental (delta) format.
|
||||
*/
|
||||
export function isIncrementalFormat(snapshot: ExecutionSnapshot): boolean {
|
||||
return snapshot.steps.some((s) => s.messagesDelta !== undefined);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconstruct full `messages` (before step) and `messagesAfter` (after step)
|
||||
* from incremental baseline + delta chain.
|
||||
*/
|
||||
export function reconstructMessages(
|
||||
steps: StepSnapshot[],
|
||||
targetStepIndex: number,
|
||||
): { messages: any[]; messagesAfter: any[] } {
|
||||
let current: any[] = [];
|
||||
|
||||
for (const step of steps) {
|
||||
if (step.stepIndex > targetStepIndex) break;
|
||||
|
||||
// Reset to baseline when present (step 0 or compression)
|
||||
if (step.messagesBaseline) {
|
||||
current = [...step.messagesBaseline];
|
||||
}
|
||||
|
||||
const beforeStep = [...current];
|
||||
|
||||
// Apply delta
|
||||
if (step.messagesDelta) {
|
||||
current = [...current, ...step.messagesDelta];
|
||||
}
|
||||
|
||||
if (step.stepIndex === targetStepIndex) {
|
||||
return { messages: beforeStep, messagesAfter: current };
|
||||
}
|
||||
}
|
||||
|
||||
return { messages: current, messagesAfter: current };
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconstruct the operation-level toolset baseline from snapshot steps.
|
||||
* Returns the `operationToolSet` stored at step 0.
|
||||
*/
|
||||
export function reconstructToolsetBaseline(steps: StepSnapshot[]): any | undefined {
|
||||
return steps.find((s) => s.toolsetBaseline)?.toolsetBaseline;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconstruct cumulative `activatedStepTools` up to a given step
|
||||
* from per-step deltas.
|
||||
*/
|
||||
export function reconstructActivatedStepTools(
|
||||
steps: StepSnapshot[],
|
||||
targetStepIndex: number,
|
||||
): any[] {
|
||||
const accumulated: any[] = [];
|
||||
|
||||
for (const step of steps) {
|
||||
if (step.stepIndex > targetStepIndex) break;
|
||||
|
||||
if (step.activatedStepToolsDelta) {
|
||||
accumulated.push(...step.activatedStepToolsDelta);
|
||||
}
|
||||
}
|
||||
|
||||
return accumulated;
|
||||
}
|
||||
|
||||
/**
|
||||
* Expand an incremental snapshot into legacy full-messages format.
|
||||
* Useful for backward-compatible tooling.
|
||||
*/
|
||||
export function expandSnapshot(snapshot: ExecutionSnapshot): ExecutionSnapshot {
|
||||
if (!isIncrementalFormat(snapshot)) return snapshot;
|
||||
|
||||
return {
|
||||
...snapshot,
|
||||
steps: snapshot.steps.map((step) => {
|
||||
const { messages, messagesAfter } = reconstructMessages(snapshot.steps, step.stepIndex);
|
||||
return { ...step, messages, messagesAfter };
|
||||
}),
|
||||
};
|
||||
}
|
||||
@@ -1,6 +1,25 @@
|
||||
import { encode } from 'gpt-tokenizer';
|
||||
|
||||
import type { ExecutionSnapshot, SnapshotSummary, StepSnapshot } from '../types';
|
||||
import { reconstructMessages } from '../utils/reconstruct';
|
||||
|
||||
/**
|
||||
* Resolve messages for a step, supporting both legacy (full) and incremental (delta) formats.
|
||||
*/
|
||||
function resolveStepMessages(
|
||||
step: StepSnapshot,
|
||||
allSteps?: StepSnapshot[],
|
||||
): { messages: any[] | undefined; messagesAfter: any[] | undefined } {
|
||||
// Legacy format: messages stored directly on step
|
||||
if (step.messages) {
|
||||
return { messages: step.messages, messagesAfter: step.messagesAfter };
|
||||
}
|
||||
// Incremental format: reconstruct from baseline + deltas
|
||||
if (step.messagesDelta !== undefined && allSteps) {
|
||||
return reconstructMessages(allSteps, step.stepIndex);
|
||||
}
|
||||
return { messages: undefined, messagesAfter: undefined };
|
||||
}
|
||||
|
||||
// ANSI color helpers
|
||||
const dim = (s: string) => `\x1B[2m${s}\x1B[22m`;
|
||||
@@ -328,16 +347,19 @@ export function renderMessageDetail(
|
||||
step: StepSnapshot,
|
||||
msgIndex: number,
|
||||
source: 'input' | 'output' = 'output',
|
||||
allSteps?: StepSnapshot[],
|
||||
): string {
|
||||
const ceEvent = step.events?.find((e) => e.type === 'context_engine_result') as any;
|
||||
let messages: any[] | undefined;
|
||||
let label: string;
|
||||
|
||||
const { messages: resolvedMessages } = resolveStepMessages(step, allSteps);
|
||||
|
||||
if (source === 'input') {
|
||||
messages = ceEvent?.input?.messages ?? step.messages;
|
||||
messages = ceEvent?.input?.messages ?? resolvedMessages;
|
||||
label = ceEvent ? 'Context Engine Input' : 'Messages (before step)';
|
||||
} else {
|
||||
messages = ceEvent?.output ?? step.messages;
|
||||
messages = ceEvent?.output ?? resolvedMessages;
|
||||
label = ceEvent ? 'Final LLM Payload' : 'Messages (before step)';
|
||||
}
|
||||
|
||||
@@ -760,7 +782,13 @@ export function renderDiff(
|
||||
|
||||
export function renderStepDetail(
|
||||
step: StepSnapshot,
|
||||
options?: { context?: boolean; events?: boolean; messages?: boolean; tools?: boolean },
|
||||
options?: {
|
||||
allSteps?: StepSnapshot[];
|
||||
context?: boolean;
|
||||
events?: boolean;
|
||||
messages?: boolean;
|
||||
tools?: boolean;
|
||||
},
|
||||
): string {
|
||||
const lines: string[] = [
|
||||
bold(`Step ${step.stepIndex}`) + ` [${step.stepType}] ${formatMs(step.executionTimeMs)}`,
|
||||
@@ -866,12 +894,15 @@ export function renderStepDetail(
|
||||
lines.push(dim('─'.repeat(60)));
|
||||
renderMessageList(lines, outputMsgs, 300);
|
||||
}
|
||||
} else if (step.messages) {
|
||||
} else {
|
||||
// Fallback: show raw DB messages if no context engine event
|
||||
lines.push('');
|
||||
lines.push(bold(`Messages (before step): ${step.messages.length} messages`));
|
||||
lines.push(dim('─'.repeat(60)));
|
||||
renderMessageList(lines, step.messages, 200);
|
||||
const { messages: resolvedMsgs } = resolveStepMessages(step, options?.allSteps);
|
||||
if (resolvedMsgs && resolvedMsgs.length > 0) {
|
||||
lines.push('');
|
||||
lines.push(bold(`Messages (before step): ${resolvedMsgs.length} messages`));
|
||||
lines.push(dim('─'.repeat(60)));
|
||||
renderMessageList(lines, resolvedMsgs, 200);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -271,9 +271,21 @@ export const createRuntimeExecutors = (
|
||||
|
||||
processedMessages = await serverMessagesEngine(contextEngineInput);
|
||||
|
||||
// Emit context engine event for tracing (captures input params and final LLM messages)
|
||||
// Emit context engine event for tracing
|
||||
// Omit large/redundant fields to reduce snapshot size:
|
||||
// - input.messages: reconstructible from step's messagesBaseline + messagesDelta
|
||||
// - input.toolsConfig: static per operation, ~47KB of manifests repeated every call_llm step
|
||||
// Keep output (processedMessages) — needed by inspect CLI for --env, --system-role, -m
|
||||
const {
|
||||
messages: _inputMsgs,
|
||||
toolsConfig: _toolsConfig,
|
||||
...contextEngineInputLite
|
||||
} = contextEngineInput;
|
||||
events.push({
|
||||
input: contextEngineInput,
|
||||
input: {
|
||||
...contextEngineInputLite,
|
||||
toolCount: _toolsConfig?.tools?.length ?? 0,
|
||||
},
|
||||
output: processedMessages,
|
||||
type: 'context_engine_result',
|
||||
} as any);
|
||||
|
||||
@@ -182,7 +182,7 @@ export class AgentRuntimeService {
|
||||
if (impl instanceof LocalQueueServiceImpl) {
|
||||
log('Setting up local execution callback');
|
||||
impl.setExecutionCallback(async (operationId, stepIndex, context) => {
|
||||
log('[%s][%d] Local callback executing...', operationId, stepIndex);
|
||||
log('[%s][%d] Local step executing...', operationId, stepIndex);
|
||||
await this.executeStep({
|
||||
context,
|
||||
operationId,
|
||||
@@ -767,24 +767,81 @@ export class AgentRuntimeService {
|
||||
}
|
||||
|
||||
if (!partial.steps) partial.steps = [];
|
||||
|
||||
// Incremental diff: only store message delta + baseline at reset points
|
||||
const prevMessages = agentState?.messages ?? [];
|
||||
const afterMessages = stepResult.newState.messages;
|
||||
const isCompression = stepResult.events?.some(
|
||||
(e: any) => e.type === 'compression_complete',
|
||||
);
|
||||
const isBaseline = stepIndex === 0 || isCompression;
|
||||
// Always store only the newly added messages, even for baseline steps
|
||||
const messagesDelta = afterMessages.slice(prevMessages.length);
|
||||
|
||||
// Strip heavy/redundant data from events before persisting to snapshot
|
||||
const snapshotEvents = (stepResult.events as any[])
|
||||
?.filter((e) => e.type !== 'llm_stream')
|
||||
.map((e) => {
|
||||
if (e.type === 'done' && e.finalState) {
|
||||
// Remove reconstructible fields from finalState:
|
||||
// - messages: from messagesBaseline + messagesDelta chain
|
||||
// - operationToolSet: from toolsetBaseline (step 0)
|
||||
// - toolManifestMap/tools/toolSourceMap: backward-compat copies of operationToolSet
|
||||
const {
|
||||
messages: _msgs,
|
||||
operationToolSet: _ots,
|
||||
toolManifestMap: _tmm,
|
||||
toolSourceMap: _tsm,
|
||||
tools: _tools,
|
||||
// activatedStepTools is kept since it's the cumulative record
|
||||
...restState
|
||||
} = e.finalState;
|
||||
return { ...e, finalState: restState };
|
||||
}
|
||||
return e;
|
||||
});
|
||||
|
||||
// Strip toolResults from payload (already in step.toolsResult)
|
||||
let snapshotPayload: unknown = currentContext?.payload;
|
||||
if (
|
||||
snapshotPayload &&
|
||||
typeof snapshotPayload === 'object' &&
|
||||
'toolResults' in snapshotPayload
|
||||
) {
|
||||
const { toolResults: _tr, ...restPayload } = snapshotPayload as Record<string, unknown>;
|
||||
snapshotPayload = restPayload;
|
||||
}
|
||||
|
||||
// Compute activatedStepTools delta (newly discovered tools in this step)
|
||||
const prevActivated = agentState?.activatedStepTools ?? [];
|
||||
const afterActivated = stepResult.newState.activatedStepTools ?? [];
|
||||
const activatedStepToolsDelta =
|
||||
afterActivated.length > prevActivated.length
|
||||
? afterActivated.slice(prevActivated.length)
|
||||
: undefined;
|
||||
|
||||
partial.steps.push({
|
||||
activatedStepToolsDelta,
|
||||
completedAt: Date.now(),
|
||||
content: stepPresentationData.content,
|
||||
context: {
|
||||
payload: currentContext?.payload,
|
||||
payload: snapshotPayload,
|
||||
phase: currentContext?.phase ?? 'unknown',
|
||||
stepContext: currentContext?.stepContext,
|
||||
},
|
||||
events: stepResult.events as any,
|
||||
events: snapshotEvents,
|
||||
executionTimeMs: stepPresentationData.executionTimeMs,
|
||||
inputTokens: stepPresentationData.stepInputTokens,
|
||||
messages: agentState?.messages,
|
||||
messagesAfter: stepResult.newState.messages,
|
||||
isCompressionReset: isCompression || undefined,
|
||||
messagesBaseline: isBaseline ? prevMessages : undefined,
|
||||
messagesDelta,
|
||||
outputTokens: stepPresentationData.stepOutputTokens,
|
||||
reasoning: stepPresentationData.reasoning,
|
||||
startedAt: startAt,
|
||||
stepIndex,
|
||||
stepType: stepPresentationData.stepType,
|
||||
// Store operation-level toolset once at step 0
|
||||
toolsetBaseline: stepIndex === 0 ? agentState?.operationToolSet : undefined,
|
||||
toolsCalling: stepPresentationData.toolsCalling,
|
||||
toolsResult: stepPresentationData.toolsResult,
|
||||
totalCost: stepPresentationData.totalCost,
|
||||
|
||||
@@ -41,6 +41,7 @@ import {
|
||||
} from '@/server/modules/Mecha';
|
||||
import { type ServerUserMemoryConfig } from '@/server/modules/Mecha/ContextEngineering/types';
|
||||
import { AgentService } from '@/server/services/agent';
|
||||
import type { AgentRuntimeServiceOptions } from '@/server/services/agentRuntime';
|
||||
import { AgentRuntimeService } from '@/server/services/agentRuntime';
|
||||
import { type StepLifecycleCallbacks } from '@/server/services/agentRuntime/types';
|
||||
import { FileService } from '@/server/services/file';
|
||||
@@ -161,7 +162,11 @@ export class AiAgentService {
|
||||
private readonly marketService: MarketService;
|
||||
private readonly klavisService: KlavisService;
|
||||
|
||||
constructor(db: LobeChatDatabase, userId: string) {
|
||||
constructor(
|
||||
db: LobeChatDatabase,
|
||||
userId: string,
|
||||
options?: { runtimeOptions?: AgentRuntimeServiceOptions },
|
||||
) {
|
||||
this.userId = userId;
|
||||
this.db = db;
|
||||
this.agentModel = new AgentModel(db, userId);
|
||||
@@ -170,7 +175,7 @@ export class AiAgentService {
|
||||
this.pluginModel = new PluginModel(db, userId);
|
||||
this.threadModel = new ThreadModel(db, userId);
|
||||
this.topicModel = new TopicModel(db, userId);
|
||||
this.agentRuntimeService = new AgentRuntimeService(db, userId);
|
||||
this.agentRuntimeService = new AgentRuntimeService(db, userId, options?.runtimeOptions);
|
||||
this.marketService = new MarketService({ userInfo: { userId } });
|
||||
this.klavisService = new KlavisService({ db, userId });
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user