mirror of
https://github.com/lobehub/lobehub.git
synced 2026-03-26 13:19:34 +07:00
✨ feat(cli): add agent run and status commands (#12839)
* ✨ feat(cli): add agent run and status commands Implement `lh agent run` for executing agents with SSE streaming and `lh agent status` for checking operation status. Includes `--replay` option for offline replay from saved JSON fixtures. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * 🐛 fix(cli): preserve SSE frame state across read boundaries and enable verbose logging - Move eventType/eventData outside the read loop so partial SSE frames split across chunks are not silently dropped - Call setVerbose(true) when --verbose is passed so logger helpers actually print detailed tool arguments and results Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -14,6 +14,10 @@ const { mockTrpcClient } = vi.hoisted(() => ({
|
||||
removeAgent: { mutate: vi.fn() },
|
||||
updateAgentConfig: { mutate: vi.fn() },
|
||||
},
|
||||
aiAgent: {
|
||||
execAgent: { mutate: vi.fn() },
|
||||
getOperationStatus: { query: vi.fn() },
|
||||
},
|
||||
},
|
||||
}));
|
||||
|
||||
@@ -21,9 +25,19 @@ const { getTrpcClient: mockGetTrpcClient } = vi.hoisted(() => ({
|
||||
getTrpcClient: vi.fn(),
|
||||
}));
|
||||
|
||||
const { mockStreamAgentEvents } = vi.hoisted(() => ({
|
||||
mockStreamAgentEvents: vi.fn(),
|
||||
}));
|
||||
|
||||
const { mockGetAuthInfo } = vi.hoisted(() => ({
|
||||
mockGetAuthInfo: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock('../api/client', () => ({ getTrpcClient: mockGetTrpcClient }));
|
||||
vi.mock('../api/http', () => ({ getAuthInfo: mockGetAuthInfo }));
|
||||
vi.mock('../utils/agentStream', () => ({ streamAgentEvents: mockStreamAgentEvents }));
|
||||
vi.mock('../utils/logger', () => ({
|
||||
log: { debug: vi.fn(), error: vi.fn(), info: vi.fn(), warn: vi.fn() },
|
||||
log: { debug: vi.fn(), error: vi.fn(), heartbeat: vi.fn(), info: vi.fn(), warn: vi.fn() },
|
||||
setVerbose: vi.fn(),
|
||||
}));
|
||||
|
||||
@@ -35,11 +49,22 @@ describe('agent command', () => {
|
||||
exitSpy = vi.spyOn(process, 'exit').mockImplementation((() => {}) as any);
|
||||
consoleSpy = vi.spyOn(console, 'log').mockImplementation(() => {});
|
||||
mockGetTrpcClient.mockResolvedValue(mockTrpcClient);
|
||||
mockGetAuthInfo.mockResolvedValue({
|
||||
accessToken: 'test-token',
|
||||
headers: { 'Content-Type': 'application/json', 'Oidc-Auth': 'test-token' },
|
||||
serverUrl: 'https://example.com',
|
||||
});
|
||||
mockStreamAgentEvents.mockResolvedValue(undefined);
|
||||
for (const method of Object.values(mockTrpcClient.agent)) {
|
||||
for (const fn of Object.values(method)) {
|
||||
(fn as ReturnType<typeof vi.fn>).mockReset();
|
||||
}
|
||||
}
|
||||
for (const method of Object.values(mockTrpcClient.aiAgent)) {
|
||||
for (const fn of Object.values(method)) {
|
||||
(fn as ReturnType<typeof vi.fn>).mockReset();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
@@ -186,4 +211,187 @@ describe('agent command', () => {
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('run', () => {
|
||||
it('should exec agent and connect to SSE stream', async () => {
|
||||
mockTrpcClient.aiAgent.execAgent.mutate.mockResolvedValue({
|
||||
operationId: 'op-123',
|
||||
success: true,
|
||||
topicId: 'topic-1',
|
||||
});
|
||||
|
||||
const program = createProgram();
|
||||
await program.parseAsync([
|
||||
'node',
|
||||
'test',
|
||||
'agent',
|
||||
'run',
|
||||
'--agent-id',
|
||||
'a1',
|
||||
'--prompt',
|
||||
'Hello',
|
||||
]);
|
||||
|
||||
expect(mockTrpcClient.aiAgent.execAgent.mutate).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ agentId: 'a1', prompt: 'Hello' }),
|
||||
);
|
||||
expect(mockStreamAgentEvents).toHaveBeenCalledWith(
|
||||
'https://example.com/api/agent/stream?operationId=op-123',
|
||||
expect.objectContaining({ 'Oidc-Auth': 'test-token' }),
|
||||
expect.objectContaining({ json: undefined, verbose: undefined }),
|
||||
);
|
||||
});
|
||||
|
||||
it('should support --slug option', async () => {
|
||||
mockTrpcClient.aiAgent.execAgent.mutate.mockResolvedValue({
|
||||
operationId: 'op-456',
|
||||
success: true,
|
||||
});
|
||||
|
||||
const program = createProgram();
|
||||
await program.parseAsync([
|
||||
'node',
|
||||
'test',
|
||||
'agent',
|
||||
'run',
|
||||
'--slug',
|
||||
'my-agent',
|
||||
'--prompt',
|
||||
'Do something',
|
||||
]);
|
||||
|
||||
expect(mockTrpcClient.aiAgent.execAgent.mutate).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ slug: 'my-agent', prompt: 'Do something' }),
|
||||
);
|
||||
});
|
||||
|
||||
it('should exit when neither --agent-id nor --slug provided', async () => {
|
||||
const program = createProgram();
|
||||
await program.parseAsync(['node', 'test', 'agent', 'run', '--prompt', 'Hello']);
|
||||
|
||||
expect(log.error).toHaveBeenCalledWith(expect.stringContaining('--agent-id or --slug'));
|
||||
expect(exitSpy).toHaveBeenCalledWith(1);
|
||||
});
|
||||
|
||||
it('should exit when --prompt not provided', async () => {
|
||||
const program = createProgram();
|
||||
await program.parseAsync(['node', 'test', 'agent', 'run', '--agent-id', 'a1']);
|
||||
|
||||
expect(log.error).toHaveBeenCalledWith(expect.stringContaining('--prompt'));
|
||||
expect(exitSpy).toHaveBeenCalledWith(1);
|
||||
});
|
||||
|
||||
it('should exit when exec fails', async () => {
|
||||
mockTrpcClient.aiAgent.execAgent.mutate.mockResolvedValue({
|
||||
error: 'Agent not found',
|
||||
success: false,
|
||||
});
|
||||
|
||||
const program = createProgram();
|
||||
await program.parseAsync([
|
||||
'node',
|
||||
'test',
|
||||
'agent',
|
||||
'run',
|
||||
'--agent-id',
|
||||
'bad',
|
||||
'--prompt',
|
||||
'Hi',
|
||||
]);
|
||||
|
||||
expect(log.error).toHaveBeenCalledWith(expect.stringContaining('Agent not found'));
|
||||
expect(exitSpy).toHaveBeenCalledWith(1);
|
||||
});
|
||||
|
||||
it('should pass --topic-id as appContext', async () => {
|
||||
mockTrpcClient.aiAgent.execAgent.mutate.mockResolvedValue({
|
||||
operationId: 'op-789',
|
||||
success: true,
|
||||
});
|
||||
|
||||
const program = createProgram();
|
||||
await program.parseAsync([
|
||||
'node',
|
||||
'test',
|
||||
'agent',
|
||||
'run',
|
||||
'--agent-id',
|
||||
'a1',
|
||||
'--prompt',
|
||||
'Hi',
|
||||
'--topic-id',
|
||||
't1',
|
||||
]);
|
||||
|
||||
expect(mockTrpcClient.aiAgent.execAgent.mutate).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ appContext: { topicId: 't1' } }),
|
||||
);
|
||||
});
|
||||
|
||||
it('should pass --json to stream options', async () => {
|
||||
mockTrpcClient.aiAgent.execAgent.mutate.mockResolvedValue({
|
||||
operationId: 'op-j',
|
||||
success: true,
|
||||
});
|
||||
|
||||
const program = createProgram();
|
||||
await program.parseAsync([
|
||||
'node',
|
||||
'test',
|
||||
'agent',
|
||||
'run',
|
||||
'--agent-id',
|
||||
'a1',
|
||||
'--prompt',
|
||||
'Hi',
|
||||
'--json',
|
||||
]);
|
||||
|
||||
expect(mockStreamAgentEvents).toHaveBeenCalledWith(
|
||||
expect.any(String),
|
||||
expect.any(Object),
|
||||
expect.objectContaining({ json: true }),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('status', () => {
|
||||
it('should display operation status', async () => {
|
||||
mockTrpcClient.aiAgent.getOperationStatus.query.mockResolvedValue({
|
||||
cost: { total: 0.0042 },
|
||||
status: 'completed',
|
||||
stepCount: 3,
|
||||
usage: { total_tokens: 1500 },
|
||||
});
|
||||
|
||||
const program = createProgram();
|
||||
await program.parseAsync(['node', 'test', 'agent', 'status', 'op-123']);
|
||||
|
||||
expect(mockTrpcClient.aiAgent.getOperationStatus.query).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ operationId: 'op-123' }),
|
||||
);
|
||||
expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining('Operation Status'));
|
||||
});
|
||||
|
||||
it('should output JSON', async () => {
|
||||
const data = { status: 'completed', stepCount: 2 };
|
||||
mockTrpcClient.aiAgent.getOperationStatus.query.mockResolvedValue(data);
|
||||
|
||||
const program = createProgram();
|
||||
await program.parseAsync(['node', 'test', 'agent', 'status', 'op-123', '--json']);
|
||||
|
||||
expect(consoleSpy).toHaveBeenCalledWith(JSON.stringify(data, null, 2));
|
||||
});
|
||||
|
||||
it('should pass --history flag', async () => {
|
||||
mockTrpcClient.aiAgent.getOperationStatus.query.mockResolvedValue({ status: 'running' });
|
||||
|
||||
const program = createProgram();
|
||||
await program.parseAsync(['node', 'test', 'agent', 'status', 'op-123', '--history']);
|
||||
|
||||
expect(mockTrpcClient.aiAgent.getOperationStatus.query).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ includeHistory: true }),
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
import { readFileSync } from 'node:fs';
|
||||
|
||||
import type { Command } from 'commander';
|
||||
import pc from 'picocolors';
|
||||
|
||||
import { getTrpcClient } from '../api/client';
|
||||
import { getAuthInfo } from '../api/http';
|
||||
import { replayAgentEvents, streamAgentEvents } from '../utils/agentStream';
|
||||
import { confirm, outputJson, printTable, truncate } from '../utils/format';
|
||||
import { log } from '../utils/logger';
|
||||
import { log, setVerbose } from '../utils/logger';
|
||||
|
||||
export function registerAgentCommand(program: Command) {
|
||||
const agent = program.command('agent').description('Manage agents');
|
||||
@@ -199,4 +203,143 @@ export function registerAgentCommand(program: Command) {
|
||||
const r = result as any;
|
||||
console.log(`${pc.green('✓')} Duplicated agent → ${pc.bold(r.agentId || r.id || 'done')}`);
|
||||
});
|
||||
|
||||
// ── run ──────────────────────────────────────────────
|
||||
|
||||
agent
|
||||
.command('run')
|
||||
.description('Run an agent with a prompt')
|
||||
.option('-a, --agent-id <id>', 'Agent ID')
|
||||
.option('-s, --slug <slug>', 'Agent slug')
|
||||
.option('-p, --prompt <text>', 'User prompt')
|
||||
.option('-t, --topic-id <id>', 'Reuse an existing topic')
|
||||
.option('--no-auto-start', 'Do not auto-start the agent')
|
||||
.option('--json', 'Output full JSON event stream')
|
||||
.option('-v, --verbose', 'Show detailed tool call info')
|
||||
.option('--replay <file>', 'Replay events from a saved JSON file (offline)')
|
||||
.action(
|
||||
async (options: {
|
||||
agentId?: string;
|
||||
autoStart?: boolean;
|
||||
json?: boolean;
|
||||
prompt?: string;
|
||||
replay?: string;
|
||||
slug?: string;
|
||||
topicId?: string;
|
||||
verbose?: boolean;
|
||||
}) => {
|
||||
if (options.verbose) setVerbose(true);
|
||||
|
||||
// Replay mode: render from saved JSON file, no network needed
|
||||
if (options.replay) {
|
||||
const data = readFileSync(options.replay, 'utf8');
|
||||
const events = JSON.parse(data);
|
||||
replayAgentEvents(events, { json: options.json, verbose: options.verbose });
|
||||
return;
|
||||
}
|
||||
|
||||
if (!options.agentId && !options.slug) {
|
||||
log.error('Either --agent-id or --slug is required.');
|
||||
process.exit(1);
|
||||
return;
|
||||
}
|
||||
if (!options.prompt) {
|
||||
log.error('--prompt is required.');
|
||||
process.exit(1);
|
||||
return;
|
||||
}
|
||||
|
||||
const client = await getTrpcClient();
|
||||
|
||||
// 1. Exec agent to get operationId
|
||||
const input: Record<string, any> = { prompt: options.prompt };
|
||||
if (options.agentId) input.agentId = options.agentId;
|
||||
if (options.slug) input.slug = options.slug;
|
||||
if (options.topicId) input.appContext = { topicId: options.topicId };
|
||||
if (options.autoStart === false) input.autoStart = false;
|
||||
|
||||
const result = await client.aiAgent.execAgent.mutate(input as any);
|
||||
const r = result as any;
|
||||
|
||||
if (!r.success) {
|
||||
log.error(`Failed to start agent: ${r.error || r.message || 'Unknown error'}`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const operationId = r.operationId;
|
||||
if (!options.json) {
|
||||
log.info(`Operation: ${pc.dim(operationId)} · Topic: ${pc.dim(r.topicId || 'n/a')}`);
|
||||
}
|
||||
|
||||
// 2. Connect to SSE stream
|
||||
const { serverUrl, headers } = await getAuthInfo();
|
||||
const streamUrl = `${serverUrl}/api/agent/stream?operationId=${encodeURIComponent(operationId)}`;
|
||||
|
||||
await streamAgentEvents(streamUrl, headers, {
|
||||
json: options.json,
|
||||
verbose: options.verbose,
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
// ── status ──────────────────────────────────────────
|
||||
|
||||
agent
|
||||
.command('status <operationId>')
|
||||
.description('Check agent operation status')
|
||||
.option('--json [fields]', 'Output JSON, optionally specify fields (comma-separated)')
|
||||
.option('--history', 'Include step history')
|
||||
.option('--history-limit <n>', 'Number of history entries', '10')
|
||||
.action(
|
||||
async (
|
||||
operationId: string,
|
||||
options: { history?: boolean; historyLimit?: string; json?: string | boolean },
|
||||
) => {
|
||||
const client = await getTrpcClient();
|
||||
|
||||
const input: Record<string, any> = { operationId };
|
||||
if (options.history) input.includeHistory = true;
|
||||
if (options.historyLimit) input.historyLimit = Number.parseInt(options.historyLimit, 10);
|
||||
|
||||
const result = await client.aiAgent.getOperationStatus.query(input as any);
|
||||
const r = result as any;
|
||||
|
||||
if (options.json !== undefined) {
|
||||
const fields = typeof options.json === 'string' ? options.json : undefined;
|
||||
outputJson(r, fields);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(pc.bold('Operation Status'));
|
||||
console.log(` ID: ${operationId}`);
|
||||
console.log(` Status: ${colorStatus(r.status || r.state || 'unknown')}`);
|
||||
|
||||
if (r.stepCount !== undefined) console.log(` Steps: ${r.stepCount}`);
|
||||
if (r.usage?.total_tokens) console.log(` Tokens: ${r.usage.total_tokens}`);
|
||||
if (r.cost?.total !== undefined) console.log(` Cost: $${r.cost.total.toFixed(4)}`);
|
||||
if (r.error) console.log(` Error: ${pc.red(r.error)}`);
|
||||
if (r.createdAt) console.log(` Started: ${r.createdAt}`);
|
||||
if (r.completedAt) console.log(` Ended: ${r.completedAt}`);
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
function colorStatus(status: string): string {
|
||||
switch (status) {
|
||||
case 'completed':
|
||||
case 'success': {
|
||||
return pc.green(status);
|
||||
}
|
||||
case 'failed':
|
||||
case 'error': {
|
||||
return pc.red(status);
|
||||
}
|
||||
case 'processing':
|
||||
case 'running': {
|
||||
return pc.yellow(status);
|
||||
}
|
||||
default: {
|
||||
return pc.dim(status);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
195
apps/cli/src/utils/agentStream.test.ts
Normal file
195
apps/cli/src/utils/agentStream.test.ts
Normal file
@@ -0,0 +1,195 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { streamAgentEvents } from './agentStream';
|
||||
|
||||
vi.mock('./logger', () => ({
|
||||
log: {
|
||||
error: vi.fn(),
|
||||
heartbeat: vi.fn(),
|
||||
info: vi.fn(),
|
||||
toolCall: vi.fn(),
|
||||
toolResult: vi.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
function createSSEStream(events: string[]): ReadableStream<Uint8Array> {
|
||||
const encoder = new TextEncoder();
|
||||
const payload = events.join('');
|
||||
|
||||
return new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue(encoder.encode(payload));
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/** Create a stream that delivers content in separate chunks to simulate network splitting */
|
||||
function createChunkedSSEStream(chunks: string[]): ReadableStream<Uint8Array> {
|
||||
const encoder = new TextEncoder();
|
||||
|
||||
return new ReadableStream({
|
||||
start(controller) {
|
||||
for (const chunk of chunks) {
|
||||
controller.enqueue(encoder.encode(chunk));
|
||||
}
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function sseMessage(type: string, data: Record<string, any>): string {
|
||||
return `event:${type}\ndata:${JSON.stringify(data)}\n\n`;
|
||||
}
|
||||
|
||||
describe('streamAgentEvents', () => {
|
||||
let fetchSpy: ReturnType<typeof vi.spyOn>;
|
||||
let stdoutSpy: ReturnType<typeof vi.spyOn>;
|
||||
let consoleSpy: ReturnType<typeof vi.spyOn>;
|
||||
|
||||
beforeEach(() => {
|
||||
fetchSpy = vi.spyOn(globalThis, 'fetch');
|
||||
stdoutSpy = vi.spyOn(process.stdout, 'write').mockImplementation(() => true);
|
||||
consoleSpy = vi.spyOn(console, 'log').mockImplementation(() => {});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
fetchSpy.mockRestore();
|
||||
stdoutSpy.mockRestore();
|
||||
consoleSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('should render text stream chunks', async () => {
|
||||
const body = createSSEStream([
|
||||
sseMessage('data', {
|
||||
data: null,
|
||||
operationId: 'op1',
|
||||
stepIndex: 0,
|
||||
timestamp: Date.now(),
|
||||
type: 'agent_runtime_init',
|
||||
}),
|
||||
sseMessage('data', {
|
||||
data: null,
|
||||
operationId: 'op1',
|
||||
stepIndex: 0,
|
||||
timestamp: Date.now(),
|
||||
type: 'step_start',
|
||||
}),
|
||||
sseMessage('data', {
|
||||
data: { chunkType: 'text', content: 'Hello ' },
|
||||
operationId: 'op1',
|
||||
stepIndex: 0,
|
||||
timestamp: Date.now(),
|
||||
type: 'stream_chunk',
|
||||
}),
|
||||
sseMessage('data', {
|
||||
data: { chunkType: 'text', content: 'world!' },
|
||||
operationId: 'op1',
|
||||
stepIndex: 0,
|
||||
timestamp: Date.now(),
|
||||
type: 'stream_chunk',
|
||||
}),
|
||||
sseMessage('data', {
|
||||
data: { stepCount: 1, usage: { total_tokens: 100 } },
|
||||
operationId: 'op1',
|
||||
stepIndex: 0,
|
||||
timestamp: Date.now(),
|
||||
type: 'agent_runtime_end',
|
||||
}),
|
||||
]);
|
||||
|
||||
fetchSpy.mockResolvedValue(new Response(body, { status: 200 }));
|
||||
|
||||
await streamAgentEvents('https://example.com/stream', {});
|
||||
|
||||
expect(stdoutSpy).toHaveBeenCalledWith('Hello ');
|
||||
expect(stdoutSpy).toHaveBeenCalledWith('world!');
|
||||
});
|
||||
|
||||
it('should output JSON when json option is true', async () => {
|
||||
const events = [
|
||||
{
|
||||
data: null,
|
||||
operationId: 'op1',
|
||||
stepIndex: 0,
|
||||
timestamp: 1000,
|
||||
type: 'agent_runtime_init',
|
||||
},
|
||||
{
|
||||
data: { stepCount: 1 },
|
||||
operationId: 'op1',
|
||||
stepIndex: 0,
|
||||
timestamp: 2000,
|
||||
type: 'agent_runtime_end',
|
||||
},
|
||||
];
|
||||
|
||||
const body = createSSEStream(events.map((e) => sseMessage('data', e)));
|
||||
fetchSpy.mockResolvedValue(new Response(body, { status: 200 }));
|
||||
|
||||
await streamAgentEvents('https://example.com/stream', {}, { json: true });
|
||||
|
||||
expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining('"agent_runtime_init"'));
|
||||
expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining('"agent_runtime_end"'));
|
||||
});
|
||||
|
||||
it('should handle heartbeat events', async () => {
|
||||
const { log } = await import('./logger');
|
||||
const body = createSSEStream([
|
||||
`event:heartbeat\ndata:{}\n\n`,
|
||||
sseMessage('data', {
|
||||
data: null,
|
||||
operationId: 'op1',
|
||||
stepIndex: 0,
|
||||
timestamp: Date.now(),
|
||||
type: 'agent_runtime_end',
|
||||
}),
|
||||
]);
|
||||
|
||||
fetchSpy.mockResolvedValue(new Response(body, { status: 200 }));
|
||||
|
||||
await streamAgentEvents('https://example.com/stream', {});
|
||||
|
||||
expect(log.heartbeat).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should preserve SSE frame state across read boundaries', async () => {
|
||||
const endEvent = JSON.stringify({
|
||||
data: { stepCount: 1 },
|
||||
operationId: 'op1',
|
||||
stepIndex: 0,
|
||||
timestamp: Date.now(),
|
||||
type: 'agent_runtime_end',
|
||||
});
|
||||
|
||||
// Split SSE message across two chunks: first chunk has event: + data:,
|
||||
// second chunk has the terminating blank line.
|
||||
const body = createChunkedSSEStream([`event:data\ndata:${endEvent}\n`, `\n`]);
|
||||
|
||||
fetchSpy.mockResolvedValue(new Response(body, { status: 200 }));
|
||||
|
||||
await streamAgentEvents('https://example.com/stream', {});
|
||||
|
||||
// If frame state was lost the event would be silently dropped,
|
||||
// and the stream would end without printing the finish line.
|
||||
expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining('Agent finished'));
|
||||
});
|
||||
|
||||
it('should exit on HTTP error', async () => {
|
||||
const exitSpy = vi.spyOn(process, 'exit').mockImplementation((() => {
|
||||
throw new Error('process.exit');
|
||||
}) as any);
|
||||
const { log } = await import('./logger');
|
||||
|
||||
fetchSpy.mockResolvedValue(new Response('Not Found', { status: 404 }));
|
||||
|
||||
await expect(streamAgentEvents('https://example.com/stream', {})).rejects.toThrow(
|
||||
'process.exit',
|
||||
);
|
||||
|
||||
expect(log.error).toHaveBeenCalledWith(expect.stringContaining('404'));
|
||||
expect(exitSpy).toHaveBeenCalledWith(1);
|
||||
|
||||
exitSpy.mockRestore();
|
||||
});
|
||||
});
|
||||
259
apps/cli/src/utils/agentStream.ts
Normal file
259
apps/cli/src/utils/agentStream.ts
Normal file
@@ -0,0 +1,259 @@
|
||||
import pc from 'picocolors';
|
||||
|
||||
import { log } from './logger';
|
||||
|
||||
export interface AgentStreamEvent {
|
||||
data: any;
|
||||
id?: string;
|
||||
operationId: string;
|
||||
stepIndex: number;
|
||||
timestamp: number;
|
||||
type: string;
|
||||
}
|
||||
|
||||
interface StreamOptions {
|
||||
json?: boolean;
|
||||
verbose?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to the agent SSE stream and render events to the terminal.
|
||||
* Resolves when the stream ends (agent_runtime_end or connection close).
|
||||
*/
|
||||
export async function streamAgentEvents(
|
||||
url: string,
|
||||
headers: Record<string, string>,
|
||||
options: StreamOptions = {},
|
||||
): Promise<void> {
|
||||
const res = await fetch(url, { headers });
|
||||
|
||||
if (!res.ok) {
|
||||
const text = await res.text();
|
||||
log.error(`Agent stream failed: ${res.status} ${text}`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
if (!res.body) {
|
||||
log.error('No response body received from agent stream');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const reader = res.body.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = '';
|
||||
const jsonEvents: AgentStreamEvent[] = [];
|
||||
const ctx = createRenderContext();
|
||||
|
||||
// Declared outside the read loop so partial SSE frames that span
|
||||
// chunk boundaries are not lost between reader.read() calls.
|
||||
let eventType = '';
|
||||
let eventData = '';
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
const lines = buffer.split('\n');
|
||||
buffer = lines.pop() || '';
|
||||
|
||||
for (const line of lines) {
|
||||
if (line.startsWith('event:')) {
|
||||
eventType = line.slice(6).trim();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (line.startsWith('data:')) {
|
||||
eventData = line.slice(5).trim();
|
||||
}
|
||||
|
||||
// Empty line = end of SSE message
|
||||
if (line === '' && eventData) {
|
||||
if (eventType === 'heartbeat') {
|
||||
log.heartbeat();
|
||||
eventType = '';
|
||||
eventData = '';
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
const event: AgentStreamEvent = JSON.parse(eventData);
|
||||
|
||||
if (options.json) {
|
||||
jsonEvents.push(event);
|
||||
} else {
|
||||
renderEvent(event, ctx, options);
|
||||
}
|
||||
|
||||
if (event.type === 'agent_runtime_end') {
|
||||
if (options.json) {
|
||||
console.log(JSON.stringify(jsonEvents, null, 2));
|
||||
} else {
|
||||
renderEnd(event);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (event.type === 'error') {
|
||||
if (options.json) {
|
||||
console.log(JSON.stringify(jsonEvents, null, 2));
|
||||
}
|
||||
log.error(
|
||||
`Agent error: ${event.data?.message || event.data?.error || 'Unknown error'}`,
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
} catch {
|
||||
// Not JSON, skip
|
||||
}
|
||||
|
||||
eventType = '';
|
||||
eventData = '';
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stream ended without agent_runtime_end
|
||||
if (options.json && jsonEvents.length > 0) {
|
||||
console.log(JSON.stringify(jsonEvents, null, 2));
|
||||
}
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Replay previously saved JSON events (from --json output) to the terminal.
|
||||
* No network calls needed.
|
||||
*/
|
||||
export function replayAgentEvents(events: AgentStreamEvent[], options: StreamOptions = {}): void {
|
||||
if (options.json) {
|
||||
console.log(JSON.stringify(events, null, 2));
|
||||
return;
|
||||
}
|
||||
|
||||
const ctx = createRenderContext();
|
||||
|
||||
for (const event of events) {
|
||||
if (!event.type) continue;
|
||||
|
||||
renderEvent(event, ctx, options);
|
||||
|
||||
if (event.type === 'agent_runtime_end') {
|
||||
renderEnd(event);
|
||||
return;
|
||||
}
|
||||
|
||||
if (event.type === 'error') {
|
||||
log.error(`Agent error: ${event.data?.message || event.data?.error || 'Unknown error'}`);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Render helpers ──────────────────────────────────────
|
||||
|
||||
interface RenderContext {
|
||||
/** Tool call IDs already printed from streaming tools_calling chunks */
|
||||
printedToolCalls: Set<string>;
|
||||
}
|
||||
|
||||
function createRenderContext(): RenderContext {
|
||||
return { printedToolCalls: new Set() };
|
||||
}
|
||||
|
||||
function renderEvent(event: AgentStreamEvent, ctx: RenderContext, options: StreamOptions): void {
|
||||
switch (event.type) {
|
||||
case 'agent_runtime_init': {
|
||||
log.info('Agent started');
|
||||
break;
|
||||
}
|
||||
|
||||
case 'step_start': {
|
||||
if (event.stepIndex > 0) console.log();
|
||||
console.log(pc.bold(pc.cyan(`── Step ${event.stepIndex + 1} ──`)));
|
||||
break;
|
||||
}
|
||||
|
||||
case 'stream_start': {
|
||||
// Quiet, content will follow
|
||||
break;
|
||||
}
|
||||
|
||||
case 'stream_chunk': {
|
||||
const data = event.data;
|
||||
if (!data) break;
|
||||
|
||||
if (data.chunkType === 'text' && data.content) {
|
||||
process.stdout.write(data.content);
|
||||
} else if (data.chunkType === 'reasoning' && data.reasoning) {
|
||||
process.stdout.write(pc.dim(data.reasoning));
|
||||
} else if (data.chunkType === 'tools_calling' && data.toolsCalling) {
|
||||
// tools_calling chunks arrive incrementally with the same tool ID.
|
||||
// Only print each tool call once (on first appearance).
|
||||
for (const tool of data.toolsCalling) {
|
||||
const id = tool.id || '';
|
||||
if (id && ctx.printedToolCalls.has(id)) continue;
|
||||
if (id) ctx.printedToolCalls.add(id);
|
||||
const name = tool.apiName || tool.function?.name || 'unknown';
|
||||
log.toolCall(name, id);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case 'stream_end': {
|
||||
process.stdout.write('\n');
|
||||
// Reset dedup set for next step's tool calls
|
||||
ctx.printedToolCalls.clear();
|
||||
break;
|
||||
}
|
||||
|
||||
case 'tool_start': {
|
||||
const tc = event.data?.toolCalling || event.data;
|
||||
const name = tc?.apiName || tc?.name || 'tool';
|
||||
const id = tc?.id || event.data?.requestId || '';
|
||||
log.toolCall(
|
||||
name,
|
||||
id,
|
||||
options.verbose ? tc?.arguments || JSON.stringify(tc?.args) : undefined,
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
case 'tool_end': {
|
||||
const payload = event.data?.payload || event.data;
|
||||
const tc = payload?.toolCalling || payload;
|
||||
const id = tc?.id || event.data?.requestId || '';
|
||||
const success = event.data?.isSuccess !== false;
|
||||
const time = event.data?.executionTime;
|
||||
const timeSuffix = time ? ` ${time}ms` : '';
|
||||
log.toolResult(id, success, options.verbose ? event.data?.result?.content : timeSuffix);
|
||||
break;
|
||||
}
|
||||
|
||||
case 'step_complete': {
|
||||
// Step finished, next step_start or agent_runtime_end will follow
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function renderEnd(event: AgentStreamEvent): void {
|
||||
console.log();
|
||||
const data = event.data || {};
|
||||
const parts: string[] = [`${pc.green('✓')} Agent finished`];
|
||||
|
||||
if (data.stepCount !== undefined) {
|
||||
parts.push(`${data.stepCount} step${data.stepCount !== 1 ? 's' : ''}`);
|
||||
}
|
||||
if (data.usage?.total_tokens) {
|
||||
parts.push(`${data.usage.total_tokens} tokens`);
|
||||
}
|
||||
if (data.cost?.total !== undefined) {
|
||||
parts.push(`$${data.cost.total.toFixed(4)}`);
|
||||
}
|
||||
|
||||
console.log(parts.join(pc.dim(' · ')));
|
||||
}
|
||||
Reference in New Issue
Block a user