mirror of
https://github.com/lobehub/lobehub.git
synced 2026-03-27 13:29:15 +07:00
🐛 fix: surface streaming errors during mid-stream pulls (#11762)
🐛 fix: surface streaming errors during pulls
This commit is contained in:
@@ -1,7 +1,10 @@
|
||||
import { describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import {
|
||||
FIRST_CHUNK_ERROR_KEY,
|
||||
convertIterableToStream,
|
||||
createCallbacksTransformer,
|
||||
createFirstErrorHandleTransformer,
|
||||
createSSEDataExtractor,
|
||||
createSSEProtocolTransformer,
|
||||
createTokenSpeedCalculator,
|
||||
@@ -239,6 +242,32 @@ describe('createTokenSpeedCalculator', async () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('convertIterableToStream', () => {
|
||||
it('should surface errors from subsequent pulls as error chunks', async () => {
|
||||
async function* erroringStream() {
|
||||
yield 'first';
|
||||
throw new Error('rate limit');
|
||||
}
|
||||
|
||||
const readable = convertIterableToStream(erroringStream()).pipeThrough(
|
||||
createFirstErrorHandleTransformer(),
|
||||
);
|
||||
|
||||
const reader = readable.getReader();
|
||||
const chunks: any[] = [];
|
||||
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
chunks.push(value);
|
||||
}
|
||||
|
||||
expect(chunks[0]).toBe('first');
|
||||
expect(chunks[1][FIRST_CHUNK_ERROR_KEY]).toBe(true);
|
||||
expect(chunks[1].message).toBe('rate limit');
|
||||
});
|
||||
});
|
||||
|
||||
describe('createSSEProtocolTransformer', () => {
|
||||
const processChunk = async (transformer: TransformStream, chunk: any) => {
|
||||
const results: any[] = [];
|
||||
|
||||
@@ -151,9 +151,19 @@ export function readableFromAsyncIterable<T>(iterable: AsyncIterable<T>) {
|
||||
},
|
||||
|
||||
async pull(controller) {
|
||||
const { done, value } = await it.next();
|
||||
if (done) controller.close();
|
||||
else controller.enqueue(value);
|
||||
try {
|
||||
const { done, value } = await it.next();
|
||||
if (done) controller.close();
|
||||
else controller.enqueue(value);
|
||||
} catch (e) {
|
||||
const error = e as Error;
|
||||
|
||||
controller.enqueue(
|
||||
(ERROR_CHUNK_PREFIX +
|
||||
JSON.stringify({ message: error.message, name: error.name, stack: error.stack })) as T,
|
||||
);
|
||||
controller.close();
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
@@ -171,9 +181,19 @@ export const convertIterableToStream = <T>(stream: AsyncIterable<T>) => {
|
||||
await it.return?.(reason);
|
||||
},
|
||||
async pull(controller) {
|
||||
const { done, value } = await it.next();
|
||||
if (done) controller.close();
|
||||
else controller.enqueue(value);
|
||||
try {
|
||||
const { done, value } = await it.next();
|
||||
if (done) controller.close();
|
||||
else controller.enqueue(value);
|
||||
} catch (e) {
|
||||
const error = e as Error;
|
||||
|
||||
controller.enqueue(
|
||||
(ERROR_CHUNK_PREFIX +
|
||||
JSON.stringify({ message: error.message, name: error.name, stack: error.stack })) as T,
|
||||
);
|
||||
controller.close();
|
||||
}
|
||||
},
|
||||
|
||||
async start(controller) {
|
||||
|
||||
Reference in New Issue
Block a user