diff --git a/shared/domain/correlation/__init__.py b/shared/domain/correlation/__init__.py index 9276783..48f2b56 100644 --- a/shared/domain/correlation/__init__.py +++ b/shared/domain/correlation/__init__.py @@ -1,4 +1,4 @@ -"""Telemetry correlation engine models, pipeline, and strategies.""" +"""Telemetry correlation engine models, pipeline, scoring, and strategies.""" from shared.domain.correlation.engine import CorrelationEngine from shared.domain.correlation.enums import CorrelationSignal, CorrelationStrategyType @@ -7,8 +7,15 @@ CorrelationGroup, CorrelationMatch, CorrelationResult, + ScoreContribution, ) from shared.domain.correlation.pipeline import CorrelationPipeline +from shared.domain.correlation.scoring import ( + ConfidenceScorer, + ScoringPipeline, + ScoringStrategy, + WeightedProbabilisticScorer, +) from shared.domain.correlation.strategies import ( CorrelationStrategy, DependencyStrategy, @@ -19,6 +26,7 @@ ) __all__ = [ + "ConfidenceScorer", "CorrelationContext", "CorrelationEngine", "CorrelationGroup", @@ -31,6 +39,10 @@ "DependencyStrategy", "ErrorSignatureStrategy", "RequestIdStrategy", + "ScoreContribution", + "ScoringPipeline", + "ScoringStrategy", "TimeWindowStrategy", "TraceIdStrategy", + "WeightedProbabilisticScorer", ] diff --git a/shared/domain/correlation/models.py b/shared/domain/correlation/models.py index fcd0f7a..b9e69e5 100644 --- a/shared/domain/correlation/models.py +++ b/shared/domain/correlation/models.py @@ -15,12 +15,26 @@ class CorrelationMatch(BaseModel): metadata: dict[str, str] = Field(default_factory=dict, description="Strategy-specific details.") +class ScoreContribution(BaseModel): + """Explains a single strategy's contribution to a group's composite score.""" + + strategy_name: str = Field(description="Strategy type name (e.g. trace_id).") + signal: CorrelationSignal = Field(description="Specific signal detected.") + raw_score: float = Field(ge=0.0, le=1.0, description="Original match score before weighting.") + weighted_score: float = Field(ge=0.0, le=1.0, description="Score after applying strategy weight.") + weight: float = Field(ge=0.0, le=1.0, description="Strategy weight used for attenuation.") + match_count: int = Field(ge=0, description="Number of raw matches contributing under this strategy.") + event_ids: list[str] = Field(default_factory=list, description="Event IDs involved in these matches.") + + class CorrelationGroup(BaseModel): group_id: str = Field(description="Unique group identifier.") event_ids: list[str] = Field(description="Event IDs in this group.") strategies_used: list[CorrelationStrategyType] = Field(description="Strategies that contributed.") - strategy_scores: dict[str, float] = Field(default_factory=dict, description="Max score per strategy.") + strategy_scores: dict[str, float] = Field(default_factory=dict, description="Max weighted score per strategy.") composite_score: float = Field(ge=0.0, le=1.0, description="Aggregate correlation score.") + confidence: float = Field(default=0.0, ge=0.0, le=1.0, description="Confidence in the correlation quality.") + contributions: list[ScoreContribution] = Field(default_factory=list, description="Per-strategy scoring breakdown.") window_start: datetime | None = Field(default=None, description="Earliest event timestamp.") window_end: datetime | None = Field(default=None, description="Latest event timestamp.") diff --git a/shared/domain/correlation/pipeline.py b/shared/domain/correlation/pipeline.py index 6db989d..530b8dc 100644 --- a/shared/domain/correlation/pipeline.py +++ b/shared/domain/correlation/pipeline.py @@ -1,26 +1,16 @@ import time -from collections import defaultdict -from uuid import uuid4 -from shared.domain.correlation.enums import CorrelationStrategyType from shared.domain.correlation.models import CorrelationContext, CorrelationGroup, CorrelationMatch, CorrelationResult +from shared.domain.correlation.scoring.pipeline import ScoringPipeline from shared.domain.correlation.strategies.base import CorrelationStrategy -def _composite_score(scores: list[float]) -> float: - if not scores: - return 0.0 - result = 0.0 - for s in scores: - result = 1.0 - (1.0 - result) * (1.0 - s) - return round(result, 4) - - class CorrelationPipeline: def __init__(self, strategies: list[CorrelationStrategy]) -> None: if not strategies: raise ValueError("At least one strategy is required") self._strategies = strategies + self._scoring_pipeline = ScoringPipeline(strategies) async def run(self, context: CorrelationContext) -> CorrelationResult: start = time.perf_counter() @@ -32,7 +22,7 @@ async def run(self, context: CorrelationContext) -> CorrelationResult: all_matches.extend(matches) strategy_counts[strategy.strategy_type.value] = len(matches) - groups = self._merge_into_groups(all_matches, context) + groups = self._scoring_pipeline.process(all_matches, context) ungrouped = self._find_ungrouped(context.events, groups) duration = (time.perf_counter() - start) * 1000 @@ -48,77 +38,8 @@ async def run(self, context: CorrelationContext) -> CorrelationResult: duration_ms=round(duration, 2), ) - def _merge_into_groups( - self, matches: list[CorrelationMatch], context: CorrelationContext - ) -> list[CorrelationGroup]: - adj: dict[str, set[str]] = defaultdict(set) - event_scores: dict[str, list[float]] = defaultdict(list) - event_strategies: dict[str, set[CorrelationStrategyType]] = defaultdict(set) - strategy_scores: dict[str, dict[str, float]] = defaultdict(dict) - - for m in matches: - adj[m.event_id_a].add(m.event_id_b) - adj[m.event_id_b].add(m.event_id_a) - event_scores[m.event_id_a].append(m.score) - event_scores[m.event_id_b].append(m.score) - event_strategies[m.event_id_a].add(m.strategy_type) - event_strategies[m.event_id_b].add(m.strategy_type) - for eid in (m.event_id_a, m.event_id_b): - key = m.strategy_type.value - existing = strategy_scores[eid].get(key, 0.0) - strategy_scores[eid][key] = max(existing, m.score) - - visited: set[str] = set() - groups: list[CorrelationGroup] = [] - event_map = {ev.event_id: ev for ev in context.events} - - for event_id in context.events: - eid = event_id.event_id - if eid in visited: - continue - group = self._bfs_group(eid, adj, visited) - if len(group) < 2: - continue - scores = [s for eid in group for s in event_scores.get(eid, [])] - strategies = list({st for eid in group for st in event_strategies.get(eid, set())}) - timestamps = [event_map[eid].timestamp for eid in group if eid in event_map] - combined_scores: dict[str, float] = {} - for eid in group: - for strategy_key, sc in strategy_scores.get(eid, {}).items(): - combined_scores[strategy_key] = max(combined_scores.get(strategy_key, 0.0), sc) - groups.append( - CorrelationGroup( - group_id=uuid4().hex, - event_ids=sorted(group), - strategies_used=sorted(strategies, key=lambda s: s.value), - strategy_scores=combined_scores, - composite_score=_composite_score(scores), - window_start=min(timestamps) if timestamps else None, - window_end=max(timestamps) if timestamps else None, - ) - ) - - return [g for g in groups if g.composite_score >= context.min_score] - def _find_ungrouped(self, events: list, groups: list[CorrelationGroup]) -> list[str]: grouped: set[str] = set() for g in groups: grouped.update(g.event_ids) return [ev.event_id for ev in events if ev.event_id not in grouped] - - @staticmethod - def _bfs_group(start: str, adj: dict[str, set[str]], visited: set[str]) -> set[str]: - from collections import deque - - group: set[str] = set() - queue: deque[str] = deque([start]) - while queue: - current = queue.popleft() - if current in visited: - continue - visited.add(current) - group.add(current) - for neighbor in adj.get(current, set()): - if neighbor not in visited: - queue.append(neighbor) - return group diff --git a/shared/domain/correlation/scoring/__init__.py b/shared/domain/correlation/scoring/__init__.py new file mode 100644 index 0000000..adce75d --- /dev/null +++ b/shared/domain/correlation/scoring/__init__.py @@ -0,0 +1,19 @@ +"""Correlation scoring model for weighted, explainable, and confidence-aware ranking.""" + +from shared.domain.correlation.models import ScoreContribution +from shared.domain.correlation.scoring.confidence import ConfidenceScorer +from shared.domain.correlation.scoring.models import ScoringResult +from shared.domain.correlation.scoring.pipeline import ScoringPipeline +from shared.domain.correlation.scoring.strategies import ( + ScoringStrategy, + WeightedProbabilisticScorer, +) + +__all__ = [ + "ConfidenceScorer", + "ScoreContribution", + "ScoringPipeline", + "ScoringResult", + "ScoringStrategy", + "WeightedProbabilisticScorer", +] diff --git a/shared/domain/correlation/scoring/confidence.py b/shared/domain/correlation/scoring/confidence.py new file mode 100644 index 0000000..d2155d2 --- /dev/null +++ b/shared/domain/correlation/scoring/confidence.py @@ -0,0 +1,81 @@ +"""Confidence scoring for correlation groups. + +Combines composite score with diversity, coverage, and recency factors. +""" + +from datetime import UTC, datetime + +from shared.domain.correlation.scoring.models import ScoringResult + + +class ConfidenceScorer: + """Computes a confidence score for a correlation group. + + Confidence is a weighted combination of: + - composite_score (40%): the base scoring quality + - diversity_factor (30%): how many distinct strategies matched + - coverage_factor (20%): how many events in the group were matched + - recency_factor (10%): how recent the events are + """ + + def compute( + self, + result: ScoringResult, + total_events_in_group: int, + event_timestamps: list[datetime] | None = None, + ) -> float: + composite = result.composite_score + + diversity = self._diversity_factor(result) + coverage = self._coverage_factor(result, total_events_in_group) + recency = self._recency_factor(event_timestamps) + + confidence = 0.4 * composite + 0.3 * diversity + 0.2 * coverage + 0.1 * recency + return round(min(max(confidence, 0.0), 1.0), 4) + + @staticmethod + def _diversity_factor(result: ScoringResult) -> float: + """Score based on how many distinct strategies contributed. + + More strategies = higher confidence. + 0 strategies -> 0.0 + 1+ strategies -> scales toward 1.0, with diminishing returns. + """ + count = len(result.strategies_used) + if count == 0: + return 0.0 + return min(count / 4.0, 1.0) + + @staticmethod + def _coverage_factor(result: ScoringResult, total_events_in_group: int) -> float: + """Score based on what fraction of group events have match data. + + Higher coverage = higher confidence. + """ + if total_events_in_group == 0: + return 0.0 + matched_events: set[str] = set() + for c in result.contributions: + matched_events.update(c.event_ids) + return len(matched_events) / total_events_in_group + + @staticmethod + def _recency_factor(event_timestamps: list[datetime] | None) -> float: + """Score based on recency of events. + + Events within the last hour = 1.0. + Events older than 24 hours = 0.0. + Linear decay in between. + """ + if not event_timestamps: + return 1.0 + + now = datetime.now(UTC) + newest = max(event_timestamps) + age_seconds = (now - newest).total_seconds() + + if age_seconds <= 3600: # 1 hour + return 1.0 + if age_seconds >= 86400: # 24 hours + return 0.0 + return round(1.0 - (age_seconds - 3600) / (86400 - 3600), 4) diff --git a/shared/domain/correlation/scoring/models.py b/shared/domain/correlation/scoring/models.py new file mode 100644 index 0000000..46ec7c2 --- /dev/null +++ b/shared/domain/correlation/scoring/models.py @@ -0,0 +1,18 @@ +"""Scoring intermediate models for explainable correlation scoring.""" + +from pydantic import BaseModel, Field + +from shared.domain.correlation.enums import CorrelationStrategyType +from shared.domain.correlation.models import ScoreContribution + + +class ScoringResult(BaseModel): + """The computed scoring output for a single correlation group.""" + + composite_score: float = Field(ge=0.0, le=1.0, description="Aggregate weighted correlation score.") + confidence: float = Field(ge=0.0, le=1.0, description="Confidence in the correlation quality.") + contributions: list[ScoreContribution] = Field(default_factory=list, description="Per-strategy breakdown.") + strategy_scores: dict[str, float] = Field(default_factory=dict, description="Max weighted score per strategy.") + strategies_used: list[CorrelationStrategyType] = Field( + default_factory=list, description="Strategies that contributed." + ) diff --git a/shared/domain/correlation/scoring/pipeline.py b/shared/domain/correlation/scoring/pipeline.py new file mode 100644 index 0000000..90a6f03 --- /dev/null +++ b/shared/domain/correlation/scoring/pipeline.py @@ -0,0 +1,96 @@ +"""ScoringPipeline — orchestrates group detection, weighted scoring, and confidence computation.""" + +from collections import defaultdict, deque +from uuid import uuid4 + +from shared.domain.correlation.enums import CorrelationStrategyType +from shared.domain.correlation.models import CorrelationContext, CorrelationGroup, CorrelationMatch +from shared.domain.correlation.scoring.confidence import ConfidenceScorer +from shared.domain.correlation.scoring.strategies import ScoringStrategy, WeightedProbabilisticScorer +from shared.domain.correlation.strategies.base import CorrelationStrategy + + +class ScoringPipeline: + """Orchestrates the full scoring flow: group detection, weighted scoring, confidence computation. + + Accepts a list of correlation strategies (for weight resolution) and + delegates the per-group score computation to a pluggable *ScoringStrategy*. + """ + + def __init__( + self, + strategies: list[CorrelationStrategy], + scorer: ScoringStrategy | None = None, + confidence_scorer: ConfidenceScorer | None = None, + ) -> None: + self._strategy_map: dict[CorrelationStrategyType, CorrelationStrategy] = { + s.strategy_type: s for s in strategies + } + self._scorer = scorer or WeightedProbabilisticScorer() + self._confidence_scorer = confidence_scorer or ConfidenceScorer() + + def process( + self, + matches: list[CorrelationMatch], + context: CorrelationContext, + ) -> list[CorrelationGroup]: + """Group matches by connected components and score each group.""" + adj: dict[str, set[str]] = defaultdict(set) + event_map = {ev.event_id: ev for ev in context.events} + + for m in matches: + adj[m.event_id_a].add(m.event_id_b) + adj[m.event_id_b].add(m.event_id_a) + + visited: set[str] = set() + groups: list[CorrelationGroup] = [] + + for event_id in context.events: + eid = event_id.event_id + if eid in visited: + continue + group = self._bfs_group(eid, adj, visited) + if len(group) < 2: + continue + + group_matches = [m for m in matches if m.event_id_a in group or m.event_id_b in group] + + scoring_result = self._scorer.score_group(group, group_matches, self._strategy_map) + + timestamps = [event_map[eid].timestamp for eid in group if eid in event_map] + confidence = self._confidence_scorer.compute( + result=scoring_result, + total_events_in_group=len(group), + event_timestamps=timestamps, + ) + + groups.append( + CorrelationGroup( + group_id=uuid4().hex, + event_ids=sorted(group), + strategies_used=scoring_result.strategies_used, + strategy_scores=scoring_result.strategy_scores, + composite_score=scoring_result.composite_score, + confidence=confidence, + contributions=scoring_result.contributions, + window_start=min(timestamps) if timestamps else None, + window_end=max(timestamps) if timestamps else None, + ) + ) + + return [g for g in groups if g.composite_score >= context.min_score] + + @staticmethod + def _bfs_group(start: str, adj: dict[str, set[str]], visited: set[str]) -> set[str]: + group: set[str] = set() + queue: deque[str] = deque([start]) + while queue: + current = queue.popleft() + if current in visited: + continue + visited.add(current) + group.add(current) + for neighbor in adj.get(current, set()): + if neighbor not in visited: + queue.append(neighbor) + return group diff --git a/shared/domain/correlation/scoring/strategies.py b/shared/domain/correlation/scoring/strategies.py new file mode 100644 index 0000000..3cb81a9 --- /dev/null +++ b/shared/domain/correlation/scoring/strategies.py @@ -0,0 +1,113 @@ +"""Scoring strategies for weighted probabilistic correlation scoring.""" + +from abc import ABC, abstractmethod +from collections import defaultdict +from collections.abc import Mapping + +from shared.domain.correlation.enums import CorrelationStrategyType +from shared.domain.correlation.models import CorrelationMatch, ScoreContribution +from shared.domain.correlation.scoring.models import ScoringResult +from shared.domain.correlation.strategies.base import CorrelationStrategy + + +def _probabilistic_union(scores: list[float]) -> float: + """Combine scores using probabilistic union: 1 - ∏(1 - sᵢ).""" + if not scores: + return 0.0 + result = 0.0 + for s in scores: + result = 1.0 - (1.0 - result) * (1.0 - s) + return round(result, 4) + + +class ScoringStrategy(ABC): + """Pluggable strategy for computing composite scores from correlation matches.""" + + @abstractmethod + def score_group( + self, + _group_event_ids: set[str], + matches: list[CorrelationMatch], + strategy_map: Mapping[CorrelationStrategyType, CorrelationStrategy], + ) -> ScoringResult: ... + + +class WeightedProbabilisticScorer(ScoringStrategy): + """Default scorer using strategy weights to attenuate match scores before probabilistic combination. + + Each match score is attenuated by its strategy's weight: + weighted_score = 1 - (1 - raw_score * weight) + + The composite score is the probabilistic union of all weighted scores. + """ + + def score_group( + self, + _group_event_ids: set[str], + matches: list[CorrelationMatch], + strategy_map: Mapping[CorrelationStrategyType, CorrelationStrategy], + ) -> ScoringResult: + per_strategy_matches: dict[str, list[CorrelationMatch]] = defaultdict(list) + for m in matches: + per_strategy_matches[m.strategy_type.value].append(m) + + weighted_scores: list[float] = [] + contributions: list[ScoreContribution] = [] + strategy_scores: dict[str, float] = {} + strategies_used: set[CorrelationStrategyType] = set() + + for strategy_key, strategy_matches in per_strategy_matches.items(): + strategy_type = strategy_matches[0].strategy_type + strategies_used.add(strategy_type) + signal = strategy_matches[0].signal + + weight = _resolve_weight(strategy_type, strategy_map) + raw_max = max(m.score for m in strategy_matches) + + # Attenuate by weight + weighted_strategy_score = 1.0 - (1.0 - raw_max * weight) + weighted_strategy_score = max(0.0, round(weighted_strategy_score, 4)) + strategy_scores[strategy_key] = weighted_strategy_score + + event_ids: list[str] = [] + for m in strategy_matches: + if m.event_id_a not in event_ids: + event_ids.append(m.event_id_a) + if m.event_id_b not in event_ids: + event_ids.append(m.event_id_b) + + contributions.append( + ScoreContribution( + strategy_name=strategy_key, + signal=signal, + raw_score=raw_max, + weighted_score=weighted_strategy_score, + weight=weight, + match_count=len(strategy_matches), + event_ids=event_ids, + ) + ) + + for m in strategy_matches: + ws = 1.0 - (1.0 - m.score * weight) + weighted_scores.append(max(0.0, round(ws, 4))) + + composite = _probabilistic_union(weighted_scores) + + return ScoringResult( + composite_score=composite, + confidence=0.0, + contributions=contributions, + strategy_scores=strategy_scores, + strategies_used=sorted(strategies_used, key=lambda s: s.value), + ) + + +def _resolve_weight( + strategy_type: CorrelationStrategyType, + strategy_map: Mapping[CorrelationStrategyType, CorrelationStrategy], +) -> float: + strategy = strategy_map.get(strategy_type) + if strategy is not None: + return strategy.weight + return 0.5 diff --git a/shared/domain/correlation/scoring/tests/__init__.py b/shared/domain/correlation/scoring/tests/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/shared/domain/correlation/scoring/tests/__init__.py @@ -0,0 +1 @@ + diff --git a/shared/domain/correlation/scoring/tests/test_confidence.py b/shared/domain/correlation/scoring/tests/test_confidence.py new file mode 100644 index 0000000..95429a7 --- /dev/null +++ b/shared/domain/correlation/scoring/tests/test_confidence.py @@ -0,0 +1,155 @@ +"""Tests for the ConfidenceScorer.""" + +from datetime import UTC, datetime, timedelta + +from shared.domain.correlation.enums import CorrelationSignal, CorrelationStrategyType +from shared.domain.correlation.scoring.confidence import ConfidenceScorer +from shared.domain.correlation.scoring.models import ScoreContribution, ScoringResult + + +class TestConfidenceScorer: + def setup_method(self) -> None: + self.scorer = ConfidenceScorer() + + def test_perfect_confidence(self) -> None: + result = ScoringResult( + composite_score=1.0, + confidence=0.0, + contributions=[ + ScoreContribution( + strategy_name="trace_id", + signal=CorrelationSignal.TRACE_MATCH, + raw_score=1.0, + weighted_score=1.0, + weight=1.0, + match_count=1, + event_ids=["e1", "e2"], + ), + ScoreContribution( + strategy_name="time_window", + signal=CorrelationSignal.TIME_PROXIMITY, + raw_score=1.0, + weighted_score=1.0, + weight=1.0, + match_count=1, + event_ids=["e1", "e2"], + ), + ScoreContribution( + strategy_name="request_id", + signal=CorrelationSignal.REQUEST_MATCH, + raw_score=1.0, + weighted_score=1.0, + weight=1.0, + match_count=1, + event_ids=["e1", "e2"], + ), + ScoreContribution( + strategy_name="error_signature", + signal=CorrelationSignal.ERROR_PATTERN, + raw_score=1.0, + weighted_score=1.0, + weight=1.0, + match_count=1, + event_ids=["e1", "e2"], + ), + ], + strategy_scores={"trace_id": 1.0}, + strategies_used=[ + CorrelationStrategyType.TRACE_ID, + CorrelationStrategyType.TIME_WINDOW, + CorrelationStrategyType.REQUEST_ID, + CorrelationStrategyType.ERROR_SIGNATURE, + ], + ) + ts = datetime.now(UTC) + confidence = self.scorer.compute(result, total_events_in_group=2, event_timestamps=[ts]) + # composite=1.0 * 0.4 = 0.4, diversity=4/4=1.0 * 0.3 = 0.3, coverage=2/2=1.0 * 0.2 = 0.2, recency=1.0 * 0.1 = 0.1 + # total = 0.4 + 0.3 + 0.2 + 0.1 = 1.0 + assert confidence == 1.0 + + def test_single_strategy_lowers_confidence(self) -> None: + result = ScoringResult( + composite_score=0.5, + confidence=0.0, + contributions=[ + ScoreContribution( + strategy_name="trace_id", + signal=CorrelationSignal.TRACE_MATCH, + raw_score=1.0, + weighted_score=0.5, + weight=0.5, + match_count=1, + event_ids=["e1", "e2"], + ), + ], + strategy_scores={"trace_id": 0.5}, + strategies_used=[CorrelationStrategyType.TRACE_ID], + ) + confidence = self.scorer.compute(result, total_events_in_group=2, event_timestamps=None) + # composite=0.5 * 0.4 = 0.2, diversity=1/4=0.25 * 0.3 = 0.075, coverage=2/2=1.0 * 0.2 = 0.2, recency=1.0 * 0.1 = 0.1 + # total = 0.2 + 0.075 + 0.2 + 0.1 = 0.575 + assert confidence == 0.575 + + def test_low_coverage_lowers_confidence(self) -> None: + result = ScoringResult( + composite_score=0.5, + confidence=0.0, + contributions=[ + ScoreContribution( + strategy_name="trace_id", + signal=CorrelationSignal.TRACE_MATCH, + raw_score=1.0, + weighted_score=0.5, + weight=0.5, + match_count=1, + event_ids=["e1"], + ), + ], + strategy_scores={"trace_id": 0.5}, + strategies_used=[CorrelationStrategyType.TRACE_ID], + ) + confidence = self.scorer.compute(result, total_events_in_group=10, event_timestamps=None) + # coverage = 1/10 = 0.1 + # composite=0.5 * 0.4 = 0.2, diversity=0.25*0.3=0.075, coverage=0.1*0.2=0.02, recency=1.0*0.1=0.1 + # total = 0.395 + assert confidence == 0.395 + + def test_old_events_reduce_recency(self) -> None: + result = ScoringResult( + composite_score=0.5, + confidence=0.0, + contributions=[ + ScoreContribution( + strategy_name="trace_id", + signal=CorrelationSignal.TRACE_MATCH, + raw_score=1.0, + weighted_score=0.5, + weight=0.5, + match_count=1, + event_ids=["e1", "e2"], + ), + ], + strategy_scores={"trace_id": 0.5}, + strategies_used=[CorrelationStrategyType.TRACE_ID], + ) + old_ts = datetime.now(UTC) - timedelta(hours=12) + confidence = self.scorer.compute(result, total_events_in_group=2, event_timestamps=[old_ts]) + # recency: 12h = 43200s -> 1.0 - (43200-3600)/(86400-3600) = 1.0 - 39600/82800 = 1.0 - 0.478 = 0.522 + assert confidence < 0.6 + assert confidence > 0.4 + + def test_empty_strategies_returns_zero_diversity(self) -> None: + result = ScoringResult( + composite_score=0.5, + confidence=0.0, + contributions=[], + strategy_scores={}, + strategies_used=[], + ) + confidence = self.scorer.compute(result, total_events_in_group=2, event_timestamps=None) + # composite=0.5*0.4=0.2, diversity=0*0.3=0, coverage=0*0.2=0, recency=1*0.1=0.1 = 0.3 + assert confidence == 0.3 + + def test_empty_events_returns_full_recency(self) -> None: + assert ConfidenceScorer._recency_factor(None) == 1.0 + assert ConfidenceScorer._recency_factor([]) == 1.0 diff --git a/shared/domain/correlation/scoring/tests/test_pipeline_integration.py b/shared/domain/correlation/scoring/tests/test_pipeline_integration.py new file mode 100644 index 0000000..b1dd681 --- /dev/null +++ b/shared/domain/correlation/scoring/tests/test_pipeline_integration.py @@ -0,0 +1,153 @@ +"""Integration tests for ScoringPipeline — group detection, scoring, confidence.""" + +from datetime import UTC, datetime + +from shared.domain.correlation.enums import CorrelationSignal, CorrelationStrategyType +from shared.domain.correlation.models import CorrelationContext, CorrelationMatch +from shared.domain.correlation.scoring.pipeline import ScoringPipeline +from shared.domain.correlation.strategies import TimeWindowStrategy, TraceIdStrategy +from shared.domain.timeline.enums import TimelineEventCategory, TimelineEventSource +from shared.domain.timeline.models import TimelineEvent + + +def _event(event_id: str, ts_offset: int = 0, trace_id: str | None = None) -> TimelineEvent: + from datetime import timedelta + + base = datetime(2026, 6, 14, 10, 0, 0, tzinfo=UTC) + return TimelineEvent( + event_id=event_id, + category=TimelineEventCategory.METRIC_ANOMALY, + source=TimelineEventSource.TELEMETRY, + timestamp=base + timedelta(seconds=ts_offset), + service_name="api", + title=f"event {event_id}", + trace_id=trace_id, + ) + + +class TestScoringPipeline: + def test_empty_matches_returns_no_groups(self) -> None: + pipeline = ScoringPipeline(strategies=[TimeWindowStrategy(60)]) + ctx = CorrelationContext(events=[_event("a"), _event("b")]) + groups = pipeline.process([], ctx) + assert groups == [] + + def test_singleton_events_are_skipped(self) -> None: + pipeline = ScoringPipeline(strategies=[TraceIdStrategy()]) + ctx = CorrelationContext(events=[_event("a", trace_id="t1"), _event("b", trace_id="t2")]) + # No shared trace -> no matches + groups = pipeline.process([], ctx) + assert groups == [] + + def test_weighted_composite_from_matches(self) -> None: + trace_strat = TraceIdStrategy() + pipeline = ScoringPipeline(strategies=[trace_strat]) + ctx = CorrelationContext(events=[_event("a", trace_id="t1"), _event("b", trace_id="t1")]) + matches = [ + CorrelationMatch( + event_id_a="a", + event_id_b="b", + strategy_type=CorrelationStrategyType.TRACE_ID, + signal=CorrelationSignal.TRACE_MATCH, + score=1.0, + ), + ] + groups = pipeline.process(matches, ctx) + assert len(groups) == 1 + g = groups[0] + assert g.composite_score == 0.9 + assert g.confidence > 0.0 + assert len(g.contributions) == 1 + assert g.contributions[0].strategy_name == "trace_id" + + def test_min_score_filters_low_scoring_groups(self) -> None: + pipeline = ScoringPipeline(strategies=[TimeWindowStrategy(60)]) + ctx = CorrelationContext( + events=[_event("a", ts_offset=0), _event("b", ts_offset=55)], + min_score=0.5, + ) + matches = [ + CorrelationMatch( + event_id_a="a", + event_id_b="b", + strategy_type=CorrelationStrategyType.TIME_WINDOW, + signal=CorrelationSignal.TIME_PROXIMITY, + score=1.0 - 55 / 60, + ), + ] + groups = pipeline.process(matches, ctx) + assert len(groups) == 0 + + def test_group_includes_all_event_ids(self) -> None: + pipeline = ScoringPipeline(strategies=[TraceIdStrategy()]) + ctx = CorrelationContext( + events=[ + _event("a", trace_id="t1"), + _event("b", trace_id="t1"), + _event("c", trace_id="t1"), + ] + ) + matches = [ + CorrelationMatch( + event_id_a="a", + event_id_b="b", + strategy_type=CorrelationStrategyType.TRACE_ID, + signal=CorrelationSignal.TRACE_MATCH, + score=1.0, + ), + CorrelationMatch( + event_id_a="a", + event_id_b="c", + strategy_type=CorrelationStrategyType.TRACE_ID, + signal=CorrelationSignal.TRACE_MATCH, + score=1.0, + ), + ] + groups = pipeline.process(matches, ctx) + assert len(groups) == 1 + assert sorted(groups[0].event_ids) == ["a", "b", "c"] + + def test_custom_scorer_can_be_injected(self) -> None: + from shared.domain.correlation.scoring.strategies import ScoringStrategy, WeightedProbabilisticScorer + + class FixedScorer(ScoringStrategy): + def score_group(self, group_event_ids, matches, strategy_map): + return WeightedProbabilisticScorer().score_group(group_event_ids, matches, strategy_map) + + pipeline = ScoringPipeline(strategies=[TraceIdStrategy()], scorer=FixedScorer()) + ctx = CorrelationContext(events=[_event("a", trace_id="t1"), _event("b", trace_id="t1")]) + matches = [ + CorrelationMatch( + event_id_a="a", + event_id_b="b", + strategy_type=CorrelationStrategyType.TRACE_ID, + signal=CorrelationSignal.TRACE_MATCH, + score=1.0, + ), + ] + groups = pipeline.process(matches, ctx) + assert len(groups) == 1 + + def test_window_start_end_from_timestamps(self) -> None: + pipeline = ScoringPipeline(strategies=[TraceIdStrategy()]) + ctx = CorrelationContext( + events=[ + _event("a", ts_offset=10, trace_id="t1"), + _event("b", ts_offset=20, trace_id="t1"), + ] + ) + matches = [ + CorrelationMatch( + event_id_a="a", + event_id_b="b", + strategy_type=CorrelationStrategyType.TRACE_ID, + signal=CorrelationSignal.TRACE_MATCH, + score=1.0, + ), + ] + groups = pipeline.process(matches, ctx) + assert len(groups) == 1 + g = groups[0] + assert g.window_start is not None + assert g.window_end is not None + assert g.window_start < g.window_end diff --git a/shared/domain/correlation/scoring/tests/test_strategies.py b/shared/domain/correlation/scoring/tests/test_strategies.py new file mode 100644 index 0000000..54bb2f6 --- /dev/null +++ b/shared/domain/correlation/scoring/tests/test_strategies.py @@ -0,0 +1,166 @@ +"""Tests for the WeightedProbabilisticScorer.""" + +from shared.domain.correlation.enums import CorrelationSignal, CorrelationStrategyType +from shared.domain.correlation.models import CorrelationMatch +from shared.domain.correlation.scoring.strategies import WeightedProbabilisticScorer +from shared.domain.correlation.strategies.base import CorrelationStrategy + + +class _FakeStrategy(CorrelationStrategy): + strategy_type: CorrelationStrategyType + signal: CorrelationSignal + weight: float + + def __init__(self, strategy_type: CorrelationStrategyType, weight: float, signal: CorrelationSignal) -> None: + self.strategy_type = strategy_type + self.weight = weight + self.signal = signal + + async def correlate(self, context): # noqa: ARG002 + return [] + + +class TestWeightedProbabilisticScorer: + def setup_method(self) -> None: + self.scorer = WeightedProbabilisticScorer() + + def test_empty_matches_returns_zero_composite(self) -> None: + result = self.scorer.score_group(set(), [], {}) + assert result.composite_score == 0.0 + assert result.contributions == [] + + def test_single_match_high_weight(self) -> None: + trace_strat = _FakeStrategy(CorrelationStrategyType.TRACE_ID, 0.9, CorrelationSignal.TRACE_MATCH) + matches = [ + CorrelationMatch( + event_id_a="e1", + event_id_b="e2", + strategy_type=CorrelationStrategyType.TRACE_ID, + signal=CorrelationSignal.TRACE_MATCH, + score=1.0, + ), + ] + strategy_map = {CorrelationStrategyType.TRACE_ID: trace_strat} + result = self.scorer.score_group({"e1", "e2"}, matches, strategy_map) + + assert result.composite_score == 0.9 + assert len(result.contributions) == 1 + contrib = result.contributions[0] + assert contrib.strategy_name == "trace_id" + assert contrib.raw_score == 1.0 + assert contrib.weighted_score == 0.9 + assert contrib.weight == 0.9 + + def test_low_weight_attenuates_score(self) -> None: + time_strat = _FakeStrategy(CorrelationStrategyType.TIME_WINDOW, 0.3, CorrelationSignal.TIME_PROXIMITY) + matches = [ + CorrelationMatch( + event_id_a="e1", + event_id_b="e2", + strategy_type=CorrelationStrategyType.TIME_WINDOW, + signal=CorrelationSignal.TIME_PROXIMITY, + score=1.0, + ), + ] + strategy_map = {CorrelationStrategyType.TIME_WINDOW: time_strat} + result = self.scorer.score_group({"e1", "e2"}, matches, strategy_map) + + assert result.composite_score == 0.3 + assert result.contributions[0].weighted_score == 0.3 + + def test_multiple_strategies_combine_probabilistically(self) -> None: + trace_strat = _FakeStrategy(CorrelationStrategyType.TRACE_ID, 0.9, CorrelationSignal.TRACE_MATCH) + time_strat = _FakeStrategy(CorrelationStrategyType.TIME_WINDOW, 0.3, CorrelationSignal.TIME_PROXIMITY) + matches = [ + CorrelationMatch( + event_id_a="e1", + event_id_b="e2", + strategy_type=CorrelationStrategyType.TRACE_ID, + signal=CorrelationSignal.TRACE_MATCH, + score=1.0, + ), + CorrelationMatch( + event_id_a="e1", + event_id_b="e2", + strategy_type=CorrelationStrategyType.TIME_WINDOW, + signal=CorrelationSignal.TIME_PROXIMITY, + score=0.9, + ), + ] + strategy_map = { + CorrelationStrategyType.TRACE_ID: trace_strat, + CorrelationStrategyType.TIME_WINDOW: time_strat, + } + result = self.scorer.score_group({"e1", "e2"}, matches, strategy_map) + + # Weighted: trace = 0.9, time = 1 - (1 - 0.9*0.3) = 0.27 + # Composite: 1 - (1-0.9)*(1-0.27) = 1 - 0.1*0.73 = 1 - 0.073 = 0.927 + assert result.composite_score == 0.927 + + def test_missing_strategy_falls_back_to_default_weight(self) -> None: + matches = [ + CorrelationMatch( + event_id_a="e1", + event_id_b="e2", + strategy_type=CorrelationStrategyType.DEPENDENCY, + signal=CorrelationSignal.DEPENDENCY_CHAIN, + score=1.0, + ), + ] + # Empty strategy map -> fallback to 0.5 + result = self.scorer.score_group({"e1", "e2"}, matches, {}) + assert result.composite_score == 0.5 + assert result.contributions[0].weight == 0.5 + + def test_strategy_scores_uses_max_per_strategy(self) -> None: + trace_strat = _FakeStrategy(CorrelationStrategyType.TRACE_ID, 0.9, CorrelationSignal.TRACE_MATCH) + matches = [ + CorrelationMatch( + event_id_a="e1", + event_id_b="e2", + strategy_type=CorrelationStrategyType.TRACE_ID, + signal=CorrelationSignal.TRACE_MATCH, + score=0.5, + ), + CorrelationMatch( + event_id_a="e1", + event_id_b="e3", + strategy_type=CorrelationStrategyType.TRACE_ID, + signal=CorrelationSignal.TRACE_MATCH, + score=1.0, + ), + ] + strategy_map = {CorrelationStrategyType.TRACE_ID: trace_strat} + result = self.scorer.score_group({"e1", "e2", "e3"}, matches, strategy_map) + + assert result.strategy_scores["trace_id"] == 0.9 + assert result.contributions[0].raw_score == 1.0 + + def test_strategies_used_is_sorted(self) -> None: + time_strat = _FakeStrategy(CorrelationStrategyType.TIME_WINDOW, 0.3, CorrelationSignal.TIME_PROXIMITY) + trace_strat = _FakeStrategy(CorrelationStrategyType.TRACE_ID, 0.9, CorrelationSignal.TRACE_MATCH) + matches = [ + CorrelationMatch( + event_id_a="e1", + event_id_b="e2", + strategy_type=CorrelationStrategyType.TIME_WINDOW, + signal=CorrelationSignal.TIME_PROXIMITY, + score=0.5, + ), + CorrelationMatch( + event_id_a="e1", + event_id_b="e2", + strategy_type=CorrelationStrategyType.TRACE_ID, + signal=CorrelationSignal.TRACE_MATCH, + score=1.0, + ), + ] + strategy_map = { + CorrelationStrategyType.TIME_WINDOW: time_strat, + CorrelationStrategyType.TRACE_ID: trace_strat, + } + result = self.scorer.score_group({"e1", "e2"}, matches, strategy_map) + assert result.strategies_used == sorted( + [CorrelationStrategyType.TIME_WINDOW, CorrelationStrategyType.TRACE_ID], + key=lambda s: s.value, + )