feat(userMemories): support to use customized Qstash client with extra header for workflows (#11378)

This commit is contained in:
Neko
2026-01-11 04:44:33 +08:00
committed by GitHub
parent 7d95853f46
commit 3417af4ccd
3 changed files with 50 additions and 6 deletions

View File

@@ -12,6 +12,10 @@ import {
type MemoryExtractionPayloadInput,
normalizeMemoryExtractionPayload,
} from '@/server/services/memory/userMemory/extract';
import { Client } from '@upstash/qstash'
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
const CEP_LAYERS: LayersEnum[] = [LayersEnum.Context, LayersEnum.Experience, LayersEnum.Preference];
const IDENTITY_LAYERS: LayersEnum[] = [LayersEnum.Identity];
@@ -136,5 +140,17 @@ export const { POST } = serve<MemoryExtractionPayloadInput>((context) =>
span.end();
}
},
),
);
), {
// NOTICE(@nekomeowww): Here as scenarios like Vercel Deployment Protection,
// intermediate context.run(...) won't offer customizable headers like context.trigger(...) / client.trigger(...)
// for passing additional headers, we have to provide a custom QStash client with the required headers here.
//
// Refer to the doc for more details:
// https://upstash.com/docs/workflow/troubleshooting/vercel#step-2-pass-header-when-triggering
qstashClient: new Client({
headers: {
...upstashWorkflowExtraHeaders,
},
token: process.env.QSTASH_TOKEN!
})
});

View File

@@ -10,13 +10,14 @@ import {
import { forEachBatchSequential } from '@/server/services/memory/userMemory/topicBatching';
import { MemorySourceType } from '@lobechat/types';
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
import { Client } from '@upstash/qstash'
const TOPIC_PAGE_SIZE = 50;
const TOPIC_BATCH_SIZE = 4;
export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
const params = normalizeMemoryExtractionPayload(context.requestPayload || {});
if (!params.userIds.length) {
return { message: 'No user ids provided for topic processing.' };
@@ -125,4 +126,17 @@ export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
}
return { processedUsers: params.userIds.length };
}, {
// NOTICE(@nekomeowww): Here as scenarios like Vercel Deployment Protection,
// intermediate context.run(...) won't offer customizable headers like context.trigger(...) / client.trigger(...)
// for passing additional headers, we have to provide a custom QStash client with the required headers here.
//
// Refer to the doc for more details:
// https://upstash.com/docs/workflow/troubleshooting/vercel#step-2-pass-header-when-triggering
qstashClient: new Client({
headers: {
...upstashWorkflowExtraHeaders,
},
token: process.env.QSTASH_TOKEN!
})
});

View File

@@ -1,5 +1,6 @@
import { serve } from '@upstash/workflow/nextjs';
import { chunk } from 'es-toolkit/compat';
import { Client } from '@upstash/qstash'
import {
MemoryExtractionExecutor,
@@ -13,9 +14,9 @@ import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryEx
const USER_PAGE_SIZE = 50;
const USER_BATCH_SIZE = 10;
export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
const params = normalizeMemoryExtractionPayload(context.requestPayload || {});
if (params.sources.length === 0) {
return { message: 'No sources provided, skip memory extraction.' };
@@ -73,4 +74,17 @@ export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
nextCursor: cursor ? cursor.id : null,
processedUsers: ids.length,
};
}, {
// NOTICE(@nekomeowww): Here as scenarios like Vercel Deployment Protection,
// intermediate context.run(...) won't offer customizable headers like context.trigger(...) / client.trigger(...)
// for passing additional headers, we have to provide a custom QStash client with the required headers here.
//
// Refer to the doc for more details:
// https://upstash.com/docs/workflow/troubleshooting/vercel#step-2-pass-header-when-triggering
qstashClient: new Client({
headers: {
...upstashWorkflowExtraHeaders,
},
token: process.env.QSTASH_TOKEN!
})
});