From 3a4c8607986155490cbc4a0bb5f6ce1efc91f4cb Mon Sep 17 00:00:00 2001 From: affsantos Date: Tue, 24 Mar 2026 21:22:42 +0100 Subject: [PATCH] fix(gateway): pin channel registry at startup to survive registry swaps Channel plugin resolution fails with 'Channel is unavailable: ' after the active plugin registry is replaced at runtime. The root cause is that getChannelPlugin() resolves against the live registry snapshot, which is replaced when non-primary registry loads (e.g., config-schema reads) call loadOpenClawPlugins(). If the replacement registry does not carry the same channel entries, outbound message delivery and subagent announce silently break. This mirrors the existing pinActivePluginHttpRouteRegistry pattern: the channel registry is pinned at gateway startup and released on shutdown. Subsequent setActivePluginRegistry calls no longer evict the channel snapshot, so getChannelPlugin() always resolves against the registry that was active when the gateway booted. --- src/channels/plugins/registry.ts | 4 +- src/gateway/server-runtime-state.ts | 9 ++- src/plugins/runtime.channel-pin.test.ts | 93 +++++++++++++++++++++++++ src/plugins/runtime.ts | 46 ++++++++++++ 4 files changed, 149 insertions(+), 3 deletions(-) create mode 100644 src/plugins/runtime.channel-pin.test.ts diff --git a/src/channels/plugins/registry.ts b/src/channels/plugins/registry.ts index 3f170cbfb92..50b55711c05 100644 --- a/src/channels/plugins/registry.ts +++ b/src/channels/plugins/registry.ts @@ -1,6 +1,6 @@ import { getActivePluginRegistryVersion, - requireActivePluginRegistry, + requireActivePluginChannelRegistry, } from "../../plugins/runtime.js"; import { CHAT_CHANNEL_ORDER, type ChatChannelId, normalizeAnyChannelId } from "../registry.js"; import type { ChannelId, ChannelPlugin } from "./types.js"; @@ -34,7 +34,7 @@ const EMPTY_CHANNEL_PLUGIN_CACHE: CachedChannelPlugins = { let cachedChannelPlugins = EMPTY_CHANNEL_PLUGIN_CACHE; function resolveCachedChannelPlugins(): CachedChannelPlugins { - const registry = requireActivePluginRegistry(); + const registry = requireActivePluginChannelRegistry(); const registryVersion = getActivePluginRegistryVersion(); const cached = cachedChannelPlugins; if (cached.registryVersion === registryVersion) { diff --git a/src/gateway/server-runtime-state.ts b/src/gateway/server-runtime-state.ts index aa354c24c6c..1a93e9396ed 100644 --- a/src/gateway/server-runtime-state.ts +++ b/src/gateway/server-runtime-state.ts @@ -6,7 +6,9 @@ import type { CliDeps } from "../cli/deps.js"; import type { createSubsystemLogger } from "../logging/subsystem.js"; import type { PluginRegistry } from "../plugins/registry.js"; import { + pinActivePluginChannelRegistry, pinActivePluginHttpRouteRegistry, + releasePinnedPluginChannelRegistry, releasePinnedPluginHttpRouteRegistry, resolveActivePluginHttpRouteRegistry, } from "../plugins/runtime.js"; @@ -99,6 +101,7 @@ export async function createGatewayRuntimeState(params: { toolEventRecipients: ReturnType; }> { pinActivePluginHttpRouteRegistry(params.pluginRegistry); + pinActivePluginChannelRegistry(params.pluginRegistry); try { let canvasHost: CanvasHostHandler | null = null; if (params.canvasHostEnabled) { @@ -230,7 +233,10 @@ export async function createGatewayRuntimeState(params: { return { canvasHost, - releasePluginRouteRegistry: () => releasePinnedPluginHttpRouteRegistry(params.pluginRegistry), + releasePluginRouteRegistry: () => { + releasePinnedPluginHttpRouteRegistry(params.pluginRegistry); + releasePinnedPluginChannelRegistry(params.pluginRegistry); + }, httpServer, httpServers, httpBindHosts, @@ -251,6 +257,7 @@ export async function createGatewayRuntimeState(params: { }; } catch (err) { releasePinnedPluginHttpRouteRegistry(params.pluginRegistry); + releasePinnedPluginChannelRegistry(params.pluginRegistry); throw err; } } diff --git a/src/plugins/runtime.channel-pin.test.ts b/src/plugins/runtime.channel-pin.test.ts new file mode 100644 index 00000000000..62f7a6f59ec --- /dev/null +++ b/src/plugins/runtime.channel-pin.test.ts @@ -0,0 +1,93 @@ +import { afterEach, describe, expect, it } from "vitest"; +import { createEmptyPluginRegistry } from "./registry-empty.js"; +import { + getActivePluginChannelRegistry, + pinActivePluginChannelRegistry, + releasePinnedPluginChannelRegistry, + requireActivePluginChannelRegistry, + resetPluginRuntimeStateForTest, + setActivePluginRegistry, +} from "./runtime.js"; + +describe("channel registry pinning", () => { + afterEach(() => { + resetPluginRuntimeStateForTest(); + }); + + it("returns the active registry when not pinned", () => { + const registry = createEmptyPluginRegistry(); + setActivePluginRegistry(registry); + expect(getActivePluginChannelRegistry()).toBe(registry); + }); + + it("preserves pinned channel registry across setActivePluginRegistry calls", () => { + const startup = createEmptyPluginRegistry(); + startup.channels = [{ plugin: { id: "slack" } }] as never; + setActivePluginRegistry(startup); + pinActivePluginChannelRegistry(startup); + + // A subsequent registry swap (e.g., config-schema load) must not evict channels. + const replacement = createEmptyPluginRegistry(); + setActivePluginRegistry(replacement); + + expect(getActivePluginChannelRegistry()).toBe(startup); + expect(getActivePluginChannelRegistry()!.channels).toHaveLength(1); + }); + + it("updates channel registry on swap when not pinned", () => { + const first = createEmptyPluginRegistry(); + setActivePluginRegistry(first); + expect(getActivePluginChannelRegistry()).toBe(first); + + const second = createEmptyPluginRegistry(); + setActivePluginRegistry(second); + expect(getActivePluginChannelRegistry()).toBe(second); + }); + + it("release restores live-tracking behavior", () => { + const startup = createEmptyPluginRegistry(); + setActivePluginRegistry(startup); + pinActivePluginChannelRegistry(startup); + + const replacement = createEmptyPluginRegistry(); + setActivePluginRegistry(replacement); + expect(getActivePluginChannelRegistry()).toBe(startup); + + releasePinnedPluginChannelRegistry(startup); + // After release, the channel registry should follow the active registry. + expect(getActivePluginChannelRegistry()).toBe(replacement); + }); + + it("release is a no-op when the pinned registry does not match", () => { + const startup = createEmptyPluginRegistry(); + setActivePluginRegistry(startup); + pinActivePluginChannelRegistry(startup); + + const unrelated = createEmptyPluginRegistry(); + releasePinnedPluginChannelRegistry(unrelated); + + // Pin is still held — unrelated release was ignored. + const replacement = createEmptyPluginRegistry(); + setActivePluginRegistry(replacement); + expect(getActivePluginChannelRegistry()).toBe(startup); + }); + + it("requireActivePluginChannelRegistry creates a registry when none exists", () => { + resetPluginRuntimeStateForTest(); + const registry = requireActivePluginChannelRegistry(); + expect(registry).toBeDefined(); + expect(registry.channels).toEqual([]); + }); + + it("resetPluginRuntimeStateForTest clears channel pin", () => { + const startup = createEmptyPluginRegistry(); + setActivePluginRegistry(startup); + pinActivePluginChannelRegistry(startup); + + resetPluginRuntimeStateForTest(); + + const fresh = createEmptyPluginRegistry(); + setActivePluginRegistry(fresh); + expect(getActivePluginChannelRegistry()).toBe(fresh); + }); +}); diff --git a/src/plugins/runtime.ts b/src/plugins/runtime.ts index 861fba6a7ce..a422956d452 100644 --- a/src/plugins/runtime.ts +++ b/src/plugins/runtime.ts @@ -7,6 +7,8 @@ type RegistryState = { registry: PluginRegistry | null; httpRouteRegistry: PluginRegistry | null; httpRouteRegistryPinned: boolean; + channelRegistry: PluginRegistry | null; + channelRegistryPinned: boolean; key: string | null; version: number; }; @@ -20,6 +22,8 @@ const state: RegistryState = (() => { registry: null, httpRouteRegistry: null, httpRouteRegistryPinned: false, + channelRegistry: null, + channelRegistryPinned: false, key: null, version: 0, }; @@ -32,6 +36,9 @@ export function setActivePluginRegistry(registry: PluginRegistry, cacheKey?: str if (!state.httpRouteRegistryPinned) { state.httpRouteRegistry = registry; } + if (!state.channelRegistryPinned) { + state.channelRegistry = registry; + } state.key = cacheKey ?? null; state.version += 1; } @@ -46,6 +53,9 @@ export function requireActivePluginRegistry(): PluginRegistry { if (!state.httpRouteRegistryPinned) { state.httpRouteRegistry = state.registry; } + if (!state.channelRegistryPinned) { + state.channelRegistry = state.registry; + } state.version += 1; } return state.registry; @@ -91,6 +101,40 @@ export function resolveActivePluginHttpRouteRegistry(fallback: PluginRegistry): return routeRegistry; } +/** Pin the channel registry so that subsequent `setActivePluginRegistry` calls + * do not replace the channel snapshot used by `getChannelPlugin`. Call at + * gateway startup after the initial plugin load so that config-schema reads + * and other non-primary registry loads cannot evict channel plugins. */ +export function pinActivePluginChannelRegistry(registry: PluginRegistry) { + state.channelRegistry = registry; + state.channelRegistryPinned = true; +} + +export function releasePinnedPluginChannelRegistry(registry?: PluginRegistry) { + if (registry && state.channelRegistry !== registry) { + return; + } + state.channelRegistryPinned = false; + state.channelRegistry = state.registry; +} + +/** Return the registry that should be used for channel plugin resolution. + * When pinned, this returns the startup registry regardless of subsequent + * `setActivePluginRegistry` calls. */ +export function getActivePluginChannelRegistry(): PluginRegistry | null { + return state.channelRegistry ?? state.registry; +} + +export function requireActivePluginChannelRegistry(): PluginRegistry { + const existing = getActivePluginChannelRegistry(); + if (existing) { + return existing; + } + const created = requireActivePluginRegistry(); + state.channelRegistry = created; + return created; +} + export function getActivePluginRegistryKey(): string | null { return state.key; } @@ -103,6 +147,8 @@ export function resetPluginRuntimeStateForTest(): void { state.registry = null; state.httpRouteRegistry = null; state.httpRouteRegistryPinned = false; + state.channelRegistry = null; + state.channelRegistryPinned = false; state.key = null; state.version += 1; }