refactor: dedupe gateway and binding helpers

This commit is contained in:
Peter Steinberger
2026-03-26 17:47:36 +00:00
parent 1c45123231
commit 48167a69b9
5 changed files with 270 additions and 303 deletions

View File

@@ -16,6 +16,36 @@ import {
} from "./configured-binding-match.js";
import { resolveConfiguredBindingRecordBySessionKeyFromRegistry } from "./configured-binding-session-lookup.js";
function resolveMaterializedConfiguredBinding(params: {
cfg: OpenClawConfig;
conversation: ConversationRef;
}) {
const conversation = toConfiguredBindingConversationRef(params.conversation);
if (!conversation) {
return null;
}
const rules = resolveCompiledBindingRegistry(params.cfg).rulesByChannel.get(conversation.channel);
if (!rules || rules.length === 0) {
return null;
}
const resolved = resolveMatchingConfiguredBinding({
rules,
conversation,
});
if (!resolved) {
return null;
}
return {
conversation,
resolved,
materializedTarget: materializeConfiguredBindingRecord({
rule: resolved.rule,
accountId: conversation.accountId,
conversation: resolved.match,
}),
};
}
export function primeConfiguredBindingRegistry(params: { cfg: OpenClawConfig }): {
bindingCount: number;
channelCount: number;
@@ -49,59 +79,26 @@ export function resolveConfiguredBindingRecordForConversation(params: {
cfg: OpenClawConfig;
conversation: ConversationRef;
}): ConfiguredBindingRecordResolution | null {
const conversation = toConfiguredBindingConversationRef(params.conversation);
if (!conversation) {
return null;
}
const registry = resolveCompiledBindingRegistry(params.cfg);
const rules = registry.rulesByChannel.get(conversation.channel);
if (!rules || rules.length === 0) {
return null;
}
const resolved = resolveMatchingConfiguredBinding({
rules,
conversation,
});
const resolved = resolveMaterializedConfiguredBinding(params);
if (!resolved) {
return null;
}
return materializeConfiguredBindingRecord({
rule: resolved.rule,
accountId: conversation.accountId,
conversation: resolved.match,
});
return resolved.materializedTarget;
}
export function resolveConfiguredBinding(params: {
cfg: OpenClawConfig;
conversation: ConversationRef;
}): ConfiguredBindingResolution | null {
const conversation = toConfiguredBindingConversationRef(params.conversation);
if (!conversation) {
return null;
}
const registry = resolveCompiledBindingRegistry(params.cfg);
const rules = registry.rulesByChannel.get(conversation.channel);
if (!rules || rules.length === 0) {
return null;
}
const resolved = resolveMatchingConfiguredBinding({
rules,
conversation,
});
const resolved = resolveMaterializedConfiguredBinding(params);
if (!resolved) {
return null;
}
const materializedTarget = materializeConfiguredBindingRecord({
rule: resolved.rule,
accountId: conversation.accountId,
conversation: resolved.match,
});
return {
conversation,
compiledBinding: resolved.rule,
match: resolved.match,
...materializedTarget,
conversation: resolved.conversation,
compiledBinding: resolved.resolved.rule,
match: resolved.resolved.match,
...resolved.materializedTarget,
};
}

View File

@@ -491,6 +491,38 @@ function cloneIfObject<T>(value: T): T {
return value;
}
function moveSingleAccountKeysIntoAccount(params: {
cfg: OpenClawConfig;
channelKey: string;
channel: ChannelSectionRecord;
accounts: Record<string, Record<string, unknown>>;
keysToMove: string[];
targetAccountId: string;
baseAccount?: Record<string, unknown>;
}): OpenClawConfig {
const nextAccount: Record<string, unknown> = { ...params.baseAccount };
for (const key of params.keysToMove) {
nextAccount[key] = cloneIfObject(params.channel[key]);
}
const nextChannel: ChannelSectionRecord = { ...params.channel };
for (const key of params.keysToMove) {
delete nextChannel[key];
}
return {
...params.cfg,
channels: {
...params.cfg.channels,
[params.channelKey]: {
...nextChannel,
accounts: {
...params.accounts,
[params.targetAccountId]: nextAccount,
},
},
},
} as OpenClawConfig;
}
// When promoting a single-account channel config to multi-account,
// move top-level account settings into accounts.default so the original
// account keeps working without duplicate account values at channel root.
@@ -523,56 +555,26 @@ export function moveSingleAccountChannelSectionToDefaultAccount(params: {
channelKey: params.channelKey,
channel: base,
});
const defaultAccount: Record<string, unknown> = {
...accounts[targetAccountId],
};
for (const key of keysToMove) {
const value = base[key];
defaultAccount[key] = cloneIfObject(value);
}
const nextChannel: ChannelSectionRecord = { ...base };
for (const key of keysToMove) {
delete nextChannel[key];
}
return {
...params.cfg,
channels: {
...params.cfg.channels,
[params.channelKey]: {
...nextChannel,
accounts: {
...accounts,
[targetAccountId]: defaultAccount,
},
},
},
} as OpenClawConfig;
return moveSingleAccountKeysIntoAccount({
cfg: params.cfg,
channelKey: params.channelKey,
channel: base,
accounts,
keysToMove,
targetAccountId,
baseAccount: accounts[targetAccountId],
});
}
const keysToMove = resolveSingleAccountKeysToMove({
channelKey: params.channelKey,
channel: base,
});
const defaultAccount: Record<string, unknown> = {};
for (const key of keysToMove) {
const value = base[key];
defaultAccount[key] = cloneIfObject(value);
}
const nextChannel: ChannelSectionRecord = { ...base };
for (const key of keysToMove) {
delete nextChannel[key];
}
return {
...params.cfg,
channels: {
...params.cfg.channels,
[params.channelKey]: {
...nextChannel,
accounts: {
...accounts,
[DEFAULT_ACCOUNT_ID]: defaultAccount,
},
},
},
} as OpenClawConfig;
return moveSingleAccountKeysIntoAccount({
cfg: params.cfg,
channelKey: params.channelKey,
channel: base,
accounts,
keysToMove,
targetAccountId: DEFAULT_ACCOUNT_ID,
});
}

View File

@@ -137,6 +137,72 @@ async function invokeSecureGatewayRoute(params: { gatewayAuthSatisfied: boolean
return { handled, exactPluginHandler, prefixGatewayHandler };
}
function mockOperatorAdminScopeFailure() {
loadOpenClawPlugins.mockReset();
handleGatewayRequest.mockReset();
handleGatewayRequest.mockImplementation(async (opts: HandleGatewayRequestOptions) => {
const scopes = opts.client?.connect.scopes ?? [];
if (opts.req.method === "sessions.delete" && !scopes.includes("operator.admin")) {
opts.respond(false, undefined, {
code: "invalid_request",
message: "missing scope: operator.admin",
});
return;
}
opts.respond(true, {});
});
}
async function invokeLeastPrivilegeDeleteRoute(params: {
path: string;
auth: "gateway" | "plugin";
gatewayAuthSatisfied: boolean;
}) {
mockOperatorAdminScopeFailure();
const subagent = await createSubagentRuntime();
const log = createPluginLog();
const handler = createGatewayPluginRequestHandler({
registry: createTestRegistry({
httpRoutes: [
createRoute({
path: params.path,
auth: params.auth,
handler: async () => {
await subagent.deleteSession({ sessionKey: "agent:main:subagent:child" });
return true;
},
}),
],
}),
log,
});
const response = makeMockHttpResponse();
const handled = await handler({ url: params.path } as IncomingMessage, response.res, undefined, {
gatewayAuthSatisfied: params.gatewayAuthSatisfied,
});
return { handled, log, ...response };
}
function expectLeastPrivilegeDeleteRouteFailure(params: {
handled: boolean;
setHeader: ReturnType<typeof makeMockHttpResponse>["setHeader"];
end: ReturnType<typeof makeMockHttpResponse>["end"];
log: ReturnType<typeof createPluginLog>;
}) {
expect(params.handled).toBe(true);
expect(handleGatewayRequest).toHaveBeenCalledTimes(1);
expect(handleGatewayRequest.mock.calls[0]?.[0]?.client?.connect.scopes).toEqual([
"operator.write",
]);
expect(params.setHeader).toHaveBeenCalledWith("Content-Type", "text/plain; charset=utf-8");
expect(params.end).toHaveBeenCalledWith("Internal Server Error");
expect(params.log.warn).toHaveBeenCalledWith(
expect.stringContaining("missing scope: operator.admin"),
);
}
describe("createGatewayPluginRequestHandler", () => {
afterEach(() => {
releasePinnedPluginHttpRouteRegistry();
@@ -144,101 +210,25 @@ describe("createGatewayPluginRequestHandler", () => {
});
it("caps unauthenticated plugin routes to non-admin subagent scopes", async () => {
loadOpenClawPlugins.mockReset();
handleGatewayRequest.mockReset();
handleGatewayRequest.mockImplementation(async (opts: HandleGatewayRequestOptions) => {
const scopes = opts.client?.connect.scopes ?? [];
if (opts.req.method === "sessions.delete" && !scopes.includes("operator.admin")) {
opts.respond(false, undefined, {
code: "invalid_request",
message: "missing scope: operator.admin",
});
return;
}
opts.respond(true, {});
});
const subagent = await createSubagentRuntime();
const log = createPluginLog();
const handler = createGatewayPluginRequestHandler({
registry: createTestRegistry({
httpRoutes: [
createRoute({
path: "/hook",
auth: "plugin",
handler: async (_req, _res) => {
await subagent.deleteSession({ sessionKey: "agent:main:subagent:child" });
return true;
},
}),
],
}),
log,
});
const { res, setHeader, end } = makeMockHttpResponse();
const handled = await handler({ url: "/hook" } as IncomingMessage, res, undefined, {
const { handled, res, setHeader, end, log } = await invokeLeastPrivilegeDeleteRoute({
path: "/hook",
auth: "plugin",
gatewayAuthSatisfied: false,
});
expect(handled).toBe(true);
expect(handleGatewayRequest).toHaveBeenCalledTimes(1);
expect(handleGatewayRequest.mock.calls[0]?.[0]?.client?.connect.scopes).toEqual([
"operator.write",
]);
expect(res.statusCode).toBe(500);
expect(setHeader).toHaveBeenCalledWith("Content-Type", "text/plain; charset=utf-8");
expect(end).toHaveBeenCalledWith("Internal Server Error");
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("missing scope: operator.admin"));
expectLeastPrivilegeDeleteRouteFailure({ handled, setHeader, end, log });
});
it("keeps gateway-authenticated plugin routes on least-privilege runtime scopes", async () => {
loadOpenClawPlugins.mockReset();
handleGatewayRequest.mockReset();
handleGatewayRequest.mockImplementation(async (opts: HandleGatewayRequestOptions) => {
const scopes = opts.client?.connect.scopes ?? [];
if (opts.req.method === "sessions.delete" && !scopes.includes("operator.admin")) {
opts.respond(false, undefined, {
code: "invalid_request",
message: "missing scope: operator.admin",
});
return;
}
opts.respond(true, {});
});
const subagent = await createSubagentRuntime();
const log = createPluginLog();
const handler = createGatewayPluginRequestHandler({
registry: createTestRegistry({
httpRoutes: [
createRoute({
path: "/secure-hook",
auth: "gateway",
handler: async (_req, _res) => {
await subagent.deleteSession({ sessionKey: "agent:main:subagent:child" });
return true;
},
}),
],
}),
log,
});
const { res, setHeader, end } = makeMockHttpResponse();
const handled = await handler({ url: "/secure-hook" } as IncomingMessage, res, undefined, {
const { handled, res, setHeader, end, log } = await invokeLeastPrivilegeDeleteRoute({
path: "/secure-hook",
auth: "gateway",
gatewayAuthSatisfied: true,
});
expect(handled).toBe(true);
expect(handleGatewayRequest).toHaveBeenCalledTimes(1);
expect(handleGatewayRequest.mock.calls[0]?.[0]?.client?.connect.scopes).toEqual([
"operator.write",
]);
expect(res.statusCode).toBe(500);
expect(setHeader).toHaveBeenCalledWith("Content-Type", "text/plain; charset=utf-8");
expect(end).toHaveBeenCalledWith("Internal Server Error");
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("missing scope: operator.admin"));
expectLeastPrivilegeDeleteRouteFailure({ handled, setHeader, end, log });
});
it("returns false when no routes are registered", async () => {

View File

@@ -32,6 +32,33 @@ async function createSessionStoreFile(): Promise<string> {
return storePath;
}
async function withOperatorSessionSubscriber<T>(
harness: Awaited<ReturnType<typeof createGatewaySuiteHarness>>,
run: (ws: Awaited<ReturnType<typeof harness.openWs>>) => Promise<T>,
) {
const ws = await harness.openWs();
try {
await connectOk(ws, { scopes: ["operator.read"] });
await rpcReq(ws, "sessions.subscribe");
return await run(ws);
} finally {
ws.close();
}
}
function waitForSessionMessageEvent(
ws: Awaited<ReturnType<Awaited<ReturnType<typeof createGatewaySuiteHarness>>["openWs"]>>,
sessionKey: string,
) {
return onceMessage(
ws,
(message) =>
message.type === "event" &&
message.event === "session.message" &&
(message.payload as { sessionKey?: string } | undefined)?.sessionKey === sessionKey,
);
}
async function expectNoMessageWithin(params: {
action?: () => Promise<void> | void;
watch: () => Promise<unknown>;
@@ -220,19 +247,8 @@ describe("session.message websocket events", () => {
const harness = await createGatewaySuiteHarness();
try {
const ws = await harness.openWs();
try {
await connectOk(ws, { scopes: ["operator.read"] });
await rpcReq(ws, "sessions.subscribe");
const messageEventPromise = onceMessage(
ws,
(message) =>
message.type === "event" &&
message.event === "session.message" &&
(message.payload as { sessionKey?: string } | undefined)?.sessionKey ===
"agent:main:main",
);
await withOperatorSessionSubscriber(harness, async (ws) => {
const messageEventPromise = waitForSessionMessageEvent(ws, "agent:main:main");
const changedEventPromise = onceMessage(
ws,
(message) =>
@@ -278,9 +294,7 @@ describe("session.message websocket events", () => {
modelProvider: "openai",
model: "gpt-5.4",
});
} finally {
ws.close();
}
});
} finally {
await harness.close();
}
@@ -314,14 +328,7 @@ describe("session.message websocket events", () => {
expect(subscribeRes.payload?.subscribed).toBe(true);
expect(subscribeRes.payload?.key).toBe("agent:main:main");
const mainEvent = onceMessage(
ws,
(message) =>
message.type === "event" &&
message.event === "session.message" &&
(message.payload as { sessionKey?: string } | undefined)?.sessionKey ===
"agent:main:main",
);
const mainEvent = waitForSessionMessageEvent(ws, "agent:main:main");
const [mainAppend] = await Promise.all([
appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",
@@ -423,19 +430,8 @@ describe("session.message websocket events", () => {
const harness = await createGatewaySuiteHarness();
try {
const ws = await harness.openWs();
try {
await connectOk(ws, { scopes: ["operator.read"] });
await rpcReq(ws, "sessions.subscribe");
const messageEventPromise = onceMessage(
ws,
(message) =>
message.type === "event" &&
message.event === "session.message" &&
(message.payload as { sessionKey?: string } | undefined)?.sessionKey ===
"agent:main:newer",
);
await withOperatorSessionSubscriber(harness, async (ws) => {
const messageEventPromise = waitForSessionMessageEvent(ws, "agent:main:newer");
emitSessionTranscriptUpdate({
sessionFile: transcriptPath,
@@ -453,9 +449,7 @@ describe("session.message websocket events", () => {
messageId: "msg-shared",
messageSeq: 1,
});
} finally {
ws.close();
}
});
} finally {
await harness.close();
}

View File

@@ -54,6 +54,53 @@ async function seedSession(params?: { text?: string }) {
return { storePath };
}
async function fetchSessionHistory(
port: number,
sessionKey: string,
params?: {
query?: string;
headers?: HeadersInit;
},
) {
const headers = new Headers(AUTH_HEADER);
for (const [key, value] of new Headers(READ_SCOPE_HEADER).entries()) {
headers.set(key, value);
}
for (const [key, value] of new Headers(params?.headers).entries()) {
headers.set(key, value);
}
return fetch(
`http://127.0.0.1:${port}/sessions/${encodeURIComponent(sessionKey)}/history${params?.query ?? ""}`,
{
headers,
},
);
}
async function withGatewayHarness<T>(
run: (harness: Awaited<ReturnType<typeof createGatewaySuiteHarness>>) => Promise<T>,
) {
const harness = await createGatewaySuiteHarness();
try {
return await run(harness);
} finally {
await harness.close();
}
}
async function expectSessionHistoryText(params: { sessionKey: string; expectedText: string }) {
await withGatewayHarness(async (harness) => {
const res = await fetchSessionHistory(harness.port, params.sessionKey);
expect(res.status).toBe(200);
const body = (await res.json()) as {
sessionKey?: string;
messages?: Array<{ content?: Array<{ text?: string }> }>;
};
expect(body.sessionKey).toBe(params.sessionKey);
expect(body.messages?.[0]?.content?.[0]?.text).toBe(params.expectedText);
});
}
async function readSseEvent(
reader: ReadableStreamDefaultReader<Uint8Array>,
state: { buffer: string },
@@ -90,16 +137,8 @@ async function readSseEvent(
describe("session history HTTP endpoints", () => {
test("returns session history over direct REST", async () => {
await seedSession({ text: "hello from history" });
const harness = await createGatewaySuiteHarness();
try {
const res = await fetch(
`http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history`,
{
headers: { ...AUTH_HEADER, ...READ_SCOPE_HEADER },
},
);
await withGatewayHarness(async (harness) => {
const res = await fetchSessionHistory(harness.port, "agent:main:main");
expect(res.status).toBe(200);
const body = (await res.json()) as {
sessionKey?: string;
@@ -117,23 +156,13 @@ describe("session history HTTP endpoints", () => {
).toMatchObject({
seq: 1,
});
} finally {
await harness.close();
}
});
});
test("returns 404 for unknown sessions", async () => {
await createSessionStoreFile();
const harness = await createGatewaySuiteHarness();
try {
const res = await fetch(
`http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:missing")}/history`,
{
headers: { ...AUTH_HEADER, ...READ_SCOPE_HEADER },
},
);
await withGatewayHarness(async (harness) => {
const res = await fetchSessionHistory(harness.port, "agent:main:missing");
expect(res.status).toBe(404);
await expect(res.json()).resolves.toMatchObject({
ok: false,
@@ -142,9 +171,7 @@ describe("session history HTTP endpoints", () => {
message: "Session not found: agent:main:missing",
},
});
} finally {
await harness.close();
}
});
});
test("prefers the freshest duplicate row for direct history reads", async () => {
@@ -193,25 +220,10 @@ describe("session history HTTP endpoints", () => {
"utf-8",
);
const harness = await createGatewaySuiteHarness();
try {
const res = await fetch(
`http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history`,
{
headers: { ...AUTH_HEADER, ...READ_SCOPE_HEADER },
},
);
expect(res.status).toBe(200);
const body = (await res.json()) as {
sessionKey?: string;
messages?: Array<{ content?: Array<{ text?: string }> }>;
};
expect(body.sessionKey).toBe("agent:main:main");
expect(body.messages?.[0]?.content?.[0]?.text).toBe("fresh history");
} finally {
await harness.close();
}
await expectSessionHistoryText({
sessionKey: "agent:main:main",
expectedText: "fresh history",
});
});
test("supports cursor pagination over direct REST while preserving the messages field", async () => {
@@ -229,14 +241,10 @@ describe("session history HTTP endpoints", () => {
});
expect(third.ok).toBe(true);
const harness = await createGatewaySuiteHarness();
try {
const firstPage = await fetch(
`http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=2`,
{
headers: { ...AUTH_HEADER, ...READ_SCOPE_HEADER },
},
);
await withGatewayHarness(async (harness) => {
const firstPage = await fetchSessionHistory(harness.port, "agent:main:main", {
query: "?limit=2",
});
expect(firstPage.status).toBe(200);
const firstBody = (await firstPage.json()) as {
sessionKey?: string;
@@ -254,12 +262,9 @@ describe("session history HTTP endpoints", () => {
expect(firstBody.hasMore).toBe(true);
expect(firstBody.nextCursor).toBe("2");
const secondPage = await fetch(
`http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=2&cursor=${encodeURIComponent(firstBody.nextCursor ?? "")}`,
{
headers: { ...AUTH_HEADER, ...READ_SCOPE_HEADER },
},
);
const secondPage = await fetchSessionHistory(harness.port, "agent:main:main", {
query: `?limit=2&cursor=${encodeURIComponent(firstBody.nextCursor ?? "")}`,
});
expect(secondPage.status).toBe(200);
const secondBody = (await secondPage.json()) as {
items?: Array<{ content?: Array<{ text?: string }>; __openclaw?: { seq?: number } }>;
@@ -273,9 +278,7 @@ describe("session history HTTP endpoints", () => {
expect(secondBody.messages?.map((message) => message.__openclaw?.seq)).toEqual([1]);
expect(secondBody.hasMore).toBe(false);
expect(secondBody.nextCursor).toBeUndefined();
} finally {
await harness.close();
}
});
});
test("streams bounded history windows over SSE", async () => {
@@ -287,18 +290,11 @@ describe("session history HTTP endpoints", () => {
});
expect(second.ok).toBe(true);
const harness = await createGatewaySuiteHarness();
try {
const res = await fetch(
`http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=1`,
{
headers: {
...AUTH_HEADER,
...READ_SCOPE_HEADER,
Accept: "text/event-stream",
},
},
);
await withGatewayHarness(async (harness) => {
const res = await fetchSessionHistory(harness.port, "agent:main:main", {
query: "?limit=1",
headers: { Accept: "text/event-stream" },
});
expect(res.status).toBe(200);
const reader = res.body?.getReader();
@@ -326,26 +322,16 @@ describe("session history HTTP endpoints", () => {
).toBe("third message");
await reader?.cancel();
} finally {
await harness.close();
}
});
});
test("streams session history updates over SSE", async () => {
const { storePath } = await seedSession({ text: "first message" });
const harness = await createGatewaySuiteHarness();
try {
const res = await fetch(
`http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history`,
{
headers: {
...AUTH_HEADER,
...READ_SCOPE_HEADER,
Accept: "text/event-stream",
},
},
);
await withGatewayHarness(async (harness) => {
const res = await fetchSessionHistory(harness.port, "agent:main:main", {
headers: { Accept: "text/event-stream" },
});
expect(res.status).toBe(200);
expect(res.headers.get("content-type") ?? "").toContain("text/event-stream");
@@ -396,9 +382,7 @@ describe("session history HTTP endpoints", () => {
});
await reader?.cancel();
} finally {
await harness.close();
}
});
});
test("rejects session history when operator.read is not requested", async () => {