Skip to content

OpenTelemetry Metrics 类型与全局 Provider 管理


一、OpenTelemetry Metrics 类型详解

OpenTelemetry Metrics API 提供了丰富的数据类型,每种类型都有特定的应用场景和 API 模式:

类型 功能 应用场景 API 示例 导出格式
Counter 单增加计数器 请求总数、错误次数 create_counter()
.add(value, attributes)
metric_name_total
UpDownCounter 可增减计数器 队列长度、连接数 create_up_down_counter()
.add(value)
metric_name
Histogram 值分布统计 响应时间分布、请求体大小 create_histogram()
.record(value)
metric_name_bucket
metric_name_sum
ObservableCounter 异步单调递增值 CPU 使用时间、网络字节总量 create_observable_counter()
注册回调函数
metric_name_total
ObservableUpDownCounter 异步可变化值 内存使用量、线程池活跃数 create_observable_up_down_counter() metric_name
ObservableGauge 异步瞬时值 CPU 利用率、温度、队列长度 create_observable_gauge() metric_name

具体实现代码示例

from opentelemetry import metrics
from opentelemetry.metrics import CallbackOptions, Observation

# 获取全局 meter (最佳实践见第二部分)
meter = metrics.get_meter(__name__)

# 1. 计数器示例:记录任务完成情况
task_counter = meter.create_counter(
    name="tasks.completed",
    description="Total completed tasks",
    unit="1"
)
# 使用
task_counter.add(1, {"status": "success", "task_type": "import"})

# 2. 直方图示例:记录请求延迟
request_latency = meter.create_histogram(
    name="http.request.duration",
    description="HTTP request duration",
    unit="ms"
)
# 使用
request_latency.record(150, {
    "method": "GET",
    "endpoint": "/api/data"
})

# 3. 可观测仪表示例:记录内存使用
def memory_callback(options: CallbackOptions):
    import psutil
    mem = psutil.virtual_memory()
    return [
        Observation(mem.used, {"type": "used"}),
        Observation(mem.available, {"type": "available"})
    ]

meter.create_observable_gauge(
    name="system.memory.usage",
    callbacks=[memory_callback],
    unit="bytes"
)

二、全局 Provider 最佳实践

应当允许外部调用方控制 Provider 的设置,而不是在库代码中硬编码初始化。OpenTelemetry 完美支持这种模式。

解决方案架构

graph LR
    A[调用方/运维] -->|初始化设置| B[全局 MeterProvider]
    C[库代码/业务模块] -->|获取| B
    D[第三方库] -->|获取| B
    B -->|导出| E[Collector]

实现代码

在库/业务代码中(如 Celery 任务):

# tfrobot/telemetry/metrics.py
from opentelemetry import metrics

def get_metrics_meter(name: str) -> metrics.Meter:
    """
    安全获取全局 Meter 实例
    无需担心 Provider 是否已初始化
    """
    provider = metrics.get_meter_provider()
    return provider.get_meter(
        name,
        version="1.0.0"  # 可选添加版本信息
    )

在应用入口(由运维控制):

# app/startup.py
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.resources import Resource

def configure_otel_metrics():
    """由运维在应用启动时调用"""
    resource = Resource.create({
        "service.name": "tfrobot-prod",
        "environment": "production"
    })
    exporter = OTLPMetricExporter(
        endpoint="otel-collector:4317"
    )
    provider = MeterProvider(
        resource=resource,
        metric_readers=[PeriodicExportingMetricReader(exporter)]
    )
    metrics.set_meter_provider(provider)

# 应用启动时调用
if __name__ == "__main__":
    configure_otel_metrics()
    start_celery_worker()  # 启动业务代码

在业务任务中使用:

# tfrobot/tasks.py
from tfrobot.telemetry.metrics import get_metrics_meter

class MyTask(CeleryTask):
    def __init__(self):
        self.meter = get_metrics_meter("celery.tasks")
        self.task_counter = self.meter.create_counter("tasks.executed")
    def __call__(self):
        self.task_counter.add(1, {"task_name": self.name})
        # ...任务逻辑

关键优势

  1. 关注点分离:
  2. 库/业务代码只负责 使用 指标
  3. 运维负责 配置初始化
  4. 弹性设计:
  5. 未初始化时的安全后备
from opentelemetry.metrics import MeterProvider, NoOpMeterProvider
provider = metrics.get_meter_provider()
assert isinstance(provider, (MeterProvider, NoOpMeterProvider))
  1. 动态更新:
  2. 支持运行时动态更新 Provider (高级场景)
def update_metrics_provider(new_provider):
    metrics.set_meter_provider(new_provider)  # 原子切换

三、运维友好型初始化推荐

对于 Kubernetes 环境,最佳实践是使用环境变量配置:

# app/startup.py
import os
from opentelemetry.metrics import set_meter_provider

def auto_configure_otel():
    """根据环境变量自动配置"""
    if os.getenv("OTEL_METRICS_ENABLED", "false").lower() == "true":
        endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "otel-collector:4317")
        provider = create_provider(endpoint)
        set_meter_provider(provider)

def create_provider(endpoint):
    """创建 Provider 实例"""
    # ...基于 endpoint 创建 Provider 的代码

K8s Deployment 配置

# k8s Deployment 配置
env:
  - name: OTEL_METRICS_ENABLED
    value: "true"
  - name: OTEL_EXPORTER_OTLP_ENDPOINT
    value: "otel-collector.tfrobotserver:4317"
  - name: OTEL_SERVICE_NAME
    value: "tfrobot-worker"

四、第三方库集成验证

确认第三方库是否遵循标准实践:

# 第三方库代码示例(正确方式)
from opentelemetry.metrics import get_meter
# 正确:使用全局获取方式
third_party_meter = get_meter_provider().get_meter("third.party")

通过这种设计: 1. 您的 Celery 任务指标 2. 框架内部指标 3. 第三方库指标 全部通过同一个全局 MeterProvider 收集

graph LR
    A[任务自定义指标] -->|报告| B[全局 MeterProvider]
    C[Celery 框架指标] -->|报告| B
    D[DB 客户端指标] -->|报告| B
    B -->|导出| E[Collector]
    E --> F[Prometheus]
    E --> G[Jaeger]

五、故障排除指南

当指标未出现时检查点:

  1. 验证 Provider 是否设置:
from opentelemetry.metrics import get_meter_provider
print(type(get_meter_provider()))  # 应显示 SDK 实现
  1. 检查环境变量:
kubectl exec -it <pod> -- env | grep OTEL_
  1. 查看 Collector 日志:
kubectl logs -n tfrobotserver deploy/otel-collector
  1. 直接访问 Prometheus 端点:
kubectl port-forward -n tfrobotserver svc/otel-collector 8889:8889
curl localhost:8889/metrics | grep 'celery_'

本文档遵循 OpenTelemetry 官方最佳实践,欢迎反馈建议。