Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion infrastructure/elasticsearch/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
"""Elasticsearch LogStore adapter implementation."""
"""Elasticsearch adapter implementations."""

from infrastructure.elasticsearch.elasticsearch_incident_store import (
ElasticsearchIncidentStore,
IncidentElasticsearchConfig,
)
from infrastructure.elasticsearch.elasticsearch_log_store import (
ElasticsearchConfig,
ElasticsearchLogStore,
)

__all__ = [
"ElasticsearchConfig",
"ElasticsearchIncidentStore",
"ElasticsearchLogStore",
"IncidentElasticsearchConfig",
]
227 changes: 227 additions & 0 deletions infrastructure/elasticsearch/elasticsearch_incident_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
"""Elasticsearch IncidentStore adapter for incident persistence and retrieval."""

from __future__ import annotations

import logging
from collections.abc import AsyncIterator

from elasticsearch import AsyncElasticsearch
from pydantic import BaseModel, Field

from infrastructure.elasticsearch.incident_index_strategy import (
ILM_POLICY_NAME,
INDEX_PREFIX,
TEMPLATE_NAME,
build_context_doc,
default_ilm_policy,
default_index_template,
incident_index_name,
)
from shared.contracts.interfaces.incident_store import IncidentFilter, IncidentStore
from shared.domain.incident.context.models import IncidentContext

logger = logging.getLogger(__name__)


class IncidentElasticsearchConfig(BaseModel):
hosts: str = Field(
default="http://localhost:9200",
description="Elasticsearch server URL(s).",
)
username: str | None = Field(
default=None,
description="Elasticsearch basic auth username (null = no auth).",
)
password: str | None = Field(
default=None,
description="Elasticsearch basic auth password (null = no auth).",
)
index_prefix: str = Field(
default=INDEX_PREFIX,
description="Prefix for incident indices.",
)
ilm_policy_name: str = Field(
default=ILM_POLICY_NAME,
description="Name of the ILM policy to apply.",
)
template_name: str = Field(
default=TEMPLATE_NAME,
description="Name of the index template.",
)
number_of_shards: int = Field(default=1, ge=1, description="Primary shard count.")
number_of_replicas: int = Field(default=0, ge=0, description="Replica count.")
request_timeout: int = Field(default=30, ge=5, description="HTTP request timeout in seconds.")


class ElasticsearchIncidentStore(IncidentStore):
"""IncidentStore implementation backed by Elasticsearch.

Manages daily rolling indices (``rp-inc-{yyyy.MM.dd}``) with an index
template that applies consistent mappings and an ILM policy for
automated retention.
"""

def __init__(self, config: IncidentElasticsearchConfig | None = None) -> None:
self._config = config or IncidentElasticsearchConfig()
self._client: AsyncElasticsearch | None = None
self._bootstrap_done = False

async def start(self) -> None:
"""Connect to Elasticsearch and bootstrap index template + ILM policy."""
if self._client is not None:
return

kwargs: dict = {
"hosts": [self._config.hosts],
"timeout": self._config.request_timeout,
}
if self._config.username is not None and self._config.password is not None:
kwargs["basic_auth"] = (self._config.username, self._config.password)

self._client = AsyncElasticsearch(**kwargs)

await self._bootstrap()
self._bootstrap_done = True
logger.info("ElasticsearchIncidentStore started", extra={"hosts": self._config.hosts})

async def close(self) -> None:
"""Close the Elasticsearch client connection."""
if self._client is not None:
await self._client.close()
self._client = None
self._bootstrap_done = False
logger.info("ElasticsearchIncidentStore closed")

async def _bootstrap(self) -> None:
"""Create ILM policy and index template if they don't exist."""
assert self._client is not None

ilm_exists = await self._client.ilm.get_lifecycle(
name=self._config.ilm_policy_name,
)
if not ilm_exists:
await self._client.ilm.put_lifecycle(
name=self._config.ilm_policy_name,
policy=default_ilm_policy(),
)
logger.info("Created ILM policy", extra={"policy": self._config.ilm_policy_name})

template_exists = await self._client.indices.exists_index_template(
name=self._config.template_name,
)
if not template_exists:
body = default_index_template()
body["template"]["settings"]["number_of_shards"] = self._config.number_of_shards
body["template"]["settings"]["number_of_replicas"] = self._config.number_of_replicas
body["template"]["settings"]["index.lifecycle.name"] = self._config.ilm_policy_name
await self._client.indices.put_index_template(
name=self._config.template_name,
body=body,
)
logger.info("Created index template", extra={"template": self._config.template_name})

async def store(self, context: IncidentContext) -> None:
assert self._client is not None, "ElasticsearchIncidentStore not started"

doc = build_context_doc(context)
index = incident_index_name(context.detected_at)

await self._client.index(index=index, id=context.incident_id, document=doc)

async def get(self, incident_id: str) -> IncidentContext | None:
assert self._client is not None, "ElasticsearchIncidentStore not started"

try:
response = await self._client.get(
index=f"{self._config.index_prefix}-*",
id=incident_id,
)
except Exception:
return None

source = response["_source"]
return IncidentContext.model_validate(source)

async def search(self, filter: IncidentFilter) -> AsyncIterator[IncidentContext]: # type: ignore[override,misc]
assert self._client is not None, "ElasticsearchIncidentStore not started"

body = _build_query_body(filter)
response = await self._client.search(
index=f"{self._config.index_prefix}-*",
body=body,
)

for hit in response["hits"]["hits"]:
yield IncidentContext.model_validate(hit["_source"])

async def count(self, filter: IncidentFilter) -> int:
assert self._client is not None, "ElasticsearchIncidentStore not started"

body = _build_query_body(filter)
body.pop("size", None)
body.pop("from", None)
body.pop("sort", None)
body["size"] = 0

response = await self._client.count(
index=f"{self._config.index_prefix}-*",
body=body,
)
return response["count"]

async def delete(self, incident_id: str) -> None:
assert self._client is not None, "ElasticsearchIncidentStore not started"

try:
await self._client.delete(
index=f"{self._config.index_prefix}-*",
id=incident_id,
)
except Exception:
logger.warning("Failed to delete incident", extra={"incident_id": incident_id})

async def health(self) -> bool:
if self._client is None:
return False
try:
return await self._client.ping()
except Exception:
return False


def _build_query_body(filter: IncidentFilter) -> dict:
"""Translate an IncidentFilter into an Elasticsearch query body."""
must_clauses: list[dict] = []

if filter.query_string:
must_clauses.append({"query_string": {"query": filter.query_string}})

if filter.primary_service:
must_clauses.append({"term": {"primary_service": filter.primary_service}})

if filter.severity:
must_clauses.append({"term": {"severity": filter.severity}})

if filter.min_score is not None:
must_clauses.append({"range": {"max_correlation_score": {"gte": filter.min_score}}})

if filter.start_time or filter.end_time:
time_range: dict[str, str] = {}
if filter.start_time:
time_range["gte"] = filter.start_time.isoformat()
if filter.end_time:
time_range["lte"] = filter.end_time.isoformat()
must_clauses.append({"range": {"detected_at": time_range}})

body: dict = {
"size": filter.limit,
"from": filter.offset,
"sort": [{filter.sort_field.value: {"order": filter.sort_order.value}}],
}

if must_clauses:
body["query"] = {"bool": {"must": must_clauses}}
else:
body["query"] = {"match_all": {}}

return body
127 changes: 127 additions & 0 deletions infrastructure/elasticsearch/incident_index_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""Elasticsearch index strategy for incident documents.

Index naming convention:
rp-inc-{yyyy.MM.dd}
rp = RootPilot
inc = incidents

Template: rp-inc-template (applied to rp-inc-*)
ILM policy: rp-inc-ilm-policy
"""

from datetime import UTC, datetime

from shared.domain.incident.context.models import IncidentContext

INDEX_PREFIX = "rp-inc"
TEMPLATE_NAME = "rp-inc-template"
ILM_POLICY_NAME = "rp-inc-ilm-policy"


def incident_index_name(dt: datetime | None = None) -> str:
"""Return the target index name for a given timestamp (UTC daily bucket)."""
ts = dt or datetime.now(UTC)
return f"{INDEX_PREFIX}-{ts.strftime('%Y.%m.%d')}"


def build_context_doc(context: IncidentContext) -> dict:
"""Convert an IncidentContext into an Elasticsearch document."""
return {
"@timestamp": context.detected_at.isoformat(),
"incident_id": context.incident_id,
"primary_service": context.primary_service,
"severity": context.severity,
"title": context.title,
"detected_at": context.detected_at.isoformat(),
"aggregated_at": context.aggregated_at.isoformat(),
"event_count": context.event_count,
"service_count": context.service_count,
"trace_count": context.trace_count,
"correlation_group_count": len(context.correlation_groups),
"ungrouped_event_count": len(context.ungrouped_events),
"impact_count": len(context.impacts),
"max_correlation_score": (
max(g.composite_score for g in context.correlation_groups) if context.correlation_groups else None
),
"service_list": sorted({svc for g in context.correlation_groups for svc in g.services}),
"timeline": context.timeline.model_dump(mode="json") if context.timeline else None,
"correlation_groups": [g.model_dump(mode="json") for g in context.correlation_groups],
"ungrouped_events": context.ungrouped_events,
"impacts": [i.model_dump(mode="json") for i in context.impacts],
"trace_groups": [t.model_dump(mode="json") for t in context.trace_groups],
}


def default_index_template() -> dict:
return {
"index_patterns": [f"{INDEX_PREFIX}-*"],
"template": {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"index.lifecycle.name": ILM_POLICY_NAME,
"index.lifecycle.rollover_alias": INDEX_PREFIX,
},
"mappings": {
"dynamic": "strict",
"properties": {
"@timestamp": {"type": "date"},
"incident_id": {"type": "keyword"},
"primary_service": {"type": "keyword"},
"severity": {"type": "keyword"},
"title": {"type": "text", "analyzer": "english"},
"detected_at": {"type": "date"},
"aggregated_at": {"type": "date"},
"event_count": {"type": "integer"},
"service_count": {"type": "integer"},
"trace_count": {"type": "integer"},
"correlation_group_count": {"type": "integer"},
"ungrouped_event_count": {"type": "integer"},
"impact_count": {"type": "integer"},
"max_correlation_score": {"type": "float"},
"service_list": {"type": "keyword"},
"timeline": {"type": "object", "enabled": False},
"correlation_groups": {"type": "object", "enabled": False},
"ungrouped_events": {"type": "keyword"},
"impacts": {"type": "object", "enabled": False},
"trace_groups": {"type": "object", "enabled": False},
},
},
},
"priority": 100,
}


def default_ilm_policy() -> dict:
return {
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {"max_age": "1d", "max_primary_shard_size": "50gb"},
"set_priority": {"priority": 100},
},
},
"warm": {
"min_age": "7d",
"actions": {
"set_priority": {"priority": 50},
"forcemerge": {"max_num_segments": 1},
},
},
"cold": {
"min_age": "30d",
"actions": {
"set_priority": {"priority": 0},
},
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {},
},
},
},
},
}
Loading
Loading