mirror of
https://github.com/lobehub/lobehub.git
synced 2026-03-27 13:29:15 +07:00
✨ feat: add the cloudEndpoint & Klavis Tools Call in Excuation Task (#11627)
* feat: add klavis servers & excute Tools add klavis * feat: support the cloud call mcp endpoint
This commit is contained in:
@@ -28,6 +28,7 @@ import {
|
||||
import { AgentService } from '@/server/services/agent';
|
||||
import { AgentRuntimeService } from '@/server/services/agentRuntime';
|
||||
import type { StepLifecycleCallbacks } from '@/server/services/agentRuntime/types';
|
||||
import { KlavisService } from '@/server/services/klavis';
|
||||
import { MarketService } from '@/server/services/market';
|
||||
|
||||
const log = debug('lobe-server:ai-agent-service');
|
||||
@@ -93,6 +94,7 @@ export class AiAgentService {
|
||||
private readonly topicModel: TopicModel;
|
||||
private readonly agentRuntimeService: AgentRuntimeService;
|
||||
private readonly marketService: MarketService;
|
||||
private readonly klavisService: KlavisService;
|
||||
|
||||
constructor(db: LobeChatDatabase, userId: string) {
|
||||
this.userId = userId;
|
||||
@@ -105,6 +107,7 @@ export class AiAgentService {
|
||||
this.topicModel = new TopicModel(db, userId);
|
||||
this.agentRuntimeService = new AgentRuntimeService(db, userId);
|
||||
this.marketService = new MarketService({ userInfo: { userId } });
|
||||
this.klavisService = new KlavisService({ db, userId });
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -205,7 +208,16 @@ export class AiAgentService {
|
||||
}
|
||||
log('execAgent: got %d lobehub skill manifests', lobehubSkillManifests.length);
|
||||
|
||||
// 6. Create tools using Server AgentToolsEngine
|
||||
// 6. Fetch Klavis tool manifests from database
|
||||
let klavisManifests: LobeToolManifest[] = [];
|
||||
try {
|
||||
klavisManifests = await this.klavisService.getKlavisManifests();
|
||||
} catch (error) {
|
||||
log('execAgent: failed to fetch klavis manifests: %O', error);
|
||||
}
|
||||
log('execAgent: got %d klavis manifests', klavisManifests.length);
|
||||
|
||||
// 7. Create tools using Server AgentToolsEngine
|
||||
const hasEnabledKnowledgeBases =
|
||||
agentConfig.knowledgeBases?.some((kb: { enabled?: boolean | null }) => kb.enabled === true) ??
|
||||
false;
|
||||
@@ -216,7 +228,7 @@ export class AiAgentService {
|
||||
};
|
||||
|
||||
const toolsEngine = createServerAgentToolsEngine(toolsContext, {
|
||||
additionalManifests: lobehubSkillManifests,
|
||||
additionalManifests: [...lobehubSkillManifests, ...klavisManifests],
|
||||
agentConfig: {
|
||||
chatConfig: agentConfig.chatConfig ?? undefined,
|
||||
plugins: agentConfig.plugins ?? undefined,
|
||||
@@ -254,15 +266,20 @@ export class AiAgentService {
|
||||
for (const manifest of lobehubSkillManifests) {
|
||||
toolSourceMap[manifest.identifier] = 'lobehubSkill';
|
||||
}
|
||||
// Mark klavis tools
|
||||
for (const manifest of klavisManifests) {
|
||||
toolSourceMap[manifest.identifier] = 'klavis';
|
||||
}
|
||||
|
||||
log(
|
||||
'execAgent: generated %d tools from %d configured plugins, %d lobehub skills',
|
||||
'execAgent: generated %d tools from %d configured plugins, %d lobehub skills, %d klavis tools',
|
||||
tools?.length ?? 0,
|
||||
pluginIds.length,
|
||||
lobehubSkillManifests.length,
|
||||
klavisManifests.length,
|
||||
);
|
||||
|
||||
// 6. Get existing messages if provided
|
||||
// 8. Get existing messages if provided
|
||||
let historyMessages: any[] = [];
|
||||
if (existingMessageIds.length > 0) {
|
||||
historyMessages = await this.messageModel.query({
|
||||
@@ -275,7 +292,7 @@ export class AiAgentService {
|
||||
}
|
||||
}
|
||||
|
||||
// 7. Create user message in database
|
||||
// 9. Create user message in database
|
||||
// Include threadId if provided (for SubAgent task execution in isolated Thread)
|
||||
const userMessageRecord = await this.messageModel.create({
|
||||
agentId: resolvedAgentId,
|
||||
@@ -286,7 +303,7 @@ export class AiAgentService {
|
||||
});
|
||||
log('execAgent: created user message %s', userMessageRecord.id);
|
||||
|
||||
// 8. Create assistant message placeholder in database
|
||||
// 10. Create assistant message placeholder in database
|
||||
// Include threadId if provided (for SubAgent task execution in isolated Thread)
|
||||
const assistantMessageRecord = await this.messageModel.create({
|
||||
agentId: resolvedAgentId,
|
||||
@@ -306,7 +323,7 @@ export class AiAgentService {
|
||||
// Combine history messages with user message
|
||||
const allMessages = [...historyMessages, userMessage];
|
||||
|
||||
// 9. Process messages using Server ContextEngineering
|
||||
// 11. Process messages using Server ContextEngineering
|
||||
const processedMessages = await serverMessagesEngine({
|
||||
capabilities: {
|
||||
isCanUseFC: isModelSupportToolUse,
|
||||
@@ -341,11 +358,11 @@ export class AiAgentService {
|
||||
|
||||
log('execAgent: processed %d messages', processedMessages.length);
|
||||
|
||||
// 10. Generate operation ID: agt_{timestamp}_{agentId}_{topicId}_{random}
|
||||
// 12. Generate operation ID: agt_{timestamp}_{agentId}_{topicId}_{random}
|
||||
const timestamp = Date.now();
|
||||
const operationId = `op_${timestamp}_${resolvedAgentId}_${topicId}_${nanoid(8)}`;
|
||||
|
||||
// 11. Create initial context
|
||||
// 13. Create initial context
|
||||
const initialContext: AgentRuntimeContext = {
|
||||
payload: {
|
||||
// Pass assistant message ID so agent runtime knows which message to update
|
||||
@@ -366,7 +383,7 @@ export class AiAgentService {
|
||||
},
|
||||
};
|
||||
|
||||
// 12. Log final operation parameters summary
|
||||
// 14. Log final operation parameters summary
|
||||
log(
|
||||
'execAgent: creating operation %s with params: model=%s, provider=%s, tools=%d, messages=%d, manifests=%d',
|
||||
operationId,
|
||||
@@ -377,7 +394,7 @@ export class AiAgentService {
|
||||
Object.keys(toolManifestMap).length,
|
||||
);
|
||||
|
||||
// 13. Create operation using AgentRuntimeService
|
||||
// 15. Create operation using AgentRuntimeService
|
||||
// Wrap in try-catch to handle operation startup failures (e.g., QStash unavailable)
|
||||
// If createOperation fails, we still have valid messages that need error info
|
||||
try {
|
||||
|
||||
228
src/server/services/klavis/index.ts
Normal file
228
src/server/services/klavis/index.ts
Normal file
@@ -0,0 +1,228 @@
|
||||
import type { LobeToolManifest } from '@lobechat/context-engine';
|
||||
import type { LobeChatDatabase } from '@lobechat/database';
|
||||
import debug from 'debug';
|
||||
|
||||
import { PluginModel } from '@/database/models/plugin';
|
||||
import { getKlavisClient, isKlavisClientAvailable } from '@/libs/klavis';
|
||||
import { type ToolExecutionResult } from '@/server/services/toolExecution/types';
|
||||
|
||||
const log = debug('lobe-server:klavis-service');
|
||||
|
||||
export interface KlavisToolExecuteParams {
|
||||
args: Record<string, any>;
|
||||
/** Tool identifier (same as Klavis server identifier, e.g., 'google-calendar') */
|
||||
identifier: string;
|
||||
toolName: string;
|
||||
}
|
||||
|
||||
export interface KlavisServiceOptions {
|
||||
db?: LobeChatDatabase;
|
||||
userId?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Klavis Service
|
||||
*
|
||||
* Provides a unified interface to Klavis Client with business logic encapsulation.
|
||||
* This service wraps Klavis Client methods to execute tools and fetch manifests.
|
||||
*
|
||||
* Usage:
|
||||
* ```typescript
|
||||
* // With database and userId (for manifest fetching)
|
||||
* const service = new KlavisService({ db, userId });
|
||||
* await service.executeKlavisTool({ identifier, toolName, args });
|
||||
*
|
||||
* // Without database (for tool execution only if you have serverUrl)
|
||||
* const service = new KlavisService();
|
||||
* ```
|
||||
*/
|
||||
export class KlavisService {
|
||||
private db?: LobeChatDatabase;
|
||||
private userId?: string;
|
||||
private pluginModel?: PluginModel;
|
||||
|
||||
constructor(options: KlavisServiceOptions = {}) {
|
||||
const { db, userId } = options;
|
||||
|
||||
this.db = db;
|
||||
this.userId = userId;
|
||||
|
||||
if (db && userId) {
|
||||
this.pluginModel = new PluginModel(db, userId);
|
||||
}
|
||||
|
||||
log(
|
||||
'KlavisService initialized: hasDB=%s, hasUserId=%s, isClientAvailable=%s',
|
||||
!!db,
|
||||
!!userId,
|
||||
isKlavisClientAvailable(),
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a Klavis tool
|
||||
* @param params - Tool execution parameters
|
||||
* @returns Tool execution result
|
||||
*/
|
||||
async executeKlavisTool(params: KlavisToolExecuteParams): Promise<ToolExecutionResult> {
|
||||
const { identifier, toolName, args } = params;
|
||||
|
||||
log('executeKlavisTool: %s/%s with args: %O', identifier, toolName, args);
|
||||
|
||||
// Check if Klavis client is available
|
||||
if (!isKlavisClientAvailable()) {
|
||||
return {
|
||||
content: 'Klavis service is not configured on server',
|
||||
error: { code: 'KLAVIS_NOT_CONFIGURED', message: 'Klavis API key not found' },
|
||||
success: false,
|
||||
};
|
||||
}
|
||||
|
||||
// Get serverUrl from plugin database
|
||||
if (!this.pluginModel) {
|
||||
return {
|
||||
content: 'Klavis service is not properly initialized',
|
||||
error: {
|
||||
code: 'KLAVIS_NOT_INITIALIZED',
|
||||
message: 'Database and userId are required for Klavis tool execution',
|
||||
},
|
||||
success: false,
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
// Get plugin from database to retrieve serverUrl
|
||||
const plugin = await this.pluginModel.findById(identifier);
|
||||
if (!plugin) {
|
||||
return {
|
||||
content: `Klavis server "${identifier}" not found in database`,
|
||||
error: { code: 'KLAVIS_SERVER_NOT_FOUND', message: `Server ${identifier} not found` },
|
||||
success: false,
|
||||
};
|
||||
}
|
||||
|
||||
const klavisParams = plugin.customParams?.klavis;
|
||||
if (!klavisParams || !klavisParams.serverUrl) {
|
||||
return {
|
||||
content: `Klavis configuration not found for server "${identifier}"`,
|
||||
error: {
|
||||
code: 'KLAVIS_CONFIG_NOT_FOUND',
|
||||
message: `Klavis configuration missing for ${identifier}`,
|
||||
},
|
||||
success: false,
|
||||
};
|
||||
}
|
||||
|
||||
const { serverUrl } = klavisParams;
|
||||
|
||||
log('executeKlavisTool: calling Klavis API with serverUrl=%s', serverUrl);
|
||||
|
||||
// Call Klavis client
|
||||
const klavisClient = getKlavisClient();
|
||||
const response = await klavisClient.mcpServer.callTools({
|
||||
serverUrl,
|
||||
toolArgs: args,
|
||||
toolName,
|
||||
});
|
||||
|
||||
log('executeKlavisTool: response: %O', response);
|
||||
|
||||
// Handle error case
|
||||
if (!response.success || !response.result) {
|
||||
return {
|
||||
content: response.error || 'Unknown error',
|
||||
error: { code: 'KLAVIS_EXECUTION_ERROR', message: response.error || 'Unknown error' },
|
||||
success: false,
|
||||
};
|
||||
}
|
||||
|
||||
// Process the response
|
||||
const content = response.result.content || [];
|
||||
const isError = response.result.isError || false;
|
||||
|
||||
// Convert content array to string
|
||||
let resultContent = '';
|
||||
if (Array.isArray(content)) {
|
||||
resultContent = content
|
||||
.map((item: any) => {
|
||||
if (typeof item === 'string') return item;
|
||||
if (item.type === 'text' && item.text) return item.text;
|
||||
return JSON.stringify(item);
|
||||
})
|
||||
.join('\n');
|
||||
} else if (typeof content === 'string') {
|
||||
resultContent = content;
|
||||
} else {
|
||||
resultContent = JSON.stringify(content);
|
||||
}
|
||||
|
||||
return {
|
||||
content: resultContent,
|
||||
success: !isError,
|
||||
};
|
||||
} catch (error) {
|
||||
const err = error as Error;
|
||||
console.error('KlavisService.executeKlavisTool error %s/%s: %O', identifier, toolName, err);
|
||||
|
||||
return {
|
||||
content: err.message,
|
||||
error: { code: 'KLAVIS_ERROR', message: err.message },
|
||||
success: false,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch Klavis tool manifests from database
|
||||
* Gets user's connected Klavis servers and builds tool manifests for agent execution
|
||||
*
|
||||
* @returns Array of tool manifests for connected Klavis servers
|
||||
*/
|
||||
async getKlavisManifests(): Promise<LobeToolManifest[]> {
|
||||
if (!this.pluginModel) {
|
||||
log('getKlavisManifests: pluginModel not available, returning empty array');
|
||||
return [];
|
||||
}
|
||||
|
||||
try {
|
||||
// Get all plugins from database
|
||||
const allPlugins = await this.pluginModel.query();
|
||||
|
||||
// Filter plugins that have klavis customParams and are authenticated
|
||||
const klavisPlugins = allPlugins.filter(
|
||||
(plugin) => plugin.customParams?.klavis?.isAuthenticated === true,
|
||||
);
|
||||
|
||||
log('getKlavisManifests: found %d authenticated Klavis plugins', klavisPlugins.length);
|
||||
|
||||
// Convert to LobeToolManifest format
|
||||
const manifests: LobeToolManifest[] = klavisPlugins
|
||||
.map((plugin) => {
|
||||
if (!plugin.manifest) return null;
|
||||
|
||||
return {
|
||||
api: plugin.manifest.api || [],
|
||||
author: 'Klavis',
|
||||
homepage: 'https://klavis.ai',
|
||||
identifier: plugin.identifier,
|
||||
meta: plugin.manifest.meta || {
|
||||
avatar: '☁️',
|
||||
description: `Klavis MCP Server: ${plugin.customParams?.klavis?.serverName}`,
|
||||
tags: ['klavis', 'mcp'],
|
||||
title: plugin.customParams?.klavis?.serverName || plugin.identifier,
|
||||
},
|
||||
type: 'builtin',
|
||||
version: '1.0.0',
|
||||
};
|
||||
})
|
||||
.filter(Boolean) as LobeToolManifest[];
|
||||
|
||||
log('getKlavisManifests: returning %d manifests', manifests.length);
|
||||
|
||||
return manifests;
|
||||
} catch (error) {
|
||||
console.error('KlavisService.getKlavisManifests error: %O', error);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,7 @@ import { type ChatToolPayload } from '@lobechat/types';
|
||||
import { safeParseJSON } from '@lobechat/utils';
|
||||
import debug from 'debug';
|
||||
|
||||
import { KlavisService } from '@/server/services/klavis';
|
||||
import { MarketService } from '@/server/services/market';
|
||||
|
||||
import { getServerRuntime, hasServerRuntime } from './serverRuntimes';
|
||||
@@ -12,9 +13,11 @@ const log = debug('lobe-server:builtin-tools-executor');
|
||||
|
||||
export class BuiltinToolsExecutor implements IToolExecutor {
|
||||
private marketService: MarketService;
|
||||
private klavisService: KlavisService;
|
||||
|
||||
constructor(db: LobeChatDatabase, userId: string) {
|
||||
this.marketService = new MarketService({ userInfo: { userId } });
|
||||
this.klavisService = new KlavisService({ db, userId });
|
||||
}
|
||||
|
||||
async execute(
|
||||
@@ -41,6 +44,15 @@ export class BuiltinToolsExecutor implements IToolExecutor {
|
||||
});
|
||||
}
|
||||
|
||||
// Route Klavis tools to KlavisService
|
||||
if (source === 'klavis') {
|
||||
return this.klavisService.executeKlavisTool({
|
||||
args,
|
||||
identifier,
|
||||
toolName: apiName,
|
||||
});
|
||||
}
|
||||
|
||||
// Use server runtime registry (handles both pre-instantiated and per-request runtimes)
|
||||
if (!hasServerRuntime(identifier)) {
|
||||
throw new Error(`Builtin tool "${identifier}" is not implemented`);
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
import { type ChatToolPayload } from '@lobechat/types';
|
||||
import { safeParseJSON } from '@lobechat/utils';
|
||||
import debug from 'debug';
|
||||
|
||||
import { type CloudMCPParams, type ToolCallContent } from '@/libs/mcp';
|
||||
import { contentBlocksToString } from '@/server/services/mcp/contentProcessor';
|
||||
|
||||
import { DiscoverService } from '../discover';
|
||||
import { type MCPService } from '../mcp';
|
||||
import { type PluginGatewayService } from '../pluginGateway';
|
||||
import { type BuiltinToolsExecutor } from './builtin';
|
||||
@@ -121,12 +126,20 @@ export class ToolExecutionService {
|
||||
};
|
||||
}
|
||||
|
||||
// Construct MCPClientParams from the mcp config
|
||||
|
||||
log('Calling MCP service with params for: %s:%s', identifier, apiName);
|
||||
log(
|
||||
'Calling MCP service with params for: %s:%s (type: %s)',
|
||||
identifier,
|
||||
apiName,
|
||||
mcpParams.type,
|
||||
);
|
||||
|
||||
try {
|
||||
// Call the MCP service
|
||||
// Check if this is a cloud MCP endpoint
|
||||
if (mcpParams.type === 'cloud') {
|
||||
return await this.executeCloudMCPTool(payload, context, mcpParams);
|
||||
}
|
||||
|
||||
// For stdio/http/sse types, use standard MCP service
|
||||
const result = await this.mcpService.callTool({
|
||||
argsStr: args,
|
||||
clientParams: mcpParams,
|
||||
@@ -152,6 +165,59 @@ export class ToolExecutionService {
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private async executeCloudMCPTool(
|
||||
payload: ChatToolPayload,
|
||||
context: ToolExecutionContext,
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
_mcpParams: CloudMCPParams,
|
||||
): Promise<ToolExecutionResult> {
|
||||
const { identifier, apiName, arguments: args } = payload;
|
||||
|
||||
log('Executing Cloud MCP tool: %s:%s via cloud gateway', identifier, apiName);
|
||||
|
||||
try {
|
||||
// Create DiscoverService with user context
|
||||
const discoverService = new DiscoverService({
|
||||
userInfo: context.userId ? { userId: context.userId } : undefined,
|
||||
});
|
||||
|
||||
// Parse arguments
|
||||
const apiParams = safeParseJSON(args) || {};
|
||||
|
||||
// Call cloud MCP endpoint via Market API
|
||||
// Returns CloudGatewayResponse: { content: ToolCallContent[], isError?: boolean }
|
||||
const cloudResult = await discoverService.callCloudMcpEndpoint({
|
||||
apiParams,
|
||||
identifier,
|
||||
toolName: apiName,
|
||||
});
|
||||
|
||||
const cloudResultContent = (cloudResult?.content ?? []) as ToolCallContent[];
|
||||
|
||||
// Convert content blocks to string (same as market router does)
|
||||
const content = contentBlocksToString(cloudResultContent);
|
||||
const state = { ...cloudResult, content: cloudResultContent };
|
||||
|
||||
log('Cloud MCP tool execution successful for: %s:%s', identifier, apiName);
|
||||
|
||||
return {
|
||||
content,
|
||||
state,
|
||||
success: !cloudResult?.isError,
|
||||
};
|
||||
} catch (error) {
|
||||
log('Cloud MCP tool execution failed for %s:%s: %O', identifier, apiName, error);
|
||||
return {
|
||||
content: (error as Error).message,
|
||||
error: {
|
||||
code: 'CLOUD_MCP_EXECUTION_ERROR',
|
||||
message: (error as Error).message,
|
||||
},
|
||||
success: false,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export * from './types';
|
||||
|
||||
Reference in New Issue
Block a user