cron: queue isolated delivery awareness

This commit is contained in:
Sparkyrider
2026-03-25 03:15:37 -05:00
committed by Ayaan Zaidi
parent 2a40612058
commit 55dc6a8bb2
3 changed files with 243 additions and 1 deletions

View File

@@ -0,0 +1,107 @@
import fs from "node:fs/promises";
import path from "node:path";
import "./isolated-agent.mocks.js";
import { beforeEach, describe, expect, it } from "vitest";
import type { CliDeps } from "../cli/deps.js";
import { resolveDefaultSessionStorePath } from "../config/sessions.js";
import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js";
import { createCliDeps, mockAgentPayloads } from "./isolated-agent.delivery.test-helpers.js";
import { runCronIsolatedAgentTurn } from "./isolated-agent.js";
import { makeCfg, makeJob, withTempCronHome } from "./isolated-agent.test-harness.js";
import { setupIsolatedAgentTurnMocks } from "./isolated-agent.test-setup.js";
import { resetCompletedDirectCronDeliveriesForTests } from "./isolated-agent/delivery-dispatch.js";
async function writeDefaultAgentSessionStoreEntries(
entries: Record<string, Record<string, unknown>>,
): Promise<string> {
const storePath = resolveDefaultSessionStorePath("main");
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(storePath, JSON.stringify(entries, null, 2), "utf-8");
return storePath;
}
async function runAnnounceTurn(params: {
home: string;
storePath: string;
sessionKey: string;
deps?: CliDeps;
cfgOverrides?: Partial<ReturnType<typeof makeCfg>>;
delivery: {
mode: "announce";
channel: "last" | "telegram";
to?: string;
bestEffort?: boolean;
};
}) {
return await runCronIsolatedAgentTurn({
cfg: makeCfg(params.home, params.storePath, params.cfgOverrides),
deps: params.deps ?? createCliDeps(),
job: {
...makeJob({ kind: "agentTurn", message: "do it" }),
sessionTarget: "isolated",
delivery: params.delivery,
},
message: "do it",
sessionKey: params.sessionKey,
lane: "cron",
});
}
describe("runCronIsolatedAgentTurn cron delivery awareness", () => {
beforeEach(() => {
setupIsolatedAgentTurnMocks();
resetCompletedDirectCronDeliveriesForTests();
resetSystemEventsForTest();
});
it("queues delivered isolated cron text for the next main-session turn", async () => {
await withTempCronHome(async (home) => {
const storePath = await writeDefaultAgentSessionStoreEntries({});
const deps = createCliDeps();
mockAgentPayloads([{ text: "hello from cron" }]);
const result = await runAnnounceTurn({
home,
storePath,
sessionKey: "cron:job-1",
deps,
delivery: {
mode: "announce",
channel: "telegram",
to: "123",
},
});
expect(result.status).toBe("ok");
expect(result.delivered).toBe(true);
expect(peekSystemEvents("agent:main:main")).toEqual(["hello from cron"]);
});
});
it("uses the global main queue when session scope is global", async () => {
await withTempCronHome(async (home) => {
const storePath = await writeDefaultAgentSessionStoreEntries({});
const deps = createCliDeps();
mockAgentPayloads([{ text: "global cron digest" }]);
const result = await runAnnounceTurn({
home,
storePath,
sessionKey: "cron:job-1",
deps,
cfgOverrides: {
session: { scope: "global", store: storePath, mainKey: "main" },
},
delivery: {
mode: "announce",
channel: "telegram",
to: "123",
},
});
expect(result.status).toBe("ok");
expect(result.delivered).toBe(true);
expect(peekSystemEvents("global")).toEqual(["global cron digest"]);
});
});
});

View File

@@ -14,6 +14,11 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
// --- Module mocks (must be hoisted before imports) ---
vi.mock("../../config/sessions.js", () => ({
resolveAgentMainSessionKey: vi.fn(({ agentId }: { agentId: string }) => `agent:${agentId}:main`),
resolveMainSessionKey: vi.fn(() => "global"),
}));
vi.mock("../../agents/subagent-registry.js", () => ({
countActiveDescendantRuns: vi.fn().mockReturnValue(0),
}));
@@ -39,6 +44,10 @@ vi.mock("../../logger.js", () => ({
logError: vi.fn(),
}));
vi.mock("../../infra/system-events.js", () => ({
enqueueSystemEvent: vi.fn(),
}));
vi.mock("./subagent-followup.js", () => ({
expectsSubagentFollowup: vi.fn().mockReturnValue(false),
isLikelyInterimCronMessage: vi.fn().mockReturnValue(false),
@@ -49,6 +58,7 @@ vi.mock("./subagent-followup.js", () => ({
// Import after mocks
import { countActiveDescendantRuns } from "../../agents/subagent-registry.js";
import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
import { enqueueSystemEvent } from "../../infra/system-events.js";
import { shouldEnqueueCronMainSummary } from "../heartbeat-policy.js";
import {
dispatchCronDelivery,
@@ -93,6 +103,8 @@ function makeBaseParams(overrides: {
synthesizedText?: string;
deliveryRequested?: boolean;
runSessionId?: string;
sessionTarget?: string;
deliveryBestEffort?: boolean;
}) {
const resolvedDelivery = makeResolvedDelivery();
return {
@@ -102,6 +114,7 @@ function makeBaseParams(overrides: {
job: {
id: "test-job",
name: "Test Job",
sessionTarget: overrides.sessionTarget ?? "isolated",
deleteAfterRun: false,
payload: { kind: "agentTurn", message: "hello" },
} as never,
@@ -114,7 +127,7 @@ function makeBaseParams(overrides: {
resolvedDelivery,
deliveryRequested: overrides.deliveryRequested ?? true,
skipHeartbeatDelivery: false,
deliveryBestEffort: false,
deliveryBestEffort: overrides.deliveryBestEffort ?? false,
deliveryPayloadHasStructuredContent: false,
deliveryPayloads: overrides.synthesizedText ? [{ text: overrides.synthesizedText }] : [],
synthesizedText: overrides.synthesizedText ?? "on it",
@@ -257,6 +270,73 @@ describe("dispatchCronDelivery — double-announce guard", () => {
).toBe(false);
});
it("queues main-session awareness for isolated cron jobs after delivery", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
const params = makeBaseParams({ synthesizedText: "Morning briefing complete." });
const state = await dispatchCronDelivery(params);
expect(state.result).toBeUndefined();
expect(state.delivered).toBe(true);
expect(state.deliveryAttempted).toBe(true);
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
expect(enqueueSystemEvent).toHaveBeenCalledWith("Morning briefing complete.", {
sessionKey: "agent:main:main",
contextKey: "cron-direct-delivery:v1:run-123:telegram::123456:",
});
});
it("keeps the cron run successful when awareness queueing throws after delivery", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
vi.mocked(enqueueSystemEvent).mockImplementation(() => {
throw new Error("queue unavailable");
});
const params = makeBaseParams({ synthesizedText: "Morning briefing complete." });
const state = await dispatchCronDelivery(params);
expect(state.result).toBeUndefined();
expect(state.delivered).toBe(true);
expect(state.deliveryAttempted).toBe(true);
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
});
it("skips main-session awareness for session-bound cron jobs", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
const params = makeBaseParams({
synthesizedText: "Session-bound cron update.",
sessionTarget: "session:agent:main:main:thread:9999",
});
const state = await dispatchCronDelivery(params);
expect(state.result).toBeUndefined();
expect(state.delivered).toBe(true);
expect(state.deliveryAttempted).toBe(true);
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
expect(enqueueSystemEvent).not.toHaveBeenCalled();
});
it("skips main-session awareness for best-effort deliveries", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
const params = makeBaseParams({
synthesizedText: "Best-effort cron update.",
deliveryBestEffort: true,
});
const state = await dispatchCronDelivery(params);
expect(state.result).toBeUndefined();
expect(state.delivered).toBe(true);
expect(state.deliveryAttempted).toBe(true);
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
expect(enqueueSystemEvent).not.toHaveBeenCalled();
});
it("skips stale cron deliveries while still suppressing fallback main summary", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-18T17:00:00.000Z"));

View File

@@ -3,6 +3,7 @@ import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js";
import type { ReplyPayload } from "../../auto-reply/types.js";
import { createOutboundSendDeps, type CliDeps } from "../../cli/outbound-send-deps.js";
import type { OpenClawConfig } from "../../config/config.js";
import { resolveAgentMainSessionKey, resolveMainSessionKey } from "../../config/sessions.js";
import { callGateway } from "../../gateway/call.js";
import { sleepWithAbort } from "../../infra/backoff.js";
import {
@@ -11,6 +12,7 @@ import {
} from "../../infra/outbound/deliver.js";
import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js";
import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js";
import { enqueueSystemEvent } from "../../infra/system-events.js";
import { logWarn, logError } from "../../logger.js";
import type { CronJob, CronRunTelemetry } from "../types.js";
import type { DeliveryTargetResolution } from "./delivery-target.js";
@@ -229,6 +231,49 @@ function buildDirectCronDeliveryIdempotencyKey(params: {
return `cron-direct-delivery:v1:${params.runSessionId}:${params.delivery.channel}:${accountId}:${normalizedTo}:${threadId}`;
}
function shouldQueueCronAwareness(job: CronJob, deliveryBestEffort: boolean): boolean {
// Keep issue #52136 scoped to isolated runs. Session-bound cron jobs keep
// their existing behavior, and best-effort sends may only partially deliver.
return job.sessionTarget === "isolated" && !deliveryBestEffort;
}
function resolveCronAwarenessMainSessionKey(params: {
cfg: OpenClawConfig;
agentId: string;
}): string {
return params.cfg.session?.scope === "global"
? resolveMainSessionKey(params.cfg)
: resolveAgentMainSessionKey({ cfg: params.cfg, agentId: params.agentId });
}
function queueCronAwarenessSystemEvent(params: {
cfg: OpenClawConfig;
jobId: string;
agentId: string;
deliveryIdempotencyKey: string;
outputText?: string;
synthesizedText?: string;
}): void {
const text = params.outputText?.trim() || params.synthesizedText?.trim() || undefined;
if (!text) {
return;
}
try {
enqueueSystemEvent(text, {
sessionKey: resolveCronAwarenessMainSessionKey({
cfg: params.cfg,
agentId: params.agentId,
}),
contextKey: params.deliveryIdempotencyKey,
});
} catch (err) {
logWarn(
`[cron:${params.jobId}] failed to queue isolated cron awareness for the main session: ${err instanceof Error ? err.message : String(err)}`,
);
}
}
export function resetCompletedDirectCronDeliveriesForTests() {
COMPLETED_DIRECT_CRON_DELIVERIES.clear();
}
@@ -436,6 +481,16 @@ export async function dispatchCronDelivery(
// Intentionally leave partial success uncached: replay may duplicate the
// successful subset, but caching it here would permanently drop the
// failed payloads by converting the replay into delivered=true.
if (delivered && shouldQueueCronAwareness(params.job, params.deliveryBestEffort)) {
queueCronAwarenessSystemEvent({
cfg: params.cfgWithAgentDefaults,
jobId: params.job.id,
agentId: params.agentId,
deliveryIdempotencyKey,
outputText,
synthesizedText,
});
}
if (delivered) {
rememberCompletedDirectCronDelivery(deliveryIdempotencyKey, deliveryResults);
}