-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathotel_fastapi.py
More file actions
271 lines (229 loc) · 13.6 KB
/
Copy pathotel_fastapi.py
File metadata and controls
271 lines (229 loc) · 13.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# https://opentelemetry.io/docs/zero-code/python/example/#:~:text=Note:%20To%20use%20automatic%20instrumentation,details%2C%20see%20the%20API%20reference.
### opentelemetry-instrument 环境变量参数启动
## OTEL 数据在 python 程序中以 console 输出的配置
# OTEL_METRIC_EXPORT_INTERVAL=10000 OTEL_PYTHON_LOG_CORRELATION=true OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=true OTEL_TRACES_EXPORTER=console OTEL_METRICS_EXPORTER=console OTEL_LOGS_EXPORTER=console OTEL_SERVICE_NAME=brde opentelemetry-instrument uvicorn otel_fastapi:app --port 8000 --host 0.0.0.0
## 4317 端口配置
# OTEL_METRIC_EXPORT_INTERVAL=60000 OTEL_PYTHON_LOG_CORRELATION=true OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=true OTEL_TRACES_EXPORTER=otlp OTEL_METRICS_EXPORTER=otlp OTEL_LOGS_EXPORTER=otlp OTEL_EXPORTER_OTLP_PROTOCOL=grpc OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4317" OTEL_SERVICE_NAME=brde opentelemetry-instrument uvicorn otel_fastapi:app --port 8000 --host 0.0.0.0
## 4318 端口配置
# OTEL_METRIC_EXPORT_INTERVAL=60000 OTEL_PYTHON_LOG_CORRELATION=true OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=true OTEL_TRACES_EXPORTER=otlp OTEL_METRICS_EXPORTER=otlp OTEL_LOGS_EXPORTER=otlp OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4318" OTEL_SERVICE_NAME=brde opentelemetry-instrument uvicorn otel_fastapi:app --port 8000 --host 0.0.0.0
# opentelemetry-instrument 环境变量解释
# 1.opentelemetry-instrument-logging 接收两个环境变量, 在opentelemetry-instrument自动插桩程序中可以设置布尔值:
# 1.1 OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=ture 此环境变量给 root logger 设置 LoggingHandler.
# 1.2 OTEL_PYTHON_LOG_CORRELATION=true 用来在日志 Record 中加入 trace_id 和 span_id.
# 等价于 set_logging_format=True, 会把日志的 handler 的格式设置为 DEFAULT_LOGGING_FORMAT
# 2. EXPORTER
# OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4317"
# OTEL_TRACES_EXPORTER=otlp
# OTEL_LOGS_EXPORTER=otlp
# OTEL_METRICS_EXPORTER=otlp
# OTEL_METRIC_EXPORT_INTERVAL=60000
# OTEL_EXPORTER_OTLP_PROTOCOL=grpc
# 3. OTEL_SERVICE_NAME
### opentelemetry-instrument 命令行参数启动
# opentelemetry-instrument \
# --traces_exporter console,otlp \
# --metrics_exporter console \
# --service_name brde \
# --exporter_otlp_endpoint http://localhost:4317 \
# uvicorn otel_fastapi:app --port 8000 --host 0.0.0.0
### 手动导入 opentelemetry api sdk 的启动命令:
# uvicorn otel_fastapi:app --port 8000 --host 0.0.0.0
### 检测 grpc 的服务是否开启:
# nc -vz localhost 4317
import os
# for key, value in os.environ.items():
# root_logger.info(f'{key}={value}')
import logging
import aiohttp
from fastapi import FastAPI
from opentelemetry import trace, metrics
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader, ConsoleMetricExporter
from opentelemetry._logs import set_logger_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogRecordExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
from opentelemetry.instrumentation.logging import DEFAULT_LOGGING_FORMAT
# Basic logging configuration
# logging.basicConfig(level=logging.NOTSET)
# 这里的格式化只是为了演示trace关联日志效果
# opentelemetry.sdk._logs.LoggingHandler 实际上这个handler 不需要设置日志格式,默认就转化为json格式(protobuf)
# 只要 root logger 配置了 opentelemetry.sdk._logs.LoggingHandler, 那么trace关联log就会自动包含 trace_id 和 span_id.
logging.basicConfig(format=DEFAULT_LOGGING_FORMAT, level="DEBUG")
logger = logging.getLogger(__name__)
OTLP_ENDPOINT = os.environ.get("OTLP_ENDPOINT", "http://localhost:4317")
DEBUG_OTLP_CONSOLE = os.environ.get("DEBUG_OTLP_CONSOLE", False)
def inpsect_logging():
import logging
root_logger = logging.getLogger()
msg = []
for i, handler in enumerate(root_logger.handlers):
formatter = handler.formatter
msg.append(f"\nHandler {i}: {handler} for '{root_logger.name}':")
if formatter:
# Accessing the format string (using the internal _fmt attribute, as there's no public getter)
# Note: Accessing internal attributes like _fmt is generally discouraged but common for format strings.
# The public attribute is `formatter`, which is the object itself.
msg.append(f" Formatter object: {formatter}\n Format string: {formatter._fmt}")
else:
if isinstance(handler, LoggingHandler):
msg.append(f" A handler class which writes logging records,in OTLP format, to a network destination or file.")
else:
msg.append(f" No formatter.")
root_logger_msg = "\n".join(msg)
root_logger.warning(f"root logger handler and formatters:{root_logger_msg}")
# non_root_logger_msg = []
# for k,v in logging.Logger.manager.loggerDict.items() :
# non_root_logger_msg.append(f'+ [{str.ljust(k, 20)}] ({str(v.__class__)[8:-2]})')
# if not isinstance(v, logging.PlaceHolder):
# for h in v.handlers:
# non_root_logger_msg.append(f' +++ {str(h.__class__)[8:-2]}')
# _msg = "\n".join(non_root_logger_msg)
# root_logger.warning(f"other logger and its handlers:\n{_msg}")
def setup_telemetry_log(resource, tracer_provider,
log_level=logging.DEBUG,
debug_otlp_console=DEBUG_OTLP_CONSOLE):
"""
Docstring for setup_telemetry_log
:param resource: Description
:param level: Description
如果使用 opentelemetry api 和 sdk, 建议给日志handler格式中包含 trace_id 和 span_id.
注意: 只有在 span 上下文中的日志才会包含 trace_id 和 span_id.
"""
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from opentelemetry.instrumentation.logging import DEFAULT_LOGGING_FORMAT
## Define the processing of logs
logger_provider = LoggerProvider(resource=resource)
set_logger_provider(logger_provider) # Overriding of current LoggerProvider is not allowed
if debug_otlp_console:
logger_provider.add_log_record_processor(BatchLogRecordProcessor(ConsoleLogRecordExporter()))
exporter = OTLPLogExporter(
endpoint=OTLP_ENDPOINT
# headers={"Authorization": "Bearer $SOURCE_TOKEN"},
)
logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter))
### 把 trace 信息记录到日志中,这个是为了兼容原来的日志系统,修改原来 logger handler 的 Formatter.
### 注意,这个会覆盖原来的日志的格式. 这个不影响上面的 telemetry push 信息的设置.
# import logging
# from opentelemetry.instrumentation.logging import DEFAULT_LOGGING_FORMAT
# logging.basicConfig(format=DEFAULT_LOGGING_FORMAT, level="DEBUG")
LoggingInstrumentor().instrument(tracer_provider=tracer_provider,
set_logging_format=True,
log_level=log_level)
handler = LoggingHandler(level=log_level, logger_provider=logger_provider)
logging.getLogger().addHandler(handler)
return logger_provider
def setup_telemetry_trace(resource, debug_otlp_console=DEBUG_OTLP_CONSOLE):
tracer_provider = TracerProvider(resource=resource)
# Use OTLP exporter to send traces (adjust endpoint if needed, default is gRPC)
if debug_otlp_console:
## 设置环境变量来判断是否加载控制台console输出trace信息
tracer_provider.add_span_processor(
BatchSpanProcessor(ConsoleSpanExporter())
)
otlp_exporter = OTLPSpanExporter(
endpoint=OTLP_ENDPOINT, insecure=True
# headers={"Authorization": "Bearer $SOURCE_TOKEN"},
)
tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(tracer_provider)
return tracer_provider
def setup_telemetry_metrics(resource, debug_otlp_console=DEBUG_OTLP_CONSOLE):
## Metrics configuration (using console exporter for demonstration)
# Define the processing of metrics
metrics_otlp_exporter = OTLPMetricExporter(
endpoint=OTLP_ENDPOINT, insecure=True
# headers={"Authorization": "Bearer $SOURCE_TOKEN"},
)
periodic_metrics_grpc_exporter = PeriodicExportingMetricReader(
metrics_otlp_exporter,
export_interval_millis=10000
)
metric_readers = [periodic_metrics_grpc_exporter]
if debug_otlp_console:
periodic_metrics_console_exporter = PeriodicExportingMetricReader(
ConsoleMetricExporter(),
export_interval_millis=10000 # 10秒钟 push 一次指标
)
metric_readers.append(periodic_metrics_console_exporter)
meter_provider = MeterProvider(resource=resource,
metric_readers=metric_readers
)
metrics.set_meter_provider(meter_provider)
return meter_provider
def setup_fastapi_opentelemetry(app: FastAPI, service_name="brde"):
"""
如果在 opentelemetry-instrument 命令启动的程序中去设置相关 provider, 会有如下报错:
trace.set_tracer_provider(trace.get_tracer_provider()) -> Overriding of current TracerProvider is not allowed
FastAPIInstrumentor.instrument_app(app) -> Attempting to instrument FastAPI app while already instrumented
LoggingInstrumentor().instrument(set_logging_format=True, log_level=logging.NOTSET) -> Attempting to instrument while already instrumented
:param app: Description
:type app: FastAPI
"""
# 判断是否由 opentelemetry-instrument uvicorn 启动程序
if getattr(app, "_is_instrumented_by_opentelemetry", False):
# 如果是由 opentelemetry-instrument 来自动插桩, 那么不需要去显式设置 provider.
logger.info("OpenTelemetry already configured. Skipping configuration.")
else:
# 手动使用 OpenTelemetry api 和 sdk 来设置插桩程序搜集遥测数据
# uvicorn otel_fastapi:app --port 8000 --host 0.0.0.0
resource = Resource(attributes={"service.name": service_name})
tracer_provider = setup_telemetry_trace(resource)
# 以前运行日志是在磁盘上写文件(虚拟机)或者容器的标准输出(k8s, docker, podman), 然后使用 vector 来搜集运行日志.
# 以后使用 opentelemetry, 使用 grpc 或者 http/protobuf 来往下游可观测性存储来写日志, 比如 openobserve 等.
# 这样能够更好的关联 trace, log, metrics 信息. 可以更好的利用 otelcol-contrib 等工具来批量处理, 采样等处理流程
# 提升整体可观测性的能力.
# 至于数据日志, 审计日志, 最好还是使用 数据文件 -> vector -> s3,这套搜集流程.
logger_provider = setup_telemetry_log(resource, tracer_provider)
meter_provider = setup_telemetry_metrics(resource)
# Instrument the app
FastAPIInstrumentor.instrument_app(app)
AioHttpClientInstrumentor().instrument()
### 检查是否在 root logger 设置 LoggingHandler
inpsect_logging()
app = FastAPI()
setup_fastapi_opentelemetry(app=app, service_name="otel_fastapi")
# Optional: create a simple counter metric
meter = metrics.get_meter("otel_fastapi.demo.meter")
request_counter = meter.create_counter("otel_fastapi.requests.total", description="Total number of requests")
async def http_post():
import aiohttp
url = 'http://httpbin.org/post' # An example endpoint that echoes back the post data
payload = {'key': 'value', 'list_of_items': [1, 2, 3]}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload) as resp:
logger.info(f"Status: {resp.status}")
# Parse the response JSON
response_json = await resp.json()
logger.info(f"Received JSON: {response_json['json']}")
return response_json
# curl 'http://127.0.0.1:8000/
@app.get("/")
def read_root():
logger.info("Handling a request to the root endpoint.")
request_counter.add(1, attributes={"endpoint": "/"})
return {"Hello": "World"}
# curl 'http://127.0.0.1:8000/items/33333'
@app.get("/items/{item_id}")
async def read_item(item_id: int):
# Manual tracing example: create a custom span
with trace.get_tracer(__name__).start_as_current_span("read_item_function"):
logger.info(f"Accessing item ID: {item_id}")
request_counter.add(1, attributes={"endpoint": "/items/{item_id}"})
res = await http_post()
return {"item_id": item_id, "response": res}
if __name__ == "__main__":
# Run with uvicorn programmatically for simplicity here
# For production, run with `uvicorn main:app` and configure OTel via env vars
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)