feat(observability-otel,libs): include and propagate Traceparent header to tid (#11845)

This commit is contained in:
Neko
2026-01-26 14:54:19 +08:00
committed by GitHub
parent c54b09182f
commit 0d101dad72
9 changed files with 344 additions and 18 deletions

View File

@@ -1,2 +1,2 @@
export type { Attributes, Span } from '@opentelemetry/api';
export { diag, metrics, SpanKind, SpanStatusCode, trace } from '@opentelemetry/api';
export type { Attributes, Context, Span, TextMapGetter } from '@opentelemetry/api';
export { context, diag, metrics, propagation,SpanKind, SpanStatusCode, trace } from '@opentelemetry/api';

View File

@@ -5,12 +5,14 @@ import {
} from '@lobechat/model-runtime';
import { ChatErrorType, type ClientSecretPayload } from '@lobechat/types';
import { getXorPayload } from '@lobechat/utils/server';
import { context as otContext } from '@lobechat/observability-otel/api';
import { auth } from '@/auth';
import { getServerDB } from '@/database/core/db-adaptor';
import { type LobeChatDatabase } from '@/database/type';
import { LOBE_CHAT_AUTH_HEADER, LOBE_CHAT_OIDC_AUTH_HEADER, OAUTH_AUTHORIZED } from '@/envs/auth';
import { validateOIDCJWT } from '@/libs/oidc-provider/jwt';
import { extractTraceContext, injectActiveTraceHeaders } from '@/libs/observability/traceparent';
import { createErrorResponse } from '@/utils/errorResponse';
import { checkAuthMethod } from './utils';
@@ -114,5 +116,17 @@ export const checkAuth =
const userId = jwtPayload.userId || '';
return handler(clonedReq, { ...options, jwtPayload, serverDB, userId });
const extractedContext = extractTraceContext(req.headers);
const res = await otContext.with(extractedContext, () =>
handler(clonedReq, { ...options, jwtPayload, serverDB, userId }),
);
const headers = new Headers(res.headers);
const traceparent = injectActiveTraceHeaders(headers);
if (!traceparent) {
return res;
}
return new Response(res.body, { ...res, headers });
};

View File

@@ -1,5 +1,12 @@
export async function register() {
if (process.env.NEXT_RUNTIME === 'nodejs' && process.env.ENABLE_TELEMETRY) {
await import('./instrumentation.node');
if (process.env.NODE_ENV !== 'production' && !process.env.ENABLE_TELEMETRY_IN_DEV) {
return;
}
const shouldEnable = process.env.ENABLE_TELEMETRY && process.env.NEXT_RUNTIME === 'nodejs';
if (!shouldEnable) {
return;
}
await import('./instrumentation.node');
}

View File

@@ -0,0 +1,69 @@
import type { Mock } from 'vitest';
import { afterEach, describe, expect, it, vi } from 'vitest';
vi.mock('@lobechat/observability-otel/api', () => {
const inject = vi.fn();
const setSpan = vi.fn((_ctx, span) => span);
return {
context: {
active: vi.fn(() => ({})),
},
propagation: { inject },
trace: { setSpan },
};
});
// eslint-disable-next-line import/first
import { injectSpanTraceHeaders } from './traceparent';
const mockSpan = (traceId: string, spanId: string) =>
({
spanContext: () => ({
traceId,
spanId,
traceFlags: 1,
}),
}) as any;
const headersWith = (...args: ConstructorParameters<typeof Headers>) => new Headers(...args);
describe('injectSpanTraceHeaders', () => {
const api = vi.importMock<typeof import('@lobechat/observability-otel/api')>(
'@lobechat/observability-otel/api',
);
afterEach(() => {
vi.resetAllMocks();
});
it('uses propagator output when available', async () => {
const { propagation } = await api;
(propagation.inject as unknown as Mock<typeof propagation.inject<Record<string, string>>>).mockImplementation((_ctx, carrier) => {
carrier.traceparent = 'from-propagator';
carrier.tracestate = 'state';
});
const headers = headersWith();
const span = mockSpan('abc'.padEnd(32, '0'), '1234567890abcdef');
const tp = injectSpanTraceHeaders(headers, span);
expect(tp).toBe('from-propagator');
expect(headers.get('traceparent')).toBe('from-propagator');
expect(headers.get('tracestate')).toBe('state');
});
it('falls back to manual traceparent formatting when propagator gives none', async () => {
const { propagation } = await api;
(propagation.inject as unknown as Mock<typeof propagation.inject<Record<string, string>>>).mockImplementation(() => undefined);
const headers = headersWith();
const span = mockSpan('1'.repeat(32), '2'.repeat(16));
const tp = injectSpanTraceHeaders(headers, span);
expect(tp).toBe('00-11111111111111111111111111111111-2222222222222222-01');
expect(headers.get('traceparent')).toBe('00-11111111111111111111111111111111-2222222222222222-01');
});
});

View File

@@ -0,0 +1,110 @@
import type {
Span,
Context as OtContext,
TextMapGetter
} from '@lobechat/observability-otel/api';
import {
context as otContext,
propagation,
trace,
} from '@lobechat/observability-otel/api';
// NOTICE: do not try to optimize this into .repeat(...) or similar,
// here served for better search / semantic search purpose for further diagnostic
// with either Agents or Human users without needed to understand how
// many zeros are needed.
const ZERO_TRACE_ID = '00000000000000000000000000000000';
// NOTICE: do not try to optimize this into .repeat(...) or similar too.
const ZERO_SPAN_ID = '0000000000000000';
const formatTraceFlags = (flags?: number) => (flags ?? 0).toString(16).padStart(2, '0');
const isValidContext = (span?: Span) => {
if (!span) return false;
const context = span.spanContext();
return (
!!context.traceId &&
context.traceId !== ZERO_TRACE_ID &&
!!context.spanId &&
context.spanId !== ZERO_SPAN_ID
);
};
export const toTraceparent = (span: Span) => {
const { traceId, spanId, traceFlags } = span.spanContext();
return `00-${traceId}-${spanId}-${formatTraceFlags(traceFlags)}`;
};
/**
* Fetch the active span and format it as a W3C traceparent header value.
*/
export const getActiveTraceparent = () => {
const span = trace.getActiveSpan();
if (!isValidContext(span)) return undefined;
return toTraceparent(span as Span);
};
/**
* Injects the active context into headers using the configured propagator (W3C by default).
* Also returns the traceparent for convenience.
*/
export const injectActiveTraceHeaders = (headers: Headers) => {
const carrier: Record<string, string> = {};
propagation.inject(otContext.active(), carrier);
// Fall back to manual formatting if the global propagator is not configured
if (!carrier.traceparent) {
const tp = getActiveTraceparent();
if (tp) carrier.traceparent = tp;
}
if (carrier.traceparent) headers.set('traceparent', carrier.traceparent);
if (carrier.tracestate) headers.set('tracestate', carrier.tracestate);
return carrier.traceparent;
};
/**
* Injects the provided span into headers. Useful when a span is created before being active.
*/
export const injectSpanTraceHeaders = (headers: Headers, span: Span) => {
const ctxWithSpan = trace.setSpan(otContext.active(), span);
const carrier: Record<string, string> = {};
propagation.inject(ctxWithSpan, carrier);
// Fall back to manual formatting if the global propagator is not configured
if (!carrier.traceparent) {
carrier.traceparent = toTraceparent(span);
}
if (carrier.traceparent) headers.set('traceparent', carrier.traceparent);
if (carrier.tracestate) headers.set('tracestate', carrier.tracestate);
return carrier.traceparent;
};
const headerGetter: TextMapGetter<Headers> = {
get(carrier, key) {
const value = carrier.get(key);
return value ? [value] : [];
},
keys(carrier) {
return Array.from(carrier.keys());
},
};
/**
* Extract trace context from incoming headers (traceparent/tracestate) using the OTEL propagator.
* Useful for linking a downstream request to the upstream responses traceparent header — read
* the header on the client, send it back on the next request, and the backend will stitch spans
* into one trace.
*
* @link {@see https://github.com/open-telemetry/opentelemetry.io/blob/a1dda51143cfbdf26cd320bea7ae43569c585cb3/content/en/docs/languages/js/propagation.md}
*/
export const extractTraceContext = (headers: Headers): OtContext => {
const ctx = propagation.extract(otContext.active(), headers, headerGetter);
return ctx;
};

View File

@@ -87,4 +87,17 @@ describe('createContextInner', () => {
expect(context1.resHeaders).toBeInstanceOf(Headers);
expect(context2.resHeaders).toBeInstanceOf(Headers);
});
it('should always provide resHeaders', async () => {
const ctx = await createContextInner();
expect(ctx.resHeaders).toBeInstanceOf(Headers);
});
it('should keep provided traceContext', async () => {
const traceContext = { test: 'ctx' } as any;
const ctx = await createContextInner({ traceContext });
expect(ctx.traceContext).toBe(traceContext);
});
});

View File

@@ -6,6 +6,8 @@ import { type NextRequest } from 'next/server';
import { auth } from '@/auth';
import { LOBE_CHAT_AUTH_HEADER, LOBE_CHAT_OIDC_AUTH_HEADER, authEnv } from '@/envs/auth';
import { validateOIDCJWT } from '@/libs/oidc-provider/jwt';
import { extractTraceContext } from '@/libs/observability/traceparent';
import type { Context as OtContext } from '@lobechat/observability-otel/api';
// Create context logger namespace
const log = debug('lobe-trpc:lambda:context');
@@ -40,6 +42,7 @@ export interface AuthContext {
// Add OIDC authentication information
oidcAuth?: OIDCAuth | null;
resHeaders?: Headers;
traceContext?: OtContext;
userAgent?: string;
userId?: string | null;
}
@@ -53,6 +56,7 @@ export const createContextInner = async (params?: {
clientIp?: string | null;
marketAccessToken?: string;
oidcAuth?: OIDCAuth | null;
traceContext?: OtContext;
userAgent?: string;
userId?: string | null;
}): Promise<AuthContext> => {
@@ -65,6 +69,7 @@ export const createContextInner = async (params?: {
marketAccessToken: params?.marketAccessToken,
oidcAuth: params?.oidcAuth,
resHeaders: responseHeaders,
traceContext: params?.traceContext,
userAgent: params?.userAgent,
userId: params?.userId,
};
@@ -83,10 +88,10 @@ export const createLambdaContext = async (request: NextRequest): Promise<LambdaC
const isMockUser = process.env.ENABLE_MOCK_DEV_USER === '1';
if (process.env.NODE_ENV === 'development' && (isDebugApi || isMockUser)) {
return {
return createContextInner({
authorizationHeader: request.headers.get(LOBE_CHAT_AUTH_HEADER),
userId: process.env.MOCK_DEV_USER_ID,
};
});
}
log('createLambdaContext called for request');
@@ -100,6 +105,8 @@ export const createLambdaContext = async (request: NextRequest): Promise<LambdaC
const cookieHeader = request.headers.get('cookie');
const cookies = cookieHeader ? parse(cookieHeader) : {};
const marketAccessToken = cookies['mp_token'];
// Extract upstream trace context for parent linking
const traceContext = extractTraceContext(request.headers);
log('marketAccessToken from cookie:', marketAccessToken ? '[HIDDEN]' : 'undefined');
const commonContext = {
@@ -137,9 +144,10 @@ export const createLambdaContext = async (request: NextRequest): Promise<LambdaC
return createContextInner({
oidcAuth,
...commonContext,
userId,
});
}
traceContext,
userId,
});
}
} catch (error) {
// If OIDC authentication fails, log error and continue with other authentication methods
if (oidcAuthToken) {
@@ -165,6 +173,7 @@ export const createLambdaContext = async (request: NextRequest): Promise<LambdaC
return createContextInner({
...commonContext,
traceContext,
userId,
});
} catch (e) {
@@ -177,5 +186,5 @@ export const createLambdaContext = async (request: NextRequest): Promise<LambdaC
'All authentication methods attempted, returning final context, userId: %s',
userId || 'not authenticated',
);
return createContextInner({ ...commonContext, userId });
return createContextInner({ ...commonContext, traceContext, userId });
};

View File

@@ -0,0 +1,93 @@
import { beforeEach, describe, expect, it, vi } from 'vitest';
const spanContext = {
traceId: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
spanId: 'bbbbbbbbbbbbbbbb',
traceFlags: 1,
};
const mocks = vi.hoisted(() => ({
capturedMiddleware: undefined as any,
}));
vi.mock('@lobechat/observability-otel/api', () => {
const tracer = {
startSpan: vi.fn(() => ({
spanContext: () => spanContext,
setStatus: vi.fn(),
setAttribute: vi.fn(),
end: vi.fn(),
})),
};
return {
SpanKind: { SERVER: 'server' },
SpanStatusCode: { OK: 1, ERROR: 2 },
context: {
active: vi.fn(() => ({})),
with: vi.fn((_ctx, fn) => fn()),
},
diag: { debug: vi.fn(), error: vi.fn() },
trace: {
getTracer: vi.fn(() => tracer),
setSpan: vi.fn((_ctx, span) => span),
},
propagation: { inject: vi.fn() },
};
});
vi.mock('../lambda/init', () => {
const middleware = (fn: any) => {
mocks.capturedMiddleware = fn;
return fn;
};
return {
trpc: {
middleware,
},
};
});
import { injectSpanTraceHeaders } from '@/libs/observability/traceparent';
vi.mock('@/libs/observability/traceparent', async () => {
const actual = await vi.importActual<typeof import('@/libs/observability/traceparent')>(
'@/libs/observability/traceparent',
);
return {
...actual,
injectSpanTraceHeaders: vi.fn(actual.injectSpanTraceHeaders),
};
});
// eslint-disable-next-line import/first
import { openTelemetry } from './openTelemetry';
describe('openTelemetry middleware', () => {
beforeEach(() => {
vi.resetAllMocks();
process.env.ENABLE_TELEMETRY = 'true';
});
it('injects trace headers into response headers', async () => {
const ctx = { resHeaders: new Headers() };
const middleware = mocks.capturedMiddleware || openTelemetry;
expect(typeof middleware).toBe('function');
const result = await middleware({
ctx: ctx as any,
getRawInput: () => undefined,
next: vi.fn().mockResolvedValue({ ok: true, data: null }),
path: 'foo.bar',
type: 'query',
});
expect(result).toEqual({ ok: true, data: null });
expect(ctx.resHeaders?.get('traceparent')).toBe(
'00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-bbbbbbbbbbbbbbbb-01',
);
expect(injectSpanTraceHeaders).toHaveBeenCalled();
});
});

View File

@@ -1,5 +1,5 @@
import type { Attributes, Span } from '@lobechat/observability-otel/api';
import { SpanKind, SpanStatusCode, diag, trace } from '@lobechat/observability-otel/api';
import { SpanKind, SpanStatusCode, context, diag, trace } from '@lobechat/observability-otel/api';
import {
ATTR_ERROR_TYPE,
ATTR_EXCEPTION_MESSAGE,
@@ -20,6 +20,8 @@ import { TRPCError } from '@trpc/server';
import { env } from 'node:process';
import { name } from '../../../../package.json';
import { injectSpanTraceHeaders } from '@/libs/observability/traceparent';
import { trpc } from '../lambda/init';
const tracer = trace.getTracer('trpc-server');
@@ -62,7 +64,7 @@ const finalizeSpanWithError = (span: Span, error: unknown) => {
}
};
export const openTelemetry = trpc.middleware(async ({ path, type, next, getRawInput }) => {
export const openTelemetry = trpc.middleware(async ({ ctx, path, type, next, getRawInput }) => {
if (!env.ENABLE_TELEMETRY) {
diag.debug(name, 'telemetry disabled', env.ENABLE_TELEMETRY);
@@ -76,15 +78,24 @@ export const openTelemetry = trpc.middleware(async ({ path, type, next, getRawIn
const input = getRawInput();
const requestSize = getPayloadSize(input);
const span = tracer.startSpan(spanName, {
attributes: baseAttributes,
kind: SpanKind.SERVER,
});
const span = tracer.startSpan(
spanName,
{
attributes: baseAttributes,
kind: SpanKind.SERVER,
},
ctx?.traceContext,
);
// attach trace headers for downstream consumers (traceparent/tracestate)
if (ctx?.resHeaders) {
injectSpanTraceHeaders(ctx.resHeaders, span);
}
const startTimestamp = Date.now();
try {
const result = await next();
const result = await context.with(trace.setSpan(context.active(), span), async () => next());
diag.debug(name, 'tRPC instrumentation', 'requestHandled');
const responseSize = getPayloadSize(result.ok ? result.data : result.error);