diff --git a/packages/agent-tracing/package.json b/packages/agent-tracing/package.json index c341ca45f8..94c26be773 100644 --- a/packages/agent-tracing/package.json +++ b/packages/agent-tracing/package.json @@ -9,7 +9,8 @@ }, "main": "./src/index.ts", "bin": { - "agent-tracing": "./src/cli/index.ts" + "agent-tracing": "./src/cli/index.ts", + "at": "./src/cli/index.ts" }, "dependencies": { "commander": "^13.1.0", diff --git a/packages/agent-tracing/src/cli/inspect.ts b/packages/agent-tracing/src/cli/inspect.ts index 1d5c0aa950..be733b2f0c 100644 --- a/packages/agent-tracing/src/cli/inspect.ts +++ b/packages/agent-tracing/src/cli/inspect.ts @@ -14,6 +14,18 @@ import { renderSystemRole, } from '../viewer'; +async function fetchSnapshotFromUrl(url: string): Promise { + const res = await fetch(url); + if (!res.ok) { + throw new Error(`Failed to fetch snapshot: ${res.status} ${res.statusText}`); + } + return (await res.json()) as ExecutionSnapshot; +} + +function isUrl(input: string): boolean { + return input.startsWith('http://') || input.startsWith('https://'); +} + function findStep(snapshot: ExecutionSnapshot, stepIndex: number): StepSnapshot { const step = snapshot.steps.find((s) => s.stepIndex === stepIndex); if (!step) { @@ -50,6 +62,7 @@ function getEnvContent(step: StepSnapshot): string | undefined { export function registerInspectCommand(program: Command) { program .command('inspect', { isDefault: true }) + .alias('i') .description('Inspect trace details') .argument('[traceId]', 'Trace ID to inspect (defaults to latest)') .option('-s, --step ', 'View specific step (default: 0 for -r/--env)') @@ -95,8 +108,15 @@ export function registerInspectCommand(program: Command) { tools?: boolean; }, ) => { - const store = new FileSnapshotStore(); - const snapshot = traceId ? await store.get(traceId) : await store.getLatest(); + let snapshot: ExecutionSnapshot | null; + + if (traceId && isUrl(traceId)) { + snapshot = await fetchSnapshotFromUrl(traceId); + } else { + const store = new FileSnapshotStore(); + snapshot = traceId ? await store.get(traceId) : await store.getLatest(); + } + if (!snapshot) { console.error( traceId diff --git a/packages/agent-tracing/src/types.ts b/packages/agent-tracing/src/types.ts index 2324e176b5..fc8edff5b2 100644 --- a/packages/agent-tracing/src/types.ts +++ b/packages/agent-tracing/src/types.ts @@ -1,4 +1,5 @@ export interface ExecutionSnapshot { + agentId?: string; completedAt?: number; completionReason?: | 'done' @@ -13,10 +14,12 @@ export interface ExecutionSnapshot { provider?: string; startedAt: number; steps: StepSnapshot[]; + topicId?: string; totalCost: number; totalSteps: number; totalTokens: number; traceId: string; + userId?: string; } export interface StepSnapshot { diff --git a/src/server/modules/AgentTracing/S3SnapshotStore.ts b/src/server/modules/AgentTracing/S3SnapshotStore.ts new file mode 100644 index 0000000000..47e512788c --- /dev/null +++ b/src/server/modules/AgentTracing/S3SnapshotStore.ts @@ -0,0 +1,82 @@ +import type { ExecutionSnapshot, ISnapshotStore, SnapshotSummary } from '@lobechat/agent-tracing'; +import debug from 'debug'; + +import { FileS3 } from '@/server/modules/S3'; + +const log = debug('lobe-server:agent-tracing:s3'); + +const TRACE_PREFIX = 'agent-traces'; + +/** + * S3-backed snapshot store for production agent trace persistence. + * + * S3 paths: + * - Final: agent-traces/{agentId}/{topicId}/{operationId}.json + * - Partial: agent-traces/_partial/{operationId}.json (temporary, deleted after finalization) + * + * Partial snapshots are needed because QStash executes each step in a + * separate HTTP request (no shared memory). Step data is accumulated + * via S3 read-modify-write per step, then finalized on completion. + * The overhead (~100ms per step) is negligible vs LLM call time. + */ +export class S3SnapshotStore implements ISnapshotStore { + private readonly s3: FileS3; + + constructor() { + this.s3 = new FileS3(); + } + + private partialKey(operationId: string): string { + return `${TRACE_PREFIX}/_partial/${operationId}.json`; + } + + async save(snapshot: ExecutionSnapshot): Promise { + const agentId = snapshot.agentId ?? 'unknown'; + const topicId = snapshot.topicId ?? 'unknown'; + const key = `${TRACE_PREFIX}/${agentId}/${topicId}/${snapshot.operationId}.json`; + + log('Saving snapshot to S3: %s', key); + await this.s3.uploadContent(key, JSON.stringify(snapshot)); + } + + // === Query methods — not supported, use OTEL backend === + + async get(_traceId: string): Promise { + return null; + } + + async getLatest(): Promise { + return null; + } + + async list(_options?: { limit?: number }): Promise { + return []; + } + + // === Partial methods — S3 read-modify-write for QStash cross-request accumulation === + + async listPartials(): Promise { + return []; + } + + async loadPartial(operationId: string): Promise | null> { + try { + const content = await this.s3.getFileContent(this.partialKey(operationId)); + return JSON.parse(content) as Partial; + } catch { + return null; + } + } + + async savePartial(operationId: string, partial: Partial): Promise { + await this.s3.uploadContent(this.partialKey(operationId), JSON.stringify(partial)); + } + + async removePartial(operationId: string): Promise { + try { + await this.s3.deleteFile(this.partialKey(operationId)); + } catch { + // ignore — partial may already be cleaned up + } + } +} diff --git a/src/server/modules/AgentTracing/index.ts b/src/server/modules/AgentTracing/index.ts new file mode 100644 index 0000000000..f09f797831 --- /dev/null +++ b/src/server/modules/AgentTracing/index.ts @@ -0,0 +1 @@ +export { S3SnapshotStore } from './S3SnapshotStore'; diff --git a/src/server/services/agentRuntime/AgentRuntimeService.ts b/src/server/services/agentRuntime/AgentRuntimeService.ts index 5ce23fee65..65fb250c31 100644 --- a/src/server/services/agentRuntime/AgentRuntimeService.ts +++ b/src/server/services/agentRuntime/AgentRuntimeService.ts @@ -1,5 +1,6 @@ import type { AgentRuntimeContext, AgentState } from '@lobechat/agent-runtime'; import { AgentRuntime, findInMessages, GeneralChatAgent } from '@lobechat/agent-runtime'; +import type { ISnapshotStore } from '@lobechat/agent-tracing'; import { dynamicInterventionAudits } from '@lobechat/builtin-tools/dynamicInterventionAudits'; import { AgentRuntimeErrorType, ChatErrorType, type ChatMessageError } from '@lobechat/types'; import debug from 'debug'; @@ -89,6 +90,12 @@ export interface AgentRuntimeServiceOptions { * Set to null to disable queue scheduling (for synchronous execution tests) */ queueService?: QueueService | null; + /** + * Optional snapshot store for persisting agent execution traces. + * When provided, execution snapshots are recorded on every step and finalized on completion. + * In dev mode without this option, falls back to FileSnapshotStore automatically. + */ + snapshotStore?: ISnapshotStore; /** * Custom StreamEventManager * Defaults to Redis-based StreamEventManager @@ -117,6 +124,7 @@ export class AgentRuntimeService { private coordinator: AgentRuntimeCoordinator; private streamManager: IStreamEventManager; private queueService: QueueService | null; + private snapshotStore: ISnapshotStore | null; private toolExecutionService: ToolExecutionService; /** * Step lifecycle callback registry @@ -144,6 +152,7 @@ export class AgentRuntimeService { }); this.queueService = options?.queueService === null ? null : (options?.queueService ?? new QueueService()); + this.snapshotStore = options?.snapshotStore ?? this.createDefaultSnapshotStore(); this.serverDB = db; this.userId = userId; this.messageModel = new MessageModel(db, this.userId); @@ -740,13 +749,10 @@ export class AgentRuntimeService { } } - // Dev mode: record step snapshot to disk for agent-tracing CLI - if (process.env.NODE_ENV === 'development') { + // Record step snapshot via injected snapshot store + if (this.snapshotStore) { try { - const { FileSnapshotStore } = await import('@lobechat/agent-tracing'); - const store = new FileSnapshotStore(); - - const partial = (await store.loadPartial(operationId)) ?? { steps: [] }; + const partial = (await this.snapshotStore.loadPartial(operationId)) ?? { steps: [] }; if (!partial.startedAt) { partial.startedAt = Date.now(); @@ -783,9 +789,9 @@ export class AgentRuntimeService { totalTokens: stepPresentationData.totalTokens, }); - await store.savePartial(operationId, partial); - } catch { - // agent-tracing not available, skip silently + await this.snapshotStore.savePartial(operationId, partial); + } catch (e) { + log('[%s] snapshot step recording failed: %O', operationId, e); } } @@ -859,15 +865,15 @@ export class AgentRuntimeService { } } - // Dev mode: finalize tracing snapshot - if (process.env.NODE_ENV === 'development') { + // Finalize tracing snapshot via injected snapshot store + if (this.snapshotStore) { try { - const { FileSnapshotStore } = await import('@lobechat/agent-tracing'); - const store = new FileSnapshotStore(); - const partial = await store.loadPartial(operationId); + const partial = await this.snapshotStore.loadPartial(operationId); if (partial) { + const metadata = agentState?.metadata as any; const snapshot = { + agentId: metadata?.agentId, completedAt: Date.now(), completionReason: reason, error: stepResult.newState.error @@ -886,14 +892,16 @@ export class AgentRuntimeService { totalCost: stepResult.newState.cost?.total ?? 0, totalSteps: stepResult.newState.stepCount, totalTokens: stepResult.newState.usage?.llm?.tokens?.total ?? 0, + topicId: metadata?.topicId, traceId: operationId, + userId: metadata?.userId, }; - await store.save(snapshot as any); - await store.removePartial(operationId); + await this.snapshotStore.save(snapshot as any); + await this.snapshotStore.removePartial(operationId); } - } catch { - // agent-tracing not available, skip silently + } catch (e) { + log('[%s] snapshot finalization failed: %O', operationId, e); } } } @@ -1350,6 +1358,34 @@ export class AgentRuntimeService { return { agent, runtime }; } + /** + * Create default snapshot store based on environment. + * - ENABLE_AGENT_S3_TRACING=1 → S3SnapshotStore + * - NODE_ENV=development → FileSnapshotStore + * - Otherwise → null (no tracing) + */ + private createDefaultSnapshotStore(): ISnapshotStore | null { + if (process.env.ENABLE_AGENT_S3_TRACING === '1') { + try { + const { S3SnapshotStore } = require('@/server/modules/AgentTracing'); + return new S3SnapshotStore(); + } catch { + // S3SnapshotStore not available + } + } + + if (process.env.NODE_ENV === 'development') { + try { + const { FileSnapshotStore } = require('@lobechat/agent-tracing'); + return new FileSnapshotStore(); + } catch { + // agent-tracing not available + } + } + + return null; + } + /** * Compute device context from DB messages at step boundary. * Uses findInMessages visitor to scan tool messages for device activation.