mirror of
https://github.com/lobehub/lobehub.git
synced 2026-03-27 13:29:15 +07:00
✨ feat(userMemories): support to use customized Qstash client with extra header for workflows (#11378)
This commit is contained in:
@@ -12,6 +12,10 @@ import {
|
||||
type MemoryExtractionPayloadInput,
|
||||
normalizeMemoryExtractionPayload,
|
||||
} from '@/server/services/memory/userMemory/extract';
|
||||
import { Client } from '@upstash/qstash'
|
||||
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
|
||||
|
||||
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
|
||||
|
||||
const CEP_LAYERS: LayersEnum[] = [LayersEnum.Context, LayersEnum.Experience, LayersEnum.Preference];
|
||||
const IDENTITY_LAYERS: LayersEnum[] = [LayersEnum.Identity];
|
||||
@@ -136,5 +140,17 @@ export const { POST } = serve<MemoryExtractionPayloadInput>((context) =>
|
||||
span.end();
|
||||
}
|
||||
},
|
||||
),
|
||||
);
|
||||
), {
|
||||
// NOTICE(@nekomeowww): Here as scenarios like Vercel Deployment Protection,
|
||||
// intermediate context.run(...) won't offer customizable headers like context.trigger(...) / client.trigger(...)
|
||||
// for passing additional headers, we have to provide a custom QStash client with the required headers here.
|
||||
//
|
||||
// Refer to the doc for more details:
|
||||
// https://upstash.com/docs/workflow/troubleshooting/vercel#step-2-pass-header-when-triggering
|
||||
qstashClient: new Client({
|
||||
headers: {
|
||||
...upstashWorkflowExtraHeaders,
|
||||
},
|
||||
token: process.env.QSTASH_TOKEN!
|
||||
})
|
||||
});
|
||||
|
||||
@@ -10,13 +10,14 @@ import {
|
||||
import { forEachBatchSequential } from '@/server/services/memory/userMemory/topicBatching';
|
||||
import { MemorySourceType } from '@lobechat/types';
|
||||
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
|
||||
import { Client } from '@upstash/qstash'
|
||||
|
||||
const TOPIC_PAGE_SIZE = 50;
|
||||
const TOPIC_BATCH_SIZE = 4;
|
||||
|
||||
export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
|
||||
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
|
||||
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
|
||||
|
||||
export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
|
||||
const params = normalizeMemoryExtractionPayload(context.requestPayload || {});
|
||||
if (!params.userIds.length) {
|
||||
return { message: 'No user ids provided for topic processing.' };
|
||||
@@ -125,4 +126,17 @@ export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
|
||||
}
|
||||
|
||||
return { processedUsers: params.userIds.length };
|
||||
}, {
|
||||
// NOTICE(@nekomeowww): Here as scenarios like Vercel Deployment Protection,
|
||||
// intermediate context.run(...) won't offer customizable headers like context.trigger(...) / client.trigger(...)
|
||||
// for passing additional headers, we have to provide a custom QStash client with the required headers here.
|
||||
//
|
||||
// Refer to the doc for more details:
|
||||
// https://upstash.com/docs/workflow/troubleshooting/vercel#step-2-pass-header-when-triggering
|
||||
qstashClient: new Client({
|
||||
headers: {
|
||||
...upstashWorkflowExtraHeaders,
|
||||
},
|
||||
token: process.env.QSTASH_TOKEN!
|
||||
})
|
||||
});
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { serve } from '@upstash/workflow/nextjs';
|
||||
import { chunk } from 'es-toolkit/compat';
|
||||
import { Client } from '@upstash/qstash'
|
||||
|
||||
import {
|
||||
MemoryExtractionExecutor,
|
||||
@@ -13,9 +14,9 @@ import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryEx
|
||||
const USER_PAGE_SIZE = 50;
|
||||
const USER_BATCH_SIZE = 10;
|
||||
|
||||
export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
|
||||
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
|
||||
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
|
||||
|
||||
export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
|
||||
const params = normalizeMemoryExtractionPayload(context.requestPayload || {});
|
||||
if (params.sources.length === 0) {
|
||||
return { message: 'No sources provided, skip memory extraction.' };
|
||||
@@ -73,4 +74,17 @@ export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
|
||||
nextCursor: cursor ? cursor.id : null,
|
||||
processedUsers: ids.length,
|
||||
};
|
||||
}, {
|
||||
// NOTICE(@nekomeowww): Here as scenarios like Vercel Deployment Protection,
|
||||
// intermediate context.run(...) won't offer customizable headers like context.trigger(...) / client.trigger(...)
|
||||
// for passing additional headers, we have to provide a custom QStash client with the required headers here.
|
||||
//
|
||||
// Refer to the doc for more details:
|
||||
// https://upstash.com/docs/workflow/troubleshooting/vercel#step-2-pass-header-when-triggering
|
||||
qstashClient: new Client({
|
||||
headers: {
|
||||
...upstashWorkflowExtraHeaders,
|
||||
},
|
||||
token: process.env.QSTASH_TOKEN!
|
||||
})
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user