From 55dc6a8bb28dd9b6f4c4c1a8e8497fa48f7cc254 Mon Sep 17 00:00:00 2001 From: Sparkyrider Date: Wed, 25 Mar 2026 03:15:37 -0500 Subject: [PATCH] cron: queue isolated delivery awareness --- .../isolated-agent.delivery-awareness.test.ts | 107 ++++++++++++++++++ .../delivery-dispatch.double-announce.test.ts | 82 +++++++++++++- src/cron/isolated-agent/delivery-dispatch.ts | 55 +++++++++ 3 files changed, 243 insertions(+), 1 deletion(-) create mode 100644 src/cron/isolated-agent.delivery-awareness.test.ts diff --git a/src/cron/isolated-agent.delivery-awareness.test.ts b/src/cron/isolated-agent.delivery-awareness.test.ts new file mode 100644 index 00000000000..dead730e07f --- /dev/null +++ b/src/cron/isolated-agent.delivery-awareness.test.ts @@ -0,0 +1,107 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import "./isolated-agent.mocks.js"; +import { beforeEach, describe, expect, it } from "vitest"; +import type { CliDeps } from "../cli/deps.js"; +import { resolveDefaultSessionStorePath } from "../config/sessions.js"; +import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js"; +import { createCliDeps, mockAgentPayloads } from "./isolated-agent.delivery.test-helpers.js"; +import { runCronIsolatedAgentTurn } from "./isolated-agent.js"; +import { makeCfg, makeJob, withTempCronHome } from "./isolated-agent.test-harness.js"; +import { setupIsolatedAgentTurnMocks } from "./isolated-agent.test-setup.js"; +import { resetCompletedDirectCronDeliveriesForTests } from "./isolated-agent/delivery-dispatch.js"; + +async function writeDefaultAgentSessionStoreEntries( + entries: Record>, +): Promise { + const storePath = resolveDefaultSessionStorePath("main"); + await fs.mkdir(path.dirname(storePath), { recursive: true }); + await fs.writeFile(storePath, JSON.stringify(entries, null, 2), "utf-8"); + return storePath; +} + +async function runAnnounceTurn(params: { + home: string; + storePath: string; + sessionKey: string; + deps?: CliDeps; + cfgOverrides?: Partial>; + delivery: { + mode: "announce"; + channel: "last" | "telegram"; + to?: string; + bestEffort?: boolean; + }; +}) { + return await runCronIsolatedAgentTurn({ + cfg: makeCfg(params.home, params.storePath, params.cfgOverrides), + deps: params.deps ?? createCliDeps(), + job: { + ...makeJob({ kind: "agentTurn", message: "do it" }), + sessionTarget: "isolated", + delivery: params.delivery, + }, + message: "do it", + sessionKey: params.sessionKey, + lane: "cron", + }); +} + +describe("runCronIsolatedAgentTurn cron delivery awareness", () => { + beforeEach(() => { + setupIsolatedAgentTurnMocks(); + resetCompletedDirectCronDeliveriesForTests(); + resetSystemEventsForTest(); + }); + + it("queues delivered isolated cron text for the next main-session turn", async () => { + await withTempCronHome(async (home) => { + const storePath = await writeDefaultAgentSessionStoreEntries({}); + const deps = createCliDeps(); + mockAgentPayloads([{ text: "hello from cron" }]); + + const result = await runAnnounceTurn({ + home, + storePath, + sessionKey: "cron:job-1", + deps, + delivery: { + mode: "announce", + channel: "telegram", + to: "123", + }, + }); + + expect(result.status).toBe("ok"); + expect(result.delivered).toBe(true); + expect(peekSystemEvents("agent:main:main")).toEqual(["hello from cron"]); + }); + }); + + it("uses the global main queue when session scope is global", async () => { + await withTempCronHome(async (home) => { + const storePath = await writeDefaultAgentSessionStoreEntries({}); + const deps = createCliDeps(); + mockAgentPayloads([{ text: "global cron digest" }]); + + const result = await runAnnounceTurn({ + home, + storePath, + sessionKey: "cron:job-1", + deps, + cfgOverrides: { + session: { scope: "global", store: storePath, mainKey: "main" }, + }, + delivery: { + mode: "announce", + channel: "telegram", + to: "123", + }, + }); + + expect(result.status).toBe("ok"); + expect(result.delivered).toBe(true); + expect(peekSystemEvents("global")).toEqual(["global cron digest"]); + }); + }); +}); diff --git a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts index 82b7c03661a..427d10d99a9 100644 --- a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts +++ b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts @@ -14,6 +14,11 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; // --- Module mocks (must be hoisted before imports) --- +vi.mock("../../config/sessions.js", () => ({ + resolveAgentMainSessionKey: vi.fn(({ agentId }: { agentId: string }) => `agent:${agentId}:main`), + resolveMainSessionKey: vi.fn(() => "global"), +})); + vi.mock("../../agents/subagent-registry.js", () => ({ countActiveDescendantRuns: vi.fn().mockReturnValue(0), })); @@ -39,6 +44,10 @@ vi.mock("../../logger.js", () => ({ logError: vi.fn(), })); +vi.mock("../../infra/system-events.js", () => ({ + enqueueSystemEvent: vi.fn(), +})); + vi.mock("./subagent-followup.js", () => ({ expectsSubagentFollowup: vi.fn().mockReturnValue(false), isLikelyInterimCronMessage: vi.fn().mockReturnValue(false), @@ -49,6 +58,7 @@ vi.mock("./subagent-followup.js", () => ({ // Import after mocks import { countActiveDescendantRuns } from "../../agents/subagent-registry.js"; import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js"; +import { enqueueSystemEvent } from "../../infra/system-events.js"; import { shouldEnqueueCronMainSummary } from "../heartbeat-policy.js"; import { dispatchCronDelivery, @@ -93,6 +103,8 @@ function makeBaseParams(overrides: { synthesizedText?: string; deliveryRequested?: boolean; runSessionId?: string; + sessionTarget?: string; + deliveryBestEffort?: boolean; }) { const resolvedDelivery = makeResolvedDelivery(); return { @@ -102,6 +114,7 @@ function makeBaseParams(overrides: { job: { id: "test-job", name: "Test Job", + sessionTarget: overrides.sessionTarget ?? "isolated", deleteAfterRun: false, payload: { kind: "agentTurn", message: "hello" }, } as never, @@ -114,7 +127,7 @@ function makeBaseParams(overrides: { resolvedDelivery, deliveryRequested: overrides.deliveryRequested ?? true, skipHeartbeatDelivery: false, - deliveryBestEffort: false, + deliveryBestEffort: overrides.deliveryBestEffort ?? false, deliveryPayloadHasStructuredContent: false, deliveryPayloads: overrides.synthesizedText ? [{ text: overrides.synthesizedText }] : [], synthesizedText: overrides.synthesizedText ?? "on it", @@ -257,6 +270,73 @@ describe("dispatchCronDelivery — double-announce guard", () => { ).toBe(false); }); + it("queues main-session awareness for isolated cron jobs after delivery", async () => { + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); + + const params = makeBaseParams({ synthesizedText: "Morning briefing complete." }); + const state = await dispatchCronDelivery(params); + + expect(state.result).toBeUndefined(); + expect(state.delivered).toBe(true); + expect(state.deliveryAttempted).toBe(true); + expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1); + expect(enqueueSystemEvent).toHaveBeenCalledWith("Morning briefing complete.", { + sessionKey: "agent:main:main", + contextKey: "cron-direct-delivery:v1:run-123:telegram::123456:", + }); + }); + + it("keeps the cron run successful when awareness queueing throws after delivery", async () => { + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); + vi.mocked(enqueueSystemEvent).mockImplementation(() => { + throw new Error("queue unavailable"); + }); + + const params = makeBaseParams({ synthesizedText: "Morning briefing complete." }); + const state = await dispatchCronDelivery(params); + + expect(state.result).toBeUndefined(); + expect(state.delivered).toBe(true); + expect(state.deliveryAttempted).toBe(true); + expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1); + }); + + it("skips main-session awareness for session-bound cron jobs", async () => { + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); + + const params = makeBaseParams({ + synthesizedText: "Session-bound cron update.", + sessionTarget: "session:agent:main:main:thread:9999", + }); + const state = await dispatchCronDelivery(params); + + expect(state.result).toBeUndefined(); + expect(state.delivered).toBe(true); + expect(state.deliveryAttempted).toBe(true); + expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1); + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + }); + + it("skips main-session awareness for best-effort deliveries", async () => { + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); + + const params = makeBaseParams({ + synthesizedText: "Best-effort cron update.", + deliveryBestEffort: true, + }); + const state = await dispatchCronDelivery(params); + + expect(state.result).toBeUndefined(); + expect(state.delivered).toBe(true); + expect(state.deliveryAttempted).toBe(true); + expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1); + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + }); + it("skips stale cron deliveries while still suppressing fallback main summary", async () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-03-18T17:00:00.000Z")); diff --git a/src/cron/isolated-agent/delivery-dispatch.ts b/src/cron/isolated-agent/delivery-dispatch.ts index 68083bfb5fd..25aa807afe8 100644 --- a/src/cron/isolated-agent/delivery-dispatch.ts +++ b/src/cron/isolated-agent/delivery-dispatch.ts @@ -3,6 +3,7 @@ import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js"; import type { ReplyPayload } from "../../auto-reply/types.js"; import { createOutboundSendDeps, type CliDeps } from "../../cli/outbound-send-deps.js"; import type { OpenClawConfig } from "../../config/config.js"; +import { resolveAgentMainSessionKey, resolveMainSessionKey } from "../../config/sessions.js"; import { callGateway } from "../../gateway/call.js"; import { sleepWithAbort } from "../../infra/backoff.js"; import { @@ -11,6 +12,7 @@ import { } from "../../infra/outbound/deliver.js"; import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js"; import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js"; +import { enqueueSystemEvent } from "../../infra/system-events.js"; import { logWarn, logError } from "../../logger.js"; import type { CronJob, CronRunTelemetry } from "../types.js"; import type { DeliveryTargetResolution } from "./delivery-target.js"; @@ -229,6 +231,49 @@ function buildDirectCronDeliveryIdempotencyKey(params: { return `cron-direct-delivery:v1:${params.runSessionId}:${params.delivery.channel}:${accountId}:${normalizedTo}:${threadId}`; } +function shouldQueueCronAwareness(job: CronJob, deliveryBestEffort: boolean): boolean { + // Keep issue #52136 scoped to isolated runs. Session-bound cron jobs keep + // their existing behavior, and best-effort sends may only partially deliver. + return job.sessionTarget === "isolated" && !deliveryBestEffort; +} + +function resolveCronAwarenessMainSessionKey(params: { + cfg: OpenClawConfig; + agentId: string; +}): string { + return params.cfg.session?.scope === "global" + ? resolveMainSessionKey(params.cfg) + : resolveAgentMainSessionKey({ cfg: params.cfg, agentId: params.agentId }); +} + +function queueCronAwarenessSystemEvent(params: { + cfg: OpenClawConfig; + jobId: string; + agentId: string; + deliveryIdempotencyKey: string; + outputText?: string; + synthesizedText?: string; +}): void { + const text = params.outputText?.trim() || params.synthesizedText?.trim() || undefined; + if (!text) { + return; + } + + try { + enqueueSystemEvent(text, { + sessionKey: resolveCronAwarenessMainSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + }), + contextKey: params.deliveryIdempotencyKey, + }); + } catch (err) { + logWarn( + `[cron:${params.jobId}] failed to queue isolated cron awareness for the main session: ${err instanceof Error ? err.message : String(err)}`, + ); + } +} + export function resetCompletedDirectCronDeliveriesForTests() { COMPLETED_DIRECT_CRON_DELIVERIES.clear(); } @@ -436,6 +481,16 @@ export async function dispatchCronDelivery( // Intentionally leave partial success uncached: replay may duplicate the // successful subset, but caching it here would permanently drop the // failed payloads by converting the replay into delivered=true. + if (delivered && shouldQueueCronAwareness(params.job, params.deliveryBestEffort)) { + queueCronAwarenessSystemEvent({ + cfg: params.cfgWithAgentDefaults, + jobId: params.job.id, + agentId: params.agentId, + deliveryIdempotencyKey, + outputText, + synthesizedText, + }); + } if (delivered) { rememberCompletedDirectCronDelivery(deliveryIdempotencyKey, deliveryResults); }