Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion shared/domain/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Domain models, graph primitives, timeline, and correlation for RootPilot."""
"""Domain models, graph primitives, timeline, correlation, and incident context for RootPilot."""

from shared.domain.correlation import (
CorrelationEngine,
Expand All @@ -15,6 +15,13 @@
TimeWindowStrategy,
TraceIdStrategy,
)
from shared.domain.incident.context import (
AggregatedCorrelationGroup,
AggregatedTimeline,
ImpactAnalysis,
IncidentContext,
IncidentContextAggregator,
)
from shared.domain.timeline import (
EventClassifier,
IncidentTimeline,
Expand All @@ -26,6 +33,8 @@
)

__all__ = [
"AggregatedCorrelationGroup",
"AggregatedTimeline",
"CorrelationEngine",
"CorrelationGroup",
"CorrelationMatch",
Expand All @@ -37,6 +46,9 @@
"DependencyStrategy",
"ErrorSignatureStrategy",
"EventClassifier",
"ImpactAnalysis",
"IncidentContext",
"IncidentContextAggregator",
"IncidentTimeline",
"RequestIdStrategy",
"TimeWindowStrategy",
Expand Down
Empty file.
17 changes: 17 additions & 0 deletions shared/domain/incident/context/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""Incident context aggregation for investigation-ready payloads."""

from shared.domain.incident.context.aggregator import IncidentContextAggregator
from shared.domain.incident.context.models import (
AggregatedCorrelationGroup,
AggregatedTimeline,
ImpactAnalysis,
IncidentContext,
)

__all__ = [
"AggregatedCorrelationGroup",
"AggregatedTimeline",
"ImpactAnalysis",
"IncidentContext",
"IncidentContextAggregator",
]
52 changes: 52 additions & 0 deletions shared/domain/incident/context/aggregator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""IncidentContextAggregator — the orchestrator for the aggregation pipeline."""

from datetime import UTC, datetime

from shared.domain.incident.context.builders import ContextBuilder, ContextBuilderState
from shared.domain.incident.context.models import IncidentContext
from shared.domain.timeline.models import TimelineEvent


class IncidentContextAggregator:
"""Aggregate telemetry events into a full investigation-ready incident context."""

def __init__(self, builders: list[ContextBuilder] | None = None) -> None:
self._builders = sorted(builders or [], key=lambda b: b.weight)

async def aggregate(
self,
incident_id: str,
primary_service: str,
events: list[TimelineEvent],
severity: str = "UNKNOWN",
title: str = "",
detected_at: datetime | None = None,
) -> IncidentContext:
state = ContextBuilderState(
incident_id=incident_id,
primary_service=primary_service,
severity=severity,
title=title,
detected_at=detected_at,
events=events,
)

for builder in self._builders:
await builder.build(state)

return IncidentContext(
incident_id=incident_id,
primary_service=primary_service,
severity=severity,
title=title,
detected_at=detected_at or datetime.now(UTC),
timeline=state.timeline,
correlation_groups=state.correlation_groups,
ungrouped_events=state.ungrouped_events,
impacts=state.impacts,
trace_groups=state.trace_groups,
event_count=len(events),
service_count=len({ev.service_name for ev in events if ev.service_name}),
trace_count=len({ev.trace_id for ev in events if ev.trace_id}),
aggregated_at=datetime.now(UTC),
)
173 changes: 173 additions & 0 deletions shared/domain/incident/context/builders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""Extensible context builders for the incident aggregation pipeline."""

from abc import ABC, abstractmethod
from datetime import datetime

from pydantic import BaseModel, Field

from shared.domain.correlation.engine import CorrelationEngine
from shared.domain.correlation.enums import CorrelationSignal
from shared.domain.correlation.grouping import TraceGroupingService
from shared.domain.correlation.grouping.models import TraceGroup
from shared.domain.incident.context.models import (
AggregatedCorrelationGroup,
AggregatedTimeline,
ImpactAnalysis,
)
from shared.domain.timeline.models import TimelineEvent
from shared.domain.timeline.services.reconstructor import TimelineReconstructor

_STRATEGY_TO_SIGNAL: dict[str, CorrelationSignal] = {
"time_window": CorrelationSignal.TIME_PROXIMITY,
"trace_id": CorrelationSignal.TRACE_MATCH,
"span_relation": CorrelationSignal.SPAN_PARENT_CHILD,
"request_id": CorrelationSignal.REQUEST_MATCH,
"dependency": CorrelationSignal.DEPENDENCY_CHAIN,
"error_signature": CorrelationSignal.ERROR_PATTERN,
}


class ContextBuilderState(BaseModel):
"""Mutable state carried through the builder pipeline."""

incident_id: str = ""
primary_service: str = ""
severity: str = "UNKNOWN"
title: str = ""
detected_at: datetime | None = None
events: list[TimelineEvent] = Field(default_factory=list)

timeline: AggregatedTimeline | None = None
correlation_groups: list[AggregatedCorrelationGroup] = Field(default_factory=list)
ungrouped_events: list[str] = Field(default_factory=list)
impacts: list[ImpactAnalysis] = Field(default_factory=list)
trace_groups: list[TraceGroup] = Field(default_factory=list)


class ContextBuilder(ABC):
"""Extensible step in the aggregation pipeline."""

weight: int = 0

@abstractmethod
async def build(self, state: ContextBuilderState) -> None:
"""Mutate *state* by adding or enriching context fields."""


class TimelineBuilder(ContextBuilder):
"""Build the incident timeline from raw events."""

weight = 10

def __init__(self, reconstructor: TimelineReconstructor | None = None) -> None:
self._reconstructor = reconstructor or TimelineReconstructor()

async def build(self, state: ContextBuilderState) -> None:
if not state.events:
return
timeline = self._reconstructor.build_timeline(
incident_id=state.incident_id,
service=state.primary_service,
events=state.events,
)
state.timeline = AggregatedTimeline(
incident_id=timeline.incident_id,
primary_service=timeline.service,
windows=timeline.windows,
total_events=timeline.event_count,
window_count=timeline.window_count,
start_time=timeline.start_time,
end_time=timeline.end_time,
duration_seconds=(
(timeline.end_time - timeline.start_time).total_seconds()
if timeline.start_time and timeline.end_time
else None
),
)


class CorrelationBuilder(ContextBuilder):
"""Run the correlation engine and transform results into aggregated groups."""

weight = 20

def __init__(self, engine: CorrelationEngine | None = None) -> None:
self._engine = engine or CorrelationEngine()

async def build(self, state: ContextBuilderState) -> None:
if not state.events:
return
result = await self._engine.correlate(state.events)
if not result.groups and not result.ungrouped_event_ids:
return

groups = [
AggregatedCorrelationGroup(
group_id=g.group_id,
event_ids=g.event_ids,
composite_score=g.composite_score,
signals=[_STRATEGY_TO_SIGNAL[s] for s in g.strategy_scores if s in _STRATEGY_TO_SIGNAL],
services=list(
{ev.service_name for ev in state.events if ev.event_id in g.event_ids and ev.service_name}
),
window_start=g.window_start,
window_end=g.window_end,
)
for g in result.groups
]
state.correlation_groups = groups
state.ungrouped_events = result.ungrouped_event_ids


class TraceBuilder(ContextBuilder):
"""Build trace groups from events with trace identifiers."""

weight = 30

def __init__(self, grouping_service: TraceGroupingService | None = None) -> None:
self._grouping = grouping_service or TraceGroupingService()

async def build(self, state: ContextBuilderState) -> None:
if not any(ev.trace_id for ev in state.events):
return
state.trace_groups = self._grouping.build_trace_groups(state.events)


class ImpactBuilder(ContextBuilder):
"""Analyze upstream causes and downstream impact for affected services."""

weight = 40

def __init__(self, traversal) -> None:
self._traversal = traversal

async def build(self, state: ContextBuilderState) -> None:
if state.timeline is None or not state.timeline.windows:
return

affected_services: set[str] = set()
for window in state.timeline.windows:
for ev in window.events:
if ev.service_name:
affected_services.add(ev.service_name)

if not affected_services:
return

impacts: list[ImpactAnalysis] = []
for svc in sorted(affected_services):
upstream = await self._traversal.get_upstream(svc)
downstream = await self._traversal.get_downstream(svc)
paths: list[list[str]] = []
for cause in upstream:
found = await self._traversal.find_paths(cause, svc, max_depth=10)
paths.extend(found)
impacts.append(
ImpactAnalysis(
service=svc,
upstream_causes=upstream,
downstream_impact=downstream,
propagation_paths=paths,
)
)
state.impacts = impacts
79 changes: 79 additions & 0 deletions shared/domain/incident/context/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from datetime import datetime

from pydantic import BaseModel, Field

from shared.domain.correlation.enums import CorrelationSignal
from shared.domain.correlation.grouping.models import TraceGroup
from shared.domain.timeline.models import TimelineWindow


class AggregatedCorrelationGroup(BaseModel):
"""A correlation group enriched with service and trace metadata."""

group_id: str = Field(description="Correlation group identifier.")
event_ids: list[str] = Field(description="Event IDs belonging to this group.")
composite_score: float = Field(ge=0.0, le=1.0, description="Aggregate correlation score.")
signals: list[CorrelationSignal] = Field(
default_factory=list, description="Detection signals that formed this group."
)
services: list[str] = Field(default_factory=list, description="Unique service names in this group.")
trace_id: str | None = Field(default=None, description="Shared trace identifier, if any.")
span_count: int = Field(default=0, description="Number of distinct spans across group events.")
window_start: datetime | None = Field(default=None, description="Earliest event timestamp.")
window_end: datetime | None = Field(default=None, description="Latest event timestamp.")


class AggregatedTimeline(BaseModel):
"""An incident timeline with computed duration and event density."""

incident_id: str = Field(description="Incident identifier.")
primary_service: str = Field(description="Primary affected service.")
windows: list[TimelineWindow] = Field(default_factory=list, description="Time-windowed event buckets.")
total_events: int = Field(default=0, description="Total event count.")
window_count: int = Field(default=0, description="Number of time windows.")
start_time: datetime | None = Field(default=None, description="Earliest event across all windows.")
end_time: datetime | None = Field(default=None, description="Latest event across all windows.")
duration_seconds: float | None = Field(default=None, description="Total incident duration in seconds.")


class ImpactAnalysis(BaseModel):
"""Upstream causes and downstream blast radius for an affected service."""

service: str = Field(description="The affected service.")
upstream_causes: list[str] = Field(
default_factory=list, description="Services that could be root causes (ancestors)."
)
downstream_impact: list[str] = Field(
default_factory=list, description="Services affected by this failure (descendants)."
)
propagation_paths: list[list[str]] = Field(
default_factory=list, description="Explicit dependency propagation paths."
)


class IncidentContext(BaseModel):
"""Complete, AI-ready incident context assembled by the aggregation pipeline."""

incident_id: str = Field(description="Incident identifier.")
primary_service: str = Field(description="Service where the incident was detected.")
severity: str = Field(default="UNKNOWN", description="Incident severity level.")
title: str = Field(default="", description="Short human-readable incident summary.")
detected_at: datetime = Field(description="When the incident was detected (UTC).")

timeline: AggregatedTimeline | None = Field(default=None, description="Structured timeline of events.")
correlation_groups: list[AggregatedCorrelationGroup] = Field(
default_factory=list, description="Correlated event groups."
)
ungrouped_events: list[str] = Field(
default_factory=list, description="Event IDs that fell below correlation threshold."
)

impacts: list[ImpactAnalysis] = Field(default_factory=list, description="Impact analysis per affected service.")
trace_groups: list[TraceGroup] = Field(default_factory=list, description="Span trees found in the event set.")

event_count: int = Field(default=0, description="Total input event count.")
service_count: int = Field(default=0, description="Unique services involved.")
trace_count: int = Field(default=0, description="Unique traces found.")
aggregated_at: datetime = Field(
default_factory=lambda: datetime.now(), description="When this context was assembled."
)
Empty file.
Loading
Loading