feat: generate agent_cron_jobs in agents (#11349)

* feat: generate agent_cron_job in agents

* feat: update the db fields

* feat: add trigger/mode in topics table & add group id in cronjob

* feat: update sql

* fix: fixed db migration

* feat: update the test
This commit is contained in:
Shinji-Li
2026-01-09 14:22:35 +08:00
committed by GitHub
parent 5a8b02ebb0
commit eefb6cb185
10 changed files with 10766 additions and 2 deletions

View File

@@ -67,6 +67,37 @@ table agents_knowledge_bases {
}
}
table agent_cron_jobs {
id text [pk, not null]
agent_id text [not null]
group_id text
user_id text [not null]
name text
description text
enabled boolean [default: true]
cron_pattern text [not null]
timezone text [default: 'UTC']
content text [not null]
edit_data jsonb
max_executions integer
remaining_executions integer
execution_conditions jsonb
last_executed_at timestamp
total_executions integer [default: 0]
accessed_at "timestamp with time zone" [not null, default: `now()`]
created_at "timestamp with time zone" [not null, default: `now()`]
updated_at "timestamp with time zone" [not null, default: `now()`]
indexes {
agent_id [name: 'agent_cron_jobs_agent_id_idx']
group_id [name: 'agent_cron_jobs_group_id_idx']
user_id [name: 'agent_cron_jobs_user_id_idx']
enabled [name: 'agent_cron_jobs_enabled_idx']
remaining_executions [name: 'agent_cron_jobs_remaining_executions_idx']
last_executed_at [name: 'agent_cron_jobs_last_executed_at_idx']
}
}
table ai_models {
id varchar(150) [not null]
display_name varchar(200)
@@ -269,7 +300,6 @@ table chat_groups_agents {
table documents {
id varchar(255) [pk, not null]
slug varchar(255)
title text
description text
content text
@@ -287,6 +317,7 @@ table documents {
user_id text [not null]
client_id text
editor_data jsonb
slug varchar(255)
accessed_at "timestamp with time zone" [not null, default: `now()`]
created_at "timestamp with time zone" [not null, default: `now()`]
updated_at "timestamp with time zone" [not null, default: `now()`]
@@ -1071,6 +1102,8 @@ table topics {
client_id text
history_summary text
metadata jsonb
trigger text
mode text
accessed_at "timestamp with time zone" [not null, default: `now()`]
created_at "timestamp with time zone" [not null, default: `now()`]
updated_at "timestamp with time zone" [not null, default: `now()`]

View File

@@ -0,0 +1,54 @@
CREATE TABLE IF NOT EXISTS "agent_cron_jobs" (
"id" text PRIMARY KEY NOT NULL,
"agent_id" text NOT NULL,
"group_id" text,
"user_id" text NOT NULL,
"name" text,
"description" text,
"enabled" boolean DEFAULT true,
"cron_pattern" text NOT NULL,
"timezone" text DEFAULT 'UTC',
"content" text NOT NULL,
"edit_data" jsonb,
"max_executions" integer,
"remaining_executions" integer,
"execution_conditions" jsonb,
"last_executed_at" timestamp,
"total_executions" integer DEFAULT 0,
"accessed_at" timestamp with time zone DEFAULT now() NOT NULL,
"created_at" timestamp with time zone DEFAULT now() NOT NULL,
"updated_at" timestamp with time zone DEFAULT now() NOT NULL
);
--> statement-breakpoint
ALTER TABLE "topics" ADD COLUMN IF NOT EXISTS "trigger" text;--> statement-breakpoint
ALTER TABLE "topics" ADD COLUMN IF NOT EXISTS "mode" text;--> statement-breakpoint
DO $$ BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conname = 'agent_cron_jobs_agent_id_agents_id_fk'
) THEN
ALTER TABLE "agent_cron_jobs" ADD CONSTRAINT "agent_cron_jobs_agent_id_agents_id_fk" FOREIGN KEY ("agent_id") REFERENCES "public"."agents"("id") ON DELETE cascade ON UPDATE no action;
END IF;
END $$;--> statement-breakpoint
DO $$ BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conname = 'agent_cron_jobs_group_id_chat_groups_id_fk'
) THEN
ALTER TABLE "agent_cron_jobs" ADD CONSTRAINT "agent_cron_jobs_group_id_chat_groups_id_fk" FOREIGN KEY ("group_id") REFERENCES "public"."chat_groups"("id") ON DELETE cascade ON UPDATE no action;
END IF;
END $$;--> statement-breakpoint
DO $$ BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conname = 'agent_cron_jobs_user_id_users_id_fk'
) THEN
ALTER TABLE "agent_cron_jobs" ADD CONSTRAINT "agent_cron_jobs_user_id_users_id_fk" FOREIGN KEY ("user_id") REFERENCES "public"."users"("id") ON DELETE cascade ON UPDATE no action;
END IF;
END $$;--> statement-breakpoint
CREATE INDEX IF NOT EXISTS "agent_cron_jobs_agent_id_idx" ON "agent_cron_jobs" USING btree ("agent_id");--> statement-breakpoint
CREATE INDEX IF NOT EXISTS "agent_cron_jobs_group_id_idx" ON "agent_cron_jobs" USING btree ("group_id");--> statement-breakpoint
CREATE INDEX IF NOT EXISTS "agent_cron_jobs_user_id_idx" ON "agent_cron_jobs" USING btree ("user_id");--> statement-breakpoint
CREATE INDEX IF NOT EXISTS "agent_cron_jobs_enabled_idx" ON "agent_cron_jobs" USING btree ("enabled");--> statement-breakpoint
CREATE INDEX IF NOT EXISTS "agent_cron_jobs_remaining_executions_idx" ON "agent_cron_jobs" USING btree ("remaining_executions");--> statement-breakpoint
CREATE INDEX IF NOT EXISTS "agent_cron_jobs_last_executed_at_idx" ON "agent_cron_jobs" USING btree ("last_executed_at");

File diff suppressed because it is too large Load Diff

View File

@@ -469,7 +469,14 @@
"when": 1766474494249,
"tag": "0066_add_document_fields",
"breakpoints": true
},
{
"idx": 67,
"version": "7",
"when": 1767929492232,
"tag": "0067_add_agent_cron_tables",
"breakpoints": true
}
],
"version": "6"
}
}

View File

@@ -57,6 +57,8 @@ describe('TopicModel - Create', () => {
agentId: null,
content: null,
editorData: null,
trigger: null,
mode: null,
createdAt: expect.any(Date),
updatedAt: expect.any(Date),
accessedAt: expect.any(Date),
@@ -96,6 +98,8 @@ describe('TopicModel - Create', () => {
groupId: null,
historySummary: null,
metadata: null,
trigger: null,
mode: null,
sessionId,
userId,
createdAt: expect.any(Date),

View File

@@ -0,0 +1,286 @@
import { and, desc, eq, gt, isNull, or, sql } from 'drizzle-orm';
import {
type AgentCronJob,
type CreateAgentCronJobData,
type NewAgentCronJob,
type UpdateAgentCronJobData,
agentCronJobs,
} from '../schemas/agentCronJob';
import type { LobeChatDatabase } from '../type';
export class AgentCronJobModel {
private readonly userId: string;
private readonly db: LobeChatDatabase;
constructor(db: LobeChatDatabase, userId?: string) {
this.db = db;
this.userId = userId!;
}
// Create a new cron job
async create(data: CreateAgentCronJobData): Promise<AgentCronJob> {
const cronJob = await this.db
.insert(agentCronJobs)
.values({
...data,
// Initialize remaining executions to match max executions
remainingExecutions: data.maxExecutions,
userId: this.userId,
} as NewAgentCronJob)
.returning();
return cronJob[0];
}
// Find cron job by ID (with user ownership check)
async findById(id: string): Promise<AgentCronJob | null> {
const result = await this.db
.select()
.from(agentCronJobs)
.where(and(eq(agentCronJobs.id, id), eq(agentCronJobs.userId, this.userId)))
.limit(1);
return result[0] || null;
}
// Find all cron jobs for a specific agent
async findByAgentId(agentId: string): Promise<AgentCronJob[]> {
return this.db
.select()
.from(agentCronJobs)
.where(and(eq(agentCronJobs.agentId, agentId), eq(agentCronJobs.userId, this.userId)))
.orderBy(desc(agentCronJobs.createdAt));
}
// Find all cron jobs for the user (across all agents)
async findByUserId(): Promise<AgentCronJob[]> {
return this.db
.select()
.from(agentCronJobs)
.where(eq(agentCronJobs.userId, this.userId))
.orderBy(desc(agentCronJobs.lastExecutedAt));
}
// Get all enabled cron jobs (system-wide for execution)
static async getEnabledJobs(db: LobeChatDatabase): Promise<AgentCronJob[]> {
return db
.select()
.from(agentCronJobs)
.where(
and(
eq(agentCronJobs.enabled, true),
or(gt(agentCronJobs.remainingExecutions, 0), isNull(agentCronJobs.remainingExecutions)),
),
)
.orderBy(agentCronJobs.lastExecutedAt);
}
// Update cron job
async update(id: string, data: UpdateAgentCronJobData): Promise<AgentCronJob | null> {
const result = await this.db
.update(agentCronJobs)
.set({
...data,
updatedAt: new Date(),
})
.where(and(eq(agentCronJobs.id, id), eq(agentCronJobs.userId, this.userId)))
.returning();
return result[0] || null;
}
// Delete cron job
async delete(id: string): Promise<boolean> {
const result = await this.db
.delete(agentCronJobs)
.where(and(eq(agentCronJobs.id, id), eq(agentCronJobs.userId, this.userId)))
.returning();
return result.length > 0;
}
// Update execution statistics after job execution
static async updateExecutionStats(
db: LobeChatDatabase,
jobId: string,
): Promise<AgentCronJob | null> {
// Update execution statistics and decrement remaining executions
const result = await db
.update(agentCronJobs)
.set({
lastExecutedAt: new Date(),
remainingExecutions: sql`
CASE
WHEN ${agentCronJobs.remainingExecutions} IS NULL THEN NULL
ELSE ${agentCronJobs.remainingExecutions} - 1
END
`,
totalExecutions: sql`${agentCronJobs.totalExecutions} + 1`,
updatedAt: new Date(),
})
.where(eq(agentCronJobs.id, jobId))
.returning();
const updatedJob = result[0];
// Auto-disable job if remaining executions reached 0
if (updatedJob && updatedJob.remainingExecutions === 0) {
await db
.update(agentCronJobs)
.set({
enabled: false,
updatedAt: new Date(),
})
.where(eq(agentCronJobs.id, jobId));
// Return updated job with enabled = false
return { ...updatedJob, enabled: false };
}
return updatedJob || null;
}
// Reset execution counts and re-enable job
async resetExecutions(id: string, newMaxExecutions?: number): Promise<AgentCronJob | null> {
const result = await this.db
.update(agentCronJobs)
.set({
enabled: true,
// Re-enable job when resetting
lastExecutedAt: null,
maxExecutions: newMaxExecutions,
remainingExecutions: newMaxExecutions,
totalExecutions: 0,
updatedAt: new Date(),
})
.where(and(eq(agentCronJobs.id, id), eq(agentCronJobs.userId, this.userId)))
.returning();
return result[0] || null;
}
// Get jobs that are near depletion (for warnings)
async getTasksNearDepletion(threshold: number = 5): Promise<AgentCronJob[]> {
return this.db
.select()
.from(agentCronJobs)
.where(
and(
eq(agentCronJobs.userId, this.userId),
eq(agentCronJobs.enabled, true),
gt(agentCronJobs.remainingExecutions, 0),
sql`${agentCronJobs.remainingExecutions} <= ${threshold}`,
),
)
.orderBy(agentCronJobs.remainingExecutions);
}
// Get jobs by execution status
async findByStatus(enabled: boolean): Promise<AgentCronJob[]> {
return this.db
.select()
.from(agentCronJobs)
.where(and(eq(agentCronJobs.userId, this.userId), eq(agentCronJobs.enabled, enabled)))
.orderBy(desc(agentCronJobs.updatedAt));
}
// Get execution statistics for dashboard
async getExecutionStats(): Promise<{
activeJobs: number;
completedExecutions: number;
pendingExecutions: number;
totalJobs: number;
}> {
const result = await this.db
.select({
activeJobs: sql<number>`sum(case when ${agentCronJobs.enabled} then 1 else 0 end)`,
completedExecutions: sql<number>`sum(${agentCronJobs.totalExecutions})`,
pendingExecutions: sql<number>`
sum(
case when ${agentCronJobs.remainingExecutions} is null then 999999
else coalesce(${agentCronJobs.remainingExecutions}, 0) end
)
`,
totalJobs: sql<number>`count(*)`,
})
.from(agentCronJobs)
.where(eq(agentCronJobs.userId, this.userId));
const stats = result[0];
return {
activeJobs: Number(stats.activeJobs),
completedExecutions: Number(stats.completedExecutions),
pendingExecutions: Number(stats.pendingExecutions === 999_999 ? 0 : stats.pendingExecutions),
totalJobs: Number(stats.totalJobs),
};
}
// Batch enable/disable jobs
async batchUpdateStatus(ids: string[], enabled: boolean): Promise<number> {
const result = await this.db
.update(agentCronJobs)
.set({
enabled,
updatedAt: new Date(),
})
.where(and(sql`${agentCronJobs.id} = ANY(${ids})`, eq(agentCronJobs.userId, this.userId)))
.returning();
return result.length;
}
// Count total jobs for pagination
async countByAgentId(agentId: string): Promise<number> {
const result = await this.db
.select({ count: sql<number>`count(*)` })
.from(agentCronJobs)
.where(and(eq(agentCronJobs.agentId, agentId), eq(agentCronJobs.userId, this.userId)));
return Number(result[0].count);
}
// Find jobs with pagination
async findWithPagination(options: {
agentId?: string;
enabled?: boolean;
limit?: number;
offset?: number;
}): Promise<{ jobs: AgentCronJob[]; total: number }> {
const { agentId, enabled, limit = 20, offset = 0 } = options;
const whereConditions = [eq(agentCronJobs.userId, this.userId)];
if (agentId) {
whereConditions.push(eq(agentCronJobs.agentId, agentId));
}
if (enabled !== undefined) {
whereConditions.push(eq(agentCronJobs.enabled, enabled));
}
const whereClause = and(...whereConditions);
// Get total count
const countResult = await this.db
.select({ count: sql<number>`count(*)` })
.from(agentCronJobs)
.where(whereClause);
const total = Number(countResult[0].count);
// Get paginated results
const jobs = await this.db
.select()
.from(agentCronJobs)
.where(whereClause)
.orderBy(desc(agentCronJobs.createdAt))
.limit(limit)
.offset(offset);
return { jobs, total };
}
}

View File

@@ -0,0 +1,138 @@
/* eslint-disable sort-keys-fix/sort-keys-fix */
import { boolean, index, integer, jsonb, pgTable, text, timestamp } from 'drizzle-orm/pg-core';
import { createInsertSchema } from 'drizzle-zod';
import { z } from 'zod';
import { idGenerator } from '../utils/idGenerator';
import { timestamps } from './_helpers';
import { agents } from './agent';
import { chatGroups } from './chatGroup';
import { users } from './user';
// Execution conditions type for JSONB field
export interface ExecutionConditions {
maxExecutionsPerDay?: number;
timeRange?: {
end: string; // "18:00"
start: string; // "09:00"
};
weekdays?: number[]; // [1,2,3,4,5] (Monday=1, Sunday=0)
}
// Agent cron jobs table - supports multiple cron jobs per agent
export const agentCronJobs = pgTable(
'agent_cron_jobs',
{
id: text('id')
.primaryKey()
.$defaultFn(() => idGenerator('agentCronJobs'))
.notNull(),
// Foreign keys
agentId: text('agent_id')
.references(() => agents.id, { onDelete: 'cascade' })
.notNull(),
groupId: text('group_id')
.references(() => chatGroups.id, { onDelete: 'cascade' }),
userId: text('user_id')
.references(() => users.id, { onDelete: 'cascade' })
.notNull(),
// Task identification
name: text('name'), // Optional task name like "Daily Report", "Data Monitoring"
description: text('description'), // Optional task description
// Core configuration
enabled: boolean('enabled').default(true),
cronPattern: text('cron_pattern').notNull(), // e.g., "0 */30 * * *"
timezone: text('timezone').default('UTC'),
// Content fields
content: text('content').notNull(), // Simple text content
editData: jsonb('edit_data'), // Rich content data (markdown, files, images, etc.)
// Execution count management
maxExecutions: integer('max_executions'), // null = unlimited
remainingExecutions: integer('remaining_executions'), // null = unlimited
// Execution conditions (stored as JSONB)
executionConditions: jsonb('execution_conditions').$type<ExecutionConditions>(),
// Execution statistics
lastExecutedAt: timestamp('last_executed_at'),
totalExecutions: integer('total_executions').default(0),
...timestamps,
},
(t) => [
// Indexes for performance
index('agent_cron_jobs_agent_id_idx').on(t.agentId),
index('agent_cron_jobs_group_id_idx').on(t.groupId),
index('agent_cron_jobs_user_id_idx').on(t.userId),
index('agent_cron_jobs_enabled_idx').on(t.enabled),
index('agent_cron_jobs_remaining_executions_idx').on(t.remainingExecutions),
index('agent_cron_jobs_last_executed_at_idx').on(t.lastExecutedAt),
],
);
// Validation schemas
export const cronPatternSchema = z
.string()
.regex(
/^(@(annually|yearly|monthly|weekly|daily|hourly|reboot))|(@every (\d+(ns|us|µs|ms|s|m|h))+)|((((\d+,)+\d+|(\d+(\/|-)\d+)|\d+|\*) ?){5,7})$/,
'Invalid cron pattern',
);
// Minimum 30 minutes validation
export const minimumIntervalSchema = z.string().refine((pattern) => {
// For simplicity, we'll validate common patterns
// More complex validation can be added later
const thirtyMinPatterns = [
'0 */30 * * *', // Every 30 minutes
'0 0 * * *', // Every hour
'0 0 */2 * *', // Every 2 hours
'0 0 */6 * *', // Every 6 hours
'0 0 0 * *', // Daily
'0 0 0 * * 1', // Weekly
'0 0 0 1 *', // Monthly
];
// Check if it matches allowed patterns or follows 30+ minute intervals
return (
thirtyMinPatterns.includes(pattern) ||
pattern.includes('*/30') ||
pattern.includes('*/60') ||
/0 \d+ \* \* \*/.test(pattern)
); // Hours pattern
}, 'Minimum execution interval is 30 minutes');
export const executionConditionsSchema = z
.object({
maxExecutionsPerDay: z.number().min(1).max(100).optional(),
timeRange: z
.object({
end: z.string().regex(/^([01]?\d|2[0-3]):[0-5]\d$/, 'Invalid time format'),
start: z.string().regex(/^([01]?\d|2[0-3]):[0-5]\d$/, 'Invalid time format'),
})
.optional(),
weekdays: z.array(z.number().min(0).max(6)).optional(),
})
.optional();
export const insertAgentCronJobSchema = createInsertSchema(agentCronJobs, {
cronPattern: minimumIntervalSchema,
content: z.string().min(1).max(2000),
editData: z.record(z.any()).optional(), // Allow any JSON structure for rich content
name: z.string().max(100).optional(),
description: z.string().max(500).optional(),
maxExecutions: z.number().min(1).max(10_000).optional(),
executionConditions: executionConditionsSchema,
});
export const updateAgentCronJobSchema = insertAgentCronJobSchema.partial();
// Type exports
export type NewAgentCronJob = typeof agentCronJobs.$inferInsert;
export type AgentCronJob = typeof agentCronJobs.$inferSelect;
export type CreateAgentCronJobData = z.infer<typeof insertAgentCronJobSchema>;
export type UpdateAgentCronJobData = z.infer<typeof updateAgentCronJobSchema>;

View File

@@ -1,4 +1,5 @@
export * from './agent';
export * from './agentCronJob';
export * from './aiInfra';
export * from './apiKey';
export * from './asyncTask';

View File

@@ -31,6 +31,8 @@ export const topics = pgTable(
clientId: text('client_id'),
historySummary: text('history_summary'),
metadata: jsonb('metadata').$type<ChatTopicMetadata | undefined>(),
trigger: text('trigger'), // 'cron' | 'chat' | 'api' - topic creation trigger source
mode: text('mode'), // 'temp' | 'test' | 'default' - topic usage scenario
...timestamps,
},
(t) => [

View File

@@ -6,6 +6,7 @@ export const createNanoId = (size = 8) =>
customAlphabet('1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ', size);
const prefixes = {
agentCronJobs: 'cron',
agents: 'agt',
budget: 'bgt',
chatGroups: 'cg',