mirror of
https://github.com/lobehub/lobehub.git
synced 2026-03-27 13:29:15 +07:00
✨ feat(userMemories): support to assign for extra headers when invoking upstash workflows (#11374)
This commit is contained in:
@@ -10,7 +10,7 @@ import {
|
||||
} from '@/server/services/memory/userMemory/extract';
|
||||
|
||||
export const POST = async (req: Request) => {
|
||||
const { webhookHeaders } = parseMemoryExtractionConfig();
|
||||
const { webhookHeaders, upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
|
||||
|
||||
if (webhookHeaders && Object.keys(webhookHeaders).length > 0) {
|
||||
for (const [key, value] of Object.entries(webhookHeaders)) {
|
||||
@@ -43,6 +43,7 @@ export const POST = async (req: Request) => {
|
||||
if (params.mode === 'workflow') {
|
||||
const { workflowRunId } = await MemoryExtractionWorkflowService.triggerProcessUsers(
|
||||
buildWorkflowPayloadInput(params),
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
);
|
||||
return NextResponse.json(
|
||||
{ message: 'Memory extraction scheduled via workflow.', workflowRunId },
|
||||
|
||||
@@ -9,11 +9,14 @@ import {
|
||||
} from '@/server/services/memory/userMemory/extract';
|
||||
import { forEachBatchSequential } from '@/server/services/memory/userMemory/topicBatching';
|
||||
import { MemorySourceType } from '@lobechat/types';
|
||||
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
|
||||
|
||||
const TOPIC_PAGE_SIZE = 50;
|
||||
const TOPIC_BATCH_SIZE = 4;
|
||||
|
||||
export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
|
||||
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
|
||||
|
||||
const params = normalizeMemoryExtractionPayload(context.requestPayload || {});
|
||||
if (!params.userIds.length) {
|
||||
return { message: 'No user ids provided for topic processing.' };
|
||||
@@ -37,7 +40,7 @@ export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
|
||||
userId,
|
||||
userIds: [userId],
|
||||
}),
|
||||
});
|
||||
}, { extraHeaders: upstashWorkflowExtraHeaders });
|
||||
};
|
||||
|
||||
for (const userId of params.userIds) {
|
||||
@@ -99,7 +102,7 @@ export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
|
||||
topicIds,
|
||||
userId,
|
||||
userIds: [userId],
|
||||
}),
|
||||
}, { extraHeaders: upstashWorkflowExtraHeaders }),
|
||||
);
|
||||
});
|
||||
|
||||
|
||||
@@ -8,11 +8,14 @@ import {
|
||||
buildWorkflowPayloadInput,
|
||||
normalizeMemoryExtractionPayload,
|
||||
} from '@/server/services/memory/userMemory/extract';
|
||||
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
|
||||
|
||||
const USER_PAGE_SIZE = 50;
|
||||
const USER_BATCH_SIZE = 10;
|
||||
|
||||
export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
|
||||
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
|
||||
|
||||
const params = normalizeMemoryExtractionPayload(context.requestPayload || {});
|
||||
if (params.sources.length === 0) {
|
||||
return { message: 'No sources provided, skip memory extraction.' };
|
||||
@@ -49,7 +52,7 @@ export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
|
||||
topicCursor: undefined,
|
||||
userId: userIds[0],
|
||||
userIds,
|
||||
}),
|
||||
}, { extraHeaders: upstashWorkflowExtraHeaders},),
|
||||
),
|
||||
),
|
||||
);
|
||||
@@ -61,7 +64,7 @@ export const { POST } = serve<MemoryExtractionPayloadInput>(async (context) => {
|
||||
...params,
|
||||
userCursor: { createdAt: cursor.createdAt.toISOString(), id: cursor.id },
|
||||
}),
|
||||
}),
|
||||
}, { extraHeaders: upstashWorkflowExtraHeaders }),
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -48,6 +48,7 @@ export interface MemoryExtractionPrivateConfig {
|
||||
region?: string;
|
||||
secretAccessKey?: string;
|
||||
};
|
||||
upstashWorkflowExtraHeaders?: Record<string, string>;
|
||||
webhookHeaders?: Record<string, string>;
|
||||
whitelistUsers?: string[];
|
||||
}
|
||||
@@ -190,6 +191,16 @@ export const parseMemoryExtractionConfig = (): MemoryExtractionPrivateConfig =>
|
||||
return acc;
|
||||
}, {});
|
||||
|
||||
const upstashWorkflowExtraHeaders = process.env.MEMORY_USER_MEMORY_WORKFLOW_EXTRA_HEADERS?.split(',')
|
||||
.filter(Boolean)
|
||||
.reduce<Record<string, string>>((acc, pair) => {
|
||||
const [key, value] = pair.split('=').map((s) => s.trim());
|
||||
if (key && value) {
|
||||
acc[key] = value;
|
||||
}
|
||||
return acc;
|
||||
}, {});
|
||||
|
||||
return {
|
||||
agentGateKeeper,
|
||||
agentLayerExtractor,
|
||||
@@ -197,6 +208,7 @@ export const parseMemoryExtractionConfig = (): MemoryExtractionPrivateConfig =>
|
||||
embedding,
|
||||
featureFlags,
|
||||
observabilityS3: extractorObservabilityS3,
|
||||
upstashWorkflowExtraHeaders,
|
||||
webhookHeaders,
|
||||
whitelistUsers,
|
||||
};
|
||||
|
||||
@@ -1841,30 +1841,30 @@ export class MemoryExtractionWorkflowService {
|
||||
return this.client;
|
||||
}
|
||||
|
||||
static triggerProcessUsers(payload: MemoryExtractionPayloadInput) {
|
||||
static triggerProcessUsers(payload: MemoryExtractionPayloadInput, options?: { extraHeaders?: Record<string, string> }) {
|
||||
if (!payload.baseUrl) {
|
||||
throw new Error('Missing baseUrl for workflow trigger');
|
||||
}
|
||||
|
||||
const url = getWorkflowUrl(WORKFLOW_PATHS.users, payload.baseUrl);
|
||||
return this.getClient().trigger({ body: payload, url });
|
||||
return this.getClient().trigger({ body: payload, headers: options?.extraHeaders, url });
|
||||
}
|
||||
|
||||
static triggerProcessUserTopics(payload: UserTopicWorkflowPayload) {
|
||||
static triggerProcessUserTopics(payload: UserTopicWorkflowPayload, options?: { extraHeaders?: Record<string, string> }) {
|
||||
if (!payload.baseUrl) {
|
||||
throw new Error('Missing baseUrl for workflow trigger');
|
||||
}
|
||||
|
||||
const url = getWorkflowUrl(WORKFLOW_PATHS.userTopics, payload.baseUrl);
|
||||
return this.getClient().trigger({ body: payload, url });
|
||||
return this.getClient().trigger({ body: payload, headers: options?.extraHeaders, url });
|
||||
}
|
||||
|
||||
static triggerProcessTopics(payload: MemoryExtractionPayloadInput) {
|
||||
static triggerProcessTopics(payload: MemoryExtractionPayloadInput, options?: { extraHeaders?: Record<string, string> }) {
|
||||
if (!payload.baseUrl) {
|
||||
throw new Error('Missing baseUrl for workflow trigger');
|
||||
}
|
||||
|
||||
const url = getWorkflowUrl(WORKFLOW_PATHS.topicBatch, payload.baseUrl);
|
||||
return this.getClient().trigger({ body: payload, url });
|
||||
return this.getClient().trigger({ body: payload, headers: options?.extraHeaders, url });
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user