mirror of
https://github.com/lobehub/lobehub.git
synced 2026-03-28 13:39:28 +07:00
✨ feat(observability-otel,libs): include and propagate Traceparent header to tid (#11845)
This commit is contained in:
@@ -1,2 +1,2 @@
|
||||
export type { Attributes, Span } from '@opentelemetry/api';
|
||||
export { diag, metrics, SpanKind, SpanStatusCode, trace } from '@opentelemetry/api';
|
||||
export type { Attributes, Context, Span, TextMapGetter } from '@opentelemetry/api';
|
||||
export { context, diag, metrics, propagation,SpanKind, SpanStatusCode, trace } from '@opentelemetry/api';
|
||||
|
||||
@@ -5,12 +5,14 @@ import {
|
||||
} from '@lobechat/model-runtime';
|
||||
import { ChatErrorType, type ClientSecretPayload } from '@lobechat/types';
|
||||
import { getXorPayload } from '@lobechat/utils/server';
|
||||
import { context as otContext } from '@lobechat/observability-otel/api';
|
||||
|
||||
import { auth } from '@/auth';
|
||||
import { getServerDB } from '@/database/core/db-adaptor';
|
||||
import { type LobeChatDatabase } from '@/database/type';
|
||||
import { LOBE_CHAT_AUTH_HEADER, LOBE_CHAT_OIDC_AUTH_HEADER, OAUTH_AUTHORIZED } from '@/envs/auth';
|
||||
import { validateOIDCJWT } from '@/libs/oidc-provider/jwt';
|
||||
import { extractTraceContext, injectActiveTraceHeaders } from '@/libs/observability/traceparent';
|
||||
import { createErrorResponse } from '@/utils/errorResponse';
|
||||
|
||||
import { checkAuthMethod } from './utils';
|
||||
@@ -114,5 +116,17 @@ export const checkAuth =
|
||||
|
||||
const userId = jwtPayload.userId || '';
|
||||
|
||||
return handler(clonedReq, { ...options, jwtPayload, serverDB, userId });
|
||||
const extractedContext = extractTraceContext(req.headers);
|
||||
|
||||
const res = await otContext.with(extractedContext, () =>
|
||||
handler(clonedReq, { ...options, jwtPayload, serverDB, userId }),
|
||||
);
|
||||
|
||||
const headers = new Headers(res.headers);
|
||||
const traceparent = injectActiveTraceHeaders(headers);
|
||||
if (!traceparent) {
|
||||
return res;
|
||||
}
|
||||
|
||||
return new Response(res.body, { ...res, headers });
|
||||
};
|
||||
|
||||
@@ -1,5 +1,12 @@
|
||||
export async function register() {
|
||||
if (process.env.NEXT_RUNTIME === 'nodejs' && process.env.ENABLE_TELEMETRY) {
|
||||
await import('./instrumentation.node');
|
||||
if (process.env.NODE_ENV !== 'production' && !process.env.ENABLE_TELEMETRY_IN_DEV) {
|
||||
return;
|
||||
}
|
||||
|
||||
const shouldEnable = process.env.ENABLE_TELEMETRY && process.env.NEXT_RUNTIME === 'nodejs';
|
||||
if (!shouldEnable) {
|
||||
return;
|
||||
}
|
||||
|
||||
await import('./instrumentation.node');
|
||||
}
|
||||
|
||||
69
src/libs/observability/traceparent.test.ts
Normal file
69
src/libs/observability/traceparent.test.ts
Normal file
@@ -0,0 +1,69 @@
|
||||
import type { Mock } from 'vitest';
|
||||
import { afterEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
vi.mock('@lobechat/observability-otel/api', () => {
|
||||
const inject = vi.fn();
|
||||
const setSpan = vi.fn((_ctx, span) => span);
|
||||
|
||||
return {
|
||||
context: {
|
||||
active: vi.fn(() => ({})),
|
||||
},
|
||||
propagation: { inject },
|
||||
trace: { setSpan },
|
||||
};
|
||||
});
|
||||
|
||||
// eslint-disable-next-line import/first
|
||||
import { injectSpanTraceHeaders } from './traceparent';
|
||||
|
||||
const mockSpan = (traceId: string, spanId: string) =>
|
||||
({
|
||||
spanContext: () => ({
|
||||
traceId,
|
||||
spanId,
|
||||
traceFlags: 1,
|
||||
}),
|
||||
}) as any;
|
||||
|
||||
const headersWith = (...args: ConstructorParameters<typeof Headers>) => new Headers(...args);
|
||||
|
||||
describe('injectSpanTraceHeaders', () => {
|
||||
const api = vi.importMock<typeof import('@lobechat/observability-otel/api')>(
|
||||
'@lobechat/observability-otel/api',
|
||||
);
|
||||
|
||||
afterEach(() => {
|
||||
vi.resetAllMocks();
|
||||
});
|
||||
|
||||
it('uses propagator output when available', async () => {
|
||||
const { propagation } = await api;
|
||||
(propagation.inject as unknown as Mock<typeof propagation.inject<Record<string, string>>>).mockImplementation((_ctx, carrier) => {
|
||||
carrier.traceparent = 'from-propagator';
|
||||
carrier.tracestate = 'state';
|
||||
});
|
||||
|
||||
const headers = headersWith();
|
||||
const span = mockSpan('abc'.padEnd(32, '0'), '1234567890abcdef');
|
||||
|
||||
const tp = injectSpanTraceHeaders(headers, span);
|
||||
|
||||
expect(tp).toBe('from-propagator');
|
||||
expect(headers.get('traceparent')).toBe('from-propagator');
|
||||
expect(headers.get('tracestate')).toBe('state');
|
||||
});
|
||||
|
||||
it('falls back to manual traceparent formatting when propagator gives none', async () => {
|
||||
const { propagation } = await api;
|
||||
(propagation.inject as unknown as Mock<typeof propagation.inject<Record<string, string>>>).mockImplementation(() => undefined);
|
||||
|
||||
const headers = headersWith();
|
||||
const span = mockSpan('1'.repeat(32), '2'.repeat(16));
|
||||
|
||||
const tp = injectSpanTraceHeaders(headers, span);
|
||||
|
||||
expect(tp).toBe('00-11111111111111111111111111111111-2222222222222222-01');
|
||||
expect(headers.get('traceparent')).toBe('00-11111111111111111111111111111111-2222222222222222-01');
|
||||
});
|
||||
});
|
||||
110
src/libs/observability/traceparent.ts
Normal file
110
src/libs/observability/traceparent.ts
Normal file
@@ -0,0 +1,110 @@
|
||||
import type {
|
||||
Span,
|
||||
Context as OtContext,
|
||||
TextMapGetter
|
||||
} from '@lobechat/observability-otel/api';
|
||||
import {
|
||||
context as otContext,
|
||||
propagation,
|
||||
trace,
|
||||
} from '@lobechat/observability-otel/api';
|
||||
|
||||
// NOTICE: do not try to optimize this into .repeat(...) or similar,
|
||||
// here served for better search / semantic search purpose for further diagnostic
|
||||
// with either Agents or Human users without needed to understand how
|
||||
// many zeros are needed.
|
||||
const ZERO_TRACE_ID = '00000000000000000000000000000000';
|
||||
// NOTICE: do not try to optimize this into .repeat(...) or similar too.
|
||||
const ZERO_SPAN_ID = '0000000000000000';
|
||||
|
||||
const formatTraceFlags = (flags?: number) => (flags ?? 0).toString(16).padStart(2, '0');
|
||||
|
||||
const isValidContext = (span?: Span) => {
|
||||
if (!span) return false;
|
||||
|
||||
const context = span.spanContext();
|
||||
return (
|
||||
!!context.traceId &&
|
||||
context.traceId !== ZERO_TRACE_ID &&
|
||||
!!context.spanId &&
|
||||
context.spanId !== ZERO_SPAN_ID
|
||||
);
|
||||
};
|
||||
|
||||
export const toTraceparent = (span: Span) => {
|
||||
const { traceId, spanId, traceFlags } = span.spanContext();
|
||||
|
||||
return `00-${traceId}-${spanId}-${formatTraceFlags(traceFlags)}`;
|
||||
};
|
||||
|
||||
/**
|
||||
* Fetch the active span and format it as a W3C traceparent header value.
|
||||
*/
|
||||
export const getActiveTraceparent = () => {
|
||||
const span = trace.getActiveSpan();
|
||||
if (!isValidContext(span)) return undefined;
|
||||
|
||||
return toTraceparent(span as Span);
|
||||
};
|
||||
|
||||
/**
|
||||
* Injects the active context into headers using the configured propagator (W3C by default).
|
||||
* Also returns the traceparent for convenience.
|
||||
*/
|
||||
export const injectActiveTraceHeaders = (headers: Headers) => {
|
||||
const carrier: Record<string, string> = {};
|
||||
propagation.inject(otContext.active(), carrier);
|
||||
|
||||
// Fall back to manual formatting if the global propagator is not configured
|
||||
if (!carrier.traceparent) {
|
||||
const tp = getActiveTraceparent();
|
||||
if (tp) carrier.traceparent = tp;
|
||||
}
|
||||
|
||||
if (carrier.traceparent) headers.set('traceparent', carrier.traceparent);
|
||||
if (carrier.tracestate) headers.set('tracestate', carrier.tracestate);
|
||||
|
||||
return carrier.traceparent;
|
||||
};
|
||||
|
||||
/**
|
||||
* Injects the provided span into headers. Useful when a span is created before being active.
|
||||
*/
|
||||
export const injectSpanTraceHeaders = (headers: Headers, span: Span) => {
|
||||
const ctxWithSpan = trace.setSpan(otContext.active(), span);
|
||||
const carrier: Record<string, string> = {};
|
||||
propagation.inject(ctxWithSpan, carrier);
|
||||
|
||||
// Fall back to manual formatting if the global propagator is not configured
|
||||
if (!carrier.traceparent) {
|
||||
carrier.traceparent = toTraceparent(span);
|
||||
}
|
||||
|
||||
if (carrier.traceparent) headers.set('traceparent', carrier.traceparent);
|
||||
if (carrier.tracestate) headers.set('tracestate', carrier.tracestate);
|
||||
|
||||
return carrier.traceparent;
|
||||
};
|
||||
|
||||
const headerGetter: TextMapGetter<Headers> = {
|
||||
get(carrier, key) {
|
||||
const value = carrier.get(key);
|
||||
return value ? [value] : [];
|
||||
},
|
||||
keys(carrier) {
|
||||
return Array.from(carrier.keys());
|
||||
},
|
||||
};
|
||||
|
||||
/**
|
||||
* Extract trace context from incoming headers (traceparent/tracestate) using the OTEL propagator.
|
||||
* Useful for linking a downstream request to the upstream response’s traceparent header — read
|
||||
* the header on the client, send it back on the next request, and the backend will stitch spans
|
||||
* into one trace.
|
||||
*
|
||||
* @link {@see https://github.com/open-telemetry/opentelemetry.io/blob/a1dda51143cfbdf26cd320bea7ae43569c585cb3/content/en/docs/languages/js/propagation.md}
|
||||
*/
|
||||
export const extractTraceContext = (headers: Headers): OtContext => {
|
||||
const ctx = propagation.extract(otContext.active(), headers, headerGetter);
|
||||
return ctx;
|
||||
};
|
||||
@@ -87,4 +87,17 @@ describe('createContextInner', () => {
|
||||
expect(context1.resHeaders).toBeInstanceOf(Headers);
|
||||
expect(context2.resHeaders).toBeInstanceOf(Headers);
|
||||
});
|
||||
|
||||
it('should always provide resHeaders', async () => {
|
||||
const ctx = await createContextInner();
|
||||
|
||||
expect(ctx.resHeaders).toBeInstanceOf(Headers);
|
||||
});
|
||||
|
||||
it('should keep provided traceContext', async () => {
|
||||
const traceContext = { test: 'ctx' } as any;
|
||||
const ctx = await createContextInner({ traceContext });
|
||||
|
||||
expect(ctx.traceContext).toBe(traceContext);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -6,6 +6,8 @@ import { type NextRequest } from 'next/server';
|
||||
import { auth } from '@/auth';
|
||||
import { LOBE_CHAT_AUTH_HEADER, LOBE_CHAT_OIDC_AUTH_HEADER, authEnv } from '@/envs/auth';
|
||||
import { validateOIDCJWT } from '@/libs/oidc-provider/jwt';
|
||||
import { extractTraceContext } from '@/libs/observability/traceparent';
|
||||
import type { Context as OtContext } from '@lobechat/observability-otel/api';
|
||||
|
||||
// Create context logger namespace
|
||||
const log = debug('lobe-trpc:lambda:context');
|
||||
@@ -40,6 +42,7 @@ export interface AuthContext {
|
||||
// Add OIDC authentication information
|
||||
oidcAuth?: OIDCAuth | null;
|
||||
resHeaders?: Headers;
|
||||
traceContext?: OtContext;
|
||||
userAgent?: string;
|
||||
userId?: string | null;
|
||||
}
|
||||
@@ -53,6 +56,7 @@ export const createContextInner = async (params?: {
|
||||
clientIp?: string | null;
|
||||
marketAccessToken?: string;
|
||||
oidcAuth?: OIDCAuth | null;
|
||||
traceContext?: OtContext;
|
||||
userAgent?: string;
|
||||
userId?: string | null;
|
||||
}): Promise<AuthContext> => {
|
||||
@@ -65,6 +69,7 @@ export const createContextInner = async (params?: {
|
||||
marketAccessToken: params?.marketAccessToken,
|
||||
oidcAuth: params?.oidcAuth,
|
||||
resHeaders: responseHeaders,
|
||||
traceContext: params?.traceContext,
|
||||
userAgent: params?.userAgent,
|
||||
userId: params?.userId,
|
||||
};
|
||||
@@ -83,10 +88,10 @@ export const createLambdaContext = async (request: NextRequest): Promise<LambdaC
|
||||
const isMockUser = process.env.ENABLE_MOCK_DEV_USER === '1';
|
||||
|
||||
if (process.env.NODE_ENV === 'development' && (isDebugApi || isMockUser)) {
|
||||
return {
|
||||
return createContextInner({
|
||||
authorizationHeader: request.headers.get(LOBE_CHAT_AUTH_HEADER),
|
||||
userId: process.env.MOCK_DEV_USER_ID,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
log('createLambdaContext called for request');
|
||||
@@ -100,6 +105,8 @@ export const createLambdaContext = async (request: NextRequest): Promise<LambdaC
|
||||
const cookieHeader = request.headers.get('cookie');
|
||||
const cookies = cookieHeader ? parse(cookieHeader) : {};
|
||||
const marketAccessToken = cookies['mp_token'];
|
||||
// Extract upstream trace context for parent linking
|
||||
const traceContext = extractTraceContext(request.headers);
|
||||
|
||||
log('marketAccessToken from cookie:', marketAccessToken ? '[HIDDEN]' : 'undefined');
|
||||
const commonContext = {
|
||||
@@ -137,9 +144,10 @@ export const createLambdaContext = async (request: NextRequest): Promise<LambdaC
|
||||
return createContextInner({
|
||||
oidcAuth,
|
||||
...commonContext,
|
||||
userId,
|
||||
});
|
||||
}
|
||||
traceContext,
|
||||
userId,
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
// If OIDC authentication fails, log error and continue with other authentication methods
|
||||
if (oidcAuthToken) {
|
||||
@@ -165,6 +173,7 @@ export const createLambdaContext = async (request: NextRequest): Promise<LambdaC
|
||||
|
||||
return createContextInner({
|
||||
...commonContext,
|
||||
traceContext,
|
||||
userId,
|
||||
});
|
||||
} catch (e) {
|
||||
@@ -177,5 +186,5 @@ export const createLambdaContext = async (request: NextRequest): Promise<LambdaC
|
||||
'All authentication methods attempted, returning final context, userId: %s',
|
||||
userId || 'not authenticated',
|
||||
);
|
||||
return createContextInner({ ...commonContext, userId });
|
||||
return createContextInner({ ...commonContext, traceContext, userId });
|
||||
};
|
||||
|
||||
93
src/libs/trpc/middleware/openTelemetry.test.ts
Normal file
93
src/libs/trpc/middleware/openTelemetry.test.ts
Normal file
@@ -0,0 +1,93 @@
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
const spanContext = {
|
||||
traceId: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
|
||||
spanId: 'bbbbbbbbbbbbbbbb',
|
||||
traceFlags: 1,
|
||||
};
|
||||
|
||||
const mocks = vi.hoisted(() => ({
|
||||
capturedMiddleware: undefined as any,
|
||||
}));
|
||||
|
||||
vi.mock('@lobechat/observability-otel/api', () => {
|
||||
const tracer = {
|
||||
startSpan: vi.fn(() => ({
|
||||
spanContext: () => spanContext,
|
||||
setStatus: vi.fn(),
|
||||
setAttribute: vi.fn(),
|
||||
end: vi.fn(),
|
||||
})),
|
||||
};
|
||||
|
||||
return {
|
||||
SpanKind: { SERVER: 'server' },
|
||||
SpanStatusCode: { OK: 1, ERROR: 2 },
|
||||
context: {
|
||||
active: vi.fn(() => ({})),
|
||||
with: vi.fn((_ctx, fn) => fn()),
|
||||
},
|
||||
diag: { debug: vi.fn(), error: vi.fn() },
|
||||
trace: {
|
||||
getTracer: vi.fn(() => tracer),
|
||||
setSpan: vi.fn((_ctx, span) => span),
|
||||
},
|
||||
propagation: { inject: vi.fn() },
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock('../lambda/init', () => {
|
||||
const middleware = (fn: any) => {
|
||||
mocks.capturedMiddleware = fn;
|
||||
return fn;
|
||||
};
|
||||
|
||||
return {
|
||||
trpc: {
|
||||
middleware,
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
import { injectSpanTraceHeaders } from '@/libs/observability/traceparent';
|
||||
|
||||
vi.mock('@/libs/observability/traceparent', async () => {
|
||||
const actual = await vi.importActual<typeof import('@/libs/observability/traceparent')>(
|
||||
'@/libs/observability/traceparent',
|
||||
);
|
||||
return {
|
||||
...actual,
|
||||
injectSpanTraceHeaders: vi.fn(actual.injectSpanTraceHeaders),
|
||||
};
|
||||
});
|
||||
|
||||
// eslint-disable-next-line import/first
|
||||
import { openTelemetry } from './openTelemetry';
|
||||
|
||||
describe('openTelemetry middleware', () => {
|
||||
beforeEach(() => {
|
||||
vi.resetAllMocks();
|
||||
process.env.ENABLE_TELEMETRY = 'true';
|
||||
});
|
||||
|
||||
it('injects trace headers into response headers', async () => {
|
||||
const ctx = { resHeaders: new Headers() };
|
||||
const middleware = mocks.capturedMiddleware || openTelemetry;
|
||||
|
||||
expect(typeof middleware).toBe('function');
|
||||
|
||||
const result = await middleware({
|
||||
ctx: ctx as any,
|
||||
getRawInput: () => undefined,
|
||||
next: vi.fn().mockResolvedValue({ ok: true, data: null }),
|
||||
path: 'foo.bar',
|
||||
type: 'query',
|
||||
});
|
||||
|
||||
expect(result).toEqual({ ok: true, data: null });
|
||||
expect(ctx.resHeaders?.get('traceparent')).toBe(
|
||||
'00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-bbbbbbbbbbbbbbbb-01',
|
||||
);
|
||||
expect(injectSpanTraceHeaders).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -1,5 +1,5 @@
|
||||
import type { Attributes, Span } from '@lobechat/observability-otel/api';
|
||||
import { SpanKind, SpanStatusCode, diag, trace } from '@lobechat/observability-otel/api';
|
||||
import { SpanKind, SpanStatusCode, context, diag, trace } from '@lobechat/observability-otel/api';
|
||||
import {
|
||||
ATTR_ERROR_TYPE,
|
||||
ATTR_EXCEPTION_MESSAGE,
|
||||
@@ -20,6 +20,8 @@ import { TRPCError } from '@trpc/server';
|
||||
import { env } from 'node:process';
|
||||
|
||||
import { name } from '../../../../package.json';
|
||||
import { injectSpanTraceHeaders } from '@/libs/observability/traceparent';
|
||||
|
||||
import { trpc } from '../lambda/init';
|
||||
|
||||
const tracer = trace.getTracer('trpc-server');
|
||||
@@ -62,7 +64,7 @@ const finalizeSpanWithError = (span: Span, error: unknown) => {
|
||||
}
|
||||
};
|
||||
|
||||
export const openTelemetry = trpc.middleware(async ({ path, type, next, getRawInput }) => {
|
||||
export const openTelemetry = trpc.middleware(async ({ ctx, path, type, next, getRawInput }) => {
|
||||
if (!env.ENABLE_TELEMETRY) {
|
||||
diag.debug(name, 'telemetry disabled', env.ENABLE_TELEMETRY);
|
||||
|
||||
@@ -76,15 +78,24 @@ export const openTelemetry = trpc.middleware(async ({ path, type, next, getRawIn
|
||||
const input = getRawInput();
|
||||
const requestSize = getPayloadSize(input);
|
||||
|
||||
const span = tracer.startSpan(spanName, {
|
||||
attributes: baseAttributes,
|
||||
kind: SpanKind.SERVER,
|
||||
});
|
||||
const span = tracer.startSpan(
|
||||
spanName,
|
||||
{
|
||||
attributes: baseAttributes,
|
||||
kind: SpanKind.SERVER,
|
||||
},
|
||||
ctx?.traceContext,
|
||||
);
|
||||
|
||||
// attach trace headers for downstream consumers (traceparent/tracestate)
|
||||
if (ctx?.resHeaders) {
|
||||
injectSpanTraceHeaders(ctx.resHeaders, span);
|
||||
}
|
||||
|
||||
const startTimestamp = Date.now();
|
||||
|
||||
try {
|
||||
const result = await next();
|
||||
const result = await context.with(trace.setSpan(context.active(), span), async () => next());
|
||||
diag.debug(name, 'tRPC instrumentation', 'requestHandled');
|
||||
|
||||
const responseSize = getPayloadSize(result.ok ? result.data : result.error);
|
||||
|
||||
Reference in New Issue
Block a user