perf: speed up test parallelism

This commit is contained in:
Peter Steinberger
2026-03-26 20:08:49 +00:00
parent 2fc017788c
commit 663ba5a3cd
13 changed files with 1517 additions and 99 deletions

View File

@@ -55,10 +55,14 @@ Think of the suites as “increasing realism” (and increasing flakiness/cost):
- Should be fast and stable
- Scheduler note:
- `pnpm test` now keeps a small checked-in behavioral manifest for true pool/isolation overrides and a separate timing snapshot for the slowest unit files.
- Extension-only local runs now also use a checked-in extensions timing snapshot so the shared extensions lane can split into a few measured batches instead of one oversized run.
- High-memory local channel runs now reuse the checked-in channel timing snapshot to split the shared channels lane into a few measured batches instead of one long shared worker.
- Shared unit, extension, channel, and gateway runs all stay on Vitest `forks`.
- The wrapper keeps measured fork-isolated exceptions and heavy singleton lanes explicit in `test/fixtures/test-parallel.behavior.json`.
- The wrapper peels the heaviest measured files into dedicated lanes instead of relying on a growing hand-maintained exclusion list.
- Refresh the timing snapshot with `pnpm test:perf:update-timings` after major suite shape changes.
- For surface-only local runs, unit, extension, and channel shared lanes can overlap their isolated hotspots instead of waiting behind one serial prefix.
- For multi-surface local runs, the wrapper keeps the shared surface phases ordered, but batches inside the same shared phase now fan out together, deferred isolated work can overlap the next shared phase, and spare `unit-fast` headroom now starts that deferred work earlier instead of leaving those slots idle.
- Refresh the timing snapshots with `pnpm test:perf:update-timings` and `pnpm test:perf:update-timings:extensions` after major suite shape changes.
- Embedded runner note:
- When you change message-tool discovery inputs or compaction runtime context,
keep both levels of coverage.

View File

@@ -750,6 +750,7 @@
"test:perf:profile:runner": "node scripts/run-vitest-profile.mjs runner",
"test:perf:update-memory-hotspots": "node scripts/test-update-memory-hotspots.mjs",
"test:perf:update-timings": "node scripts/test-update-timings.mjs",
"test:perf:update-timings:extensions": "node scripts/test-update-timings.mjs --config vitest.extensions.config.ts",
"test:sectriage": "pnpm exec vitest run --config vitest.gateway.config.ts && vitest run --config vitest.unit.config.ts --exclude src/daemon/launchd.integration.test.ts --exclude src/process/exec.test.ts",
"test:serial": "node scripts/test-parallel.mjs --profile serial",
"test:startup:memory": "node scripts/check-cli-startup-memory.mjs",

View File

@@ -132,6 +132,62 @@ export function formatExplanation(explanation) {
].join("\n");
}
const buildOrderedParallelSegments = (units) => {
const segments = [];
let deferredUnits = [];
for (const unit of units) {
if (unit.serialPhase) {
if (deferredUnits.length > 0) {
segments.push({ type: "deferred", units: deferredUnits });
deferredUnits = [];
}
const lastSegment = segments.at(-1);
if (lastSegment?.type === "serialPhase" && lastSegment.phase === unit.serialPhase) {
lastSegment.units.push(unit);
} else {
segments.push({ type: "serialPhase", phase: unit.serialPhase, units: [unit] });
}
continue;
}
deferredUnits.push(unit);
}
if (deferredUnits.length > 0) {
segments.push({ type: "deferred", units: deferredUnits });
}
return segments;
};
const prioritizeDeferredUnitsForPhase = (units, phase) => {
const preferredSurface =
phase === "extensions" || phase === "channels" ? phase : phase === "unit-fast" ? "unit" : null;
if (preferredSurface === null) {
return units;
}
const preferred = [];
const remaining = [];
for (const unit of units) {
if (unit.surface === preferredSurface) {
preferred.push(unit);
} else {
remaining.push(unit);
}
}
return preferred.length > 0 ? [...preferred, ...remaining] : units;
};
const partitionUnitsBySurface = (units, surface) => {
const matching = [];
const remaining = [];
for (const unit of units) {
if (unit.surface === surface) {
matching.push(unit);
} else {
remaining.push(unit);
}
}
return { matching, remaining };
};
export async function executePlan(plan, options = {}) {
const env = options.env ?? process.env;
const artifacts = options.artifacts ?? createExecutionArtifacts(env);
@@ -632,23 +688,116 @@ export async function executePlan(plan, options = {}) {
}
if (plan.serialPrefixUnits.length > 0) {
const failedSerialPrefix = await runUnitsWithLimit(
plan.serialPrefixUnits,
plan.passthroughOptionArgs,
1,
);
if (failedSerialPrefix !== undefined) {
return failedSerialPrefix;
const orderedSegments = buildOrderedParallelSegments(plan.parallelUnits);
let pendingDeferredSegment = null;
let carriedDeferredPromise = null;
let carriedDeferredSurface = null;
for (const segment of orderedSegments) {
if (segment.type === "deferred") {
pendingDeferredSegment = segment;
continue;
}
// Preserve phase ordering, but let batches inside the same shared phase use
// the normal top-level concurrency budget.
let deferredPromise = null;
let deferredCarryPromise = carriedDeferredPromise;
let deferredCarrySurface = carriedDeferredSurface;
if (
segment.phase === "unit-fast" &&
pendingDeferredSegment !== null &&
plan.topLevelParallelEnabled
) {
const availableSlots = Math.max(0, plan.topLevelParallelLimit - segment.units.length);
if (availableSlots > 0) {
const prePhaseDeferred = pendingDeferredSegment.units;
if (prePhaseDeferred.length > 0) {
deferredCarryPromise = runUnitsWithLimit(
prePhaseDeferred,
plan.passthroughOptionArgs,
availableSlots,
);
deferredCarrySurface = prePhaseDeferred.some((unit) => unit.surface === "channels")
? "channels"
: null;
pendingDeferredSegment = null;
}
}
}
if (pendingDeferredSegment !== null) {
const prioritizedDeferred = prioritizeDeferredUnitsForPhase(
pendingDeferredSegment.units,
segment.phase,
);
if (segment.phase === "extensions") {
const { matching: channelDeferred, remaining: otherDeferred } = partitionUnitsBySurface(
prioritizedDeferred,
"channels",
);
deferredPromise =
otherDeferred.length > 0
? runUnitsWithLimit(
otherDeferred,
plan.passthroughOptionArgs,
plan.deferredRunConcurrency ?? 1,
)
: null;
deferredCarryPromise =
channelDeferred.length > 0
? runUnitsWithLimit(
channelDeferred,
plan.passthroughOptionArgs,
plan.deferredRunConcurrency ?? 1,
)
: carriedDeferredPromise;
deferredCarrySurface = channelDeferred.length > 0 ? "channels" : carriedDeferredSurface;
} else {
deferredPromise = runUnitsWithLimit(
prioritizedDeferred,
plan.passthroughOptionArgs,
plan.deferredRunConcurrency ?? 1,
);
}
}
pendingDeferredSegment = null;
// eslint-disable-next-line no-await-in-loop
const failedSerialPhase = await runUnits(segment.units, plan.passthroughOptionArgs);
if (failedSerialPhase !== undefined) {
return failedSerialPhase;
}
if (deferredCarryPromise !== null && deferredCarrySurface === segment.phase) {
// eslint-disable-next-line no-await-in-loop
const failedCarriedDeferred = await deferredCarryPromise;
if (failedCarriedDeferred !== undefined) {
return failedCarriedDeferred;
}
deferredCarryPromise = null;
deferredCarrySurface = null;
}
if (deferredPromise !== null) {
// eslint-disable-next-line no-await-in-loop
const failedDeferredPhase = await deferredPromise;
if (failedDeferredPhase !== undefined) {
return failedDeferredPhase;
}
}
carriedDeferredPromise = deferredCarryPromise;
carriedDeferredSurface = deferredCarrySurface;
}
const failedDeferredParallel = plan.deferredRunConcurrency
? await runUnitsWithLimit(
plan.deferredParallelUnits,
plan.passthroughOptionArgs,
plan.deferredRunConcurrency,
)
: await runUnits(plan.deferredParallelUnits, plan.passthroughOptionArgs);
if (failedDeferredParallel !== undefined) {
return failedDeferredParallel;
if (pendingDeferredSegment !== null) {
const failedDeferredParallel = await runUnitsWithLimit(
pendingDeferredSegment.units,
plan.passthroughOptionArgs,
plan.deferredRunConcurrency ?? 1,
);
if (failedDeferredParallel !== undefined) {
return failedDeferredParallel;
}
}
if (carriedDeferredPromise !== null) {
const failedCarriedDeferred = await carriedDeferredPromise;
if (failedCarriedDeferred !== undefined) {
return failedCarriedDeferred;
}
}
} else {
const failedParallel = await runUnits(plan.parallelUnits, plan.passthroughOptionArgs);

View File

@@ -2,6 +2,7 @@ import path from "node:path";
import { isUnitConfigTestFile } from "../../vitest.unit-paths.mjs";
import {
loadChannelTimingManifest,
loadExtensionTimingManifest,
loadUnitMemoryHotspotManifest,
loadUnitTimingManifest,
packFilesByDuration,
@@ -145,6 +146,7 @@ const createPlannerContext = (request, options = {}) => {
const catalog = options.catalog ?? loadTestCatalog();
const unitTimingManifest = loadUnitTimingManifest();
const channelTimingManifest = loadChannelTimingManifest();
const extensionTimingManifest = loadExtensionTimingManifest();
const unitMemoryHotspotManifest = loadUnitMemoryHotspotManifest();
return {
env,
@@ -153,6 +155,7 @@ const createPlannerContext = (request, options = {}) => {
catalog,
unitTimingManifest,
channelTimingManifest,
extensionTimingManifest,
unitMemoryHotspotManifest,
};
};
@@ -198,11 +201,16 @@ const resolveEntryTimingEstimator = (entry, context) => {
context.unitTimingManifest.files[file]?.durationMs ??
context.unitTimingManifest.defaultDurationMs;
}
if (config === "vitest.channels.config.ts" || config === "vitest.extensions.config.ts") {
if (config === "vitest.channels.config.ts") {
return (file) =>
context.channelTimingManifest.files[file]?.durationMs ??
context.channelTimingManifest.defaultDurationMs;
}
if (config === "vitest.extensions.config.ts") {
return (file) =>
context.extensionTimingManifest.files[file]?.durationMs ??
context.extensionTimingManifest.defaultDurationMs;
}
return null;
};
@@ -233,6 +241,20 @@ const splitFilesByDurationBudget = (files, targetDurationMs, estimateDurationMs)
return batches;
};
const splitFilesByBalancedDurationBudget = (files, targetDurationMs, estimateDurationMs) => {
if (!Number.isFinite(targetDurationMs) || targetDurationMs <= 0 || files.length <= 1) {
return [files];
}
const totalDurationMs = files.reduce((sum, file) => sum + estimateDurationMs(file), 0);
const batchCount = clamp(Math.ceil(totalDurationMs / targetDurationMs), 1, files.length);
const originalOrder = new Map(files.map((file, index) => [file, index]));
return packFilesByDuration(files, batchCount, estimateDurationMs).map((batch) =>
[...batch].toSorted(
(left, right) => (originalOrder.get(left) ?? 0) - (originalOrder.get(right) ?? 0),
),
);
};
const resolveMaxWorkersForUnit = (unit, context) => {
const overrideWorkers = Number.parseInt(context.env.OPENCLAW_TEST_WORKERS ?? "", 10);
const resolvedOverride =
@@ -332,11 +354,20 @@ const resolveUnitHeavyFileGroups = (context) => {
};
const buildDefaultUnits = (context, request) => {
const { env, executionBudget, catalog, unitTimingManifest, channelTimingManifest } = context;
const {
env,
executionBudget,
catalog,
unitTimingManifest,
channelTimingManifest,
extensionTimingManifest,
} = context;
const noIsolateArgs = context.noIsolateArgs;
const selectedSurfaces = buildRequestedSurfaces(request, env);
const selectedSurfaceSet = new Set(selectedSurfaces);
const unitOnlyRun = selectedSurfaceSet.size === 1 && selectedSurfaceSet.has("unit");
const channelsOnlyRun = selectedSurfaceSet.size === 1 && selectedSurfaceSet.has("channels");
const extensionsOnlyRun = selectedSurfaceSet.size === 1 && selectedSurfaceSet.has("extensions");
const {
heavyUnitLaneCount,
@@ -361,6 +392,8 @@ const buildDefaultUnits = (context, request) => {
unitTimingManifest.files[file]?.durationMs ?? unitTimingManifest.defaultDurationMs;
const estimateChannelDurationMs = (file) =>
channelTimingManifest.files[file]?.durationMs ?? channelTimingManifest.defaultDurationMs;
const estimateExtensionDurationMs = (file) =>
extensionTimingManifest.files[file]?.durationMs ?? extensionTimingManifest.defaultDurationMs;
const unitFastCandidateFiles = catalog.allKnownUnitFiles.filter(
(file) => !new Set(unitFastExcludedFiles).has(file),
);
@@ -421,7 +454,7 @@ const buildDefaultUnits = (context, request) => {
id: unitId,
surface: "unit",
isolate: false,
serialPhase: "unit-fast",
serialPhase: unitOnlyRun ? undefined : "unit-fast",
includeFiles: batch,
estimatedDurationMs: estimateEntryFilesDurationMs(
{ args: ["vitest", "run", "--config", "vitest.unit.config.ts"] },
@@ -453,6 +486,7 @@ const buildDefaultUnits = (context, request) => {
id: `unit-${path.basename(file, ".test.ts")}-isolated`,
surface: "unit",
isolate: true,
estimatedDurationMs: estimateUnitDurationMs(file),
args: [
"vitest",
"run",
@@ -478,6 +512,7 @@ const buildDefaultUnits = (context, request) => {
id: `unit-heavy-${String(index + 1)}`,
surface: "unit",
isolate: false,
estimatedDurationMs: files.reduce((sum, file) => sum + estimateUnitDurationMs(file), 0),
args: [
"vitest",
"run",
@@ -498,6 +533,7 @@ const buildDefaultUnits = (context, request) => {
id: `unit-${path.basename(file, ".test.ts")}-memory-isolated`,
surface: "unit",
isolate: true,
estimatedDurationMs: estimateUnitDurationMs(file),
args: [
"vitest",
"run",
@@ -533,6 +569,29 @@ const buildDefaultUnits = (context, request) => {
}
}
if (selectedSurfaceSet.has("channels")) {
for (const file of catalog.channelIsolatedFiles) {
units.push(
createExecutionUnit(context, {
id: `${path.basename(file, ".test.ts")}-channels-isolated`,
surface: "channels",
isolate: true,
estimatedDurationMs: estimateChannelDurationMs(file),
args: [
"vitest",
"run",
"--config",
"vitest.channels.config.ts",
"--pool=forks",
...noIsolateArgs,
file,
],
reasons: ["channels-isolated-rule"],
}),
);
}
}
if (selectedSurfaceSet.has("extensions")) {
for (const file of catalog.extensionForkIsolatedFiles) {
units.push(
@@ -540,15 +599,16 @@ const buildDefaultUnits = (context, request) => {
id: `extensions-${path.basename(file, ".test.ts")}-isolated`,
surface: "extensions",
isolate: true,
estimatedDurationMs: estimateExtensionDurationMs(file),
args: ["vitest", "run", "--config", "vitest.extensions.config.ts", "--pool=forks", file],
reasons: ["extensions-isolated-manifest"],
}),
);
}
const extensionBatches = splitFilesByDurationBudget(
const extensionBatches = splitFilesByBalancedDurationBudget(
extensionSharedCandidateFiles,
extensionsBatchTargetMs,
estimateChannelDurationMs,
estimateExtensionDurationMs,
);
for (const [batchIndex, batch] of extensionBatches.entries()) {
if (batch.length === 0) {
@@ -561,7 +621,7 @@ const buildDefaultUnits = (context, request) => {
id: unitId,
surface: "extensions",
isolate: false,
serialPhase: "extensions",
serialPhase: extensionsOnlyRun ? undefined : "extensions",
includeFiles: batch,
estimatedDurationMs: estimateEntryFilesDurationMs(
{ args: ["vitest", "run", "--config", "vitest.extensions.config.ts"] },
@@ -581,25 +641,6 @@ const buildDefaultUnits = (context, request) => {
}
if (selectedSurfaceSet.has("channels")) {
for (const file of catalog.channelIsolatedFiles) {
units.push(
createExecutionUnit(context, {
id: `${path.basename(file, ".test.ts")}-channels-isolated`,
surface: "channels",
isolate: true,
args: [
"vitest",
"run",
"--config",
"vitest.channels.config.ts",
"--pool=forks",
...noIsolateArgs,
file,
],
reasons: ["channels-isolated-rule"],
}),
);
}
const channelBatches = splitFilesByDurationBudget(
channelSharedCandidateFiles,
channelsBatchTargetMs,

View File

@@ -102,6 +102,7 @@ const LOCAL_MEMORY_BUDGETS = {
heavyLaneCount: 3,
memoryHeavyFileLimit: 8,
unitFastBatchTargetMs: 10_000,
channelsBatchTargetMs: 0,
},
moderate: {
vitestCap: 3,
@@ -117,6 +118,7 @@ const LOCAL_MEMORY_BUDGETS = {
heavyLaneCount: 4,
memoryHeavyFileLimit: 12,
unitFastBatchTargetMs: 15_000,
channelsBatchTargetMs: 0,
},
mid: {
vitestCap: 4,
@@ -132,6 +134,7 @@ const LOCAL_MEMORY_BUDGETS = {
heavyLaneCount: 4,
memoryHeavyFileLimit: 16,
unitFastBatchTargetMs: 0,
channelsBatchTargetMs: 0,
},
high: {
vitestCap: 6,
@@ -140,13 +143,14 @@ const LOCAL_MEMORY_BUDGETS = {
unitHeavy: 2,
extensions: 4,
gateway: 3,
topLevelNoIsolate: 12,
topLevelNoIsolate: 14,
topLevelIsolated: 4,
deferred: 3,
deferred: 8,
heavyFileLimit: 80,
heavyLaneCount: 5,
memoryHeavyFileLimit: 16,
unitFastBatchTargetMs: 45_000,
channelsBatchTargetMs: 30_000,
},
};
@@ -296,8 +300,8 @@ export function resolveExecutionBudget(runtimeCapabilities) {
memoryHeavyUnitFileLimit: bandBudget.memoryHeavyFileLimit,
unitFastLaneCount: 1,
unitFastBatchTargetMs: bandBudget.unitFastBatchTargetMs,
channelsBatchTargetMs: 0,
extensionsBatchTargetMs: 0,
channelsBatchTargetMs: bandBudget.channelsBatchTargetMs ?? 0,
extensionsBatchTargetMs: 240_000,
};
const loadAdjustedBudget = {

View File

@@ -3,6 +3,7 @@ import { normalizeTrackedRepoPath, tryReadJsonFile } from "./test-report-utils.m
export const behaviorManifestPath = "test/fixtures/test-parallel.behavior.json";
export const unitTimingManifestPath = "test/fixtures/test-timings.unit.json";
export const channelTimingManifestPath = "test/fixtures/test-timings.channels.json";
export const extensionTimingManifestPath = "test/fixtures/test-timings.extensions.json";
export const unitMemoryHotspotManifestPath = "test/fixtures/test-memory-hotspots.unit.json";
const defaultTimingManifest = {
@@ -15,6 +16,11 @@ const defaultChannelTimingManifest = {
defaultDurationMs: 3000,
files: {},
};
const defaultExtensionTimingManifest = {
config: "vitest.extensions.config.ts",
defaultDurationMs: 1000,
files: {},
};
const defaultMemoryHotspotManifest = {
config: "vitest.unit.config.ts",
defaultMinDeltaKb: 256 * 1024,
@@ -137,6 +143,10 @@ export function loadChannelTimingManifest() {
return loadTimingManifest(channelTimingManifestPath, defaultChannelTimingManifest);
}
export function loadExtensionTimingManifest() {
return loadTimingManifest(extensionTimingManifestPath, defaultExtensionTimingManifest);
}
export function loadUnitMemoryHotspotManifest() {
const raw = tryReadJsonFile(unitMemoryHotspotManifestPath, defaultMemoryHotspotManifest);
const defaultMinDeltaKb =

View File

@@ -5,26 +5,42 @@ import {
normalizeTrackedRepoPath,
writeJsonFile,
} from "./test-report-utils.mjs";
import { unitTimingManifestPath } from "./test-runner-manifest.mjs";
import { extensionTimingManifestPath, unitTimingManifestPath } from "./test-runner-manifest.mjs";
const resolveDefaultManifestSettings = (config) => {
if (config === "vitest.extensions.config.ts") {
return {
out: extensionTimingManifestPath,
defaultDurationMs: 1000,
description: "extension",
};
}
return {
out: unitTimingManifestPath,
defaultDurationMs: 250,
description: "unit",
};
};
if (process.argv.slice(2).includes("--help")) {
console.log(
[
"Usage: node scripts/test-update-timings.mjs [options]",
"",
"Generate or refresh the unit test timing manifest from a Vitest JSON report.",
"Generate or refresh a test timing manifest from a Vitest JSON report.",
"",
"Options:",
" --config <path> Vitest config to run when no report is supplied",
" --report <path> Reuse an existing Vitest JSON report",
" --out <path> Output manifest path (default: test/fixtures/test-timings.unit.json)",
" --out <path> Output manifest path (default follows --config)",
" --limit <count> Max number of file timings to retain (default: 256)",
" --default-duration-ms <ms> Fallback duration for unknown files (default: 250)",
" --default-duration-ms <ms> Fallback duration for unknown files (default follows --config)",
" --help Show this help text",
"",
"Examples:",
" node scripts/test-update-timings.mjs",
" node scripts/test-update-timings.mjs --config vitest.unit.config.ts --limit 128",
" node scripts/test-update-timings.mjs --config vitest.extensions.config.ts",
" node scripts/test-update-timings.mjs --report /tmp/vitest-report.json --out /tmp/timings.json",
].join("\n"),
);
@@ -32,14 +48,14 @@ if (process.argv.slice(2).includes("--help")) {
}
function parseArgs(argv) {
return parseFlagArgs(
const parsed = parseFlagArgs(
argv,
{
config: "vitest.unit.config.ts",
limit: 256,
reportPath: "",
out: unitTimingManifestPath,
defaultDurationMs: 250,
out: "",
defaultDurationMs: 0,
},
[
stringFlag("--config", "config"),
@@ -49,6 +65,16 @@ function parseArgs(argv) {
intFlag("--default-duration-ms", "defaultDurationMs", { min: 1 }),
],
);
const defaults = resolveDefaultManifestSettings(parsed.config);
return {
...parsed,
out: parsed.out || defaults.out,
defaultDurationMs:
Number.isFinite(parsed.defaultDurationMs) && parsed.defaultDurationMs > 0
? parsed.defaultDurationMs
: defaults.defaultDurationMs,
description: defaults.description,
};
}
const opts = parseArgs(process.argv.slice(2));
@@ -75,5 +101,5 @@ const output = {
writeJsonFile(opts.out, output);
console.log(
`[test-update-timings] wrote ${String(Object.keys(files).length)} timings to ${opts.out}`,
`[test-update-timings] wrote ${String(Object.keys(files).length)} ${opts.description} timings to ${opts.out}`,
);

View File

@@ -159,6 +159,26 @@ export function clearSessionStoreCacheForTest(): void {
LOCK_QUEUES.clear();
}
export async function drainSessionStoreLockQueuesForTest(): Promise<void> {
while (LOCK_QUEUES.size > 0) {
const queues = [...LOCK_QUEUES.values()];
for (const queue of queues) {
for (const task of queue.pending) {
task.reject(new Error("session store queue cleared for test"));
}
queue.pending.length = 0;
}
const activeDrains = queues.flatMap((queue) =>
queue.drainPromise ? [queue.drainPromise] : [],
);
if (activeDrains.length === 0) {
LOCK_QUEUES.clear();
return;
}
await Promise.allSettled(activeDrains);
}
}
/** Expose lock queue size for tests. */
export function getSessionStoreLockQueueSizeForTest(): number {
return LOCK_QUEUES.size;
@@ -602,6 +622,7 @@ type SessionStoreLockTask = {
type SessionStoreLockQueue = {
running: boolean;
pending: SessionStoreLockTask[];
drainPromise: Promise<void> | null;
};
const LOCK_QUEUES = new Map<string, SessionStoreLockQueue>();
@@ -686,63 +707,71 @@ function getOrCreateLockQueue(storePath: string): SessionStoreLockQueue {
if (existing) {
return existing;
}
const created: SessionStoreLockQueue = { running: false, pending: [] };
const created: SessionStoreLockQueue = { running: false, pending: [], drainPromise: null };
LOCK_QUEUES.set(storePath, created);
return created;
}
async function drainSessionStoreLockQueue(storePath: string): Promise<void> {
const queue = LOCK_QUEUES.get(storePath);
if (!queue || queue.running) {
if (!queue) {
return;
}
if (queue.drainPromise) {
await queue.drainPromise;
return;
}
queue.running = true;
try {
while (queue.pending.length > 0) {
const task = queue.pending.shift();
if (!task) {
continue;
}
queue.drainPromise = (async () => {
try {
while (queue.pending.length > 0) {
const task = queue.pending.shift();
if (!task) {
continue;
}
const remainingTimeoutMs = task.timeoutMs ?? Number.POSITIVE_INFINITY;
if (task.timeoutMs != null && remainingTimeoutMs <= 0) {
task.reject(lockTimeoutError(storePath));
continue;
}
const remainingTimeoutMs = task.timeoutMs ?? Number.POSITIVE_INFINITY;
if (task.timeoutMs != null && remainingTimeoutMs <= 0) {
task.reject(lockTimeoutError(storePath));
continue;
}
let lock: { release: () => Promise<void> } | undefined;
let result: unknown;
let failed: unknown;
let hasFailure = false;
try {
lock = await acquireSessionWriteLock({
sessionFile: storePath,
timeoutMs: remainingTimeoutMs,
staleMs: task.staleMs,
let lock: { release: () => Promise<void> } | undefined;
let result: unknown;
let failed: unknown;
let hasFailure = false;
try {
lock = await acquireSessionWriteLock({
sessionFile: storePath,
timeoutMs: remainingTimeoutMs,
staleMs: task.staleMs,
});
result = await task.fn();
} catch (err) {
hasFailure = true;
failed = err;
} finally {
await lock?.release().catch(() => undefined);
}
if (hasFailure) {
task.reject(failed);
continue;
}
task.resolve(result);
}
} finally {
queue.running = false;
queue.drainPromise = null;
if (queue.pending.length === 0) {
LOCK_QUEUES.delete(storePath);
} else {
queueMicrotask(() => {
void drainSessionStoreLockQueue(storePath);
});
result = await task.fn();
} catch (err) {
hasFailure = true;
failed = err;
} finally {
await lock?.release().catch(() => undefined);
}
if (hasFailure) {
task.reject(failed);
continue;
}
task.resolve(result);
}
} finally {
queue.running = false;
if (queue.pending.length === 0) {
LOCK_QUEUES.delete(storePath);
} else {
queueMicrotask(() => {
void drainSessionStoreLockQueue(storePath);
});
}
}
})();
await queue.drainPromise;
}
async function withSessionStoreLock<T>(

View File

@@ -0,0 +1,58 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import {
getSessionStoreLockQueueSizeForTest,
withSessionStoreLockForTest,
} from "../config/sessions/store.js";
import { cleanupSessionStateForTest } from "./session-state-cleanup.js";
function createDeferred<T>() {
let resolve!: (value: T | PromiseLike<T>) => void;
let reject!: (reason?: unknown) => void;
const promise = new Promise<T>((nextResolve, nextReject) => {
resolve = nextResolve;
reject = nextReject;
});
return { promise, resolve, reject };
}
describe("cleanupSessionStateForTest", () => {
afterEach(async () => {
await cleanupSessionStateForTest();
});
it("waits for in-flight session store locks before clearing test state", async () => {
const fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-session-cleanup-"));
const storePath = path.join(fixtureRoot, "openclaw-sessions.json");
const started = createDeferred<void>();
const release = createDeferred<void>();
try {
const running = withSessionStoreLockForTest(storePath, async () => {
started.resolve();
await release.promise;
});
await started.promise;
expect(getSessionStoreLockQueueSizeForTest()).toBe(1);
let settled = false;
const cleanupPromise = cleanupSessionStateForTest().then(() => {
settled = true;
});
await new Promise((resolve) => setTimeout(resolve, 25));
expect(settled).toBe(false);
release.resolve();
await running;
await cleanupPromise;
expect(getSessionStoreLockQueueSizeForTest()).toBe(0);
} finally {
release.resolve();
await fs.rm(fixtureRoot, { recursive: true, force: true });
}
});
});

View File

@@ -1,8 +1,12 @@
import { drainSessionWriteLockStateForTest } from "../agents/session-write-lock.js";
import { clearSessionStoreCacheForTest } from "../config/sessions/store.js";
import {
clearSessionStoreCacheForTest,
drainSessionStoreLockQueuesForTest,
} from "../config/sessions/store.js";
import { drainFileLockStateForTest } from "../infra/file-lock.js";
export async function cleanupSessionStateForTest(): Promise<void> {
await drainSessionStoreLockQueuesForTest();
clearSessionStoreCacheForTest();
await drainFileLockStateForTest();
await drainSessionWriteLockStateForTest();

1031
test/fixtures/test-timings.extensions.json vendored Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -200,7 +200,46 @@ describe("scripts/test-parallel lane planning", () => {
expect(output).toContain("mode=local intent=normal memoryBand=mid");
expect(output).toContain("unit-fast filters=all maxWorkers=");
expect(output).toContain("extensions filters=all maxWorkers=");
expect(output).toMatch(/extensions(?:-batch-1)? filters=all maxWorkers=/);
});
it("starts isolated channel lanes before shared extension batches on high-memory local hosts", () => {
const repoRoot = path.resolve(import.meta.dirname, "../..");
const output = execFileSync(
"node",
[
"scripts/test-parallel.mjs",
"--plan",
"--surface",
"unit",
"--surface",
"extensions",
"--surface",
"channels",
],
{
cwd: repoRoot,
env: {
...clearPlannerShardEnv(process.env),
CI: "",
GITHUB_ACTIONS: "",
RUNNER_OS: "macOS",
OPENCLAW_TEST_HOST_CPU_COUNT: "12",
OPENCLAW_TEST_HOST_MEMORY_GIB: "128",
OPENCLAW_TEST_LOAD_AWARE: "0",
},
encoding: "utf8",
},
);
const firstChannelIsolated = output.indexOf(
"message-handler.preflight.acp-bindings-channels-isolated",
);
const firstExtensionBatch = output.indexOf("extensions-batch-1");
const firstChannelBatch = output.indexOf("channels-batch-1");
expect(firstChannelIsolated).toBeGreaterThanOrEqual(0);
expect(firstExtensionBatch).toBeGreaterThan(firstChannelIsolated);
expect(firstChannelBatch).toBeGreaterThan(firstExtensionBatch);
});
it("explains targeted file ownership and execution policy", () => {

View File

@@ -153,4 +153,26 @@ describe("resolveLocalVitestMaxWorkers", () => {
expect(budget.vitestMaxWorkers).toBe(2);
expect(budget.topLevelParallelLimit).toBe(2);
});
it("enables shared channel batching on high-memory local hosts", () => {
const runtime = resolveRuntimeCapabilities(
{
RUNNER_OS: "macOS",
},
{
cpuCount: 16,
totalMemoryBytes: 128 * 1024 ** 3,
platform: "darwin",
mode: "local",
loadAverage: [0.2, 0.2, 0.2],
},
);
const budget = resolveExecutionBudget(runtime);
expect(runtime.memoryBand).toBe("high");
expect(runtime.loadBand).toBe("idle");
expect(budget.channelsBatchTargetMs).toBe(30_000);
expect(budget.deferredRunConcurrency).toBe(8);
expect(budget.topLevelParallelLimitNoIsolate).toBe(14);
});
});