feat: add the cloudEndpoint & Klavis Tools Call in Excuation Task (#11627)

* feat: add klavis servers & excute Tools add klavis

* feat: support the cloud call mcp endpoint
This commit is contained in:
Shinji-Li
2026-01-19 22:35:18 +08:00
committed by GitHub
parent a3dedd5b04
commit 0ffe6c4af5
4 changed files with 338 additions and 15 deletions

View File

@@ -28,6 +28,7 @@ import {
import { AgentService } from '@/server/services/agent';
import { AgentRuntimeService } from '@/server/services/agentRuntime';
import type { StepLifecycleCallbacks } from '@/server/services/agentRuntime/types';
import { KlavisService } from '@/server/services/klavis';
import { MarketService } from '@/server/services/market';
const log = debug('lobe-server:ai-agent-service');
@@ -93,6 +94,7 @@ export class AiAgentService {
private readonly topicModel: TopicModel;
private readonly agentRuntimeService: AgentRuntimeService;
private readonly marketService: MarketService;
private readonly klavisService: KlavisService;
constructor(db: LobeChatDatabase, userId: string) {
this.userId = userId;
@@ -105,6 +107,7 @@ export class AiAgentService {
this.topicModel = new TopicModel(db, userId);
this.agentRuntimeService = new AgentRuntimeService(db, userId);
this.marketService = new MarketService({ userInfo: { userId } });
this.klavisService = new KlavisService({ db, userId });
}
/**
@@ -205,7 +208,16 @@ export class AiAgentService {
}
log('execAgent: got %d lobehub skill manifests', lobehubSkillManifests.length);
// 6. Create tools using Server AgentToolsEngine
// 6. Fetch Klavis tool manifests from database
let klavisManifests: LobeToolManifest[] = [];
try {
klavisManifests = await this.klavisService.getKlavisManifests();
} catch (error) {
log('execAgent: failed to fetch klavis manifests: %O', error);
}
log('execAgent: got %d klavis manifests', klavisManifests.length);
// 7. Create tools using Server AgentToolsEngine
const hasEnabledKnowledgeBases =
agentConfig.knowledgeBases?.some((kb: { enabled?: boolean | null }) => kb.enabled === true) ??
false;
@@ -216,7 +228,7 @@ export class AiAgentService {
};
const toolsEngine = createServerAgentToolsEngine(toolsContext, {
additionalManifests: lobehubSkillManifests,
additionalManifests: [...lobehubSkillManifests, ...klavisManifests],
agentConfig: {
chatConfig: agentConfig.chatConfig ?? undefined,
plugins: agentConfig.plugins ?? undefined,
@@ -254,15 +266,20 @@ export class AiAgentService {
for (const manifest of lobehubSkillManifests) {
toolSourceMap[manifest.identifier] = 'lobehubSkill';
}
// Mark klavis tools
for (const manifest of klavisManifests) {
toolSourceMap[manifest.identifier] = 'klavis';
}
log(
'execAgent: generated %d tools from %d configured plugins, %d lobehub skills',
'execAgent: generated %d tools from %d configured plugins, %d lobehub skills, %d klavis tools',
tools?.length ?? 0,
pluginIds.length,
lobehubSkillManifests.length,
klavisManifests.length,
);
// 6. Get existing messages if provided
// 8. Get existing messages if provided
let historyMessages: any[] = [];
if (existingMessageIds.length > 0) {
historyMessages = await this.messageModel.query({
@@ -275,7 +292,7 @@ export class AiAgentService {
}
}
// 7. Create user message in database
// 9. Create user message in database
// Include threadId if provided (for SubAgent task execution in isolated Thread)
const userMessageRecord = await this.messageModel.create({
agentId: resolvedAgentId,
@@ -286,7 +303,7 @@ export class AiAgentService {
});
log('execAgent: created user message %s', userMessageRecord.id);
// 8. Create assistant message placeholder in database
// 10. Create assistant message placeholder in database
// Include threadId if provided (for SubAgent task execution in isolated Thread)
const assistantMessageRecord = await this.messageModel.create({
agentId: resolvedAgentId,
@@ -306,7 +323,7 @@ export class AiAgentService {
// Combine history messages with user message
const allMessages = [...historyMessages, userMessage];
// 9. Process messages using Server ContextEngineering
// 11. Process messages using Server ContextEngineering
const processedMessages = await serverMessagesEngine({
capabilities: {
isCanUseFC: isModelSupportToolUse,
@@ -341,11 +358,11 @@ export class AiAgentService {
log('execAgent: processed %d messages', processedMessages.length);
// 10. Generate operation ID: agt_{timestamp}_{agentId}_{topicId}_{random}
// 12. Generate operation ID: agt_{timestamp}_{agentId}_{topicId}_{random}
const timestamp = Date.now();
const operationId = `op_${timestamp}_${resolvedAgentId}_${topicId}_${nanoid(8)}`;
// 11. Create initial context
// 13. Create initial context
const initialContext: AgentRuntimeContext = {
payload: {
// Pass assistant message ID so agent runtime knows which message to update
@@ -366,7 +383,7 @@ export class AiAgentService {
},
};
// 12. Log final operation parameters summary
// 14. Log final operation parameters summary
log(
'execAgent: creating operation %s with params: model=%s, provider=%s, tools=%d, messages=%d, manifests=%d',
operationId,
@@ -377,7 +394,7 @@ export class AiAgentService {
Object.keys(toolManifestMap).length,
);
// 13. Create operation using AgentRuntimeService
// 15. Create operation using AgentRuntimeService
// Wrap in try-catch to handle operation startup failures (e.g., QStash unavailable)
// If createOperation fails, we still have valid messages that need error info
try {

View File

@@ -0,0 +1,228 @@
import type { LobeToolManifest } from '@lobechat/context-engine';
import type { LobeChatDatabase } from '@lobechat/database';
import debug from 'debug';
import { PluginModel } from '@/database/models/plugin';
import { getKlavisClient, isKlavisClientAvailable } from '@/libs/klavis';
import { type ToolExecutionResult } from '@/server/services/toolExecution/types';
const log = debug('lobe-server:klavis-service');
export interface KlavisToolExecuteParams {
args: Record<string, any>;
/** Tool identifier (same as Klavis server identifier, e.g., 'google-calendar') */
identifier: string;
toolName: string;
}
export interface KlavisServiceOptions {
db?: LobeChatDatabase;
userId?: string;
}
/**
* Klavis Service
*
* Provides a unified interface to Klavis Client with business logic encapsulation.
* This service wraps Klavis Client methods to execute tools and fetch manifests.
*
* Usage:
* ```typescript
* // With database and userId (for manifest fetching)
* const service = new KlavisService({ db, userId });
* await service.executeKlavisTool({ identifier, toolName, args });
*
* // Without database (for tool execution only if you have serverUrl)
* const service = new KlavisService();
* ```
*/
export class KlavisService {
private db?: LobeChatDatabase;
private userId?: string;
private pluginModel?: PluginModel;
constructor(options: KlavisServiceOptions = {}) {
const { db, userId } = options;
this.db = db;
this.userId = userId;
if (db && userId) {
this.pluginModel = new PluginModel(db, userId);
}
log(
'KlavisService initialized: hasDB=%s, hasUserId=%s, isClientAvailable=%s',
!!db,
!!userId,
isKlavisClientAvailable(),
);
}
/**
* Execute a Klavis tool
* @param params - Tool execution parameters
* @returns Tool execution result
*/
async executeKlavisTool(params: KlavisToolExecuteParams): Promise<ToolExecutionResult> {
const { identifier, toolName, args } = params;
log('executeKlavisTool: %s/%s with args: %O', identifier, toolName, args);
// Check if Klavis client is available
if (!isKlavisClientAvailable()) {
return {
content: 'Klavis service is not configured on server',
error: { code: 'KLAVIS_NOT_CONFIGURED', message: 'Klavis API key not found' },
success: false,
};
}
// Get serverUrl from plugin database
if (!this.pluginModel) {
return {
content: 'Klavis service is not properly initialized',
error: {
code: 'KLAVIS_NOT_INITIALIZED',
message: 'Database and userId are required for Klavis tool execution',
},
success: false,
};
}
try {
// Get plugin from database to retrieve serverUrl
const plugin = await this.pluginModel.findById(identifier);
if (!plugin) {
return {
content: `Klavis server "${identifier}" not found in database`,
error: { code: 'KLAVIS_SERVER_NOT_FOUND', message: `Server ${identifier} not found` },
success: false,
};
}
const klavisParams = plugin.customParams?.klavis;
if (!klavisParams || !klavisParams.serverUrl) {
return {
content: `Klavis configuration not found for server "${identifier}"`,
error: {
code: 'KLAVIS_CONFIG_NOT_FOUND',
message: `Klavis configuration missing for ${identifier}`,
},
success: false,
};
}
const { serverUrl } = klavisParams;
log('executeKlavisTool: calling Klavis API with serverUrl=%s', serverUrl);
// Call Klavis client
const klavisClient = getKlavisClient();
const response = await klavisClient.mcpServer.callTools({
serverUrl,
toolArgs: args,
toolName,
});
log('executeKlavisTool: response: %O', response);
// Handle error case
if (!response.success || !response.result) {
return {
content: response.error || 'Unknown error',
error: { code: 'KLAVIS_EXECUTION_ERROR', message: response.error || 'Unknown error' },
success: false,
};
}
// Process the response
const content = response.result.content || [];
const isError = response.result.isError || false;
// Convert content array to string
let resultContent = '';
if (Array.isArray(content)) {
resultContent = content
.map((item: any) => {
if (typeof item === 'string') return item;
if (item.type === 'text' && item.text) return item.text;
return JSON.stringify(item);
})
.join('\n');
} else if (typeof content === 'string') {
resultContent = content;
} else {
resultContent = JSON.stringify(content);
}
return {
content: resultContent,
success: !isError,
};
} catch (error) {
const err = error as Error;
console.error('KlavisService.executeKlavisTool error %s/%s: %O', identifier, toolName, err);
return {
content: err.message,
error: { code: 'KLAVIS_ERROR', message: err.message },
success: false,
};
}
}
/**
* Fetch Klavis tool manifests from database
* Gets user's connected Klavis servers and builds tool manifests for agent execution
*
* @returns Array of tool manifests for connected Klavis servers
*/
async getKlavisManifests(): Promise<LobeToolManifest[]> {
if (!this.pluginModel) {
log('getKlavisManifests: pluginModel not available, returning empty array');
return [];
}
try {
// Get all plugins from database
const allPlugins = await this.pluginModel.query();
// Filter plugins that have klavis customParams and are authenticated
const klavisPlugins = allPlugins.filter(
(plugin) => plugin.customParams?.klavis?.isAuthenticated === true,
);
log('getKlavisManifests: found %d authenticated Klavis plugins', klavisPlugins.length);
// Convert to LobeToolManifest format
const manifests: LobeToolManifest[] = klavisPlugins
.map((plugin) => {
if (!plugin.manifest) return null;
return {
api: plugin.manifest.api || [],
author: 'Klavis',
homepage: 'https://klavis.ai',
identifier: plugin.identifier,
meta: plugin.manifest.meta || {
avatar: '☁️',
description: `Klavis MCP Server: ${plugin.customParams?.klavis?.serverName}`,
tags: ['klavis', 'mcp'],
title: plugin.customParams?.klavis?.serverName || plugin.identifier,
},
type: 'builtin',
version: '1.0.0',
};
})
.filter(Boolean) as LobeToolManifest[];
log('getKlavisManifests: returning %d manifests', manifests.length);
return manifests;
} catch (error) {
console.error('KlavisService.getKlavisManifests error: %O', error);
return [];
}
}
}

View File

@@ -3,6 +3,7 @@ import { type ChatToolPayload } from '@lobechat/types';
import { safeParseJSON } from '@lobechat/utils';
import debug from 'debug';
import { KlavisService } from '@/server/services/klavis';
import { MarketService } from '@/server/services/market';
import { getServerRuntime, hasServerRuntime } from './serverRuntimes';
@@ -12,9 +13,11 @@ const log = debug('lobe-server:builtin-tools-executor');
export class BuiltinToolsExecutor implements IToolExecutor {
private marketService: MarketService;
private klavisService: KlavisService;
constructor(db: LobeChatDatabase, userId: string) {
this.marketService = new MarketService({ userInfo: { userId } });
this.klavisService = new KlavisService({ db, userId });
}
async execute(
@@ -41,6 +44,15 @@ export class BuiltinToolsExecutor implements IToolExecutor {
});
}
// Route Klavis tools to KlavisService
if (source === 'klavis') {
return this.klavisService.executeKlavisTool({
args,
identifier,
toolName: apiName,
});
}
// Use server runtime registry (handles both pre-instantiated and per-request runtimes)
if (!hasServerRuntime(identifier)) {
throw new Error(`Builtin tool "${identifier}" is not implemented`);

View File

@@ -1,6 +1,11 @@
import { type ChatToolPayload } from '@lobechat/types';
import { safeParseJSON } from '@lobechat/utils';
import debug from 'debug';
import { type CloudMCPParams, type ToolCallContent } from '@/libs/mcp';
import { contentBlocksToString } from '@/server/services/mcp/contentProcessor';
import { DiscoverService } from '../discover';
import { type MCPService } from '../mcp';
import { type PluginGatewayService } from '../pluginGateway';
import { type BuiltinToolsExecutor } from './builtin';
@@ -121,12 +126,20 @@ export class ToolExecutionService {
};
}
// Construct MCPClientParams from the mcp config
log('Calling MCP service with params for: %s:%s', identifier, apiName);
log(
'Calling MCP service with params for: %s:%s (type: %s)',
identifier,
apiName,
mcpParams.type,
);
try {
// Call the MCP service
// Check if this is a cloud MCP endpoint
if (mcpParams.type === 'cloud') {
return await this.executeCloudMCPTool(payload, context, mcpParams);
}
// For stdio/http/sse types, use standard MCP service
const result = await this.mcpService.callTool({
argsStr: args,
clientParams: mcpParams,
@@ -152,6 +165,59 @@ export class ToolExecutionService {
};
}
}
private async executeCloudMCPTool(
payload: ChatToolPayload,
context: ToolExecutionContext,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
_mcpParams: CloudMCPParams,
): Promise<ToolExecutionResult> {
const { identifier, apiName, arguments: args } = payload;
log('Executing Cloud MCP tool: %s:%s via cloud gateway', identifier, apiName);
try {
// Create DiscoverService with user context
const discoverService = new DiscoverService({
userInfo: context.userId ? { userId: context.userId } : undefined,
});
// Parse arguments
const apiParams = safeParseJSON(args) || {};
// Call cloud MCP endpoint via Market API
// Returns CloudGatewayResponse: { content: ToolCallContent[], isError?: boolean }
const cloudResult = await discoverService.callCloudMcpEndpoint({
apiParams,
identifier,
toolName: apiName,
});
const cloudResultContent = (cloudResult?.content ?? []) as ToolCallContent[];
// Convert content blocks to string (same as market router does)
const content = contentBlocksToString(cloudResultContent);
const state = { ...cloudResult, content: cloudResultContent };
log('Cloud MCP tool execution successful for: %s:%s', identifier, apiName);
return {
content,
state,
success: !cloudResult?.isError,
};
} catch (error) {
log('Cloud MCP tool execution failed for %s:%s: %O', identifier, apiName, error);
return {
content: (error as Error).message,
error: {
code: 'CLOUD_MCP_EXECUTION_ERROR',
message: (error as Error).message,
},
success: false,
};
}
}
}
export * from './types';