diff --git a/infrastructure/elasticsearch/__init__.py b/infrastructure/elasticsearch/__init__.py index c329d9b..016eec4 100644 --- a/infrastructure/elasticsearch/__init__.py +++ b/infrastructure/elasticsearch/__init__.py @@ -1,5 +1,9 @@ -"""Elasticsearch LogStore adapter implementation.""" +"""Elasticsearch adapter implementations.""" +from infrastructure.elasticsearch.elasticsearch_incident_store import ( + ElasticsearchIncidentStore, + IncidentElasticsearchConfig, +) from infrastructure.elasticsearch.elasticsearch_log_store import ( ElasticsearchConfig, ElasticsearchLogStore, @@ -7,5 +11,7 @@ __all__ = [ "ElasticsearchConfig", + "ElasticsearchIncidentStore", "ElasticsearchLogStore", + "IncidentElasticsearchConfig", ] diff --git a/infrastructure/elasticsearch/elasticsearch_incident_store.py b/infrastructure/elasticsearch/elasticsearch_incident_store.py new file mode 100644 index 0000000..350e170 --- /dev/null +++ b/infrastructure/elasticsearch/elasticsearch_incident_store.py @@ -0,0 +1,227 @@ +"""Elasticsearch IncidentStore adapter for incident persistence and retrieval.""" + +from __future__ import annotations + +import logging +from collections.abc import AsyncIterator + +from elasticsearch import AsyncElasticsearch +from pydantic import BaseModel, Field + +from infrastructure.elasticsearch.incident_index_strategy import ( + ILM_POLICY_NAME, + INDEX_PREFIX, + TEMPLATE_NAME, + build_context_doc, + default_ilm_policy, + default_index_template, + incident_index_name, +) +from shared.contracts.interfaces.incident_store import IncidentFilter, IncidentStore +from shared.domain.incident.context.models import IncidentContext + +logger = logging.getLogger(__name__) + + +class IncidentElasticsearchConfig(BaseModel): + hosts: str = Field( + default="http://localhost:9200", + description="Elasticsearch server URL(s).", + ) + username: str | None = Field( + default=None, + description="Elasticsearch basic auth username (null = no auth).", + ) + password: str | None = Field( + default=None, + description="Elasticsearch basic auth password (null = no auth).", + ) + index_prefix: str = Field( + default=INDEX_PREFIX, + description="Prefix for incident indices.", + ) + ilm_policy_name: str = Field( + default=ILM_POLICY_NAME, + description="Name of the ILM policy to apply.", + ) + template_name: str = Field( + default=TEMPLATE_NAME, + description="Name of the index template.", + ) + number_of_shards: int = Field(default=1, ge=1, description="Primary shard count.") + number_of_replicas: int = Field(default=0, ge=0, description="Replica count.") + request_timeout: int = Field(default=30, ge=5, description="HTTP request timeout in seconds.") + + +class ElasticsearchIncidentStore(IncidentStore): + """IncidentStore implementation backed by Elasticsearch. + + Manages daily rolling indices (``rp-inc-{yyyy.MM.dd}``) with an index + template that applies consistent mappings and an ILM policy for + automated retention. + """ + + def __init__(self, config: IncidentElasticsearchConfig | None = None) -> None: + self._config = config or IncidentElasticsearchConfig() + self._client: AsyncElasticsearch | None = None + self._bootstrap_done = False + + async def start(self) -> None: + """Connect to Elasticsearch and bootstrap index template + ILM policy.""" + if self._client is not None: + return + + kwargs: dict = { + "hosts": [self._config.hosts], + "timeout": self._config.request_timeout, + } + if self._config.username is not None and self._config.password is not None: + kwargs["basic_auth"] = (self._config.username, self._config.password) + + self._client = AsyncElasticsearch(**kwargs) + + await self._bootstrap() + self._bootstrap_done = True + logger.info("ElasticsearchIncidentStore started", extra={"hosts": self._config.hosts}) + + async def close(self) -> None: + """Close the Elasticsearch client connection.""" + if self._client is not None: + await self._client.close() + self._client = None + self._bootstrap_done = False + logger.info("ElasticsearchIncidentStore closed") + + async def _bootstrap(self) -> None: + """Create ILM policy and index template if they don't exist.""" + assert self._client is not None + + ilm_exists = await self._client.ilm.get_lifecycle( + name=self._config.ilm_policy_name, + ) + if not ilm_exists: + await self._client.ilm.put_lifecycle( + name=self._config.ilm_policy_name, + policy=default_ilm_policy(), + ) + logger.info("Created ILM policy", extra={"policy": self._config.ilm_policy_name}) + + template_exists = await self._client.indices.exists_index_template( + name=self._config.template_name, + ) + if not template_exists: + body = default_index_template() + body["template"]["settings"]["number_of_shards"] = self._config.number_of_shards + body["template"]["settings"]["number_of_replicas"] = self._config.number_of_replicas + body["template"]["settings"]["index.lifecycle.name"] = self._config.ilm_policy_name + await self._client.indices.put_index_template( + name=self._config.template_name, + body=body, + ) + logger.info("Created index template", extra={"template": self._config.template_name}) + + async def store(self, context: IncidentContext) -> None: + assert self._client is not None, "ElasticsearchIncidentStore not started" + + doc = build_context_doc(context) + index = incident_index_name(context.detected_at) + + await self._client.index(index=index, id=context.incident_id, document=doc) + + async def get(self, incident_id: str) -> IncidentContext | None: + assert self._client is not None, "ElasticsearchIncidentStore not started" + + try: + response = await self._client.get( + index=f"{self._config.index_prefix}-*", + id=incident_id, + ) + except Exception: + return None + + source = response["_source"] + return IncidentContext.model_validate(source) + + async def search(self, filter: IncidentFilter) -> AsyncIterator[IncidentContext]: # type: ignore[override,misc] + assert self._client is not None, "ElasticsearchIncidentStore not started" + + body = _build_query_body(filter) + response = await self._client.search( + index=f"{self._config.index_prefix}-*", + body=body, + ) + + for hit in response["hits"]["hits"]: + yield IncidentContext.model_validate(hit["_source"]) + + async def count(self, filter: IncidentFilter) -> int: + assert self._client is not None, "ElasticsearchIncidentStore not started" + + body = _build_query_body(filter) + body.pop("size", None) + body.pop("from", None) + body.pop("sort", None) + body["size"] = 0 + + response = await self._client.count( + index=f"{self._config.index_prefix}-*", + body=body, + ) + return response["count"] + + async def delete(self, incident_id: str) -> None: + assert self._client is not None, "ElasticsearchIncidentStore not started" + + try: + await self._client.delete( + index=f"{self._config.index_prefix}-*", + id=incident_id, + ) + except Exception: + logger.warning("Failed to delete incident", extra={"incident_id": incident_id}) + + async def health(self) -> bool: + if self._client is None: + return False + try: + return await self._client.ping() + except Exception: + return False + + +def _build_query_body(filter: IncidentFilter) -> dict: + """Translate an IncidentFilter into an Elasticsearch query body.""" + must_clauses: list[dict] = [] + + if filter.query_string: + must_clauses.append({"query_string": {"query": filter.query_string}}) + + if filter.primary_service: + must_clauses.append({"term": {"primary_service": filter.primary_service}}) + + if filter.severity: + must_clauses.append({"term": {"severity": filter.severity}}) + + if filter.min_score is not None: + must_clauses.append({"range": {"max_correlation_score": {"gte": filter.min_score}}}) + + if filter.start_time or filter.end_time: + time_range: dict[str, str] = {} + if filter.start_time: + time_range["gte"] = filter.start_time.isoformat() + if filter.end_time: + time_range["lte"] = filter.end_time.isoformat() + must_clauses.append({"range": {"detected_at": time_range}}) + + body: dict = { + "size": filter.limit, + "from": filter.offset, + "sort": [{filter.sort_field.value: {"order": filter.sort_order.value}}], + } + + if must_clauses: + body["query"] = {"bool": {"must": must_clauses}} + else: + body["query"] = {"match_all": {}} + + return body diff --git a/infrastructure/elasticsearch/incident_index_strategy.py b/infrastructure/elasticsearch/incident_index_strategy.py new file mode 100644 index 0000000..84935b1 --- /dev/null +++ b/infrastructure/elasticsearch/incident_index_strategy.py @@ -0,0 +1,127 @@ +"""Elasticsearch index strategy for incident documents. + +Index naming convention: + rp-inc-{yyyy.MM.dd} + rp = RootPilot + inc = incidents + +Template: rp-inc-template (applied to rp-inc-*) +ILM policy: rp-inc-ilm-policy +""" + +from datetime import UTC, datetime + +from shared.domain.incident.context.models import IncidentContext + +INDEX_PREFIX = "rp-inc" +TEMPLATE_NAME = "rp-inc-template" +ILM_POLICY_NAME = "rp-inc-ilm-policy" + + +def incident_index_name(dt: datetime | None = None) -> str: + """Return the target index name for a given timestamp (UTC daily bucket).""" + ts = dt or datetime.now(UTC) + return f"{INDEX_PREFIX}-{ts.strftime('%Y.%m.%d')}" + + +def build_context_doc(context: IncidentContext) -> dict: + """Convert an IncidentContext into an Elasticsearch document.""" + return { + "@timestamp": context.detected_at.isoformat(), + "incident_id": context.incident_id, + "primary_service": context.primary_service, + "severity": context.severity, + "title": context.title, + "detected_at": context.detected_at.isoformat(), + "aggregated_at": context.aggregated_at.isoformat(), + "event_count": context.event_count, + "service_count": context.service_count, + "trace_count": context.trace_count, + "correlation_group_count": len(context.correlation_groups), + "ungrouped_event_count": len(context.ungrouped_events), + "impact_count": len(context.impacts), + "max_correlation_score": ( + max(g.composite_score for g in context.correlation_groups) if context.correlation_groups else None + ), + "service_list": sorted({svc for g in context.correlation_groups for svc in g.services}), + "timeline": context.timeline.model_dump(mode="json") if context.timeline else None, + "correlation_groups": [g.model_dump(mode="json") for g in context.correlation_groups], + "ungrouped_events": context.ungrouped_events, + "impacts": [i.model_dump(mode="json") for i in context.impacts], + "trace_groups": [t.model_dump(mode="json") for t in context.trace_groups], + } + + +def default_index_template() -> dict: + return { + "index_patterns": [f"{INDEX_PREFIX}-*"], + "template": { + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0, + "index.lifecycle.name": ILM_POLICY_NAME, + "index.lifecycle.rollover_alias": INDEX_PREFIX, + }, + "mappings": { + "dynamic": "strict", + "properties": { + "@timestamp": {"type": "date"}, + "incident_id": {"type": "keyword"}, + "primary_service": {"type": "keyword"}, + "severity": {"type": "keyword"}, + "title": {"type": "text", "analyzer": "english"}, + "detected_at": {"type": "date"}, + "aggregated_at": {"type": "date"}, + "event_count": {"type": "integer"}, + "service_count": {"type": "integer"}, + "trace_count": {"type": "integer"}, + "correlation_group_count": {"type": "integer"}, + "ungrouped_event_count": {"type": "integer"}, + "impact_count": {"type": "integer"}, + "max_correlation_score": {"type": "float"}, + "service_list": {"type": "keyword"}, + "timeline": {"type": "object", "enabled": False}, + "correlation_groups": {"type": "object", "enabled": False}, + "ungrouped_events": {"type": "keyword"}, + "impacts": {"type": "object", "enabled": False}, + "trace_groups": {"type": "object", "enabled": False}, + }, + }, + }, + "priority": 100, + } + + +def default_ilm_policy() -> dict: + return { + "policy": { + "phases": { + "hot": { + "min_age": "0ms", + "actions": { + "rollover": {"max_age": "1d", "max_primary_shard_size": "50gb"}, + "set_priority": {"priority": 100}, + }, + }, + "warm": { + "min_age": "7d", + "actions": { + "set_priority": {"priority": 50}, + "forcemerge": {"max_num_segments": 1}, + }, + }, + "cold": { + "min_age": "30d", + "actions": { + "set_priority": {"priority": 0}, + }, + }, + "delete": { + "min_age": "90d", + "actions": { + "delete": {}, + }, + }, + }, + }, + } diff --git a/infrastructure/elasticsearch/tests/test_elasticsearch_incident_store.py b/infrastructure/elasticsearch/tests/test_elasticsearch_incident_store.py new file mode 100644 index 0000000..451714b --- /dev/null +++ b/infrastructure/elasticsearch/tests/test_elasticsearch_incident_store.py @@ -0,0 +1,490 @@ +"""Tests for the Elasticsearch IncidentStore adapter.""" + +from __future__ import annotations + +from datetime import UTC, datetime +from unittest.mock import AsyncMock, patch + +import pytest + +from infrastructure.elasticsearch.elasticsearch_incident_store import ( + ElasticsearchIncidentStore, + IncidentElasticsearchConfig, + _build_query_body, +) +from infrastructure.elasticsearch.incident_index_strategy import ( + INDEX_PREFIX, + build_context_doc, + default_ilm_policy, + default_index_template, + incident_index_name, +) +from shared.contracts.interfaces.incident_store import ( + IncidentFilter, + IncidentSortField, + IncidentSortOrder, +) +from shared.domain.correlation.enums import CorrelationSignal +from shared.domain.incident.context.models import ( + AggregatedCorrelationGroup, + AggregatedTimeline, + ImpactAnalysis, + IncidentContext, +) + + +@pytest.fixture +def config() -> IncidentElasticsearchConfig: + return IncidentElasticsearchConfig(hosts="http://localhost:9200") + + +@pytest.fixture +def context() -> IncidentContext: + ts = datetime(2026, 6, 14, 12, 0, 0, tzinfo=UTC) + return IncidentContext( + incident_id="inc-1", + primary_service="api", + severity="CRITICAL", + title="High error rate on api", + detected_at=ts, + event_count=10, + service_count=2, + trace_count=1, + correlation_groups=[ + AggregatedCorrelationGroup( + group_id="g-1", + event_ids=["e1", "e2"], + composite_score=0.85, + signals=[CorrelationSignal.TRACE_MATCH], + services=["api", "db"], + ), + ], + ungrouped_events=["e3"], + impacts=[ + ImpactAnalysis( + service="db", + upstream_causes=["api"], + downstream_impact=["cache"], + ), + ], + aggregated_at=ts, + ) + + +class TestIndexNaming: + def test_index_name_format(self) -> None: + dt = datetime(2026, 6, 14, 12, 0, 0, tzinfo=UTC) + name = incident_index_name(dt) + assert name == "rp-inc-2026.06.14" + + def test_index_name_defaults_to_utc_now(self) -> None: + name = incident_index_name() + assert name.startswith("rp-inc-") + + def test_index_name_short_prefix(self) -> None: + assert INDEX_PREFIX == "rp-inc" + + +class TestDefaultTemplates: + def test_default_index_template_structure(self) -> None: + tmpl = default_index_template() + assert tmpl["index_patterns"] == ["rp-inc-*"] + assert tmpl["priority"] == 100 + properties = tmpl["template"]["mappings"]["properties"] + assert "incident_id" in properties + assert "primary_service" in properties + assert "severity" in properties + assert "detected_at" in properties + assert "correlation_groups" in properties + + def test_default_ilm_policy_has_all_phases(self) -> None: + policy = default_ilm_policy() + phases = policy["policy"]["phases"] + assert "hot" in phases + assert "warm" in phases + assert "cold" in phases + assert "delete" in phases + + +class TestBuildContextDoc: + def test_build_doc_structure(self, context: IncidentContext) -> None: + doc = build_context_doc(context) + assert doc["incident_id"] == "inc-1" + assert doc["primary_service"] == "api" + assert doc["severity"] == "CRITICAL" + assert doc["event_count"] == 10 + assert doc["max_correlation_score"] == 0.85 + assert doc["service_list"] == ["api", "db"] + assert len(doc["correlation_groups"]) == 1 + assert doc["correlation_groups"][0]["group_id"] == "g-1" + + def test_build_doc_no_correlation_groups(self) -> None: + ts = datetime(2026, 6, 14, tzinfo=UTC) + ctx = IncidentContext(incident_id="inc-2", primary_service="api", detected_at=ts) + doc = build_context_doc(ctx) + assert doc["max_correlation_score"] is None + assert doc["service_list"] == [] + + def test_build_doc_includes_timeline(self) -> None: + ts = datetime(2026, 6, 14, tzinfo=UTC) + ctx = IncidentContext( + incident_id="inc-3", + primary_service="db", + detected_at=ts, + timeline=AggregatedTimeline( + incident_id="inc-3", + primary_service="db", + total_events=5, + window_count=2, + ), + ) + doc = build_context_doc(ctx) + assert doc["timeline"] is not None + assert doc["timeline"]["incident_id"] == "inc-3" + assert doc["timeline"]["total_events"] == 5 + + +class TestBuildQueryBody: + def test_empty_filter_uses_match_all(self) -> None: + f = IncidentFilter() + body = _build_query_body(f) + assert body["query"] == {"match_all": {}} + assert body["size"] == 100 + assert body["from"] == 0 + + def test_filter_by_primary_service(self) -> None: + f = IncidentFilter(primary_service="api") + body = _build_query_body(f) + assert body["query"]["bool"]["must"] == [{"term": {"primary_service": "api"}}] + + def test_filter_by_severity(self) -> None: + f = IncidentFilter(severity="CRITICAL") + body = _build_query_body(f) + assert body["query"]["bool"]["must"] == [{"term": {"severity": "CRITICAL"}}] + + def test_filter_by_min_score(self) -> None: + f = IncidentFilter(min_score=0.5) + body = _build_query_body(f) + assert body["query"]["bool"]["must"] == [{"range": {"max_correlation_score": {"gte": 0.5}}}] + + def test_filter_by_time_range(self) -> None: + start = datetime(2026, 6, 14, tzinfo=UTC) + end = datetime(2026, 6, 15, tzinfo=UTC) + f = IncidentFilter(start_time=start, end_time=end) + body = _build_query_body(f) + time_range = body["query"]["bool"]["must"][0]["range"]["detected_at"] + assert time_range["gte"] == "2026-06-14T00:00:00+00:00" + assert time_range["lte"] == "2026-06-15T00:00:00+00:00" + + def test_filter_custom_sort(self) -> None: + f = IncidentFilter(sort_field=IncidentSortField.EVENT_COUNT, sort_order=IncidentSortOrder.ASC) + body = _build_query_body(f) + assert body["sort"] == [{"event_count": {"order": "asc"}}] + + def test_filter_with_offset_and_limit(self) -> None: + f = IncidentFilter(limit=25, offset=50) + body = _build_query_body(f) + assert body["size"] == 25 + assert body["from"] == 50 + + def test_filter_query_string(self) -> None: + f = IncidentFilter(query_string="error AND api") + body = _build_query_body(f) + assert {"query_string": {"query": "error AND api"}} in body["query"]["bool"]["must"] + + def test_filter_multiple_conditions(self) -> None: + f = IncidentFilter( + primary_service="api", + severity="CRITICAL", + min_score=0.7, + query_string="timeout", + ) + body = _build_query_body(f) + must = body["query"]["bool"]["must"] + assert len(must) == 4 + + +class TestIncidentElasticsearchConfig: + def test_default_hosts(self) -> None: + cfg = IncidentElasticsearchConfig() + assert cfg.hosts == "http://localhost:9200" + assert cfg.username is None + assert cfg.password is None + + def test_with_auth(self) -> None: + cfg = IncidentElasticsearchConfig( + hosts="https://es-cluster:9200", + username="admin", + password="secret", + ) + assert cfg.username == "admin" + assert cfg.password == "secret" + + def test_default_index_prefix(self) -> None: + cfg = IncidentElasticsearchConfig() + assert cfg.index_prefix == "rp-inc" + + +class TestElasticsearchIncidentStore: + async def test_start_without_auth(self, config: IncidentElasticsearchConfig) -> None: + store = ElasticsearchIncidentStore(config=config) + mock_client = AsyncMock() + + with ( + patch( + "infrastructure.elasticsearch.elasticsearch_incident_store.AsyncElasticsearch", + return_value=mock_client, + ) as mock_es, + patch.object(store, "_bootstrap", AsyncMock()), + ): + await store.start() + + assert store._client is mock_client + mock_es.assert_called_once_with( + hosts=["http://localhost:9200"], + timeout=30, + ) + + async def test_start_with_auth(self) -> None: + cfg = IncidentElasticsearchConfig( + hosts="https://es-cluster:9200", + username="admin", + password="secret", + ) + store = ElasticsearchIncidentStore(config=cfg) + mock_client = AsyncMock() + + with ( + patch( + "infrastructure.elasticsearch.elasticsearch_incident_store.AsyncElasticsearch", + return_value=mock_client, + ) as mock_es, + patch.object(store, "_bootstrap", AsyncMock()), + ): + await store.start() + + mock_es.assert_called_once_with( + hosts=["https://es-cluster:9200"], + timeout=30, + basic_auth=("admin", "secret"), + ) + + async def test_start_is_idempotent(self, config: IncidentElasticsearchConfig) -> None: + store = ElasticsearchIncidentStore(config=config) + mock_client = AsyncMock() + + with ( + patch( + "infrastructure.elasticsearch.elasticsearch_incident_store.AsyncElasticsearch", + return_value=mock_client, + ), + patch.object(store, "_bootstrap", AsyncMock()), + ): + await store.start() + await store.start() + + async def test_close_cleans_up(self, config: IncidentElasticsearchConfig) -> None: + store = ElasticsearchIncidentStore(config=config) + mock_client = AsyncMock() + store._client = mock_client + store._bootstrap_done = True + + await store.close() + + assert store._client is None + assert store._bootstrap_done is False + mock_client.close.assert_awaited_once() + + async def test_close_skips_if_not_started(self, config: IncidentElasticsearchConfig) -> None: + store = ElasticsearchIncidentStore(config=config) + await store.close() + + async def test_store_indexes_document(self, config: IncidentElasticsearchConfig, context: IncidentContext) -> None: + store = ElasticsearchIncidentStore(config=config) + mock_client = AsyncMock() + store._client = mock_client + + await store.store(context) + + mock_client.index.assert_awaited_once() + call_args = mock_client.index.await_args + assert call_args is not None + assert call_args.kwargs["index"] == "rp-inc-2026.06.14" + assert call_args.kwargs["id"] == "inc-1" + assert call_args.kwargs["document"]["incident_id"] == "inc-1" + + async def test_get_returns_context(self, config: IncidentElasticsearchConfig) -> None: + store = ElasticsearchIncidentStore(config=config) + mock_client = AsyncMock() + mock_client.get = AsyncMock( + return_value={ + "_source": { + "incident_id": "inc-1", + "primary_service": "api", + "severity": "CRITICAL", + "title": "High error rate on api", + "detected_at": "2026-06-14T12:00:00+00:00", + "aggregated_at": "2026-06-14T12:00:00+00:00", + "event_count": 10, + "service_count": 2, + "trace_count": 1, + "correlation_groups": [], + "ungrouped_events": [], + "impacts": [], + "trace_groups": [], + }, + }, + ) + store._client = mock_client + + result = await store.get("inc-1") + + assert result is not None + assert result.incident_id == "inc-1" + assert result.severity == "CRITICAL" + + async def test_get_returns_none_on_miss(self, config: IncidentElasticsearchConfig) -> None: + store = ElasticsearchIncidentStore(config=config) + mock_client = AsyncMock() + mock_client.get = AsyncMock(side_effect=Exception("not found")) + store._client = mock_client + + result = await store.get("inc-missing") + assert result is None + + async def test_search_returns_contexts(self, config: IncidentElasticsearchConfig) -> None: + store = ElasticsearchIncidentStore(config=config) + mock_client = AsyncMock() + mock_client.search = AsyncMock( + return_value={ + "hits": { + "hits": [ + { + "_source": { + "incident_id": "inc-1", + "primary_service": "api", + "severity": "CRITICAL", + "title": "API down", + "detected_at": "2026-06-14T12:00:00+00:00", + "aggregated_at": "2026-06-14T12:00:00+00:00", + "event_count": 10, + "service_count": 2, + "trace_count": 1, + "correlation_groups": [], + "ungrouped_events": [], + "impacts": [], + "trace_groups": [], + }, + }, + ], + }, + }, + ) + store._client = mock_client + + flt = IncidentFilter(severity="CRITICAL") + results = [c async for c in store.search(flt)] + + assert len(results) == 1 + assert results[0].incident_id == "inc-1" + assert results[0].severity == "CRITICAL" + + async def test_search_empty_results(self, config: IncidentElasticsearchConfig) -> None: + store = ElasticsearchIncidentStore(config=config) + mock_client = AsyncMock() + mock_client.search = AsyncMock(return_value={"hits": {"hits": []}}) + store._client = mock_client + + results = [c async for c in store.search(IncidentFilter())] + assert results == [] + + async def test_count_returns_count(self, config: IncidentElasticsearchConfig) -> None: + store = ElasticsearchIncidentStore(config=config) + mock_client = AsyncMock() + mock_client.count = AsyncMock(return_value={"count": 5}) + store._client = mock_client + + count = await store.count(IncidentFilter(primary_service="api")) + assert count == 5 + + async def test_delete_calls_delete(self, config: IncidentElasticsearchConfig) -> None: + store = ElasticsearchIncidentStore(config=config) + mock_client = AsyncMock() + store._client = mock_client + + await store.delete("inc-1") + + mock_client.delete.assert_awaited_once_with( + index="rp-inc-*", + id="inc-1", + ) + + async def test_delete_swallows_exception(self, config: IncidentElasticsearchConfig) -> None: + store = ElasticsearchIncidentStore(config=config) + mock_client = AsyncMock() + mock_client.delete = AsyncMock(side_effect=Exception("not found")) + store._client = mock_client + + await store.delete("inc-missing") + + async def test_health_returns_true_when_connected(self, config: IncidentElasticsearchConfig) -> None: + store = ElasticsearchIncidentStore(config=config) + mock_client = AsyncMock() + mock_client.ping = AsyncMock(return_value=True) + store._client = mock_client + + assert await store.health() is True + + async def test_health_returns_false_when_no_client(self, config: IncidentElasticsearchConfig) -> None: + store = ElasticsearchIncidentStore(config=config) + assert await store.health() is False + + async def test_health_returns_false_on_ping_failure(self, config: IncidentElasticsearchConfig) -> None: + store = ElasticsearchIncidentStore(config=config) + mock_client = AsyncMock() + mock_client.ping = AsyncMock(side_effect=Exception("connection failed")) + store._client = mock_client + + assert await store.health() is False + + async def test_bootstrap_creates_ilm_and_template(self, config: IncidentElasticsearchConfig) -> None: + store = ElasticsearchIncidentStore(config=config) + mock_client = AsyncMock() + mock_client.ilm.get_lifecycle = AsyncMock(return_value=False) + mock_client.ilm.put_lifecycle = AsyncMock() + mock_client.indices.exists_index_template = AsyncMock(return_value=False) + mock_client.indices.put_index_template = AsyncMock() + store._client = mock_client + + await store._bootstrap() + + mock_client.ilm.put_lifecycle.assert_awaited_once() + mock_client.indices.put_index_template.assert_awaited_once() + + async def test_bootstrap_skips_if_exists(self, config: IncidentElasticsearchConfig) -> None: + store = ElasticsearchIncidentStore(config=config) + mock_client = AsyncMock() + mock_client.ilm.get_lifecycle = AsyncMock(return_value=True) + mock_client.ilm.put_lifecycle = AsyncMock() + mock_client.indices.exists_index_template = AsyncMock(return_value=True) + mock_client.indices.put_index_template = AsyncMock() + store._client = mock_client + + await store._bootstrap() + + mock_client.ilm.put_lifecycle.assert_not_awaited() + mock_client.indices.put_index_template.assert_not_awaited() + + async def test_store_raises_if_not_started( + self, config: IncidentElasticsearchConfig, context: IncidentContext + ) -> None: + store = ElasticsearchIncidentStore(config=config) + with pytest.raises(AssertionError, match="not started"): + await store.store(context) + + async def test_search_raises_if_not_started(self, config: IncidentElasticsearchConfig) -> None: + store = ElasticsearchIncidentStore(config=config) + flt = IncidentFilter() + with pytest.raises(AssertionError, match="not started"): + async for _ in store.search(flt): + pass diff --git a/shared/contracts/interfaces/incident_store.py b/shared/contracts/interfaces/incident_store.py new file mode 100644 index 0000000..89258d1 --- /dev/null +++ b/shared/contracts/interfaces/incident_store.py @@ -0,0 +1,63 @@ +"""Incident store abstraction for provider-agnostic async incident persistence.""" + +from abc import ABC, abstractmethod +from collections.abc import AsyncIterator +from datetime import datetime +from enum import StrEnum + +from pydantic import BaseModel, Field + +from shared.domain.incident.context.models import IncidentContext + + +class IncidentSortField(StrEnum): + DETECTED_AT = "detected_at" + SEVERITY = "severity" + EVENT_COUNT = "event_count" + SERVICE_COUNT = "service_count" + + +class IncidentSortOrder(StrEnum): + ASC = "asc" + DESC = "desc" + + +class IncidentFilter(BaseModel): + primary_service: str | None = Field(default=None, description="Filter by primary service name.") + severity: str | None = Field(default=None, description="Filter by severity level (e.g. CRITICAL, WARNING).") + min_score: float | None = Field(default=None, ge=0.0, le=1.0, description="Minimum composite score threshold.") + start_time: datetime | None = Field(default=None, description="Earliest detected_at (inclusive).") + end_time: datetime | None = Field(default=None, description="Latest detected_at (inclusive).") + limit: int = Field(default=100, ge=1, le=10_000, description="Maximum results to return.") + offset: int = Field(default=0, ge=0, description="Number of results to skip for pagination.") + sort_field: IncidentSortField = Field(default=IncidentSortField.DETECTED_AT, description="Field to sort by.") + sort_order: IncidentSortOrder = Field(default=IncidentSortOrder.DESC, description="Sort order.") + query_string: str | None = Field(default=None, description="Full-text search query (Lucene syntax).") + + +class IncidentStore(ABC): + """Abstract store for persisting and retrieving incident contexts.""" + + @abstractmethod + async def store(self, context: IncidentContext) -> None: + """Persist an incident context.""" + + @abstractmethod + async def get(self, incident_id: str) -> IncidentContext | None: + """Retrieve an incident context by ID.""" + + @abstractmethod + async def search(self, filter: IncidentFilter) -> AsyncIterator[IncidentContext]: + """Yield incident contexts matching the given filter.""" + + @abstractmethod + async def count(self, filter: IncidentFilter) -> int: + """Return the number of incidents matching the filter.""" + + @abstractmethod + async def delete(self, incident_id: str) -> None: + """Delete an incident context by ID.""" + + @abstractmethod + async def health(self) -> bool: + """Return True if the store is reachable and operational."""