chore: avoid loading all platform bots with signle webhook

This commit is contained in:
rdmclin2
2026-03-09 00:43:18 +08:00
parent 9f0ae47eab
commit e2d55e3433
2 changed files with 227 additions and 138 deletions

View File

@@ -152,6 +152,39 @@ export class AgentBotProviderModel {
// --------------- System-wide static methods ---------------
static findEnabledByPlatformAndAppId = async (
db: LobeChatDatabase,
platform: string,
applicationId: string,
gateKeeper?: GateKeeper,
): Promise<DecryptedBotProvider | null> => {
const [result] = await db
.select()
.from(agentBotProviders)
.where(
and(
eq(agentBotProviders.platform, platform),
eq(agentBotProviders.applicationId, applicationId),
eq(agentBotProviders.enabled, true),
),
)
.limit(1);
if (!result?.credentials) return null;
try {
const credentials = gateKeeper
? JSON.parse((await gateKeeper.decrypt(result.credentials)).plaintext)
: JSON.parse(result.credentials);
if (!credentials.botToken && !credentials.appSecret) return null;
return { ...result, credentials };
} catch {
return null;
}
};
static findEnabledByPlatform = async (
db: LobeChatDatabase,
platform: string,

View File

@@ -6,6 +6,7 @@ import { Chat, ConsoleLogger } from 'chat';
import debug from 'debug';
import { getServerDB } from '@/database/core/db-adaptor';
import type { DecryptedBotProvider } from '@/database/models/agentBotProvider';
import { AgentBotProviderModel } from '@/database/models/agentBotProvider';
import type { LobeChatDatabase } from '@/database/type';
import { appEnv } from '@/envs/app';
@@ -73,6 +74,9 @@ function createAdapterForPlatform(
/**
* Routes incoming webhook events to the correct Chat SDK Bot instance
* and triggers message processing via AgentBridgeService.
*
* Uses lazy per-bot loading: only the bot needed for an incoming webhook
* is loaded from DB, rather than eagerly loading all bots at startup.
*/
export class BotMessageRouter {
/** botToken → Chat instance (for Discord webhook routing via x-discord-gateway-token) */
@@ -87,6 +91,17 @@ export class BotMessageRouter {
/** "platform:applicationId" → credentials */
private credentialsByKey = new Map<string, StoredCredentials>();
/** Dedup concurrent loads for the same bot key */
private loadingPromises = new Map<string, Promise<Chat<any> | null>>();
/** Dedup concurrent loadPlatformBots calls */
private platformLoadPromises = new Map<string, Promise<void>>();
/** Lazily resolved shared dependencies */
private serverDB: LobeChatDatabase | null = null;
private gateKeeper: KeyVaultsGateKeeper | null = null;
private infraPromise: Promise<void> | null = null;
// ------------------------------------------------------------------
// Public API
// ------------------------------------------------------------------
@@ -99,7 +114,7 @@ export class BotMessageRouter {
*/
getWebhookHandler(platform: string, appId?: string): (req: Request) => Promise<Response> {
return async (req: Request) => {
await this.ensureInitialized();
await this.ensureInfra();
switch (platform) {
case 'discord': {
@@ -151,7 +166,15 @@ export class BotMessageRouter {
// ignore parse errors
}
const bot = this.botInstancesByToken.get(gatewayToken);
// Try cached token lookup first
let bot = this.botInstancesByToken.get(gatewayToken);
if (bot?.webhooks && 'discord' in bot.webhooks) {
return bot.webhooks.discord(this.cloneRequest(req, bodyBuffer));
}
// Fallback: load all Discord bots to find the one matching this token
await this.loadPlatformBots('discord');
bot = this.botInstancesByToken.get(gatewayToken);
if (bot?.webhooks && 'discord' in bot.webhooks) {
return bot.webhooks.discord(this.cloneRequest(req, bodyBuffer));
}
@@ -167,7 +190,7 @@ export class BotMessageRouter {
const appId = payload.application_id;
if (appId) {
const bot = this.botInstances.get(`discord:${appId}`);
const bot = await this.ensureBotLoaded('discord', appId);
if (bot?.webhooks && 'discord' in bot.webhooks) {
return bot.webhooks.discord(this.cloneRequest(req, bodyBuffer));
}
@@ -176,19 +199,6 @@ export class BotMessageRouter {
// Not valid JSON — fall through
}
// Fallback: try all registered Discord bots
for (const [key, bot] of this.botInstances) {
if (!key.startsWith('discord:')) continue;
if (bot.webhooks && 'discord' in bot.webhooks) {
try {
const resp = await bot.webhooks.discord(this.cloneRequest(req, bodyBuffer));
if (resp.status !== 401) return resp;
} catch {
// signature mismatch — try next
}
}
}
return new Response('No bot configured for Discord', { status: 404 });
}
@@ -228,33 +238,16 @@ export class BotMessageRouter {
// Direct lookup by applicationId (bot-specific endpoint: /webhooks/telegram/{appId})
if (appId) {
const key = `telegram:${appId}`;
const bot = this.botInstances.get(key);
const bot = await this.ensureBotLoaded('telegram', appId);
if (bot?.webhooks && 'telegram' in bot.webhooks) {
log('handleTelegramWebhook: direct lookup hit for %s', key);
log('handleTelegramWebhook: direct lookup hit for telegram:%s', appId);
return bot.webhooks.telegram(this.cloneRequest(req, bodyBuffer));
}
log('handleTelegramWebhook: no bot registered for %s', key);
log('handleTelegramWebhook: no bot registered for telegram:%s', appId);
return new Response('No bot configured for Telegram', { status: 404 });
}
// Fallback: iterate all registered Telegram bots (legacy /webhooks/telegram endpoint).
// Secret token verification will reject mismatches.
for (const [key, bot] of this.botInstances) {
if (!key.startsWith('telegram:')) continue;
if (bot.webhooks && 'telegram' in bot.webhooks) {
try {
log('handleTelegramWebhook: trying bot %s', key);
const resp = await bot.webhooks.telegram(this.cloneRequest(req, bodyBuffer));
log('handleTelegramWebhook: bot %s responded with status=%d', key, resp.status);
if (resp.status !== 401) return resp;
} catch (error) {
log('handleTelegramWebhook: bot %s webhook error: %O', key, error);
}
}
}
log('handleTelegramWebhook: no matching bot found');
log('handleTelegramWebhook: no appId provided, cannot route');
return new Response('No bot configured for Telegram', { status: 404 });
}
@@ -273,28 +266,14 @@ export class BotMessageRouter {
// Direct lookup by applicationId
if (appId) {
const key = `${platform}:${appId}`;
const bot = this.botInstances.get(key);
const bot = await this.ensureBotLoaded(platform, appId);
if (bot?.webhooks && platform in bot.webhooks) {
return (bot.webhooks as any)[platform](this.cloneRequest(req, bodyBuffer));
}
log('handleChatSdkWebhook: no bot registered for %s', key);
log('handleChatSdkWebhook: no bot registered for %s:%s', platform, appId);
return new Response(`No bot configured for ${platform}`, { status: 404 });
}
// Fallback: try all registered bots for this platform
for (const [key, bot] of this.botInstances) {
if (!key.startsWith(`${platform}:`)) continue;
if (bot.webhooks && platform in bot.webhooks) {
try {
const resp = await (bot.webhooks as any)[platform](this.cloneRequest(req, bodyBuffer));
if (resp.status !== 401) return resp;
} catch {
// try next
}
}
}
return new Response(`No bot configured for ${platform}`, { status: 404 });
}
@@ -307,117 +286,194 @@ export class BotMessageRouter {
}
// ------------------------------------------------------------------
// Initialisation
// Lazy loading infrastructure
// ------------------------------------------------------------------
private static REFRESH_INTERVAL_MS = 5 * 60_000;
private initPromise: Promise<void> | null = null;
private lastLoadedAt = 0;
private refreshPromise: Promise<void> | null = null;
private async ensureInitialized(): Promise<void> {
if (!this.initPromise) {
this.initPromise = this.initialize();
/**
* Ensure DB and gateKeeper are ready. Called once per webhook request.
* Also handles periodic cache invalidation so newly added bots are discovered.
*/
private async ensureInfra(): Promise<void> {
if (!this.infraPromise) {
this.infraPromise = this.initInfra();
}
await this.initPromise;
await this.infraPromise;
// Periodically refresh bot mappings in the background so newly added bots are discovered
// Periodically clear cache so newly added/changed bots are discovered on next request
if (
Date.now() - this.lastLoadedAt > BotMessageRouter.REFRESH_INTERVAL_MS &&
!this.refreshPromise
this.lastLoadedAt > 0 &&
Date.now() - this.lastLoadedAt > BotMessageRouter.REFRESH_INTERVAL_MS
) {
this.refreshPromise = this.loadAgentBots().finally(() => {
this.refreshPromise = null;
});
log('Cache expired, clearing bot instances for lazy reload');
this.botInstances.clear();
this.agentMap.clear();
this.credentialsByKey.clear();
this.botInstancesByToken.clear();
this.loadingPromises.clear();
this.platformLoadPromises.clear();
this.lastLoadedAt = 0;
}
}
async initialize(): Promise<void> {
log('Initializing BotMessageRouter');
await this.loadAgentBots();
log('Initialized: %d agent bots', this.botInstances.size);
private async initInfra(): Promise<void> {
log('Initializing BotMessageRouter infrastructure');
this.serverDB = await getServerDB();
this.gateKeeper = await KeyVaultsGateKeeper.initWithEnvKey();
log('Infrastructure ready');
}
// ------------------------------------------------------------------
// Per-agent bots from DB
// Lazy per-bot loading with dedup
// ------------------------------------------------------------------
private async loadAgentBots(): Promise<void> {
/**
* Ensure a single bot is loaded and cached. Returns the Chat instance or null.
* Deduplicates concurrent loads for the same platform:appId.
*/
private async ensureBotLoaded(platform: string, appId: string): Promise<Chat<any> | null> {
const key = `${platform}:${appId}`;
// Already cached
const existing = this.botInstances.get(key);
if (existing) return existing;
// Dedup: another request is already loading this bot
const pending = this.loadingPromises.get(key);
if (pending) return pending;
// Load from DB
const promise = this.loadBot(platform, appId).finally(() => {
this.loadingPromises.delete(key);
});
this.loadingPromises.set(key, promise);
return promise;
}
/**
* Load a single bot from DB, create Chat instance, register handlers, initialize.
*/
private async loadBot(platform: string, appId: string): Promise<Chat<any> | null> {
const key = `${platform}:${appId}`;
log('loadBot: loading %s', key);
try {
const serverDB = await getServerDB();
const gateKeeper = await KeyVaultsGateKeeper.initWithEnvKey();
const provider = await AgentBotProviderModel.findEnabledByPlatformAndAppId(
this.serverDB!,
platform,
appId,
this.gateKeeper ?? undefined,
);
// Load all supported platforms
for (const platform of ['discord', 'telegram', 'lark', 'feishu']) {
const providers = await AgentBotProviderModel.findEnabledByPlatform(
serverDB,
platform,
gateKeeper,
);
log('Found %d %s bot providers in DB', providers.length, platform);
for (const provider of providers) {
const { agentId, userId, applicationId, credentials } = provider;
const key = `${platform}:${applicationId}`;
if (this.agentMap.has(key)) {
log('Skipping provider %s: already registered', key);
continue;
}
const adapters = createAdapterForPlatform(platform, credentials, applicationId);
if (!adapters) {
log('Unsupported platform: %s', platform);
continue;
}
const bot = this.createBot(adapters, `agent-${agentId}`);
this.registerHandlers(bot, serverDB, {
agentId,
applicationId,
platform,
userId,
});
await bot.initialize();
this.botInstances.set(key, bot);
this.agentMap.set(key, { agentId, userId });
this.credentialsByKey.set(key, credentials);
// Discord-specific: also index by botToken for gateway forwarding
if (platform === 'discord' && credentials.botToken) {
this.botInstancesByToken.set(credentials.botToken, bot);
}
// Telegram: call setWebhook to ensure Telegram-side secret_token
// stays in sync with the adapter config (idempotent, safe on every init)
if (platform === 'telegram' && credentials.botToken) {
const baseUrl = (credentials.webhookProxyUrl || appEnv.APP_URL || '').replace(
/\/$/,
'',
);
const webhookUrl = `${baseUrl}/api/agent/webhooks/telegram/${applicationId}`;
setTelegramWebhook(credentials.botToken, webhookUrl, credentials.secretToken).catch(
(err) => {
log('Failed to set Telegram webhook for appId=%s: %O', applicationId, err);
},
);
}
log('Created %s bot for agent=%s, appId=%s', platform, agentId, applicationId);
}
if (!provider) {
log('loadBot: no enabled provider found for %s', key);
return null;
}
this.lastLoadedAt = Date.now();
return this.initializeBot(platform, provider);
} catch (error) {
log('Failed to load agent bots: %O', error);
log('loadBot: failed to load %s: %O', key, error);
return null;
}
}
/**
* Load all bots for a given platform with concurrent dedup.
* Used for Discord gateway token routing where we don't know the appId upfront.
*/
private async loadPlatformBots(platform: string): Promise<void> {
const pending = this.platformLoadPromises.get(platform);
if (pending) return pending;
const promise = this.doLoadPlatformBots(platform).finally(() => {
this.platformLoadPromises.delete(platform);
});
this.platformLoadPromises.set(platform, promise);
return promise;
}
private async doLoadPlatformBots(platform: string): Promise<void> {
log('loadPlatformBots: loading all %s bots', platform);
try {
const providers = await AgentBotProviderModel.findEnabledByPlatform(
this.serverDB!,
platform,
this.gateKeeper ?? undefined,
);
log('loadPlatformBots: found %d %s providers', providers.length, platform);
for (const provider of providers) {
const key = `${platform}:${provider.applicationId}`;
if (this.botInstances.has(key)) continue;
await this.initializeBot(platform, provider);
}
} catch (error) {
log('loadPlatformBots: failed for %s: %O', platform, error);
}
}
/**
* Shared bot initialization: create adapter, Chat instance, register handlers,
* populate caches, set webhooks.
*/
private async initializeBot(
platform: string,
provider: DecryptedBotProvider,
): Promise<Chat<any> | null> {
const { agentId, userId, applicationId, credentials } = provider;
const key = `${platform}:${applicationId}`;
// Double-check: might have been loaded by a concurrent call
const existing = this.botInstances.get(key);
if (existing) return existing;
const adapters = createAdapterForPlatform(platform, credentials, applicationId);
if (!adapters) {
log('initializeBot: unsupported platform %s', platform);
return null;
}
const bot = this.createBot(adapters, `agent-${agentId}`);
this.registerHandlers(bot, this.serverDB!, {
agentId,
applicationId,
platform,
userId,
});
await bot.initialize();
this.botInstances.set(key, bot);
this.agentMap.set(key, { agentId, userId });
this.credentialsByKey.set(key, credentials);
// Discord-specific: also index by botToken for gateway forwarding
if (platform === 'discord' && credentials.botToken) {
this.botInstancesByToken.set(credentials.botToken, bot);
}
// Telegram: call setWebhook to ensure Telegram-side secret_token
// stays in sync with the adapter config (idempotent, safe on every init)
if (platform === 'telegram' && credentials.botToken) {
const baseUrl = (credentials.webhookProxyUrl || appEnv.APP_URL || '').replace(/\/$/, '');
const webhookUrl = `${baseUrl}/api/agent/webhooks/telegram/${applicationId}`;
setTelegramWebhook(credentials.botToken, webhookUrl, credentials.secretToken).catch((err) => {
log('Failed to set Telegram webhook for appId=%s: %O', applicationId, err);
});
}
if (!this.lastLoadedAt) this.lastLoadedAt = Date.now();
log('Created %s bot for agent=%s, appId=%s', platform, agentId, applicationId);
return bot;
}
// ------------------------------------------------------------------
// Helpers
// ------------------------------------------------------------------