🛠 chore: support injectable snapshot store in AgentRuntimeService (#12984)

This commit is contained in:
Arvin Xu
2026-03-15 01:00:56 +08:00
committed by GitHub
parent adbf11dc11
commit 6a4d6c6a86
6 changed files with 164 additions and 21 deletions

View File

@@ -9,7 +9,8 @@
},
"main": "./src/index.ts",
"bin": {
"agent-tracing": "./src/cli/index.ts"
"agent-tracing": "./src/cli/index.ts",
"at": "./src/cli/index.ts"
},
"dependencies": {
"commander": "^13.1.0",

View File

@@ -14,6 +14,18 @@ import {
renderSystemRole,
} from '../viewer';
async function fetchSnapshotFromUrl(url: string): Promise<ExecutionSnapshot> {
const res = await fetch(url);
if (!res.ok) {
throw new Error(`Failed to fetch snapshot: ${res.status} ${res.statusText}`);
}
return (await res.json()) as ExecutionSnapshot;
}
function isUrl(input: string): boolean {
return input.startsWith('http://') || input.startsWith('https://');
}
function findStep(snapshot: ExecutionSnapshot, stepIndex: number): StepSnapshot {
const step = snapshot.steps.find((s) => s.stepIndex === stepIndex);
if (!step) {
@@ -50,6 +62,7 @@ function getEnvContent(step: StepSnapshot): string | undefined {
export function registerInspectCommand(program: Command) {
program
.command('inspect', { isDefault: true })
.alias('i')
.description('Inspect trace details')
.argument('[traceId]', 'Trace ID to inspect (defaults to latest)')
.option('-s, --step <n>', 'View specific step (default: 0 for -r/--env)')
@@ -95,8 +108,15 @@ export function registerInspectCommand(program: Command) {
tools?: boolean;
},
) => {
const store = new FileSnapshotStore();
const snapshot = traceId ? await store.get(traceId) : await store.getLatest();
let snapshot: ExecutionSnapshot | null;
if (traceId && isUrl(traceId)) {
snapshot = await fetchSnapshotFromUrl(traceId);
} else {
const store = new FileSnapshotStore();
snapshot = traceId ? await store.get(traceId) : await store.getLatest();
}
if (!snapshot) {
console.error(
traceId

View File

@@ -1,4 +1,5 @@
export interface ExecutionSnapshot {
agentId?: string;
completedAt?: number;
completionReason?:
| 'done'
@@ -13,10 +14,12 @@ export interface ExecutionSnapshot {
provider?: string;
startedAt: number;
steps: StepSnapshot[];
topicId?: string;
totalCost: number;
totalSteps: number;
totalTokens: number;
traceId: string;
userId?: string;
}
export interface StepSnapshot {

View File

@@ -0,0 +1,82 @@
import type { ExecutionSnapshot, ISnapshotStore, SnapshotSummary } from '@lobechat/agent-tracing';
import debug from 'debug';
import { FileS3 } from '@/server/modules/S3';
const log = debug('lobe-server:agent-tracing:s3');
const TRACE_PREFIX = 'agent-traces';
/**
* S3-backed snapshot store for production agent trace persistence.
*
* S3 paths:
* - Final: agent-traces/{agentId}/{topicId}/{operationId}.json
* - Partial: agent-traces/_partial/{operationId}.json (temporary, deleted after finalization)
*
* Partial snapshots are needed because QStash executes each step in a
* separate HTTP request (no shared memory). Step data is accumulated
* via S3 read-modify-write per step, then finalized on completion.
* The overhead (~100ms per step) is negligible vs LLM call time.
*/
export class S3SnapshotStore implements ISnapshotStore {
private readonly s3: FileS3;
constructor() {
this.s3 = new FileS3();
}
private partialKey(operationId: string): string {
return `${TRACE_PREFIX}/_partial/${operationId}.json`;
}
async save(snapshot: ExecutionSnapshot): Promise<void> {
const agentId = snapshot.agentId ?? 'unknown';
const topicId = snapshot.topicId ?? 'unknown';
const key = `${TRACE_PREFIX}/${agentId}/${topicId}/${snapshot.operationId}.json`;
log('Saving snapshot to S3: %s', key);
await this.s3.uploadContent(key, JSON.stringify(snapshot));
}
// === Query methods — not supported, use OTEL backend ===
async get(_traceId: string): Promise<ExecutionSnapshot | null> {
return null;
}
async getLatest(): Promise<ExecutionSnapshot | null> {
return null;
}
async list(_options?: { limit?: number }): Promise<SnapshotSummary[]> {
return [];
}
// === Partial methods — S3 read-modify-write for QStash cross-request accumulation ===
async listPartials(): Promise<string[]> {
return [];
}
async loadPartial(operationId: string): Promise<Partial<ExecutionSnapshot> | null> {
try {
const content = await this.s3.getFileContent(this.partialKey(operationId));
return JSON.parse(content) as Partial<ExecutionSnapshot>;
} catch {
return null;
}
}
async savePartial(operationId: string, partial: Partial<ExecutionSnapshot>): Promise<void> {
await this.s3.uploadContent(this.partialKey(operationId), JSON.stringify(partial));
}
async removePartial(operationId: string): Promise<void> {
try {
await this.s3.deleteFile(this.partialKey(operationId));
} catch {
// ignore — partial may already be cleaned up
}
}
}

View File

@@ -0,0 +1 @@
export { S3SnapshotStore } from './S3SnapshotStore';

View File

@@ -1,5 +1,6 @@
import type { AgentRuntimeContext, AgentState } from '@lobechat/agent-runtime';
import { AgentRuntime, findInMessages, GeneralChatAgent } from '@lobechat/agent-runtime';
import type { ISnapshotStore } from '@lobechat/agent-tracing';
import { dynamicInterventionAudits } from '@lobechat/builtin-tools/dynamicInterventionAudits';
import { AgentRuntimeErrorType, ChatErrorType, type ChatMessageError } from '@lobechat/types';
import debug from 'debug';
@@ -89,6 +90,12 @@ export interface AgentRuntimeServiceOptions {
* Set to null to disable queue scheduling (for synchronous execution tests)
*/
queueService?: QueueService | null;
/**
* Optional snapshot store for persisting agent execution traces.
* When provided, execution snapshots are recorded on every step and finalized on completion.
* In dev mode without this option, falls back to FileSnapshotStore automatically.
*/
snapshotStore?: ISnapshotStore;
/**
* Custom StreamEventManager
* Defaults to Redis-based StreamEventManager
@@ -117,6 +124,7 @@ export class AgentRuntimeService {
private coordinator: AgentRuntimeCoordinator;
private streamManager: IStreamEventManager;
private queueService: QueueService | null;
private snapshotStore: ISnapshotStore | null;
private toolExecutionService: ToolExecutionService;
/**
* Step lifecycle callback registry
@@ -144,6 +152,7 @@ export class AgentRuntimeService {
});
this.queueService =
options?.queueService === null ? null : (options?.queueService ?? new QueueService());
this.snapshotStore = options?.snapshotStore ?? this.createDefaultSnapshotStore();
this.serverDB = db;
this.userId = userId;
this.messageModel = new MessageModel(db, this.userId);
@@ -740,13 +749,10 @@ export class AgentRuntimeService {
}
}
// Dev mode: record step snapshot to disk for agent-tracing CLI
if (process.env.NODE_ENV === 'development') {
// Record step snapshot via injected snapshot store
if (this.snapshotStore) {
try {
const { FileSnapshotStore } = await import('@lobechat/agent-tracing');
const store = new FileSnapshotStore();
const partial = (await store.loadPartial(operationId)) ?? { steps: [] };
const partial = (await this.snapshotStore.loadPartial(operationId)) ?? { steps: [] };
if (!partial.startedAt) {
partial.startedAt = Date.now();
@@ -783,9 +789,9 @@ export class AgentRuntimeService {
totalTokens: stepPresentationData.totalTokens,
});
await store.savePartial(operationId, partial);
} catch {
// agent-tracing not available, skip silently
await this.snapshotStore.savePartial(operationId, partial);
} catch (e) {
log('[%s] snapshot step recording failed: %O', operationId, e);
}
}
@@ -859,15 +865,15 @@ export class AgentRuntimeService {
}
}
// Dev mode: finalize tracing snapshot
if (process.env.NODE_ENV === 'development') {
// Finalize tracing snapshot via injected snapshot store
if (this.snapshotStore) {
try {
const { FileSnapshotStore } = await import('@lobechat/agent-tracing');
const store = new FileSnapshotStore();
const partial = await store.loadPartial(operationId);
const partial = await this.snapshotStore.loadPartial(operationId);
if (partial) {
const metadata = agentState?.metadata as any;
const snapshot = {
agentId: metadata?.agentId,
completedAt: Date.now(),
completionReason: reason,
error: stepResult.newState.error
@@ -886,14 +892,16 @@ export class AgentRuntimeService {
totalCost: stepResult.newState.cost?.total ?? 0,
totalSteps: stepResult.newState.stepCount,
totalTokens: stepResult.newState.usage?.llm?.tokens?.total ?? 0,
topicId: metadata?.topicId,
traceId: operationId,
userId: metadata?.userId,
};
await store.save(snapshot as any);
await store.removePartial(operationId);
await this.snapshotStore.save(snapshot as any);
await this.snapshotStore.removePartial(operationId);
}
} catch {
// agent-tracing not available, skip silently
} catch (e) {
log('[%s] snapshot finalization failed: %O', operationId, e);
}
}
}
@@ -1350,6 +1358,34 @@ export class AgentRuntimeService {
return { agent, runtime };
}
/**
* Create default snapshot store based on environment.
* - ENABLE_AGENT_S3_TRACING=1 → S3SnapshotStore
* - NODE_ENV=development → FileSnapshotStore
* - Otherwise → null (no tracing)
*/
private createDefaultSnapshotStore(): ISnapshotStore | null {
if (process.env.ENABLE_AGENT_S3_TRACING === '1') {
try {
const { S3SnapshotStore } = require('@/server/modules/AgentTracing');
return new S3SnapshotStore();
} catch {
// S3SnapshotStore not available
}
}
if (process.env.NODE_ENV === 'development') {
try {
const { FileSnapshotStore } = require('@lobechat/agent-tracing');
return new FileSnapshotStore();
} catch {
// agent-tracing not available
}
}
return null;
}
/**
* Compute device context from DB messages at step boundary.
* Uses findInMessages visitor to scan tool messages for device activation.