feat(userMemories): support to assign for extra headers when invoking upstash workflows (#11374)

This commit is contained in:
Neko
2026-01-09 16:15:35 +08:00
committed by GitHub
parent 2e53db375d
commit 895e15ec21
5 changed files with 30 additions and 11 deletions

View File

@@ -10,7 +10,7 @@ import {
} from '@/server/services/memory/userMemory/extract';
export const POST = async (req: Request) => {
const { webhookHeaders } = parseMemoryExtractionConfig();
const { webhookHeaders, upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
if (webhookHeaders && Object.keys(webhookHeaders).length > 0) {
for (const [key, value] of Object.entries(webhookHeaders)) {
@@ -43,6 +43,7 @@ export const POST = async (req: Request) => {
if (params.mode === 'workflow') {
const { workflowRunId } = await MemoryExtractionWorkflowService.triggerProcessUsers(
buildWorkflowPayloadInput(params),
{ extraHeaders: upstashWorkflowExtraHeaders },
);
return NextResponse.json(
{ message: 'Memory extraction scheduled via workflow.', workflowRunId },

View File

@@ -9,11 +9,14 @@ import {
} from '@/server/services/memory/userMemory/extract';
import { forEachBatchSequential } from '@/server/services/memory/userMemory/topicBatching';
import { MemorySourceType } from '@lobechat/types';
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
const TOPIC_PAGE_SIZE = 50;
const TOPIC_BATCH_SIZE = 4;
export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
const params = normalizeMemoryExtractionPayload(context.requestPayload || {});
if (!params.userIds.length) {
return { message: 'No user ids provided for topic processing.' };
@@ -37,7 +40,7 @@ export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
userId,
userIds: [userId],
}),
});
}, { extraHeaders: upstashWorkflowExtraHeaders });
};
for (const userId of params.userIds) {
@@ -99,7 +102,7 @@ export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
topicIds,
userId,
userIds: [userId],
}),
}, { extraHeaders: upstashWorkflowExtraHeaders }),
);
});

View File

@@ -8,11 +8,14 @@ import {
buildWorkflowPayloadInput,
normalizeMemoryExtractionPayload,
} from '@/server/services/memory/userMemory/extract';
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
const USER_PAGE_SIZE = 50;
const USER_BATCH_SIZE = 10;
export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
const params = normalizeMemoryExtractionPayload(context.requestPayload || {});
if (params.sources.length === 0) {
return { message: 'No sources provided, skip memory extraction.' };
@@ -49,7 +52,7 @@ export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
topicCursor: undefined,
userId: userIds[0],
userIds,
}),
}, { extraHeaders: upstashWorkflowExtraHeaders},),
),
),
);
@@ -61,7 +64,7 @@ export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
...params,
userCursor: { createdAt: cursor.createdAt.toISOString(), id: cursor.id },
}),
}),
}, { extraHeaders: upstashWorkflowExtraHeaders }),
);
}

View File

@@ -48,6 +48,7 @@ export interface MemoryExtractionPrivateConfig {
region?: string;
secretAccessKey?: string;
};
upstashWorkflowExtraHeaders?: Record<string, string>;
webhookHeaders?: Record<string, string>;
whitelistUsers?: string[];
}
@@ -190,6 +191,16 @@ export const parseMemoryExtractionConfig = (): MemoryExtractionPrivateConfig =>
return acc;
}, {});
const upstashWorkflowExtraHeaders = process.env.MEMORY_USER_MEMORY_WORKFLOW_EXTRA_HEADERS?.split(',')
.filter(Boolean)
.reduce<Record<string, string>>((acc, pair) => {
const [key, value] = pair.split('=').map((s) => s.trim());
if (key && value) {
acc[key] = value;
}
return acc;
}, {});
return {
agentGateKeeper,
agentLayerExtractor,
@@ -197,6 +208,7 @@ export const parseMemoryExtractionConfig = (): MemoryExtractionPrivateConfig =>
embedding,
featureFlags,
observabilityS3: extractorObservabilityS3,
upstashWorkflowExtraHeaders,
webhookHeaders,
whitelistUsers,
};

View File

@@ -1841,30 +1841,30 @@ export class MemoryExtractionWorkflowService {
return this.client;
}
static triggerProcessUsers(payload: MemoryExtractionPayloadInput) {
static triggerProcessUsers(payload: MemoryExtractionPayloadInput, options?: { extraHeaders?: Record<string, string> }) {
if (!payload.baseUrl) {
throw new Error('Missing baseUrl for workflow trigger');
}
const url = getWorkflowUrl(WORKFLOW_PATHS.users, payload.baseUrl);
return this.getClient().trigger({ body: payload, url });
return this.getClient().trigger({ body: payload, headers: options?.extraHeaders, url });
}
static triggerProcessUserTopics(payload: UserTopicWorkflowPayload) {
static triggerProcessUserTopics(payload: UserTopicWorkflowPayload, options?: { extraHeaders?: Record<string, string> }) {
if (!payload.baseUrl) {
throw new Error('Missing baseUrl for workflow trigger');
}
const url = getWorkflowUrl(WORKFLOW_PATHS.userTopics, payload.baseUrl);
return this.getClient().trigger({ body: payload, url });
return this.getClient().trigger({ body: payload, headers: options?.extraHeaders, url });
}
static triggerProcessTopics(payload: MemoryExtractionPayloadInput) {
static triggerProcessTopics(payload: MemoryExtractionPayloadInput, options?: { extraHeaders?: Record<string, string> }) {
if (!payload.baseUrl) {
throw new Error('Missing baseUrl for workflow trigger');
}
const url = getWorkflowUrl(WORKFLOW_PATHS.topicBatch, payload.baseUrl);
return this.getClient().trigger({ body: payload, url });
return this.getClient().trigger({ body: payload, headers: options?.extraHeaders, url });
}
}