Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.11.6
hooks:
- id: ruff
args: [--fix]
- id: ruff-format

- repo: local
hooks:
- id: mypy
name: mypy
entry: .venv/Scripts/python.exe -m mypy
language: system
types: [python]
require_serial: true
pass_filenames: false
args: [--no-error-summary, shared, infrastructure]
7 changes: 3 additions & 4 deletions examples/event_contracts.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Serialization and usage examples for RootPilot event contracts."""

import json
from datetime import datetime, timezone
from datetime import UTC, datetime

from shared.contracts import (
Event,
Expand All @@ -11,7 +11,6 @@
TelemetryEvent,
)


# ---------------------------------------------------------------------------
# TelemetryEvent – serialize / deserialize
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -55,7 +54,7 @@
title="Gateway timeout spike above threshold",
description="p99 latency exceeded 5s for 3 consecutive minutes.",
source_event_ids=[envelope.id],
detected_at=datetime.now(timezone.utc),
detected_at=datetime.now(UTC),
)

incident_json = incident.model_dump_json()
Expand Down Expand Up @@ -122,4 +121,4 @@
with open("examples/_sample_events.json", "w") as f:
json.dump(all_events, f, indent=2, default=str)

print(f"Bulk export written to examples/_sample_events.json")
print("Bulk export written to examples/_sample_events.json")
10 changes: 7 additions & 3 deletions examples/provider_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@

import asyncio
from collections.abc import AsyncIterator
from datetime import datetime, timezone
from datetime import UTC, datetime

from pydantic import BaseModel
from shared.contracts import Event, EventBus, LLMMessage, LLMProvider, LLMResponse, LogEntry, LogFilter, LogStore

from shared.contracts import Event, EventBus, LLMMessage, LLMProvider, LLMResponse, LogEntry, LogFilter, LogStore

# ---------------------------------------------------------------------------
# Example EventBus implementation
# ---------------------------------------------------------------------------


class PrintBus(EventBus):
async def publish(self, event: Event, topic: str | None = None) -> None:
print(f"[{topic or event.topic}] {event.source}: {event.payload}")
Expand All @@ -33,6 +34,7 @@ async def health(self) -> bool:
# Example LogStore implementation
# ---------------------------------------------------------------------------


class MemoryLogStore(LogStore):
def __init__(self) -> None:
self._logs: list[LogEntry] = []
Expand All @@ -56,6 +58,7 @@ async def health(self) -> bool:
# Example LLMProvider implementation
# ---------------------------------------------------------------------------


class EchoProvider(LLMProvider):
async def generate(
self,
Expand All @@ -82,12 +85,13 @@ async def embed(self, text: str, model: str | None = None) -> list[float]:
# Usage
# ---------------------------------------------------------------------------


async def main() -> None:
bus = PrintBus()
await bus.publish(Event(source="test", topic="ping", payload={"msg": "hello"}))

store = MemoryLogStore()
await store.write(LogEntry(timestamp=datetime.now(timezone.utc), service="svc", level="INFO", message="started"))
await store.write(LogEntry(timestamp=datetime.now(UTC), service="svc", level="INFO", message="started"))
async for entry in store.query(LogFilter(service="svc")):
print(f" Log: {entry.message}")

Expand Down
21 changes: 14 additions & 7 deletions infrastructure/elasticsearch/elasticsearch_log_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@

import logging
from collections.abc import AsyncIterator
from datetime import datetime, timezone
from datetime import UTC, datetime
from typing import Any

from elasticsearch import AsyncElasticsearch # type: ignore
from elasticsearch.helpers import async_bulk # type: ignore
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk
from pydantic import BaseModel, Field

from shared.contracts.interfaces.log_store import LogEntry, LogFilter, LogStore, SortOrder
from shared.contracts.interfaces.log_store import LogEntry, LogFilter, LogStore

logger = logging.getLogger(__name__)

Expand All @@ -29,7 +30,7 @@

def _index_name(dt: datetime | None = None) -> str:
"""Return the target index name for a given timestamp (UTC daily bucket)."""
ts = dt or datetime.now(timezone.utc)
ts = dt or datetime.now(UTC)
return f"{INDEX_PREFIX}-{ts.strftime('%Y.%m.%d')}"


Expand All @@ -51,6 +52,7 @@ def _build_es_doc(entry: LogEntry) -> dict:
# Applied automatically at startup to ensure consistent mappings.
# ─────────────────────────────────────────────────────────────────────────


def _default_index_template() -> dict:
return {
"index_patterns": [f"{INDEX_PREFIX}-*"],
Expand Down Expand Up @@ -117,6 +119,7 @@ def _default_ilm_policy() -> dict:

# ── Query builder ─────────────────────────────────────────────────────────


def _build_query_body(filter: LogFilter) -> dict:
"""Translate a LogFilter into an Elasticsearch query body."""
must_clauses: list[dict] = []
Expand Down Expand Up @@ -157,6 +160,7 @@ def _build_query_body(filter: LogFilter) -> dict:

# ── Elasticsearch Configuration ───────────────────────────────────────────


class ElasticsearchConfig(BaseModel):
hosts: str = Field(
default="http://localhost:9200",
Expand All @@ -182,6 +186,7 @@ class ElasticsearchConfig(BaseModel):

# ── Elasticsearch LogStore Adapter ────────────────────────────────────────


class ElasticsearchLogStore(LogStore):
"""LogStore implementation backed by Elasticsearch.

Expand Down Expand Up @@ -266,7 +271,9 @@ async def _generate_actions():
index = _index_name(entry.timestamp)
yield {"_index": index, "_source": doc}

success, errors = await async_bulk(
success: int
errors: list[Any]
success, errors = await async_bulk( # type: ignore[assignment]
client=self._client,
actions=_generate_actions(),
chunk_size=self._config.bulk_batch_size,
Expand All @@ -281,7 +288,7 @@ async def _generate_actions():
else:
logger.debug("Bulk write succeeded", extra={"count": success})

async def query(self, filter: LogFilter) -> AsyncIterator[LogEntry]:
async def query(self, filter: LogFilter) -> AsyncIterator[LogEntry]: # type: ignore[override,misc]
assert self._client is not None, "ElasticsearchLogStore not started"

body = _build_query_body(filter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from __future__ import annotations

from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch
from datetime import UTC, datetime
from unittest.mock import AsyncMock, patch

import pytest

Expand All @@ -27,7 +27,7 @@ def config() -> ElasticsearchConfig:
@pytest.fixture
def entry() -> LogEntry:
return LogEntry(
timestamp=datetime(2026, 6, 13, 12, 0, 0, tzinfo=timezone.utc),
timestamp=datetime(2026, 6, 13, 12, 0, 0, tzinfo=UTC),
service="ingestion-service",
level="ERROR",
message="Connection refused",
Expand All @@ -39,7 +39,7 @@ def entry() -> LogEntry:

class TestIndexNaming:
def test_index_name_format(self) -> None:
dt = datetime(2026, 6, 13, 12, 0, 0, tzinfo=timezone.utc)
dt = datetime(2026, 6, 13, 12, 0, 0, tzinfo=UTC)
name = _index_name(dt)
assert name == "rp-tl-2026.06.13"

Expand All @@ -48,11 +48,11 @@ def test_index_name_defaults_to_utc_now(self) -> None:
assert name.startswith("rp-tl-")

def test_index_name_pads_single_digit_month(self) -> None:
dt = datetime(2026, 1, 5, tzinfo=timezone.utc)
dt = datetime(2026, 1, 5, tzinfo=UTC)
assert _index_name(dt) == "rp-tl-2026.01.05"

def test_index_name_pads_single_digit_day(self) -> None:
dt = datetime(2026, 12, 1, tzinfo=timezone.utc)
dt = datetime(2026, 12, 1, tzinfo=UTC)
assert _index_name(dt) == "rp-tl-2026.12.01"


Expand All @@ -70,7 +70,7 @@ def test_build_doc_structure(self, entry: LogEntry) -> None:

def test_build_doc_without_trace_span(self) -> None:
entry = LogEntry(
timestamp=datetime(2026, 6, 13, tzinfo=timezone.utc),
timestamp=datetime(2026, 6, 13, tzinfo=UTC),
service="test",
level="INFO",
message="hello",
Expand Down Expand Up @@ -128,8 +128,8 @@ def test_filter_by_trace_id(self) -> None:
assert body["query"]["bool"]["must"] == [{"term": {"trace_id": "abc123"}}]

def test_filter_by_time_range(self) -> None:
start = datetime(2026, 6, 13, tzinfo=timezone.utc)
end = datetime(2026, 6, 14, tzinfo=timezone.utc)
start = datetime(2026, 6, 13, tzinfo=UTC)
end = datetime(2026, 6, 14, tzinfo=UTC)
f = LogFilter(start_time=start, end_time=end)
body = _build_query_body(f)
time_range = body["query"]["bool"]["must"][0]["range"]["@timestamp"]
Expand Down
4 changes: 2 additions & 2 deletions infrastructure/monitoring/otel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from infrastructure.monitoring.otel.instrumentation import (
OpenTelemetryMiddleware,
setup_tracing,
get_trace_context,
setup_tracing,
)
from infrastructure.monitoring.otel.otel_tracer_provider import (
OTelSpan,
Expand All @@ -16,6 +16,6 @@
"OTelTracer",
"OTelTracerProvider",
"OpenTelemetryMiddleware",
"setup_tracing",
"get_trace_context",
"setup_tracing",
]
6 changes: 1 addition & 5 deletions infrastructure/monitoring/otel/instrumentation.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
"""Tracing setup utilities and ASGI middleware for FastAPI services."""

import logging
from collections.abc import Awaitable, Callable
from typing import Any

from fastapi import FastAPI
from opentelemetry import trace as otel_trace
Expand Down Expand Up @@ -59,9 +57,7 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
await self.app(scope, receive, send)
return

provider: OTelTracerProvider | None = getattr(
getattr(self.app, "state", None), "tracer_provider", None
)
provider: OTelTracerProvider | None = getattr(getattr(self.app, "state", None), "tracer_provider", None)
if provider is None:
await self.app(scope, receive, send)
return
Expand Down
16 changes: 8 additions & 8 deletions infrastructure/monitoring/otel/otel_tracer_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
from opentelemetry.propagators.composite import CompositeHTTPPropagator
from opentelemetry.propagators.textmap import Setter, TextMapPropagator
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import Span as OTelSDKSpan
from opentelemetry.sdk.trace import TracerProvider as OTelSDKTracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import NonRecordingSpan, SpanContext as OTelSpanContext
from opentelemetry.trace import NonRecordingSpan
from opentelemetry.trace import SpanContext as OTelSpanContext
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

from shared.observability.tracing import Span, SpanContext, SpanKind, SpanStatus, Tracer, TracerProvider
Expand Down Expand Up @@ -108,9 +108,7 @@ def start_span(
if context is not None:
parent_octx = _to_span_context(context)
if parent_octx is not None:
otel_ctx = otel_trace.set_span_in_context(
NonRecordingSpan(parent_octx)
)
otel_ctx = otel_trace.set_span_in_context(NonRecordingSpan(parent_octx))

otel_span = self._tracer.start_span(
name=name,
Expand All @@ -137,9 +135,11 @@ def __init__(
sdk_provider.add_span_processor(span_processor)

self._provider = sdk_provider
self._propagator: TextMapPropagator = CompositeHTTPPropagator([
TraceContextTextMapPropagator(),
])
self._propagator: TextMapPropagator = CompositeHTTPPropagator(
[
TraceContextTextMapPropagator(),
]
)

otel_trace.set_tracer_provider(sdk_provider)

Expand Down
10 changes: 3 additions & 7 deletions infrastructure/rabbitmq/rabbitmq_event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class RabbitMQConfig(BaseModel):


class _SubscriberInfo:
__slots__ = ("queue_name", "channel", "consumer_tag", "topic")
__slots__ = ("channel", "consumer_tag", "queue_name", "topic")

def __init__(
self,
Expand Down Expand Up @@ -174,9 +174,7 @@ async def subscribe(self, topic: str, handler: EventHandler) -> None:
async def health(self) -> bool:
if self._closed:
return False
if self._connection is None or self._connection.is_closed:
return False
return True
return not (self._connection is None or self._connection.is_closed)

async def _resolve_exchange(self) -> AbstractExchange:
async with self._lock:
Expand All @@ -192,9 +190,7 @@ async def _resolve_exchange(self) -> AbstractExchange:
await channel.close()
return self._exchange

def _make_handler(
self, handler: EventHandler
) -> Callable[[AbstractIncomingMessage], Awaitable[None]]:
def _make_handler(self, handler: EventHandler) -> Callable[[AbstractIncomingMessage], Awaitable[None]]:
async def _on_message(message: AbstractIncomingMessage) -> None:
async with message.process(requeue=True):
try:
Expand Down
Loading
Loading