Trace
一、模块功能概述¶
1.1 为什么需要这个扩展?¶
原生的 OpenTelemetry 采集数据(Spans)的方式通常是批量地记录数据,在一次调用结束后统一导出。这种方式虽然能够满足大部分性能需求,但在某些场景下(尤其是需要实时监控或多系统联动时),我们希望能够在调用过程中实时地触发某些事件并将数据传回至业务系统。
因此,为了满足实时事件触发及实时数据传输的需求,我们对 OpenTelemetry 中的 TracerProvider、Tracer 和 Span 做了相应的扩展:
- TFSpan:继承自 OpenTelemetry 的
Span,主要覆盖了add_event方法,能够在事件添加时,触发自定义的Hook,将当前上下文中的数据实时发送出去。 - TFTracer:继承自 OpenTelemetry 的
Tracer,以便在创建新的Span时返回我们的自定义TFSpan实例,从而使得所有在其上创建的 Span 都具备实时事件触发的能力。 - TFTracerProvider:继承自 OpenTelemetry 的
TracerProvider,保证在全局或需要的上下文中,获取Tracer时能够返回定制版的 TFTracer。
1.2 扩展的核心点¶
- 实时触发 Hook:当
TFSpan.add_event()被调用时,会调用HookManager中的钩子函数,将 Span 的属性、上下文信息实时传递给业务逻辑。 - 与原生 OpenTelemetry 高度兼容:除了上述的定制点,其余的创建、管理、导出流程和原生 OpenTelemetry 保持一致。
- 全局设置:如果在全局或应用初始化(如 FastAPI 的启动脚本中)没有通过
set_tf_tracer_provider显式设置我们的 TFTracerProvider,则默认的 OpenTelemetryTracerProvider不能与本扩展联通,无法获得实时事件触发能力。
二、如何使用¶
2.1 安装与引用¶
将该模块(如 customer.py)放到你的项目中,或者打包成单独的 Python 包后,使用以下方式进行导入(示例):
from tfrobot.telemetry.tracer import (
TFTracerProvider,
set_tf_tracer_provider,
get_tf_tracer,
tracer # 如果你需要直接使用全局 tracer
)
2.2 设置自定义的 TracerProvider¶
在应用启动时(例如使用 FastAPI 时),需要手动设置此自定义的 TFTracerProvider 为全局,以确保你的业务代码在使用 OpenTelemetry 创建 Span 时,拿到的是我们改造后的 Span(即 TFSpan)。
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from your_telemetry_package.customer import TFTracerProvider, set_tf_tracer_provider
# 假设有一个配置对象 otlp_config
def setup_tracing():
# 1. 创建自定义 TFTracerProvider
resource = Resource(attributes={"service.name": "my_service"})
provider = TFTracerProvider(resource=resource)
# 2. 设置为全局默认
trace.set_tracer_provider(provider)
# 为TFRobot设置TracerProvider
set_tf_tracer_provider(provider)
# 3. 添加数据导出方式(例如控制台、OTLP、Jaeger 等)
console_exporter = ConsoleSpanExporter()
span_processor = BatchSpanProcessor(console_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# 现在全局都可以使用这个 TracerProvider 了
注意:
trace.set_tracer_provider(provider)只会让 OpenTelemetry 的全局函数(如trace.get_tracer_provider())拿到这个 provider,而set_tf_tracer_provider(provider)则是我们扩展的,必须在业务启动时调用,以确保与自定义的 TFTracerProvider 全局保持一致。否则的话,你在业务中如果直接使用trace.get_tracer(),拿到的 Tracer 将不会是TFTracer。
2.3 在业务代码中使用实时事件触发¶
设置好上述的全局之后,在你的业务逻辑中创建和操作 Span 的方式几乎与原生 OpenTelemetry 一致。唯一的区别是,如果这个 Span 是通过 TFTracer 创建的,那么在调用 span.add_event() 时,会立刻触发 Hook。
from opentelemetry import trace
def business_logic():
# 拿到我们自定义的 TFTracer(它会返回一个 TFSpan)
current_tracer = trace.get_tracer(__name__)
with current_tracer.start_as_current_span("my_span") as span:
# 业务逻辑...
do_something()
# 调用 add_event,实时触发 Hook
span.add_event("some_event", attributes={"key": "value"})
# 后续业务逻辑...
只要当前的
TracerProvider是我们设置的TFTracerProvider,那么通过trace.get_tracer(...)返回的便是TFTracer,从而生成的Span即是TFSpan。
三、FastAPI 集成示例¶
以下是一段结合 FastAPI、OTLP 配置等综合使用的示例(与您提供的示例类似):
from fastapi import FastAPI
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from your_telemetry_package.customer import TFTracerProvider, set_tf_tracer_provider
from your_telemetry_package.customer import get_tf_tracer_provider
# 假设有一个 OTLPConfig 类,负责解析 OTLP 配置
from my_config_file import OTLPConfig
app = FastAPI()
def setup_tracing(tf_app: FastAPI) -> None:
otlp_config = OTLPConfig().otlp
trace_resource = Resource(attributes={"service.name": otlp_config.service_name})
# 创建自定义的 TracerProvider
provider = TFTracerProvider(resource=trace_resource)
if otlp_config:
# 如果配置了 OTLP(例如 Jaeger),则使用 OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
trace.set_tracer_provider(provider)
set_tf_tracer_provider(provider)
otlp_exporter = OTLPSpanExporter(endpoint=otlp_config.endpoint, insecure=True)
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# 如果需要,可在此处添加自定义中间件,比如 AddTraceIDMiddleware
# tf_app.add_middleware(AddTraceIDMiddleware)
else:
# 如果未配置 Jaeger,则使用 ConsoleSpanExporter
trace.set_tracer_provider(provider)
set_tf_tracer_provider(provider)
console_exporter = ConsoleSpanExporter()
span_processor = BatchSpanProcessor(console_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# 通过 FastAPI 官方的自动打桩
FastAPIInstrumentor.instrument_app(tf_app)
# 在启动脚本或 main 函数中调用此函数
setup_tracing(app)
@app.get("/hello")
def hello_world():
# 获取自定义 tracer
current_tracer = trace.get_tracer("sample_app")
with current_tracer.start_as_current_span("hello_span") as span:
span.add_event("processing_request", attributes={"endpoint": "/hello"})
return {"message": "Hello, world!"}
通过上述示例,我们可以看到:
- 全局设置:使用
set_tf_tracer_provider将我们的TFTracerProvider设置为全局。 - 事件触发:在
span.add_event()时,即可实时触发逻辑(调用 HookManager 的钩子函数等)。 - 原生使用方式:在业务逻辑中依然使用
trace.get_tracer(...)和span.add_event(...)等原生方式,不会有额外的改动。
四、常见问题(FAQ)¶
-
Q:如果不调用
set_tf_tracer_provider会发生什么?
A:那么 OpenTelemetry 的全局 TracerProvider 仍会是默认实现(TracerProvider),而不是我们扩展的TFTracerProvider。这意味着span.add_event()不会再触发 HookManager 的钩子,无法实现实时数据传输。 -
Q:我的钩子函数什么时候会被触发?
A:当你调用span.add_event("event_name", attributes={...})时,会立即触发HookManager.execute_hooks(...)并执行你配置好的钩子函数。 -
Q:除了实时传输,还有什么其他作用?
A:实时触发事件也可用于多系统协同,如在监控系统中实时提示异常,或在业务系统中实时触发后续流程(如调度其他微服务)。而原生 OTel 通常只能在批量导出后进行后置分析。 -
Q:能否继续使用现有的 Jaeger、Zipkin、Prometheus 等监控后端?
A:当然可以。本质上我们只对TracerProvider、Tracer、Span做了扩展,其他导出流程(Exporters、Processors)依然与 OpenTelemetry 保持一致,如 Jaeger Exporter、Zipkin Exporter、OTLP Exporter 等都可直接集成。
五、小结¶
通过 TFTracerProvider、TFTracer 和 TFSpan 对 OpenTelemetry 进行扩展,可以在保留大部分原生使用方式的同时,增强其在实时事件传输场景下的能力。在集成过程中,需要注意显式调用 set_tf_tracer_provider 来替换默认的全局 TracerProvider,否则无法获得实时事件触发能力。
希望本示例和文档能帮助您快速上手并集成到现有的 FastAPI 或其他 Python Web 框架中。如果您有进一步的问题或建议,欢迎随时提出。祝使用愉快!