🐛 fix: surface streaming errors during mid-stream pulls (#11762)

🐛 fix: surface streaming errors during pulls
This commit is contained in:
Arvin Xu
2026-01-24 09:55:43 +08:00
committed by GitHub
parent 42339cd6d0
commit 74a88d3a61
2 changed files with 55 additions and 6 deletions

View File

@@ -1,7 +1,10 @@
import { describe, expect, it, vi } from 'vitest';
import {
FIRST_CHUNK_ERROR_KEY,
convertIterableToStream,
createCallbacksTransformer,
createFirstErrorHandleTransformer,
createSSEDataExtractor,
createSSEProtocolTransformer,
createTokenSpeedCalculator,
@@ -239,6 +242,32 @@ describe('createTokenSpeedCalculator', async () => {
});
});
describe('convertIterableToStream', () => {
it('should surface errors from subsequent pulls as error chunks', async () => {
async function* erroringStream() {
yield 'first';
throw new Error('rate limit');
}
const readable = convertIterableToStream(erroringStream()).pipeThrough(
createFirstErrorHandleTransformer(),
);
const reader = readable.getReader();
const chunks: any[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
expect(chunks[0]).toBe('first');
expect(chunks[1][FIRST_CHUNK_ERROR_KEY]).toBe(true);
expect(chunks[1].message).toBe('rate limit');
});
});
describe('createSSEProtocolTransformer', () => {
const processChunk = async (transformer: TransformStream, chunk: any) => {
const results: any[] = [];

View File

@@ -151,9 +151,19 @@ export function readableFromAsyncIterable<T>(iterable: AsyncIterable<T>) {
},
async pull(controller) {
const { done, value } = await it.next();
if (done) controller.close();
else controller.enqueue(value);
try {
const { done, value } = await it.next();
if (done) controller.close();
else controller.enqueue(value);
} catch (e) {
const error = e as Error;
controller.enqueue(
(ERROR_CHUNK_PREFIX +
JSON.stringify({ message: error.message, name: error.name, stack: error.stack })) as T,
);
controller.close();
}
},
});
}
@@ -171,9 +181,19 @@ export const convertIterableToStream = <T>(stream: AsyncIterable<T>) => {
await it.return?.(reason);
},
async pull(controller) {
const { done, value } = await it.next();
if (done) controller.close();
else controller.enqueue(value);
try {
const { done, value } = await it.next();
if (done) controller.close();
else controller.enqueue(value);
} catch (e) {
const error = e as Error;
controller.enqueue(
(ERROR_CHUNK_PREFIX +
JSON.stringify({ message: error.message, name: error.name, stack: error.stack })) as T,
);
controller.close();
}
},
async start(controller) {