From 44c33ea7b1dd9461a88bf284477d5c03e410459b Mon Sep 17 00:00:00 2001 From: saurabh batham Date: Sun, 14 Jun 2026 19:04:01 +0530 Subject: [PATCH] feat: implement incident context aggregation pipeline (RP-17) - Add IncidentContext, AggregatedCorrelationGroup, AggregatedTimeline, ImpactAnalysis models - Add ContextBuilder ABC + TimelineBuilder, CorrelationBuilder, TraceBuilder, ImpactBuilder - Add IncidentContextAggregator orchestrator - Wire exports into shared/domain/__init__.py - Add tests (15 tests, all passing) - All ruff and mypy checks pass --- shared/domain/__init__.py | 14 +- shared/domain/incident/__init__.py | 0 shared/domain/incident/context/__init__.py | 17 ++ shared/domain/incident/context/aggregator.py | 52 +++++ shared/domain/incident/context/builders.py | 173 ++++++++++++++++ shared/domain/incident/context/models.py | 79 +++++++ .../domain/incident/context/tests/__init__.py | 0 .../incident/context/tests/test_aggregator.py | 196 ++++++++++++++++++ .../incident/context/tests/test_models.py | 145 +++++++++++++ 9 files changed, 675 insertions(+), 1 deletion(-) create mode 100644 shared/domain/incident/__init__.py create mode 100644 shared/domain/incident/context/__init__.py create mode 100644 shared/domain/incident/context/aggregator.py create mode 100644 shared/domain/incident/context/builders.py create mode 100644 shared/domain/incident/context/models.py create mode 100644 shared/domain/incident/context/tests/__init__.py create mode 100644 shared/domain/incident/context/tests/test_aggregator.py create mode 100644 shared/domain/incident/context/tests/test_models.py diff --git a/shared/domain/__init__.py b/shared/domain/__init__.py index 71144d8..000c7d7 100644 --- a/shared/domain/__init__.py +++ b/shared/domain/__init__.py @@ -1,4 +1,4 @@ -"""Domain models, graph primitives, timeline, and correlation for RootPilot.""" +"""Domain models, graph primitives, timeline, correlation, and incident context for RootPilot.""" from shared.domain.correlation import ( CorrelationEngine, @@ -15,6 +15,13 @@ TimeWindowStrategy, TraceIdStrategy, ) +from shared.domain.incident.context import ( + AggregatedCorrelationGroup, + AggregatedTimeline, + ImpactAnalysis, + IncidentContext, + IncidentContextAggregator, +) from shared.domain.timeline import ( EventClassifier, IncidentTimeline, @@ -26,6 +33,8 @@ ) __all__ = [ + "AggregatedCorrelationGroup", + "AggregatedTimeline", "CorrelationEngine", "CorrelationGroup", "CorrelationMatch", @@ -37,6 +46,9 @@ "DependencyStrategy", "ErrorSignatureStrategy", "EventClassifier", + "ImpactAnalysis", + "IncidentContext", + "IncidentContextAggregator", "IncidentTimeline", "RequestIdStrategy", "TimeWindowStrategy", diff --git a/shared/domain/incident/__init__.py b/shared/domain/incident/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/shared/domain/incident/context/__init__.py b/shared/domain/incident/context/__init__.py new file mode 100644 index 0000000..0c1cb38 --- /dev/null +++ b/shared/domain/incident/context/__init__.py @@ -0,0 +1,17 @@ +"""Incident context aggregation for investigation-ready payloads.""" + +from shared.domain.incident.context.aggregator import IncidentContextAggregator +from shared.domain.incident.context.models import ( + AggregatedCorrelationGroup, + AggregatedTimeline, + ImpactAnalysis, + IncidentContext, +) + +__all__ = [ + "AggregatedCorrelationGroup", + "AggregatedTimeline", + "ImpactAnalysis", + "IncidentContext", + "IncidentContextAggregator", +] diff --git a/shared/domain/incident/context/aggregator.py b/shared/domain/incident/context/aggregator.py new file mode 100644 index 0000000..f131da5 --- /dev/null +++ b/shared/domain/incident/context/aggregator.py @@ -0,0 +1,52 @@ +"""IncidentContextAggregator — the orchestrator for the aggregation pipeline.""" + +from datetime import UTC, datetime + +from shared.domain.incident.context.builders import ContextBuilder, ContextBuilderState +from shared.domain.incident.context.models import IncidentContext +from shared.domain.timeline.models import TimelineEvent + + +class IncidentContextAggregator: + """Aggregate telemetry events into a full investigation-ready incident context.""" + + def __init__(self, builders: list[ContextBuilder] | None = None) -> None: + self._builders = sorted(builders or [], key=lambda b: b.weight) + + async def aggregate( + self, + incident_id: str, + primary_service: str, + events: list[TimelineEvent], + severity: str = "UNKNOWN", + title: str = "", + detected_at: datetime | None = None, + ) -> IncidentContext: + state = ContextBuilderState( + incident_id=incident_id, + primary_service=primary_service, + severity=severity, + title=title, + detected_at=detected_at, + events=events, + ) + + for builder in self._builders: + await builder.build(state) + + return IncidentContext( + incident_id=incident_id, + primary_service=primary_service, + severity=severity, + title=title, + detected_at=detected_at or datetime.now(UTC), + timeline=state.timeline, + correlation_groups=state.correlation_groups, + ungrouped_events=state.ungrouped_events, + impacts=state.impacts, + trace_groups=state.trace_groups, + event_count=len(events), + service_count=len({ev.service_name for ev in events if ev.service_name}), + trace_count=len({ev.trace_id for ev in events if ev.trace_id}), + aggregated_at=datetime.now(UTC), + ) diff --git a/shared/domain/incident/context/builders.py b/shared/domain/incident/context/builders.py new file mode 100644 index 0000000..050fa79 --- /dev/null +++ b/shared/domain/incident/context/builders.py @@ -0,0 +1,173 @@ +"""Extensible context builders for the incident aggregation pipeline.""" + +from abc import ABC, abstractmethod +from datetime import datetime + +from pydantic import BaseModel, Field + +from shared.domain.correlation.engine import CorrelationEngine +from shared.domain.correlation.enums import CorrelationSignal +from shared.domain.correlation.grouping import TraceGroupingService +from shared.domain.correlation.grouping.models import TraceGroup +from shared.domain.incident.context.models import ( + AggregatedCorrelationGroup, + AggregatedTimeline, + ImpactAnalysis, +) +from shared.domain.timeline.models import TimelineEvent +from shared.domain.timeline.services.reconstructor import TimelineReconstructor + +_STRATEGY_TO_SIGNAL: dict[str, CorrelationSignal] = { + "time_window": CorrelationSignal.TIME_PROXIMITY, + "trace_id": CorrelationSignal.TRACE_MATCH, + "span_relation": CorrelationSignal.SPAN_PARENT_CHILD, + "request_id": CorrelationSignal.REQUEST_MATCH, + "dependency": CorrelationSignal.DEPENDENCY_CHAIN, + "error_signature": CorrelationSignal.ERROR_PATTERN, +} + + +class ContextBuilderState(BaseModel): + """Mutable state carried through the builder pipeline.""" + + incident_id: str = "" + primary_service: str = "" + severity: str = "UNKNOWN" + title: str = "" + detected_at: datetime | None = None + events: list[TimelineEvent] = Field(default_factory=list) + + timeline: AggregatedTimeline | None = None + correlation_groups: list[AggregatedCorrelationGroup] = Field(default_factory=list) + ungrouped_events: list[str] = Field(default_factory=list) + impacts: list[ImpactAnalysis] = Field(default_factory=list) + trace_groups: list[TraceGroup] = Field(default_factory=list) + + +class ContextBuilder(ABC): + """Extensible step in the aggregation pipeline.""" + + weight: int = 0 + + @abstractmethod + async def build(self, state: ContextBuilderState) -> None: + """Mutate *state* by adding or enriching context fields.""" + + +class TimelineBuilder(ContextBuilder): + """Build the incident timeline from raw events.""" + + weight = 10 + + def __init__(self, reconstructor: TimelineReconstructor | None = None) -> None: + self._reconstructor = reconstructor or TimelineReconstructor() + + async def build(self, state: ContextBuilderState) -> None: + if not state.events: + return + timeline = self._reconstructor.build_timeline( + incident_id=state.incident_id, + service=state.primary_service, + events=state.events, + ) + state.timeline = AggregatedTimeline( + incident_id=timeline.incident_id, + primary_service=timeline.service, + windows=timeline.windows, + total_events=timeline.event_count, + window_count=timeline.window_count, + start_time=timeline.start_time, + end_time=timeline.end_time, + duration_seconds=( + (timeline.end_time - timeline.start_time).total_seconds() + if timeline.start_time and timeline.end_time + else None + ), + ) + + +class CorrelationBuilder(ContextBuilder): + """Run the correlation engine and transform results into aggregated groups.""" + + weight = 20 + + def __init__(self, engine: CorrelationEngine | None = None) -> None: + self._engine = engine or CorrelationEngine() + + async def build(self, state: ContextBuilderState) -> None: + if not state.events: + return + result = await self._engine.correlate(state.events) + if not result.groups and not result.ungrouped_event_ids: + return + + groups = [ + AggregatedCorrelationGroup( + group_id=g.group_id, + event_ids=g.event_ids, + composite_score=g.composite_score, + signals=[_STRATEGY_TO_SIGNAL[s] for s in g.strategy_scores if s in _STRATEGY_TO_SIGNAL], + services=list( + {ev.service_name for ev in state.events if ev.event_id in g.event_ids and ev.service_name} + ), + window_start=g.window_start, + window_end=g.window_end, + ) + for g in result.groups + ] + state.correlation_groups = groups + state.ungrouped_events = result.ungrouped_event_ids + + +class TraceBuilder(ContextBuilder): + """Build trace groups from events with trace identifiers.""" + + weight = 30 + + def __init__(self, grouping_service: TraceGroupingService | None = None) -> None: + self._grouping = grouping_service or TraceGroupingService() + + async def build(self, state: ContextBuilderState) -> None: + if not any(ev.trace_id for ev in state.events): + return + state.trace_groups = self._grouping.build_trace_groups(state.events) + + +class ImpactBuilder(ContextBuilder): + """Analyze upstream causes and downstream impact for affected services.""" + + weight = 40 + + def __init__(self, traversal) -> None: + self._traversal = traversal + + async def build(self, state: ContextBuilderState) -> None: + if state.timeline is None or not state.timeline.windows: + return + + affected_services: set[str] = set() + for window in state.timeline.windows: + for ev in window.events: + if ev.service_name: + affected_services.add(ev.service_name) + + if not affected_services: + return + + impacts: list[ImpactAnalysis] = [] + for svc in sorted(affected_services): + upstream = await self._traversal.get_upstream(svc) + downstream = await self._traversal.get_downstream(svc) + paths: list[list[str]] = [] + for cause in upstream: + found = await self._traversal.find_paths(cause, svc, max_depth=10) + paths.extend(found) + impacts.append( + ImpactAnalysis( + service=svc, + upstream_causes=upstream, + downstream_impact=downstream, + propagation_paths=paths, + ) + ) + state.impacts = impacts diff --git a/shared/domain/incident/context/models.py b/shared/domain/incident/context/models.py new file mode 100644 index 0000000..6cbe577 --- /dev/null +++ b/shared/domain/incident/context/models.py @@ -0,0 +1,79 @@ +from datetime import datetime + +from pydantic import BaseModel, Field + +from shared.domain.correlation.enums import CorrelationSignal +from shared.domain.correlation.grouping.models import TraceGroup +from shared.domain.timeline.models import TimelineWindow + + +class AggregatedCorrelationGroup(BaseModel): + """A correlation group enriched with service and trace metadata.""" + + group_id: str = Field(description="Correlation group identifier.") + event_ids: list[str] = Field(description="Event IDs belonging to this group.") + composite_score: float = Field(ge=0.0, le=1.0, description="Aggregate correlation score.") + signals: list[CorrelationSignal] = Field( + default_factory=list, description="Detection signals that formed this group." + ) + services: list[str] = Field(default_factory=list, description="Unique service names in this group.") + trace_id: str | None = Field(default=None, description="Shared trace identifier, if any.") + span_count: int = Field(default=0, description="Number of distinct spans across group events.") + window_start: datetime | None = Field(default=None, description="Earliest event timestamp.") + window_end: datetime | None = Field(default=None, description="Latest event timestamp.") + + +class AggregatedTimeline(BaseModel): + """An incident timeline with computed duration and event density.""" + + incident_id: str = Field(description="Incident identifier.") + primary_service: str = Field(description="Primary affected service.") + windows: list[TimelineWindow] = Field(default_factory=list, description="Time-windowed event buckets.") + total_events: int = Field(default=0, description="Total event count.") + window_count: int = Field(default=0, description="Number of time windows.") + start_time: datetime | None = Field(default=None, description="Earliest event across all windows.") + end_time: datetime | None = Field(default=None, description="Latest event across all windows.") + duration_seconds: float | None = Field(default=None, description="Total incident duration in seconds.") + + +class ImpactAnalysis(BaseModel): + """Upstream causes and downstream blast radius for an affected service.""" + + service: str = Field(description="The affected service.") + upstream_causes: list[str] = Field( + default_factory=list, description="Services that could be root causes (ancestors)." + ) + downstream_impact: list[str] = Field( + default_factory=list, description="Services affected by this failure (descendants)." + ) + propagation_paths: list[list[str]] = Field( + default_factory=list, description="Explicit dependency propagation paths." + ) + + +class IncidentContext(BaseModel): + """Complete, AI-ready incident context assembled by the aggregation pipeline.""" + + incident_id: str = Field(description="Incident identifier.") + primary_service: str = Field(description="Service where the incident was detected.") + severity: str = Field(default="UNKNOWN", description="Incident severity level.") + title: str = Field(default="", description="Short human-readable incident summary.") + detected_at: datetime = Field(description="When the incident was detected (UTC).") + + timeline: AggregatedTimeline | None = Field(default=None, description="Structured timeline of events.") + correlation_groups: list[AggregatedCorrelationGroup] = Field( + default_factory=list, description="Correlated event groups." + ) + ungrouped_events: list[str] = Field( + default_factory=list, description="Event IDs that fell below correlation threshold." + ) + + impacts: list[ImpactAnalysis] = Field(default_factory=list, description="Impact analysis per affected service.") + trace_groups: list[TraceGroup] = Field(default_factory=list, description="Span trees found in the event set.") + + event_count: int = Field(default=0, description="Total input event count.") + service_count: int = Field(default=0, description="Unique services involved.") + trace_count: int = Field(default=0, description="Unique traces found.") + aggregated_at: datetime = Field( + default_factory=lambda: datetime.now(), description="When this context was assembled." + ) diff --git a/shared/domain/incident/context/tests/__init__.py b/shared/domain/incident/context/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/shared/domain/incident/context/tests/test_aggregator.py b/shared/domain/incident/context/tests/test_aggregator.py new file mode 100644 index 0000000..107ccc5 --- /dev/null +++ b/shared/domain/incident/context/tests/test_aggregator.py @@ -0,0 +1,196 @@ +"""Integration tests for the incident context aggregation pipeline.""" + +from datetime import UTC, datetime, timedelta + +import pytest + +from shared.domain.graph.enums import DependencyType +from shared.domain.graph.models import DependencyEdge +from shared.domain.graph.store import InMemoryGraphStore +from shared.domain.graph.traversal import GraphTraversal +from shared.domain.incident.context.aggregator import IncidentContextAggregator +from shared.domain.incident.context.builders import ( + ContextBuilder, + ContextBuilderState, + CorrelationBuilder, + ImpactBuilder, + TimelineBuilder, + TraceBuilder, +) +from shared.domain.timeline.enums import TimelineEventCategory, TimelineEventSource +from shared.domain.timeline.models import TimelineEvent + + +@pytest.fixture +def events() -> list[TimelineEvent]: + base = datetime(2026, 6, 14, 10, 0, 0, tzinfo=UTC) + return [ + TimelineEvent( + event_id="e1", + category=TimelineEventCategory.FAILURE, + source=TimelineEventSource.TELEMETRY, + timestamp=base, + service_name="api", + title="error burst", + trace_id="trace-abc", + span_id="span-1", + ), + TimelineEvent( + event_id="e2", + category=TimelineEventCategory.FAILURE, + source=TimelineEventSource.TELEMETRY, + timestamp=base + timedelta(seconds=30), + service_name="api", + title="error burst", + trace_id="trace-abc", + span_id="span-2", + parent_span_id="span-1", + ), + TimelineEvent( + event_id="e3", + category=TimelineEventCategory.RETRY, + source=TimelineEventSource.TELEMETRY, + timestamp=base + timedelta(minutes=3), + service_name="db", + title="retry", + trace_id="trace-xyz", + ), + TimelineEvent( + event_id="e4", + category=TimelineEventCategory.RECOVERY, + source=TimelineEventSource.TELEMETRY, + timestamp=base + timedelta(minutes=6), + service_name="api", + title="recovered", + ), + ] + + +class TestAggregatorNoBuilders: + async def test_minimal_context_with_no_builders(self) -> None: + ts = datetime.now(UTC) + aggregator = IncidentContextAggregator(builders=[]) + context = await aggregator.aggregate( + incident_id="inc-1", + primary_service="api", + events=[], + detected_at=ts, + ) + assert context.incident_id == "inc-1" + assert context.timeline is None + assert context.correlation_groups == [] + assert context.ungrouped_events == [] + assert context.impacts == [] + assert context.trace_groups == [] + assert context.event_count == 0 + + async def test_context_counts_metadata(self) -> None: + ts = datetime.now(UTC) + aggregator = IncidentContextAggregator(builders=[]) + context = await aggregator.aggregate( + incident_id="inc-1", + primary_service="api", + events=[], + severity="CRITICAL", + title="API is down", + detected_at=ts, + ) + assert context.severity == "CRITICAL" + assert context.title == "API is down" + + +class TestAggregatorAllBuilders: + @pytest.fixture + async def graph_store(self) -> InMemoryGraphStore: + s = InMemoryGraphStore() + await s.add_edge(DependencyEdge(source="web", target="api", dependency_type=DependencyType.SYNCHRONOUS)) + await s.add_edge(DependencyEdge(source="api", target="db", dependency_type=DependencyType.DATABASE)) + return s + + @pytest.fixture + def aggregator(self, graph_store: InMemoryGraphStore) -> IncidentContextAggregator: + traversal = GraphTraversal(graph_store) + return IncidentContextAggregator( + builders=[ + TimelineBuilder(), + CorrelationBuilder(), + TraceBuilder(), + ImpactBuilder(traversal), + ] + ) + + async def test_aggregator_populates_all_sections( + self, + aggregator: IncidentContextAggregator, + events: list[TimelineEvent], + ) -> None: + ts = datetime.now(UTC) + context = await aggregator.aggregate( + incident_id="inc-agg-1", + primary_service="api", + events=events, + detected_at=ts, + ) + + assert context.incident_id == "inc-agg-1" + assert context.event_count == 4 + + assert context.timeline is not None + assert context.timeline.total_events == 4 + assert context.timeline.primary_service == "api" + + assert 1 <= len(context.timeline.windows) <= 3 + assert len(context.ungrouped_events) >= 0 + + assert len(context.trace_groups) >= 1 + trace_ids = {g.trace_id for g in context.trace_groups} + assert "trace-abc" in trace_ids + + assert len(context.impacts) >= 1 + svcs = {i.service for i in context.impacts} + assert "api" in svcs + + async def test_aggregator_handles_empty_events( + self, + aggregator: IncidentContextAggregator, + ) -> None: + ts = datetime.now(UTC) + context = await aggregator.aggregate( + incident_id="inc-empty", + primary_service="api", + events=[], + detected_at=ts, + ) + assert context.event_count == 0 + assert context.timeline is None + assert context.correlation_groups == [] + assert context.impacts == [] + assert context.trace_groups == [] + + +class TestCustomBuilder: + async def test_builders_executed_by_weight_order(self) -> None: + order: list[int] = [] + + class BuilderA(ContextBuilder): + weight = 20 + + async def build(self, state: ContextBuilderState) -> None: # noqa: ARG002 + order.append(20) + + class BuilderB(ContextBuilder): + weight = 10 + + async def build(self, state: ContextBuilderState) -> None: # noqa: ARG002 + order.append(10) + + class BuilderC(ContextBuilder): + weight = 30 + + async def build(self, state: ContextBuilderState) -> None: # noqa: ARG002 + order.append(30) + + aggregator = IncidentContextAggregator(builders=[BuilderA(), BuilderB(), BuilderC()]) + ts = datetime.now(UTC) + await aggregator.aggregate("inc-1", "api", [], detected_at=ts) + assert order == [10, 20, 30] diff --git a/shared/domain/incident/context/tests/test_models.py b/shared/domain/incident/context/tests/test_models.py new file mode 100644 index 0000000..a315396 --- /dev/null +++ b/shared/domain/incident/context/tests/test_models.py @@ -0,0 +1,145 @@ +"""Tests for incident context models.""" + +from datetime import UTC, datetime + +from shared.domain.correlation.enums import CorrelationSignal +from shared.domain.incident.context.models import ( + AggregatedCorrelationGroup, + AggregatedTimeline, + ImpactAnalysis, + IncidentContext, +) + + +class TestAggregatedCorrelationGroup: + async def test_minimal_group(self) -> None: + group = AggregatedCorrelationGroup( + group_id="g-1", + event_ids=["e1", "e2"], + composite_score=0.85, + ) + assert group.group_id == "g-1" + assert group.composite_score == 0.85 + assert group.signals == [] + assert group.services == [] + assert group.trace_id is None + assert group.span_count == 0 + + async def test_group_with_all_fields(self) -> None: + ts = datetime.now(UTC) + group = AggregatedCorrelationGroup( + group_id="g-2", + event_ids=["e1"], + composite_score=0.9, + signals=[CorrelationSignal.TRACE_MATCH, CorrelationSignal.ERROR_PATTERN], + services=["api", "db"], + trace_id="trace-abc", + span_count=3, + window_start=ts, + window_end=ts, + ) + assert CorrelationSignal.TRACE_MATCH in group.signals + assert group.services == ["api", "db"] + + async def test_json_round_trip(self) -> None: + original = AggregatedCorrelationGroup( + group_id="g-3", + event_ids=["e1"], + composite_score=0.5, + ) + restored = AggregatedCorrelationGroup.model_validate_json(original.model_dump_json()) + assert restored == original + + +class TestAggregatedTimeline: + async def test_empty_timeline(self) -> None: + timeline = AggregatedTimeline(incident_id="inc-1", primary_service="api") + assert timeline.total_events == 0 + assert timeline.window_count == 0 + assert timeline.duration_seconds is None + + async def test_timeline_with_duration(self) -> None: + start = datetime(2026, 6, 14, 10, 0, 0, tzinfo=UTC) + end = datetime(2026, 6, 14, 10, 10, 0, tzinfo=UTC) + timeline = AggregatedTimeline( + incident_id="inc-1", + primary_service="api", + total_events=5, + window_count=2, + start_time=start, + end_time=end, + duration_seconds=600.0, + ) + assert timeline.duration_seconds == 600.0 + assert timeline.start_time == start + + +class TestImpactAnalysis: + async def test_empty_impact(self) -> None: + impact = ImpactAnalysis(service="api") + assert impact.service == "api" + assert impact.upstream_causes == [] + assert impact.downstream_impact == [] + + async def test_impact_with_paths(self) -> None: + impact = ImpactAnalysis( + service="db", + upstream_causes=["api-gateway", "user-service"], + downstream_impact=["cache"], + propagation_paths=[["api-gateway", "user-service", "db"]], + ) + assert len(impact.propagation_paths) == 1 + assert impact.propagation_paths[0] == ["api-gateway", "user-service", "db"] + + +class TestIncidentContext: + async def test_minimal_context(self) -> None: + ts = datetime.now(UTC) + context = IncidentContext( + incident_id="inc-1", + primary_service="api", + detected_at=ts, + ) + assert context.incident_id == "inc-1" + assert context.severity == "UNKNOWN" + assert context.event_count == 0 + assert context.timeline is None + + async def test_inherited_counts_from_event_data(self) -> None: + ts = datetime.now(UTC) + context = IncidentContext( + incident_id="inc-2", + primary_service="db", + detected_at=ts, + event_count=100, + service_count=3, + trace_count=2, + correlation_groups=[ + AggregatedCorrelationGroup( + group_id="g-1", + event_ids=["e1", "e2"], + composite_score=0.75, + ), + ], + ungrouped_events=["e3"], + ) + assert context.event_count == 100 + assert context.service_count == 3 + assert context.trace_count == 2 + assert len(context.correlation_groups) == 1 + assert len(context.ungrouped_events) == 1 + assert context.aggregated_at is not None + + async def test_json_round_trip(self) -> None: + ts = datetime.now(UTC) + original = IncidentContext( + incident_id="inc-3", + primary_service="api", + detected_at=ts, + title="High error rate on api", + severity="CRITICAL", + ) + restored = IncidentContext.model_validate_json(original.model_dump_json()) + assert restored.incident_id == original.incident_id + assert restored.title == original.title + assert restored.severity == original.severity