Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion shared/domain/correlation/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Telemetry correlation engine models, pipeline, and strategies."""
"""Telemetry correlation engine models, pipeline, scoring, and strategies."""

from shared.domain.correlation.engine import CorrelationEngine
from shared.domain.correlation.enums import CorrelationSignal, CorrelationStrategyType
Expand All @@ -7,8 +7,15 @@
CorrelationGroup,
CorrelationMatch,
CorrelationResult,
ScoreContribution,
)
from shared.domain.correlation.pipeline import CorrelationPipeline
from shared.domain.correlation.scoring import (
ConfidenceScorer,
ScoringPipeline,
ScoringStrategy,
WeightedProbabilisticScorer,
)
from shared.domain.correlation.strategies import (
CorrelationStrategy,
DependencyStrategy,
Expand All @@ -19,6 +26,7 @@
)

__all__ = [
"ConfidenceScorer",
"CorrelationContext",
"CorrelationEngine",
"CorrelationGroup",
Expand All @@ -31,6 +39,10 @@
"DependencyStrategy",
"ErrorSignatureStrategy",
"RequestIdStrategy",
"ScoreContribution",
"ScoringPipeline",
"ScoringStrategy",
"TimeWindowStrategy",
"TraceIdStrategy",
"WeightedProbabilisticScorer",
]
16 changes: 15 additions & 1 deletion shared/domain/correlation/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,26 @@ class CorrelationMatch(BaseModel):
metadata: dict[str, str] = Field(default_factory=dict, description="Strategy-specific details.")


class ScoreContribution(BaseModel):
"""Explains a single strategy's contribution to a group's composite score."""

strategy_name: str = Field(description="Strategy type name (e.g. trace_id).")
signal: CorrelationSignal = Field(description="Specific signal detected.")
raw_score: float = Field(ge=0.0, le=1.0, description="Original match score before weighting.")
weighted_score: float = Field(ge=0.0, le=1.0, description="Score after applying strategy weight.")
weight: float = Field(ge=0.0, le=1.0, description="Strategy weight used for attenuation.")
match_count: int = Field(ge=0, description="Number of raw matches contributing under this strategy.")
event_ids: list[str] = Field(default_factory=list, description="Event IDs involved in these matches.")


class CorrelationGroup(BaseModel):
group_id: str = Field(description="Unique group identifier.")
event_ids: list[str] = Field(description="Event IDs in this group.")
strategies_used: list[CorrelationStrategyType] = Field(description="Strategies that contributed.")
strategy_scores: dict[str, float] = Field(default_factory=dict, description="Max score per strategy.")
strategy_scores: dict[str, float] = Field(default_factory=dict, description="Max weighted score per strategy.")
composite_score: float = Field(ge=0.0, le=1.0, description="Aggregate correlation score.")
confidence: float = Field(default=0.0, ge=0.0, le=1.0, description="Confidence in the correlation quality.")
contributions: list[ScoreContribution] = Field(default_factory=list, description="Per-strategy scoring breakdown.")
window_start: datetime | None = Field(default=None, description="Earliest event timestamp.")
window_end: datetime | None = Field(default=None, description="Latest event timestamp.")

Expand Down
85 changes: 3 additions & 82 deletions shared/domain/correlation/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,16 @@
import time
from collections import defaultdict
from uuid import uuid4

from shared.domain.correlation.enums import CorrelationStrategyType
from shared.domain.correlation.models import CorrelationContext, CorrelationGroup, CorrelationMatch, CorrelationResult
from shared.domain.correlation.scoring.pipeline import ScoringPipeline
from shared.domain.correlation.strategies.base import CorrelationStrategy


def _composite_score(scores: list[float]) -> float:
if not scores:
return 0.0
result = 0.0
for s in scores:
result = 1.0 - (1.0 - result) * (1.0 - s)
return round(result, 4)


class CorrelationPipeline:
def __init__(self, strategies: list[CorrelationStrategy]) -> None:
if not strategies:
raise ValueError("At least one strategy is required")
self._strategies = strategies
self._scoring_pipeline = ScoringPipeline(strategies)

async def run(self, context: CorrelationContext) -> CorrelationResult:
start = time.perf_counter()
Expand All @@ -32,7 +22,7 @@ async def run(self, context: CorrelationContext) -> CorrelationResult:
all_matches.extend(matches)
strategy_counts[strategy.strategy_type.value] = len(matches)

groups = self._merge_into_groups(all_matches, context)
groups = self._scoring_pipeline.process(all_matches, context)
ungrouped = self._find_ungrouped(context.events, groups)

duration = (time.perf_counter() - start) * 1000
Expand All @@ -48,77 +38,8 @@ async def run(self, context: CorrelationContext) -> CorrelationResult:
duration_ms=round(duration, 2),
)

def _merge_into_groups(
self, matches: list[CorrelationMatch], context: CorrelationContext
) -> list[CorrelationGroup]:
adj: dict[str, set[str]] = defaultdict(set)
event_scores: dict[str, list[float]] = defaultdict(list)
event_strategies: dict[str, set[CorrelationStrategyType]] = defaultdict(set)
strategy_scores: dict[str, dict[str, float]] = defaultdict(dict)

for m in matches:
adj[m.event_id_a].add(m.event_id_b)
adj[m.event_id_b].add(m.event_id_a)
event_scores[m.event_id_a].append(m.score)
event_scores[m.event_id_b].append(m.score)
event_strategies[m.event_id_a].add(m.strategy_type)
event_strategies[m.event_id_b].add(m.strategy_type)
for eid in (m.event_id_a, m.event_id_b):
key = m.strategy_type.value
existing = strategy_scores[eid].get(key, 0.0)
strategy_scores[eid][key] = max(existing, m.score)

visited: set[str] = set()
groups: list[CorrelationGroup] = []
event_map = {ev.event_id: ev for ev in context.events}

for event_id in context.events:
eid = event_id.event_id
if eid in visited:
continue
group = self._bfs_group(eid, adj, visited)
if len(group) < 2:
continue
scores = [s for eid in group for s in event_scores.get(eid, [])]
strategies = list({st for eid in group for st in event_strategies.get(eid, set())})
timestamps = [event_map[eid].timestamp for eid in group if eid in event_map]
combined_scores: dict[str, float] = {}
for eid in group:
for strategy_key, sc in strategy_scores.get(eid, {}).items():
combined_scores[strategy_key] = max(combined_scores.get(strategy_key, 0.0), sc)
groups.append(
CorrelationGroup(
group_id=uuid4().hex,
event_ids=sorted(group),
strategies_used=sorted(strategies, key=lambda s: s.value),
strategy_scores=combined_scores,
composite_score=_composite_score(scores),
window_start=min(timestamps) if timestamps else None,
window_end=max(timestamps) if timestamps else None,
)
)

return [g for g in groups if g.composite_score >= context.min_score]

def _find_ungrouped(self, events: list, groups: list[CorrelationGroup]) -> list[str]:
grouped: set[str] = set()
for g in groups:
grouped.update(g.event_ids)
return [ev.event_id for ev in events if ev.event_id not in grouped]

@staticmethod
def _bfs_group(start: str, adj: dict[str, set[str]], visited: set[str]) -> set[str]:
from collections import deque

group: set[str] = set()
queue: deque[str] = deque([start])
while queue:
current = queue.popleft()
if current in visited:
continue
visited.add(current)
group.add(current)
for neighbor in adj.get(current, set()):
if neighbor not in visited:
queue.append(neighbor)
return group
19 changes: 19 additions & 0 deletions shared/domain/correlation/scoring/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Correlation scoring model for weighted, explainable, and confidence-aware ranking."""

from shared.domain.correlation.models import ScoreContribution
from shared.domain.correlation.scoring.confidence import ConfidenceScorer
from shared.domain.correlation.scoring.models import ScoringResult
from shared.domain.correlation.scoring.pipeline import ScoringPipeline
from shared.domain.correlation.scoring.strategies import (
ScoringStrategy,
WeightedProbabilisticScorer,
)

__all__ = [
"ConfidenceScorer",
"ScoreContribution",
"ScoringPipeline",
"ScoringResult",
"ScoringStrategy",
"WeightedProbabilisticScorer",
]
81 changes: 81 additions & 0 deletions shared/domain/correlation/scoring/confidence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Confidence scoring for correlation groups.

Combines composite score with diversity, coverage, and recency factors.
"""

from datetime import UTC, datetime

from shared.domain.correlation.scoring.models import ScoringResult


class ConfidenceScorer:
"""Computes a confidence score for a correlation group.

Confidence is a weighted combination of:
- composite_score (40%): the base scoring quality
- diversity_factor (30%): how many distinct strategies matched
- coverage_factor (20%): how many events in the group were matched
- recency_factor (10%): how recent the events are
"""

def compute(
self,
result: ScoringResult,
total_events_in_group: int,
event_timestamps: list[datetime] | None = None,
) -> float:
composite = result.composite_score

diversity = self._diversity_factor(result)
coverage = self._coverage_factor(result, total_events_in_group)
recency = self._recency_factor(event_timestamps)

confidence = 0.4 * composite + 0.3 * diversity + 0.2 * coverage + 0.1 * recency
return round(min(max(confidence, 0.0), 1.0), 4)

@staticmethod
def _diversity_factor(result: ScoringResult) -> float:
"""Score based on how many distinct strategies contributed.

More strategies = higher confidence.
0 strategies -> 0.0
1+ strategies -> scales toward 1.0, with diminishing returns.
"""
count = len(result.strategies_used)
if count == 0:
return 0.0
return min(count / 4.0, 1.0)

@staticmethod
def _coverage_factor(result: ScoringResult, total_events_in_group: int) -> float:
"""Score based on what fraction of group events have match data.

Higher coverage = higher confidence.
"""
if total_events_in_group == 0:
return 0.0
matched_events: set[str] = set()
for c in result.contributions:
matched_events.update(c.event_ids)
return len(matched_events) / total_events_in_group

@staticmethod
def _recency_factor(event_timestamps: list[datetime] | None) -> float:
"""Score based on recency of events.

Events within the last hour = 1.0.
Events older than 24 hours = 0.0.
Linear decay in between.
"""
if not event_timestamps:
return 1.0

now = datetime.now(UTC)
newest = max(event_timestamps)
age_seconds = (now - newest).total_seconds()

if age_seconds <= 3600: # 1 hour
return 1.0
if age_seconds >= 86400: # 24 hours
return 0.0
return round(1.0 - (age_seconds - 3600) / (86400 - 3600), 4)
18 changes: 18 additions & 0 deletions shared/domain/correlation/scoring/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""Scoring intermediate models for explainable correlation scoring."""

from pydantic import BaseModel, Field

from shared.domain.correlation.enums import CorrelationStrategyType
from shared.domain.correlation.models import ScoreContribution


class ScoringResult(BaseModel):
"""The computed scoring output for a single correlation group."""

composite_score: float = Field(ge=0.0, le=1.0, description="Aggregate weighted correlation score.")
confidence: float = Field(ge=0.0, le=1.0, description="Confidence in the correlation quality.")
contributions: list[ScoreContribution] = Field(default_factory=list, description="Per-strategy breakdown.")
strategy_scores: dict[str, float] = Field(default_factory=dict, description="Max weighted score per strategy.")
strategies_used: list[CorrelationStrategyType] = Field(
default_factory=list, description="Strategies that contributed."
)
Loading
Loading