fix(gateway): pin channel registry at startup to survive registry swaps

Channel plugin resolution fails with 'Channel is unavailable: <channel>'
after the active plugin registry is replaced at runtime. The root cause is
that getChannelPlugin() resolves against the live registry snapshot, which
is replaced when non-primary registry loads (e.g., config-schema reads)
call loadOpenClawPlugins(). If the replacement registry does not carry the
same channel entries, outbound message delivery and subagent announce
silently break.

This mirrors the existing pinActivePluginHttpRouteRegistry pattern: the
channel registry is pinned at gateway startup and released on shutdown.
Subsequent setActivePluginRegistry calls no longer evict the channel
snapshot, so getChannelPlugin() always resolves against the registry that
was active when the gateway booted.
This commit is contained in:
affsantos
2026-03-24 21:22:42 +01:00
committed by Peter Steinberger
parent 4d41b8664c
commit 3a4c860798
4 changed files with 149 additions and 3 deletions

View File

@@ -1,6 +1,6 @@
import { import {
getActivePluginRegistryVersion, getActivePluginRegistryVersion,
requireActivePluginRegistry, requireActivePluginChannelRegistry,
} from "../../plugins/runtime.js"; } from "../../plugins/runtime.js";
import { CHAT_CHANNEL_ORDER, type ChatChannelId, normalizeAnyChannelId } from "../registry.js"; import { CHAT_CHANNEL_ORDER, type ChatChannelId, normalizeAnyChannelId } from "../registry.js";
import type { ChannelId, ChannelPlugin } from "./types.js"; import type { ChannelId, ChannelPlugin } from "./types.js";
@@ -34,7 +34,7 @@ const EMPTY_CHANNEL_PLUGIN_CACHE: CachedChannelPlugins = {
let cachedChannelPlugins = EMPTY_CHANNEL_PLUGIN_CACHE; let cachedChannelPlugins = EMPTY_CHANNEL_PLUGIN_CACHE;
function resolveCachedChannelPlugins(): CachedChannelPlugins { function resolveCachedChannelPlugins(): CachedChannelPlugins {
const registry = requireActivePluginRegistry(); const registry = requireActivePluginChannelRegistry();
const registryVersion = getActivePluginRegistryVersion(); const registryVersion = getActivePluginRegistryVersion();
const cached = cachedChannelPlugins; const cached = cachedChannelPlugins;
if (cached.registryVersion === registryVersion) { if (cached.registryVersion === registryVersion) {

View File

@@ -6,7 +6,9 @@ import type { CliDeps } from "../cli/deps.js";
import type { createSubsystemLogger } from "../logging/subsystem.js"; import type { createSubsystemLogger } from "../logging/subsystem.js";
import type { PluginRegistry } from "../plugins/registry.js"; import type { PluginRegistry } from "../plugins/registry.js";
import { import {
pinActivePluginChannelRegistry,
pinActivePluginHttpRouteRegistry, pinActivePluginHttpRouteRegistry,
releasePinnedPluginChannelRegistry,
releasePinnedPluginHttpRouteRegistry, releasePinnedPluginHttpRouteRegistry,
resolveActivePluginHttpRouteRegistry, resolveActivePluginHttpRouteRegistry,
} from "../plugins/runtime.js"; } from "../plugins/runtime.js";
@@ -99,6 +101,7 @@ export async function createGatewayRuntimeState(params: {
toolEventRecipients: ReturnType<typeof createToolEventRecipientRegistry>; toolEventRecipients: ReturnType<typeof createToolEventRecipientRegistry>;
}> { }> {
pinActivePluginHttpRouteRegistry(params.pluginRegistry); pinActivePluginHttpRouteRegistry(params.pluginRegistry);
pinActivePluginChannelRegistry(params.pluginRegistry);
try { try {
let canvasHost: CanvasHostHandler | null = null; let canvasHost: CanvasHostHandler | null = null;
if (params.canvasHostEnabled) { if (params.canvasHostEnabled) {
@@ -230,7 +233,10 @@ export async function createGatewayRuntimeState(params: {
return { return {
canvasHost, canvasHost,
releasePluginRouteRegistry: () => releasePinnedPluginHttpRouteRegistry(params.pluginRegistry), releasePluginRouteRegistry: () => {
releasePinnedPluginHttpRouteRegistry(params.pluginRegistry);
releasePinnedPluginChannelRegistry(params.pluginRegistry);
},
httpServer, httpServer,
httpServers, httpServers,
httpBindHosts, httpBindHosts,
@@ -251,6 +257,7 @@ export async function createGatewayRuntimeState(params: {
}; };
} catch (err) { } catch (err) {
releasePinnedPluginHttpRouteRegistry(params.pluginRegistry); releasePinnedPluginHttpRouteRegistry(params.pluginRegistry);
releasePinnedPluginChannelRegistry(params.pluginRegistry);
throw err; throw err;
} }
} }

View File

@@ -0,0 +1,93 @@
import { afterEach, describe, expect, it } from "vitest";
import { createEmptyPluginRegistry } from "./registry-empty.js";
import {
getActivePluginChannelRegistry,
pinActivePluginChannelRegistry,
releasePinnedPluginChannelRegistry,
requireActivePluginChannelRegistry,
resetPluginRuntimeStateForTest,
setActivePluginRegistry,
} from "./runtime.js";
describe("channel registry pinning", () => {
afterEach(() => {
resetPluginRuntimeStateForTest();
});
it("returns the active registry when not pinned", () => {
const registry = createEmptyPluginRegistry();
setActivePluginRegistry(registry);
expect(getActivePluginChannelRegistry()).toBe(registry);
});
it("preserves pinned channel registry across setActivePluginRegistry calls", () => {
const startup = createEmptyPluginRegistry();
startup.channels = [{ plugin: { id: "slack" } }] as never;
setActivePluginRegistry(startup);
pinActivePluginChannelRegistry(startup);
// A subsequent registry swap (e.g., config-schema load) must not evict channels.
const replacement = createEmptyPluginRegistry();
setActivePluginRegistry(replacement);
expect(getActivePluginChannelRegistry()).toBe(startup);
expect(getActivePluginChannelRegistry()!.channels).toHaveLength(1);
});
it("updates channel registry on swap when not pinned", () => {
const first = createEmptyPluginRegistry();
setActivePluginRegistry(first);
expect(getActivePluginChannelRegistry()).toBe(first);
const second = createEmptyPluginRegistry();
setActivePluginRegistry(second);
expect(getActivePluginChannelRegistry()).toBe(second);
});
it("release restores live-tracking behavior", () => {
const startup = createEmptyPluginRegistry();
setActivePluginRegistry(startup);
pinActivePluginChannelRegistry(startup);
const replacement = createEmptyPluginRegistry();
setActivePluginRegistry(replacement);
expect(getActivePluginChannelRegistry()).toBe(startup);
releasePinnedPluginChannelRegistry(startup);
// After release, the channel registry should follow the active registry.
expect(getActivePluginChannelRegistry()).toBe(replacement);
});
it("release is a no-op when the pinned registry does not match", () => {
const startup = createEmptyPluginRegistry();
setActivePluginRegistry(startup);
pinActivePluginChannelRegistry(startup);
const unrelated = createEmptyPluginRegistry();
releasePinnedPluginChannelRegistry(unrelated);
// Pin is still held — unrelated release was ignored.
const replacement = createEmptyPluginRegistry();
setActivePluginRegistry(replacement);
expect(getActivePluginChannelRegistry()).toBe(startup);
});
it("requireActivePluginChannelRegistry creates a registry when none exists", () => {
resetPluginRuntimeStateForTest();
const registry = requireActivePluginChannelRegistry();
expect(registry).toBeDefined();
expect(registry.channels).toEqual([]);
});
it("resetPluginRuntimeStateForTest clears channel pin", () => {
const startup = createEmptyPluginRegistry();
setActivePluginRegistry(startup);
pinActivePluginChannelRegistry(startup);
resetPluginRuntimeStateForTest();
const fresh = createEmptyPluginRegistry();
setActivePluginRegistry(fresh);
expect(getActivePluginChannelRegistry()).toBe(fresh);
});
});

View File

@@ -7,6 +7,8 @@ type RegistryState = {
registry: PluginRegistry | null; registry: PluginRegistry | null;
httpRouteRegistry: PluginRegistry | null; httpRouteRegistry: PluginRegistry | null;
httpRouteRegistryPinned: boolean; httpRouteRegistryPinned: boolean;
channelRegistry: PluginRegistry | null;
channelRegistryPinned: boolean;
key: string | null; key: string | null;
version: number; version: number;
}; };
@@ -20,6 +22,8 @@ const state: RegistryState = (() => {
registry: null, registry: null,
httpRouteRegistry: null, httpRouteRegistry: null,
httpRouteRegistryPinned: false, httpRouteRegistryPinned: false,
channelRegistry: null,
channelRegistryPinned: false,
key: null, key: null,
version: 0, version: 0,
}; };
@@ -32,6 +36,9 @@ export function setActivePluginRegistry(registry: PluginRegistry, cacheKey?: str
if (!state.httpRouteRegistryPinned) { if (!state.httpRouteRegistryPinned) {
state.httpRouteRegistry = registry; state.httpRouteRegistry = registry;
} }
if (!state.channelRegistryPinned) {
state.channelRegistry = registry;
}
state.key = cacheKey ?? null; state.key = cacheKey ?? null;
state.version += 1; state.version += 1;
} }
@@ -46,6 +53,9 @@ export function requireActivePluginRegistry(): PluginRegistry {
if (!state.httpRouteRegistryPinned) { if (!state.httpRouteRegistryPinned) {
state.httpRouteRegistry = state.registry; state.httpRouteRegistry = state.registry;
} }
if (!state.channelRegistryPinned) {
state.channelRegistry = state.registry;
}
state.version += 1; state.version += 1;
} }
return state.registry; return state.registry;
@@ -91,6 +101,40 @@ export function resolveActivePluginHttpRouteRegistry(fallback: PluginRegistry):
return routeRegistry; return routeRegistry;
} }
/** Pin the channel registry so that subsequent `setActivePluginRegistry` calls
* do not replace the channel snapshot used by `getChannelPlugin`. Call at
* gateway startup after the initial plugin load so that config-schema reads
* and other non-primary registry loads cannot evict channel plugins. */
export function pinActivePluginChannelRegistry(registry: PluginRegistry) {
state.channelRegistry = registry;
state.channelRegistryPinned = true;
}
export function releasePinnedPluginChannelRegistry(registry?: PluginRegistry) {
if (registry && state.channelRegistry !== registry) {
return;
}
state.channelRegistryPinned = false;
state.channelRegistry = state.registry;
}
/** Return the registry that should be used for channel plugin resolution.
* When pinned, this returns the startup registry regardless of subsequent
* `setActivePluginRegistry` calls. */
export function getActivePluginChannelRegistry(): PluginRegistry | null {
return state.channelRegistry ?? state.registry;
}
export function requireActivePluginChannelRegistry(): PluginRegistry {
const existing = getActivePluginChannelRegistry();
if (existing) {
return existing;
}
const created = requireActivePluginRegistry();
state.channelRegistry = created;
return created;
}
export function getActivePluginRegistryKey(): string | null { export function getActivePluginRegistryKey(): string | null {
return state.key; return state.key;
} }
@@ -103,6 +147,8 @@ export function resetPluginRuntimeStateForTest(): void {
state.registry = null; state.registry = null;
state.httpRouteRegistry = null; state.httpRouteRegistry = null;
state.httpRouteRegistryPinned = false; state.httpRouteRegistryPinned = false;
state.channelRegistry = null;
state.channelRegistryPinned = false;
state.key = null; state.key = null;
state.version += 1; state.version += 1;
} }