refactor: split discord monitor startup and lifecycle

This commit is contained in:
Peter Steinberger
2026-03-26 23:56:08 +00:00
parent e3cd209889
commit 2f43c6b334
8 changed files with 874 additions and 722 deletions

View File

@@ -1,13 +1,10 @@
import type { EventEmitter } from "node:events";
import type { DiscordGatewayHandle } from "./monitor/gateway-handle.js";
import type {
DiscordGatewayEvent,
DiscordGatewaySupervisor,
} from "./monitor/gateway-supervisor.js";
export type DiscordGatewayHandle = {
disconnect?: () => void;
};
export type WaitForDiscordGatewayStopParams = {
gateway?: DiscordGatewayHandle;
abortSignal?: AbortSignal;

View File

@@ -0,0 +1,31 @@
import type { EventEmitter } from "node:events";
import type { GatewayPlugin } from "@buape/carbon/gateway";
export type DiscordGatewayHandle = Pick<GatewayPlugin, "disconnect"> & {
emitter?: EventEmitter;
};
type GatewaySocketListener = (...args: unknown[]) => void;
export type DiscordGatewaySocket = {
on: (event: "close" | "error", listener: GatewaySocketListener) => unknown;
listeners: (event: "close" | "error") => GatewaySocketListener[];
removeListener: (event: "close" | "error", listener: GatewaySocketListener) => unknown;
terminate?: () => void;
};
export type MutableDiscordGateway = GatewayPlugin & {
emitter?: EventEmitter;
options: Record<string, unknown> & {
reconnect?: {
maxAttempts?: number;
};
};
state?: {
sessionId?: string | null;
resumeGatewayUrl?: string | null;
sequence?: number | null;
};
sequence?: number | null;
ws?: DiscordGatewaySocket | null;
};

View File

@@ -0,0 +1,497 @@
import { createArmableStallWatchdog } from "openclaw/plugin-sdk/channel-lifecycle";
import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime";
import { danger } from "openclaw/plugin-sdk/runtime-env";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import type { MutableDiscordGateway } from "./gateway-handle.js";
import type { DiscordMonitorStatusSink } from "./status.js";
const DISCORD_GATEWAY_READY_TIMEOUT_MS = 15_000;
const DISCORD_GATEWAY_READY_POLL_MS = 250;
const DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS = 5_000;
const DISCORD_GATEWAY_FORCE_TERMINATE_CLOSE_TIMEOUT_MS = 1_000;
const DISCORD_GATEWAY_HELLO_TIMEOUT_MS = 30_000;
const DISCORD_GATEWAY_HELLO_CONNECTED_POLL_MS = 250;
const DISCORD_GATEWAY_MAX_CONSECUTIVE_HELLO_STALLS = 3;
const DISCORD_GATEWAY_RECONNECT_STALL_TIMEOUT_MS = 5 * 60_000;
type GatewayReadyWaitResult = "ready" | "timeout" | "stopped";
async function waitForDiscordGatewayReady(params: {
gateway?: Pick<MutableDiscordGateway, "isConnected">;
abortSignal?: AbortSignal;
timeoutMs: number;
beforePoll?: () => Promise<"continue" | "stop"> | "continue" | "stop";
}): Promise<GatewayReadyWaitResult> {
const deadlineAt = Date.now() + params.timeoutMs;
while (!params.abortSignal?.aborted) {
const pollDecision = await params.beforePoll?.();
if (pollDecision === "stop") {
return "stopped";
}
if (params.gateway?.isConnected) {
return "ready";
}
if (Date.now() >= deadlineAt) {
return "timeout";
}
await new Promise<void>((resolve) => {
const timeout = setTimeout(resolve, DISCORD_GATEWAY_READY_POLL_MS);
timeout.unref?.();
});
}
return "stopped";
}
export function createDiscordGatewayReconnectController(params: {
accountId: string;
gateway?: MutableDiscordGateway;
runtime: RuntimeEnv;
abortSignal?: AbortSignal;
pushStatus: (patch: Parameters<DiscordMonitorStatusSink>[0]) => void;
isLifecycleStopping: () => boolean;
drainPendingGatewayErrors: () => "continue" | "stop";
}) {
let forceStopHandler: ((err: unknown) => void) | undefined;
let queuedForceStopError: unknown;
let helloTimeoutId: ReturnType<typeof setTimeout> | undefined;
let helloConnectedPollId: ReturnType<typeof setInterval> | undefined;
let reconnectInFlight: Promise<void> | undefined;
let consecutiveHelloStalls = 0;
const shouldStop = () => params.isLifecycleStopping() || params.abortSignal?.aborted;
const resetHelloStallCounter = () => {
consecutiveHelloStalls = 0;
};
const clearHelloWatch = () => {
if (helloTimeoutId) {
clearTimeout(helloTimeoutId);
helloTimeoutId = undefined;
}
if (helloConnectedPollId) {
clearInterval(helloConnectedPollId);
helloConnectedPollId = undefined;
}
};
const parseGatewayCloseCode = (message: string): number | undefined => {
const match = /code\s+(\d{3,5})/i.exec(message);
if (!match?.[1]) {
return undefined;
}
const code = Number.parseInt(match[1], 10);
return Number.isFinite(code) ? code : undefined;
};
const clearResumeState = () => {
if (!params.gateway?.state) {
return;
}
params.gateway.state.sessionId = null;
params.gateway.state.resumeGatewayUrl = null;
params.gateway.state.sequence = null;
params.gateway.sequence = null;
};
const triggerForceStop = (err: unknown) => {
if (forceStopHandler) {
forceStopHandler(err);
return;
}
queuedForceStopError = err;
};
const reconnectStallWatchdog = createArmableStallWatchdog({
label: `discord:${params.accountId}:reconnect`,
timeoutMs: DISCORD_GATEWAY_RECONNECT_STALL_TIMEOUT_MS,
abortSignal: params.abortSignal,
runtime: params.runtime,
onTimeout: () => {
if (shouldStop()) {
return;
}
const at = Date.now();
const error = new Error(
`discord reconnect watchdog timeout after ${DISCORD_GATEWAY_RECONNECT_STALL_TIMEOUT_MS}ms`,
);
params.pushStatus({
connected: false,
lastEventAt: at,
lastDisconnect: {
at,
error: error.message,
},
lastError: error.message,
});
params.runtime.error?.(
danger(
`discord: reconnect watchdog timeout after ${DISCORD_GATEWAY_RECONNECT_STALL_TIMEOUT_MS}ms; force-stopping monitor task`,
),
);
triggerForceStop(error);
},
});
const pushConnectedStatus = (at: number) => {
params.pushStatus({
...createConnectedChannelStatusPatch(at),
lastDisconnect: null,
});
};
const disconnectGatewaySocketWithoutAutoReconnect = async () => {
if (!params.gateway) {
return;
}
const gateway = params.gateway;
const socket = gateway.ws;
if (!socket) {
gateway.disconnect();
return;
}
// Carbon reconnects from the socket close handler even for intentional
// disconnects. Drop the current socket's close/error listeners so a forced
// reconnect does not race the old socket's automatic resume path.
for (const listener of socket.listeners("close")) {
socket.removeListener("close", listener);
}
for (const listener of socket.listeners("error")) {
socket.removeListener("error", listener);
}
await new Promise<void>((resolve, reject) => {
let settled = false;
let drainTimeout: ReturnType<typeof setTimeout> | undefined;
let terminateCloseTimeout: ReturnType<typeof setTimeout> | undefined;
const ignoreSocketError = () => {};
const clearPendingTimers = () => {
if (drainTimeout) {
clearTimeout(drainTimeout);
drainTimeout = undefined;
}
if (terminateCloseTimeout) {
clearTimeout(terminateCloseTimeout);
terminateCloseTimeout = undefined;
}
};
const cleanup = () => {
clearPendingTimers();
socket.removeListener("close", onClose);
socket.removeListener("error", ignoreSocketError);
};
const onClose = () => {
cleanup();
if (settled) {
return;
}
settled = true;
resolve();
};
const resolveStoppedWait = () => {
if (settled) {
return;
}
settled = true;
clearPendingTimers();
resolve();
};
const rejectClose = (error: Error) => {
if (shouldStop()) {
resolveStoppedWait();
return;
}
if (settled) {
return;
}
settled = true;
clearPendingTimers();
reject(error);
};
drainTimeout = setTimeout(() => {
if (settled) {
return;
}
if (shouldStop()) {
resolveStoppedWait();
return;
}
params.runtime.error?.(
danger(
`discord: gateway socket did not close within ${DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS}ms before reconnect; attempting forced terminate before giving up`,
),
);
let terminateStarted = false;
try {
if (typeof socket.terminate === "function") {
socket.terminate();
terminateStarted = true;
}
} catch {
// Best-effort only. If terminate fails, fail closed instead of
// opening another socket on top of an unknown old one.
}
if (!terminateStarted) {
params.runtime.error?.(
danger(
`discord: gateway socket did not expose a working terminate() after ${DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS}ms; force-stopping instead of opening a parallel socket`,
),
);
rejectClose(
new Error(
`discord gateway socket did not close within ${DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS}ms before reconnect`,
),
);
return;
}
terminateCloseTimeout = setTimeout(() => {
if (settled) {
return;
}
if (shouldStop()) {
resolveStoppedWait();
return;
}
params.runtime.error?.(
danger(
`discord: gateway socket did not close ${DISCORD_GATEWAY_FORCE_TERMINATE_CLOSE_TIMEOUT_MS}ms after forced terminate; force-stopping instead of opening a parallel socket`,
),
);
rejectClose(
new Error(
`discord gateway socket did not close within ${DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS}ms before reconnect`,
),
);
}, DISCORD_GATEWAY_FORCE_TERMINATE_CLOSE_TIMEOUT_MS);
terminateCloseTimeout.unref?.();
}, DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS);
drainTimeout.unref?.();
socket.on("error", ignoreSocketError);
socket.on("close", onClose);
gateway.disconnect();
});
};
const reconnectGateway = async (reconnectParams: {
resume: boolean;
forceFreshIdentify?: boolean;
}) => {
if (reconnectInFlight) {
return await reconnectInFlight;
}
reconnectInFlight = (async () => {
if (reconnectParams.forceFreshIdentify) {
// Carbon still sends RESUME on HELLO when session state is populated,
// even after connect(false). Clear cached session data first so this
// path truly forces a fresh IDENTIFY.
clearResumeState();
}
if (shouldStop()) {
return;
}
await disconnectGatewaySocketWithoutAutoReconnect();
if (shouldStop()) {
return;
}
params.gateway?.connect(reconnectParams.resume);
})().finally(() => {
reconnectInFlight = undefined;
});
return await reconnectInFlight;
};
const reconnectGatewayFresh = async () => {
await reconnectGateway({ resume: false, forceFreshIdentify: true });
};
const onGatewayDebug = (msg: unknown) => {
const message = String(msg);
const at = Date.now();
params.pushStatus({ lastEventAt: at });
if (message.includes("WebSocket connection closed")) {
if (params.gateway?.isConnected) {
resetHelloStallCounter();
}
reconnectStallWatchdog.arm(at);
params.pushStatus({
connected: false,
lastDisconnect: {
at,
status: parseGatewayCloseCode(message),
},
});
clearHelloWatch();
return;
}
if (!message.includes("WebSocket connection opened")) {
return;
}
reconnectStallWatchdog.disarm();
clearHelloWatch();
let sawConnected = params.gateway?.isConnected === true;
if (sawConnected) {
pushConnectedStatus(at);
}
helloConnectedPollId = setInterval(() => {
if (!params.gateway?.isConnected) {
return;
}
sawConnected = true;
resetHelloStallCounter();
reconnectStallWatchdog.disarm();
pushConnectedStatus(Date.now());
if (helloConnectedPollId) {
clearInterval(helloConnectedPollId);
helloConnectedPollId = undefined;
}
}, DISCORD_GATEWAY_HELLO_CONNECTED_POLL_MS);
helloTimeoutId = setTimeout(() => {
helloTimeoutId = undefined;
void (async () => {
try {
if (helloConnectedPollId) {
clearInterval(helloConnectedPollId);
helloConnectedPollId = undefined;
}
if (sawConnected || params.gateway?.isConnected) {
resetHelloStallCounter();
return;
}
consecutiveHelloStalls += 1;
const forceFreshIdentify =
consecutiveHelloStalls >= DISCORD_GATEWAY_MAX_CONSECUTIVE_HELLO_STALLS;
const stalledAt = Date.now();
reconnectStallWatchdog.arm(stalledAt);
params.pushStatus({
connected: false,
lastEventAt: stalledAt,
lastDisconnect: {
at: stalledAt,
error: "hello-timeout",
},
});
params.runtime.log?.(
danger(
forceFreshIdentify
? `connection stalled: no HELLO within ${DISCORD_GATEWAY_HELLO_TIMEOUT_MS}ms (${consecutiveHelloStalls}/${DISCORD_GATEWAY_MAX_CONSECUTIVE_HELLO_STALLS}); forcing fresh identify`
: `connection stalled: no HELLO within ${DISCORD_GATEWAY_HELLO_TIMEOUT_MS}ms (${consecutiveHelloStalls}/${DISCORD_GATEWAY_MAX_CONSECUTIVE_HELLO_STALLS}); retrying resume`,
),
);
if (forceFreshIdentify) {
resetHelloStallCounter();
}
if (shouldStop()) {
return;
}
if (forceFreshIdentify) {
await reconnectGatewayFresh();
return;
}
await reconnectGateway({ resume: true });
} catch (err) {
params.runtime.error?.(
danger(`discord: failed to restart stalled gateway socket: ${String(err)}`),
);
triggerForceStop(err);
}
})();
}, DISCORD_GATEWAY_HELLO_TIMEOUT_MS);
};
const onAbort = () => {
reconnectStallWatchdog.disarm();
const at = Date.now();
params.pushStatus({ connected: false, lastEventAt: at });
if (!params.gateway) {
return;
}
params.gateway.options.reconnect = { maxAttempts: 0 };
params.gateway.disconnect();
};
const ensureStartupReady = async () => {
if (!params.gateway || params.gateway.isConnected || shouldStop()) {
if (params.gateway?.isConnected && !shouldStop()) {
pushConnectedStatus(Date.now());
}
return;
}
const initialReady = await waitForDiscordGatewayReady({
gateway: params.gateway,
abortSignal: params.abortSignal,
timeoutMs: DISCORD_GATEWAY_READY_TIMEOUT_MS,
beforePoll: params.drainPendingGatewayErrors,
});
if (initialReady === "stopped" || shouldStop()) {
return;
}
if (initialReady === "timeout") {
params.runtime.error?.(
danger(
`discord: gateway was not ready after ${DISCORD_GATEWAY_READY_TIMEOUT_MS}ms; forcing a fresh reconnect`,
),
);
const startupRetryAt = Date.now();
params.pushStatus({
connected: false,
lastEventAt: startupRetryAt,
lastDisconnect: {
at: startupRetryAt,
error: "startup-not-ready",
},
});
await reconnectGatewayFresh();
const reconnected = await waitForDiscordGatewayReady({
gateway: params.gateway,
abortSignal: params.abortSignal,
timeoutMs: DISCORD_GATEWAY_READY_TIMEOUT_MS,
beforePoll: params.drainPendingGatewayErrors,
});
if (reconnected === "stopped" || shouldStop()) {
return;
}
if (reconnected === "timeout") {
const error = new Error(
`discord gateway did not reach READY within ${DISCORD_GATEWAY_READY_TIMEOUT_MS}ms after a forced reconnect`,
);
const startupFailureAt = Date.now();
params.pushStatus({
connected: false,
lastEventAt: startupFailureAt,
lastDisconnect: {
at: startupFailureAt,
error: "startup-reconnect-timeout",
},
lastError: error.message,
});
throw error;
}
}
if (params.gateway.isConnected && !shouldStop()) {
pushConnectedStatus(Date.now());
}
};
if (params.abortSignal?.aborted) {
onAbort();
} else {
params.abortSignal?.addEventListener("abort", onAbort, { once: true });
}
return {
ensureStartupReady,
onAbort,
onGatewayDebug,
clearHelloWatch,
registerForceStop: (handler: (err: unknown) => void) => {
forceStopHandler = handler;
if (queuedForceStopError !== undefined) {
const queued = queuedForceStopError;
queuedForceStopError = undefined;
handler(queued);
}
},
dispose: () => {
reconnectStallWatchdog.stop();
clearHelloWatch();
params.abortSignal?.removeEventListener("abort", onAbort);
},
};
}

View File

@@ -4,6 +4,7 @@ import { beforeEach, describe, expect, it, vi, type Mock } from "vitest";
import type { RuntimeEnv } from "../../../../src/runtime.js";
import type { WaitForDiscordGatewayStopParams } from "../monitor.gateway.js";
import type { DiscordGatewayEvent } from "./gateway-supervisor.js";
type LifecycleParams = Parameters<
typeof import("./provider.lifecycle.js").runDiscordGatewayLifecycle
>[0];
@@ -164,7 +165,7 @@ describe("runDiscordGatewayLifecycle", () => {
};
sequence?: number | null;
ws?: EventEmitter & { terminate?: () => void };
}) {
}): { emitter: EventEmitter; gateway: MockGateway } {
const emitter = new EventEmitter();
const gateway: MockGateway = {
isConnected: false,
@@ -527,7 +528,7 @@ describe("runDiscordGatewayLifecycle", () => {
start,
stop,
threadStop,
waitCalls: 0,
waitCalls: 1,
gatewaySupervisor,
});
} finally {
@@ -865,7 +866,7 @@ describe("runDiscordGatewayLifecycle", () => {
disconnect: vi.fn(),
connect: vi.fn(),
emitter,
};
} as unknown as TestGateway;
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
const abortController = new AbortController();

View File

@@ -1,13 +1,12 @@
import type { GatewayPlugin } from "@buape/carbon/gateway";
import { createArmableStallWatchdog } from "openclaw/plugin-sdk/channel-lifecycle";
import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime";
import { danger } from "openclaw/plugin-sdk/runtime-env";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import { attachDiscordGatewayLogging } from "../gateway-logging.js";
import { getDiscordGatewayEmitter, waitForDiscordGatewayStop } from "../monitor.gateway.js";
import type { DiscordVoiceManager } from "../voice/manager.js";
import type { MutableDiscordGateway } from "./gateway-handle.js";
import { registerGateway, unregisterGateway } from "./gateway-registry.js";
import type { DiscordGatewayEvent, DiscordGatewaySupervisor } from "./gateway-supervisor.js";
import { createDiscordGatewayReconnectController } from "./provider.lifecycle.reconnect.js";
import type { DiscordMonitorStatusSink } from "./status.js";
type ExecApprovalsHandler = {
@@ -15,61 +14,9 @@ type ExecApprovalsHandler = {
stop: () => Promise<void>;
};
const DISCORD_GATEWAY_READY_TIMEOUT_MS = 15_000;
const DISCORD_GATEWAY_READY_POLL_MS = 250;
const DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS = 5_000;
const DISCORD_GATEWAY_FORCE_TERMINATE_CLOSE_TIMEOUT_MS = 1_000;
type GatewaySocketListener = (...args: unknown[]) => void;
type DiscordGatewaySocket = {
on: (event: "close" | "error", listener: GatewaySocketListener) => unknown;
listeners: (event: "close" | "error") => GatewaySocketListener[];
removeListener: (event: "close" | "error", listener: GatewaySocketListener) => unknown;
terminate?: () => void;
};
type MutableGateway = GatewayPlugin & {
state?: {
sessionId?: string | null;
resumeGatewayUrl?: string | null;
sequence?: number | null;
};
sequence?: number | null;
ws?: DiscordGatewaySocket | null;
};
type GatewayReadyWaitResult = "ready" | "timeout" | "stopped";
async function waitForDiscordGatewayReady(params: {
gateway?: Pick<GatewayPlugin, "isConnected">;
abortSignal?: AbortSignal;
timeoutMs: number;
beforePoll?: () => Promise<"continue" | "stop"> | "continue" | "stop";
}): Promise<GatewayReadyWaitResult> {
const deadlineAt = Date.now() + params.timeoutMs;
while (!params.abortSignal?.aborted) {
const pollDecision = await params.beforePoll?.();
if (pollDecision === "stop") {
return "stopped";
}
if (params.gateway?.isConnected) {
return "ready";
}
if (Date.now() >= deadlineAt) {
return "timeout";
}
await new Promise<void>((resolve) => {
const timeout = setTimeout(resolve, DISCORD_GATEWAY_READY_POLL_MS);
timeout.unref?.();
});
}
return "stopped";
}
export async function runDiscordGatewayLifecycle(params: {
accountId: string;
gateway?: GatewayPlugin;
gateway?: MutableDiscordGateway;
runtime: RuntimeEnv;
abortSignal?: AbortSignal;
isDisallowedIntentsError: (err: unknown) => boolean;
@@ -80,10 +27,6 @@ export async function runDiscordGatewayLifecycle(params: {
gatewaySupervisor: DiscordGatewaySupervisor;
statusSink?: DiscordMonitorStatusSink;
}) {
const HELLO_TIMEOUT_MS = 30000;
const HELLO_CONNECTED_POLL_MS = 250;
const MAX_CONSECUTIVE_HELLO_STALLS = 3;
const RECONNECT_STALL_TIMEOUT_MS = 5 * 60_000;
const gateway = params.gateway;
if (gateway) {
registerGateway(params.accountId, gateway);
@@ -94,384 +37,20 @@ export async function runDiscordGatewayLifecycle(params: {
runtime: params.runtime,
});
let lifecycleStopping = false;
let forceStopHandler: ((err: unknown) => void) | undefined;
let queuedForceStopError: unknown;
const pushStatus = (patch: Parameters<DiscordMonitorStatusSink>[0]) => {
params.statusSink?.(patch);
};
const triggerForceStop = (err: unknown) => {
if (forceStopHandler) {
forceStopHandler(err);
return;
}
queuedForceStopError = err;
};
const reconnectStallWatchdog = createArmableStallWatchdog({
label: `discord:${params.accountId}:reconnect`,
timeoutMs: RECONNECT_STALL_TIMEOUT_MS,
abortSignal: params.abortSignal,
const reconnectController = createDiscordGatewayReconnectController({
accountId: params.accountId,
gateway,
runtime: params.runtime,
onTimeout: () => {
if (params.abortSignal?.aborted || lifecycleStopping) {
return;
}
const at = Date.now();
const error = new Error(
`discord reconnect watchdog timeout after ${RECONNECT_STALL_TIMEOUT_MS}ms`,
);
pushStatus({
connected: false,
lastEventAt: at,
lastDisconnect: {
at,
error: error.message,
},
lastError: error.message,
});
params.runtime.error?.(
danger(
`discord: reconnect watchdog timeout after ${RECONNECT_STALL_TIMEOUT_MS}ms; force-stopping monitor task`,
),
);
triggerForceStop(error);
},
abortSignal: params.abortSignal,
pushStatus,
isLifecycleStopping: () => lifecycleStopping,
drainPendingGatewayErrors: () => drainPendingGatewayErrors(),
});
const onAbort = () => {
lifecycleStopping = true;
reconnectStallWatchdog.disarm();
const at = Date.now();
pushStatus({ connected: false, lastEventAt: at });
if (!gateway) {
return;
}
gateway.options.reconnect = { maxAttempts: 0 };
gateway.disconnect();
};
if (params.abortSignal?.aborted) {
onAbort();
} else {
params.abortSignal?.addEventListener("abort", onAbort, { once: true });
}
let helloTimeoutId: ReturnType<typeof setTimeout> | undefined;
let helloConnectedPollId: ReturnType<typeof setInterval> | undefined;
let consecutiveHelloStalls = 0;
const clearHelloWatch = () => {
if (helloTimeoutId) {
clearTimeout(helloTimeoutId);
helloTimeoutId = undefined;
}
if (helloConnectedPollId) {
clearInterval(helloConnectedPollId);
helloConnectedPollId = undefined;
}
};
const resetHelloStallCounter = () => {
consecutiveHelloStalls = 0;
};
const parseGatewayCloseCode = (message: string): number | undefined => {
const match = /code\s+(\d{3,5})/i.exec(message);
if (!match?.[1]) {
return undefined;
}
const code = Number.parseInt(match[1], 10);
return Number.isFinite(code) ? code : undefined;
};
const clearResumeState = () => {
const mutableGateway = gateway as MutableGateway | undefined;
if (!mutableGateway?.state) {
return;
}
mutableGateway.state.sessionId = null;
mutableGateway.state.resumeGatewayUrl = null;
mutableGateway.state.sequence = null;
mutableGateway.sequence = null;
};
const disconnectGatewaySocketWithoutAutoReconnect = async () => {
if (!gateway) {
return;
}
const mutableGateway = gateway as MutableGateway;
const socket = mutableGateway.ws;
if (!socket) {
gateway.disconnect();
return;
}
// Carbon reconnects from the socket close handler even for intentional
// disconnects. Drop the current socket's close/error listeners so a forced
// reconnect does not race the old socket's automatic resume path.
for (const listener of socket.listeners("close")) {
socket.removeListener("close", listener);
}
for (const listener of socket.listeners("error")) {
socket.removeListener("error", listener);
}
await new Promise<void>((resolve, reject) => {
let settled = false;
let drainTimeout: ReturnType<typeof setTimeout> | undefined;
let terminateCloseTimeout: ReturnType<typeof setTimeout> | undefined;
const ignoreSocketError = () => {};
const shouldStopWaiting = () => lifecycleStopping || params.abortSignal?.aborted;
const clearPendingTimers = () => {
if (drainTimeout) {
clearTimeout(drainTimeout);
drainTimeout = undefined;
}
if (terminateCloseTimeout) {
clearTimeout(terminateCloseTimeout);
terminateCloseTimeout = undefined;
}
};
const cleanup = () => {
clearPendingTimers();
socket.removeListener("close", onClose);
socket.removeListener("error", ignoreSocketError);
};
const onClose = () => {
cleanup();
if (settled) {
return;
}
settled = true;
resolve();
};
const resolveStoppedWait = () => {
if (settled) {
return;
}
settled = true;
clearPendingTimers();
// Keep suppressing late ws errors until the socket actually closes.
// The original Carbon listeners were removed above, and `terminate()`
// can still asynchronously emit "error" before "close".
resolve();
};
const rejectClose = (error: Error) => {
if (shouldStopWaiting()) {
resolveStoppedWait();
return;
}
if (settled) {
return;
}
settled = true;
clearPendingTimers();
// Keep suppressing late ws errors until the socket actually closes.
// The original Carbon listeners were removed above, and `terminate()`
// can still asynchronously emit "error" before "close".
reject(error);
};
drainTimeout = setTimeout(() => {
if (settled) {
return;
}
if (shouldStopWaiting()) {
resolveStoppedWait();
return;
}
params.runtime.error?.(
danger(
`discord: gateway socket did not close within ${DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS}ms before reconnect; attempting forced terminate before giving up`,
),
);
let terminateStarted = false;
try {
if (typeof socket.terminate === "function") {
socket.terminate();
terminateStarted = true;
}
} catch {
// Best-effort only. If terminate fails, fail closed instead of
// opening another socket on top of an unknown old one.
}
if (!terminateStarted) {
params.runtime.error?.(
danger(
`discord: gateway socket did not expose a working terminate() after ${DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS}ms; force-stopping instead of opening a parallel socket`,
),
);
rejectClose(
new Error(
`discord gateway socket did not close within ${DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS}ms before reconnect`,
),
);
return;
}
terminateCloseTimeout = setTimeout(() => {
if (settled) {
return;
}
if (shouldStopWaiting()) {
resolveStoppedWait();
return;
}
params.runtime.error?.(
danger(
`discord: gateway socket did not close ${DISCORD_GATEWAY_FORCE_TERMINATE_CLOSE_TIMEOUT_MS}ms after forced terminate; force-stopping instead of opening a parallel socket`,
),
);
rejectClose(
new Error(
`discord gateway socket did not close within ${DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS}ms before reconnect`,
),
);
}, DISCORD_GATEWAY_FORCE_TERMINATE_CLOSE_TIMEOUT_MS);
terminateCloseTimeout.unref?.();
}, DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS);
drainTimeout.unref?.();
socket.on("error", ignoreSocketError);
socket.on("close", onClose);
gateway.disconnect();
});
};
let reconnectInFlight: Promise<void> | undefined;
const reconnectGateway = async (reconnectParams: {
resume: boolean;
forceFreshIdentify?: boolean;
}) => {
if (reconnectInFlight) {
return await reconnectInFlight;
}
reconnectInFlight = (async () => {
if (reconnectParams.forceFreshIdentify) {
// Carbon still sends RESUME on HELLO when session state is populated,
// even after connect(false). Clear cached session data first so this
// path truly forces a fresh IDENTIFY.
clearResumeState();
}
if (lifecycleStopping || params.abortSignal?.aborted) {
return;
}
await disconnectGatewaySocketWithoutAutoReconnect();
if (lifecycleStopping || params.abortSignal?.aborted) {
return;
}
gateway?.connect(reconnectParams.resume);
})().finally(() => {
reconnectInFlight = undefined;
});
return await reconnectInFlight;
};
const reconnectGatewayFresh = async () => {
await reconnectGateway({ resume: false, forceFreshIdentify: true });
};
const onGatewayDebug = (msg: unknown) => {
const message = String(msg);
const at = Date.now();
pushStatus({ lastEventAt: at });
if (message.includes("WebSocket connection closed")) {
// Carbon marks `isConnected` true only after READY/RESUMED and flips it
// false during reconnect handling after this debug line is emitted.
if (gateway?.isConnected) {
resetHelloStallCounter();
}
reconnectStallWatchdog.arm(at);
pushStatus({
connected: false,
lastDisconnect: {
at,
status: parseGatewayCloseCode(message),
},
});
clearHelloWatch();
return;
}
if (!message.includes("WebSocket connection opened")) {
return;
}
reconnectStallWatchdog.disarm();
clearHelloWatch();
let sawConnected = gateway?.isConnected === true;
if (sawConnected) {
pushStatus({
...createConnectedChannelStatusPatch(at),
lastDisconnect: null,
});
}
helloConnectedPollId = setInterval(() => {
if (!gateway?.isConnected) {
return;
}
sawConnected = true;
resetHelloStallCounter();
const connectedAt = Date.now();
reconnectStallWatchdog.disarm();
pushStatus({
...createConnectedChannelStatusPatch(connectedAt),
lastDisconnect: null,
});
if (helloConnectedPollId) {
clearInterval(helloConnectedPollId);
helloConnectedPollId = undefined;
}
}, HELLO_CONNECTED_POLL_MS);
helloTimeoutId = setTimeout(() => {
helloTimeoutId = undefined;
void (async () => {
try {
if (helloConnectedPollId) {
clearInterval(helloConnectedPollId);
helloConnectedPollId = undefined;
}
if (sawConnected || gateway?.isConnected) {
resetHelloStallCounter();
return;
}
consecutiveHelloStalls += 1;
const forceFreshIdentify = consecutiveHelloStalls >= MAX_CONSECUTIVE_HELLO_STALLS;
const stalledAt = Date.now();
reconnectStallWatchdog.arm(stalledAt);
pushStatus({
connected: false,
lastEventAt: stalledAt,
lastDisconnect: {
at: stalledAt,
error: "hello-timeout",
},
});
params.runtime.log?.(
danger(
forceFreshIdentify
? `connection stalled: no HELLO within ${HELLO_TIMEOUT_MS}ms (${consecutiveHelloStalls}/${MAX_CONSECUTIVE_HELLO_STALLS}); forcing fresh identify`
: `connection stalled: no HELLO within ${HELLO_TIMEOUT_MS}ms (${consecutiveHelloStalls}/${MAX_CONSECUTIVE_HELLO_STALLS}); retrying resume`,
),
);
if (forceFreshIdentify) {
resetHelloStallCounter();
}
if (lifecycleStopping || params.abortSignal?.aborted) {
return;
}
if (forceFreshIdentify) {
await reconnectGatewayFresh();
return;
}
await reconnectGateway({ resume: true });
} catch (err) {
params.runtime.error?.(
danger(`discord: failed to restart stalled gateway socket: ${String(err)}`),
);
triggerForceStop(err);
}
})();
}, HELLO_TIMEOUT_MS);
};
const onGatewayDebug = reconnectController.onGatewayDebug;
gatewayEmitter?.on("debug", onGatewayDebug);
let sawDisallowedIntents = false;
@@ -524,76 +103,7 @@ export async function runDiscordGatewayLifecycle(params: {
return;
}
// Carbon starts the gateway during client construction, before OpenClaw can
// attach lifecycle listeners. Require a READY/RESUMED-connected gateway
// before continuing so the monitor does not look healthy while silently
// missing inbound events.
if (gateway && !gateway.isConnected && !lifecycleStopping) {
const initialReady = await waitForDiscordGatewayReady({
gateway,
abortSignal: params.abortSignal,
timeoutMs: DISCORD_GATEWAY_READY_TIMEOUT_MS,
beforePoll: drainPendingGatewayErrors,
});
if (initialReady === "stopped" || lifecycleStopping) {
return;
}
if (initialReady === "timeout" && !lifecycleStopping) {
params.runtime.error?.(
danger(
`discord: gateway was not ready after ${DISCORD_GATEWAY_READY_TIMEOUT_MS}ms; forcing a fresh reconnect`,
),
);
const startupRetryAt = Date.now();
pushStatus({
connected: false,
lastEventAt: startupRetryAt,
lastDisconnect: {
at: startupRetryAt,
error: "startup-not-ready",
},
});
await reconnectGatewayFresh();
const reconnected = await waitForDiscordGatewayReady({
gateway,
abortSignal: params.abortSignal,
timeoutMs: DISCORD_GATEWAY_READY_TIMEOUT_MS,
beforePoll: drainPendingGatewayErrors,
});
if (reconnected === "stopped" || lifecycleStopping) {
return;
}
if (reconnected === "timeout" && !lifecycleStopping) {
const error = new Error(
`discord gateway did not reach READY within ${DISCORD_GATEWAY_READY_TIMEOUT_MS}ms after a forced reconnect`,
);
const startupFailureAt = Date.now();
pushStatus({
connected: false,
lastEventAt: startupFailureAt,
lastDisconnect: {
at: startupFailureAt,
error: "startup-reconnect-timeout",
},
lastError: error.message,
});
throw error;
}
}
}
// If the gateway is already connected when the lifecycle starts (or becomes
// connected during the startup readiness guard), push the initial connected
// status now. Guard against lifecycleStopping: if the abortSignal was
// already aborted, onAbort() ran synchronously above and pushed connected:
// false, so don't contradict it with a spurious connected: true.
if (gateway?.isConnected && !lifecycleStopping) {
const at = Date.now();
pushStatus({
...createConnectedChannelStatusPatch(at),
lastDisconnect: null,
});
}
await reconnectController.ensureStartupReady();
if (drainPendingGatewayErrors() === "stop") {
return;
@@ -608,14 +118,7 @@ export async function runDiscordGatewayLifecycle(params: {
abortSignal: params.abortSignal,
gatewaySupervisor: params.gatewaySupervisor,
onGatewayEvent: handleGatewayEvent,
registerForceStop: (forceStop) => {
forceStopHandler = forceStop;
if (queuedForceStopError !== undefined) {
const queued = queuedForceStopError;
queuedForceStopError = undefined;
forceStop(queued);
}
},
registerForceStop: reconnectController.registerForceStop,
});
} catch (err) {
if (!sawDisallowedIntents && !params.isDisallowedIntentsError(err)) {
@@ -626,10 +129,8 @@ export async function runDiscordGatewayLifecycle(params: {
params.gatewaySupervisor.detachLifecycle();
unregisterGateway(params.accountId);
stopGatewayLogging();
reconnectStallWatchdog.stop();
clearHelloWatch();
reconnectController.dispose();
gatewayEmitter?.removeListener("debug", onGatewayDebug);
params.abortSignal?.removeEventListener("abort", onAbort);
if (params.voiceManager) {
await params.voiceManager.destroy();
params.voiceManagerRef.current = null;

View File

@@ -0,0 +1,237 @@
import {
Client,
ReadyListener,
type BaseCommand,
type BaseMessageInteractiveComponent,
type Modal,
type Plugin,
} from "@buape/carbon";
import type { GatewayPlugin } from "@buape/carbon/gateway";
import { VoicePlugin } from "@buape/carbon/voice";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
import { isDangerousNameMatchingEnabled } from "openclaw/plugin-sdk/config-runtime";
import { danger } from "openclaw/plugin-sdk/runtime-env";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import type { DiscordGuildEntryResolved } from "./allow-list.js";
import { createDiscordAutoPresenceController } from "./auto-presence.js";
import type { DiscordDmPolicy } from "./dm-command-auth.js";
import type { MutableDiscordGateway } from "./gateway-handle.js";
import { createDiscordGatewayPlugin } from "./gateway-plugin.js";
import { createDiscordGatewaySupervisor } from "./gateway-supervisor.js";
import {
DiscordMessageListener,
DiscordPresenceListener,
DiscordReactionListener,
DiscordReactionRemoveListener,
DiscordThreadUpdateListener,
registerDiscordListener,
} from "./listeners.js";
import { resolveDiscordPresenceUpdate } from "./presence.js";
type DiscordAutoPresenceController = ReturnType<typeof createDiscordAutoPresenceController>;
type DiscordListenerConfig = {
dangerouslyAllowNameMatching?: boolean;
intents?: { presence?: boolean };
};
type CreateClientFn = (
options: ConstructorParameters<typeof Client>[0],
handlers: ConstructorParameters<typeof Client>[1],
plugins: ConstructorParameters<typeof Client>[2],
) => Client;
export function createDiscordStatusReadyListener(params: {
discordConfig: Parameters<typeof resolveDiscordPresenceUpdate>[0];
getAutoPresenceController: () => DiscordAutoPresenceController | null;
}): ReadyListener {
return new (class DiscordStatusReadyListener extends ReadyListener {
async handle(_data: unknown, client: Client) {
const autoPresenceController = params.getAutoPresenceController();
if (autoPresenceController?.enabled) {
autoPresenceController.refresh();
return;
}
const gateway = client.getPlugin<GatewayPlugin>("gateway");
if (!gateway) {
return;
}
const presence = resolveDiscordPresenceUpdate(params.discordConfig);
if (!presence) {
return;
}
gateway.updatePresence(presence);
}
})();
}
export function createDiscordMonitorClient(params: {
accountId: string;
applicationId: string;
token: string;
commands: BaseCommand[];
components: BaseMessageInteractiveComponent[];
modals: Modal[];
voiceEnabled: boolean;
discordConfig: Parameters<typeof resolveDiscordPresenceUpdate>[0] & {
eventQueue?: { listenerTimeout?: number };
};
runtime: RuntimeEnv;
createClient: CreateClientFn;
createGatewayPlugin: typeof createDiscordGatewayPlugin;
createGatewaySupervisor: typeof createDiscordGatewaySupervisor;
createAutoPresenceController: typeof createDiscordAutoPresenceController;
isDisallowedIntentsError: (err: unknown) => boolean;
}) {
let autoPresenceController: DiscordAutoPresenceController | null = null;
const clientPlugins: Plugin[] = [
params.createGatewayPlugin({
discordConfig: params.discordConfig,
runtime: params.runtime,
}),
];
if (params.voiceEnabled) {
clientPlugins.push(new VoicePlugin());
}
// Pass eventQueue config to Carbon so the gateway listener budget can be tuned.
// Default listenerTimeout is 120s (Carbon defaults to 30s, which is too short for some
// Discord normalization/enqueue work).
const eventQueueOpts = {
listenerTimeout: 120_000,
...params.discordConfig.eventQueue,
};
const readyListener = createDiscordStatusReadyListener({
discordConfig: params.discordConfig,
getAutoPresenceController: () => autoPresenceController,
});
const client = params.createClient(
{
baseUrl: "http://localhost",
deploySecret: "a",
clientId: params.applicationId,
publicKey: "a",
token: params.token,
autoDeploy: false,
eventQueue: eventQueueOpts,
},
{
commands: params.commands,
listeners: [readyListener],
components: params.components,
modals: params.modals,
},
clientPlugins,
);
const gateway = client.getPlugin<GatewayPlugin>("gateway") as MutableDiscordGateway | undefined;
const gatewaySupervisor = params.createGatewaySupervisor({
gateway,
isDisallowedIntentsError: params.isDisallowedIntentsError,
runtime: params.runtime,
});
if (gateway) {
autoPresenceController = params.createAutoPresenceController({
accountId: params.accountId,
discordConfig: params.discordConfig,
gateway,
log: (message) => params.runtime.log?.(message),
});
autoPresenceController.start();
}
return {
client,
gateway,
gatewaySupervisor,
autoPresenceController,
eventQueueOpts,
};
}
export async function fetchDiscordBotIdentity(params: {
client: Pick<Client, "fetchUser">;
runtime: RuntimeEnv;
logStartupPhase: (phase: string, details?: string) => void;
}) {
params.logStartupPhase("fetch-bot-identity:start");
try {
const botUser = await params.client.fetchUser("@me");
const botUserId = botUser?.id;
const botUserName = botUser?.username?.trim() || botUser?.globalName?.trim() || undefined;
params.logStartupPhase(
"fetch-bot-identity:done",
`botUserId=${botUserId ?? "<missing>"} botUserName=${botUserName ?? "<missing>"}`,
);
return { botUserId, botUserName };
} catch (err) {
params.runtime.error?.(danger(`discord: failed to fetch bot identity: ${String(err)}`));
params.logStartupPhase("fetch-bot-identity:error", String(err));
return { botUserId: undefined, botUserName: undefined };
}
}
export function registerDiscordMonitorListeners(params: {
cfg: OpenClawConfig;
client: Pick<Client, "listeners">;
accountId: string;
discordConfig: DiscordListenerConfig;
runtime: RuntimeEnv;
botUserId?: string;
dmEnabled: boolean;
groupDmEnabled: boolean;
groupDmChannels?: string[];
dmPolicy: DiscordDmPolicy;
allowFrom?: string[];
groupPolicy: "open" | "allowlist" | "disabled";
guildEntries?: Record<string, DiscordGuildEntryResolved>;
logger: NonNullable<ConstructorParameters<typeof DiscordMessageListener>[1]>;
messageHandler: ConstructorParameters<typeof DiscordMessageListener>[0];
trackInboundEvent?: () => void;
eventQueueListenerTimeoutMs?: number;
}) {
registerDiscordListener(
params.client.listeners,
new DiscordMessageListener(params.messageHandler, params.logger, params.trackInboundEvent, {
timeoutMs: params.eventQueueListenerTimeoutMs,
}),
);
const reactionListenerOptions: ConstructorParameters<typeof DiscordReactionListener>[0] = {
cfg: params.cfg,
accountId: params.accountId,
runtime: params.runtime,
botUserId: params.botUserId,
dmEnabled: params.dmEnabled,
groupDmEnabled: params.groupDmEnabled,
groupDmChannels: params.groupDmChannels ?? [],
dmPolicy: params.dmPolicy,
allowFrom: params.allowFrom ?? [],
groupPolicy: params.groupPolicy,
allowNameMatching: isDangerousNameMatchingEnabled(params.discordConfig),
guildEntries: params.guildEntries,
logger: params.logger,
onEvent: params.trackInboundEvent,
};
registerDiscordListener(
params.client.listeners,
new DiscordReactionListener(reactionListenerOptions),
);
registerDiscordListener(
params.client.listeners,
new DiscordReactionRemoveListener(reactionListenerOptions),
);
registerDiscordListener(
params.client.listeners,
new DiscordThreadUpdateListener(params.cfg, params.accountId, params.logger),
);
if (params.discordConfig.intents?.presence) {
registerDiscordListener(
params.client.listeners,
new DiscordPresenceListener({ logger: params.logger, accountId: params.accountId }),
);
params.runtime.log?.("discord: GuildPresences intent enabled — presence listener registered");
}
}

View File

@@ -2,14 +2,11 @@ import { inspect } from "node:util";
import {
Client,
RateLimitError,
ReadyListener,
type BaseCommand,
type BaseMessageInteractiveComponent,
type Modal,
type Plugin,
} from "@buape/carbon";
import { GatewayCloseCodes, type GatewayPlugin } from "@buape/carbon/gateway";
import { VoicePlugin } from "@buape/carbon/voice";
import { Routes } from "discord-api-types/v10";
import {
listNativeCommandSpecsForConfig,
@@ -23,7 +20,6 @@ import {
} from "openclaw/plugin-sdk/config-runtime";
import type { OpenClawConfig, ReplyToMode } from "openclaw/plugin-sdk/config-runtime";
import { loadConfig } from "openclaw/plugin-sdk/config-runtime";
import { isDangerousNameMatchingEnabled } from "openclaw/plugin-sdk/config-runtime";
import {
GROUP_POLICY_BLOCKED_LABEL,
resolveOpenProviderRuntimeGroupPolicy,
@@ -62,25 +58,23 @@ import {
import { createDiscordAutoPresenceController } from "./auto-presence.js";
import { resolveDiscordSlashCommandConfig } from "./commands.js";
import { createExecApprovalButton, DiscordExecApprovalHandler } from "./exec-approvals.js";
import type { MutableDiscordGateway } from "./gateway-handle.js";
import { createDiscordGatewayPlugin } from "./gateway-plugin.js";
import { createDiscordGatewaySupervisor } from "./gateway-supervisor.js";
import {
DiscordMessageListener,
DiscordPresenceListener,
DiscordReactionListener,
DiscordReactionRemoveListener,
DiscordThreadUpdateListener,
registerDiscordListener,
} from "./listeners.js";
import { registerDiscordListener } from "./listeners.js";
import {
createDiscordCommandArgFallbackButton,
createDiscordModelPickerFallbackButton,
createDiscordModelPickerFallbackSelect,
createDiscordNativeCommand,
} from "./native-command.js";
import { resolveDiscordPresenceUpdate } from "./presence.js";
import { resolveDiscordAllowlistConfig } from "./provider.allowlist.js";
import { runDiscordGatewayLifecycle } from "./provider.lifecycle.js";
import {
createDiscordMonitorClient,
fetchDiscordBotIdentity,
registerDiscordMonitorListeners,
} from "./provider.startup.js";
import { resolveDiscordRestFetch } from "./rest-fetch.js";
import { formatDiscordStartupStatusMessage } from "./startup-status.js";
import type { DiscordMonitorStatusSink } from "./status.js";
@@ -796,8 +790,10 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
let lifecycleStarted = false;
let gatewaySupervisor: ReturnType<typeof createDiscordGatewaySupervisor> | undefined;
let deactivateMessageHandler: (() => void) | undefined;
let autoPresenceController: ReturnType<typeof createDiscordAutoPresenceController> | null = null;
let lifecycleGateway: GatewayPlugin | undefined;
let autoPresenceController: ReturnType<
typeof createDiscordMonitorClient
>["autoPresenceController"] = null;
let lifecycleGateway: MutableDiscordGateway | undefined;
let earlyGatewayEmitter = gatewaySupervisor?.emitter;
let onEarlyGatewayDebug: ((msg: unknown) => void) | undefined;
try {
@@ -892,69 +888,32 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
modals.push(createDiscordComponentModal(componentContext));
}
class DiscordStatusReadyListener extends ReadyListener {
async handle(_data: unknown, client: Client) {
if (autoPresenceController?.enabled) {
autoPresenceController.refresh();
return;
}
const gateway = client.getPlugin<GatewayPlugin>("gateway");
if (!gateway) {
return;
}
const presence = resolveDiscordPresenceUpdate(discordCfg);
if (!presence) {
return;
}
gateway.updatePresence(presence);
}
}
const clientPlugins: Plugin[] = [
(createDiscordGatewayPluginForTesting ?? createDiscordGatewayPlugin)({
discordConfig: discordCfg,
runtime,
}),
];
if (voiceEnabled) {
clientPlugins.push(new VoicePlugin());
}
// Pass eventQueue config to Carbon so the gateway listener budget can be tuned.
// Default listenerTimeout is 120s (Carbon defaults to 30s, which is too short for some
// Discord normalization/enqueue work).
const eventQueueOpts = {
listenerTimeout: 120_000,
...discordCfg.eventQueue,
};
const client = (createClientForTesting ?? ((...args) => new Client(...args)))(
{
baseUrl: "http://localhost",
deploySecret: "a",
clientId: applicationId,
publicKey: "a",
token,
autoDeploy: false,
eventQueue: eventQueueOpts,
},
{
commands,
listeners: [new DiscordStatusReadyListener()],
components,
modals,
},
clientPlugins,
);
lifecycleGateway = client.getPlugin<GatewayPlugin>("gateway");
gatewaySupervisor = (
createDiscordGatewaySupervisorForTesting ?? createDiscordGatewaySupervisor
)({
gateway: lifecycleGateway,
isDisallowedIntentsError: isDiscordDisallowedIntentsError,
const {
client,
gateway,
gatewaySupervisor: createdGatewaySupervisor,
autoPresenceController: createdAutoPresenceController,
eventQueueOpts,
} = createDiscordMonitorClient({
accountId: account.accountId,
applicationId,
token,
commands,
components,
modals,
voiceEnabled,
discordConfig: discordCfg,
runtime,
createClient: createClientForTesting ?? ((...args) => new Client(...args)),
createGatewayPlugin: createDiscordGatewayPluginForTesting ?? createDiscordGatewayPlugin,
createGatewaySupervisor:
createDiscordGatewaySupervisorForTesting ?? createDiscordGatewaySupervisor,
createAutoPresenceController: createDiscordAutoPresenceController,
isDisallowedIntentsError: isDiscordDisallowedIntentsError,
});
lifecycleGateway = gateway;
gatewaySupervisor = createdGatewaySupervisor;
autoPresenceController = createdAutoPresenceController;
earlyGatewayEmitter = gatewaySupervisor.emitter;
onEarlyGatewayDebug = (msg: unknown) => {
@@ -966,15 +925,6 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
);
};
earlyGatewayEmitter?.on("debug", onEarlyGatewayDebug);
if (lifecycleGateway) {
autoPresenceController = createDiscordAutoPresenceController({
accountId: account.accountId,
discordConfig: discordCfg,
gateway: lifecycleGateway,
log: (message) => runtime.log?.(message),
});
autoPresenceController.start();
}
logDiscordStartupPhase({
runtime,
@@ -1004,8 +954,19 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
string,
import("openclaw/plugin-sdk/reply-history").HistoryEntry[]
>();
let botUserId: string | undefined;
let botUserName: string | undefined;
let { botUserId, botUserName } = await fetchDiscordBotIdentity({
client,
runtime,
logStartupPhase: (phase, details) =>
logDiscordStartupPhase({
runtime,
accountId: account.accountId,
phase,
startAt: startupStartedAt,
gateway: lifecycleGateway,
details,
}),
});
let voiceManager: DiscordVoiceManager | null = null;
if (nativeDisabledExplicit) {
@@ -1030,37 +991,6 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
});
}
logDiscordStartupPhase({
runtime,
accountId: account.accountId,
phase: "fetch-bot-identity:start",
startAt: startupStartedAt,
gateway: lifecycleGateway,
});
try {
const botUser = await client.fetchUser("@me");
botUserId = botUser?.id;
botUserName = botUser?.username?.trim() || botUser?.globalName?.trim() || undefined;
logDiscordStartupPhase({
runtime,
accountId: account.accountId,
phase: "fetch-bot-identity:done",
startAt: startupStartedAt,
gateway: lifecycleGateway,
details: `botUserId=${botUserId ?? "<missing>"} botUserName=${botUserName ?? "<missing>"}`,
});
} catch (err) {
runtime.error?.(danger(`discord: failed to fetch bot identity: ${String(err)}`));
logDiscordStartupPhase({
runtime,
accountId: account.accountId,
phase: "fetch-bot-identity:error",
startAt: startupStartedAt,
gateway: lifecycleGateway,
details: String(err),
});
}
if (voiceEnabled) {
const { DiscordVoiceManager, DiscordVoiceReadyListener } = await loadDiscordVoiceRuntime();
voiceManager = new DiscordVoiceManager({
@@ -1105,47 +1035,25 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
opts.setStatus?.({ lastEventAt: at, lastInboundAt: at });
}
: undefined;
registerDiscordListener(
client.listeners,
new DiscordMessageListener(messageHandler, logger, trackInboundEvent, {
timeoutMs: eventQueueOpts.listenerTimeout,
}),
);
const reactionListenerOptions = {
registerDiscordMonitorListeners({
cfg,
client,
accountId: account.accountId,
discordConfig: discordCfg,
runtime,
botUserId,
dmEnabled,
groupDmEnabled,
groupDmChannels: groupDmChannels ?? [],
groupDmChannels,
dmPolicy,
allowFrom: allowFrom ?? [],
allowFrom,
groupPolicy,
allowNameMatching: isDangerousNameMatchingEnabled(discordCfg),
guildEntries,
logger,
onEvent: trackInboundEvent,
};
registerDiscordListener(client.listeners, new DiscordReactionListener(reactionListenerOptions));
registerDiscordListener(
client.listeners,
new DiscordReactionRemoveListener(reactionListenerOptions),
);
registerDiscordListener(
client.listeners,
new DiscordThreadUpdateListener(cfg, account.accountId, logger),
);
if (discordCfg.intents?.presence) {
registerDiscordListener(
client.listeners,
new DiscordPresenceListener({ logger, accountId: account.accountId }),
);
runtime.log?.("discord: GuildPresences intent enabled — presence listener registered");
}
messageHandler,
trackInboundEvent,
eventQueueListenerTimeoutMs: eventQueueOpts.listenerTimeout,
});
const botIdentity =
botUserId && botUserName ? `${botUserId} (${botUserName})` : (botUserId ?? botUserName ?? "");

View File

@@ -117,6 +117,22 @@ function parseNumericPlanField(line: string, key: string): number {
return Number(match[1]);
}
function runManifestOutputWriter(workflow: string, envOverrides: NodeJS.ProcessEnv = {}): string {
const outputPath = path.join(os.tmpdir(), `openclaw-${workflow}-output-${Date.now()}.txt`);
try {
execFileSync("node", ["scripts/ci-write-manifest-outputs.mjs", "--workflow", workflow], {
cwd: REPO_ROOT,
env: createPlannerEnv({
GITHUB_OUTPUT: outputPath,
...envOverrides,
}),
encoding: "utf8",
});
return fs.readFileSync(outputPath, "utf8");
} finally {
fs.rmSync(outputPath, { force: true });
}
}
describe("scripts/test-parallel fatal output guard", () => {
it("fails a zero exit when V8 reports an out-of-memory fatal", () => {
const output = [
@@ -389,77 +405,41 @@ describe("scripts/test-parallel lane planning", () => {
});
it("writes CI workflow outputs in ci mode", () => {
const repoRoot = path.resolve(import.meta.dirname, "../..");
const outputPath = path.join(os.tmpdir(), `openclaw-ci-output-${Date.now()}.txt`);
execFileSync("node", ["scripts/ci-write-manifest-outputs.mjs", "--workflow", "ci"], {
cwd: repoRoot,
env: {
...clearPlannerShardEnv(process.env),
GITHUB_OUTPUT: outputPath,
GITHUB_EVENT_NAME: "pull_request",
OPENCLAW_CI_DOCS_ONLY: "false",
OPENCLAW_CI_DOCS_CHANGED: "false",
OPENCLAW_CI_RUN_NODE: "true",
OPENCLAW_CI_RUN_MACOS: "true",
OPENCLAW_CI_RUN_ANDROID: "true",
OPENCLAW_CI_RUN_WINDOWS: "true",
OPENCLAW_CI_RUN_SKILLS_PYTHON: "false",
OPENCLAW_CI_HAS_CHANGED_EXTENSIONS: "false",
OPENCLAW_CI_CHANGED_EXTENSIONS_MATRIX: '{"include":[]}',
},
encoding: "utf8",
const outputs = runManifestOutputWriter("ci", {
GITHUB_EVENT_NAME: "pull_request",
OPENCLAW_CI_DOCS_ONLY: "false",
OPENCLAW_CI_DOCS_CHANGED: "false",
OPENCLAW_CI_RUN_NODE: "true",
OPENCLAW_CI_RUN_MACOS: "true",
OPENCLAW_CI_RUN_ANDROID: "true",
OPENCLAW_CI_RUN_WINDOWS: "true",
OPENCLAW_CI_RUN_SKILLS_PYTHON: "false",
OPENCLAW_CI_HAS_CHANGED_EXTENSIONS: "false",
OPENCLAW_CI_CHANGED_EXTENSIONS_MATRIX: '{"include":[]}',
});
const outputs = fs.readFileSync(outputPath, "utf8");
expect(outputs).toContain("run_build_artifacts=true");
expect(outputs).toContain("run_checks_windows=true");
expect(outputs).toContain("run_macos_node=true");
expect(outputs).toContain("android_matrix=");
fs.rmSync(outputPath, { force: true });
});
it("writes install-smoke outputs in install-smoke mode", () => {
const repoRoot = path.resolve(import.meta.dirname, "../..");
const outputPath = path.join(os.tmpdir(), `openclaw-install-output-${Date.now()}.txt`);
execFileSync("node", ["scripts/ci-write-manifest-outputs.mjs", "--workflow", "install-smoke"], {
cwd: repoRoot,
env: {
...clearPlannerShardEnv(process.env),
GITHUB_OUTPUT: outputPath,
OPENCLAW_CI_DOCS_ONLY: "false",
OPENCLAW_CI_RUN_CHANGED_SMOKE: "true",
},
encoding: "utf8",
const outputs = runManifestOutputWriter("install-smoke", {
OPENCLAW_CI_DOCS_ONLY: "false",
OPENCLAW_CI_RUN_CHANGED_SMOKE: "true",
});
const outputs = fs.readFileSync(outputPath, "utf8");
expect(outputs).toContain("run_install_smoke=true");
expect(outputs).not.toContain("run_checks=");
fs.rmSync(outputPath, { force: true });
});
it("writes bun outputs in ci-bun mode", () => {
const repoRoot = path.resolve(import.meta.dirname, "../..");
const outputPath = path.join(os.tmpdir(), `openclaw-bun-output-${Date.now()}.txt`);
execFileSync("node", ["scripts/ci-write-manifest-outputs.mjs", "--workflow", "ci-bun"], {
cwd: repoRoot,
env: {
...clearPlannerShardEnv(process.env),
GITHUB_OUTPUT: outputPath,
OPENCLAW_CI_DOCS_ONLY: "false",
OPENCLAW_CI_RUN_NODE: "true",
},
encoding: "utf8",
const outputs = runManifestOutputWriter("ci-bun", {
OPENCLAW_CI_DOCS_ONLY: "false",
OPENCLAW_CI_RUN_NODE: "true",
});
const outputs = fs.readFileSync(outputPath, "utf8");
expect(outputs).toContain("run_bun_checks=true");
expect(outputs).toContain("bun_checks_matrix=");
expect(outputs).not.toContain("run_install_smoke=");
fs.rmSync(outputPath, { force: true });
});
it("passes through vitest --mode values that are not wrapper runtime overrides", () => {