diff --git a/.env b/.env index e407436..fba3198 100644 --- a/.env +++ b/.env @@ -33,3 +33,18 @@ STATISTICS_DB_TABLE=feed_statistics KAFKA_TICKER_DETECTION_TOPIC=ticker-detection DETECTION_ENGINE_VOLUME_Z_THRESHOLD=2.5 DETECTION_ENGINE_PRICE_Z_THRESHOLD=2.5 +DETECTION_ARCHIVER_CONSUMER_GROUP=detection-archiver +DETECTION_ARCHIVER_METRICS_PORT=9700 +DETECTION_ARCHIVER_DB_HOST=postgres +DETECTION_ARCHIVER_DB_PORT=5432 +DETECTION_ARCHIVER_DB_NAME=tradestrike_db +DETECTION_ARCHIVER_DB_USER=admin +DETECTION_ARCHIVER_DB_PASSWORD=admin +DETECTION_ARCHIVER_DB_SCHEMA=public +DETECTION_ARCHIVER_DB_TABLE=feed_detection +MINIO_HOST=minio +MINIO_PORT=9000 +MINIO_USER=minio_admin +MINIO_PASSWORD=minio_admin +MINIO_BUCKET=detections +MINIO_SECURE=false \ No newline at end of file diff --git a/Makefile b/Makefile index e406921..c6e074d 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ FAKE_ALPACA_INTERVAL_SECONDS ?= 1 all: test -install: check_python venv install-feed-ingestor-deps +install: check_python venv install-service-deps check_python: @if [ -z "$(PYTHON)" ]; then echo "Python interpreter not found. Please install Python 3."; exit 1; fi @@ -27,13 +27,16 @@ venv: requirements.txt $(PYTHON) -m venv venv $(VENV_PYTHON_PATH) -m pip install -r requirements.txt -install-feed-ingestor-deps: venv - @REQ_FILE=services/feed-ingestor/requirements.txt; \ - if [ ! -f $$REQ_FILE ]; then \ - echo "โš ๏ธ No runtime requirements found for feed-ingestor at $$REQ_FILE"; \ +install-service-deps: venv + @REQ_FILES=$$(find services -mindepth 2 -maxdepth 2 -name requirements.txt | sort); \ + if [ -z "$$REQ_FILES" ]; then \ + echo "โš ๏ธ No runtime requirements found under services/"; \ else \ - echo "๐Ÿ“ฆ Installing dependencies for service 'feed-ingestor'..."; \ - $(VENV_PYTHON_PATH) -m pip install -r $$REQ_FILE; \ + for req in $$REQ_FILES; do \ + service=$$(basename $$(dirname $$req)); \ + echo "๐Ÿ“ฆ Installing dependencies for service '$$service'..."; \ + $(VENV_PYTHON_PATH) -m pip install -r $$req; \ + done; \ fi install-service: check_python venv @@ -44,7 +47,7 @@ install-service: check_python venv $(VENV_PYTHON_PATH) -m pip install -r $$REQ_FILE test: install - $(VENV_PYTHON_PATH) -m pytest -s tst/ --cov-fail-under=100 --cov=src --cov-report=html + $(VENV_PYTHON_PATH) -m pytest -s tst/ --cov-fail-under=100 --cov=src --cov-report=term-missing --cov-report=html clean: delete-stack @echo "๐Ÿงน Cleaning environment..." @@ -74,4 +77,4 @@ create-stack: install-docker test USE_FAKE_ALPACA_CLIENT=$(MOCK_ALPACA_CLIENT) FAKE_ALPACA_INTERVAL_SECONDS=$(FAKE_ALPACA_INTERVAL_SECONDS) docker compose up -d --build @echo "๐ŸŽฏ Environment setup complete โ€” all services running." -.PHONY: all install install-feed-ingestor-deps install-service clean test install-docker create-stack delete-stack +.PHONY: all install install-service install-service-deps clean test install-docker create-stack delete-stack diff --git a/docker-compose.yml b/docker-compose.yml index af9827d..b0ad76d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -70,6 +70,7 @@ services: - /bin/sh - -c - | + set -euo pipefail echo "Waiting for Kafka to become reachable..." while ! KAFKA_OPTS="" /opt/kafka/bin/kafka-topics.sh --bootstrap-server "${KAFKA_BOOTSTRAP_SERVERS}" --list >/dev/null 2>&1; do echo " still waiting..." @@ -207,6 +208,46 @@ services: retries: 5 restart: unless-stopped + # ============================================================ + # ๐Ÿ“Š Detection Archiver Worker + # ============================================================ + detection-archiver: + build: + context: . + dockerfile: services/detection-archiver/Dockerfile + container_name: detection-archiver + env_file: + - .env + environment: + DETECTION_ARCHIVER_CONSUMER_GROUP: ${DETECTION_ARCHIVER_CONSUMER_GROUP:-detection-archiver} + DETECTION_ARCHIVER_METRICS_PORT: ${DETECTION_ARCHIVER_METRICS_PORT:-9700} + DETECTION_ARCHIVER_DB_HOST: ${DETECTION_ARCHIVER_DB_HOST:-postgres} + DETECTION_ARCHIVER_DB_PORT: ${DETECTION_ARCHIVER_DB_PORT:-5432} + DETECTION_ARCHIVER_DB_NAME: ${DETECTION_ARCHIVER_DB_NAME:-tradestrike_db} + DETECTION_ARCHIVER_DB_USER: ${DETECTION_ARCHIVER_DB_USER:-admin} + DETECTION_ARCHIVER_DB_PASSWORD: ${DETECTION_ARCHIVER_DB_PASSWORD:-admin} + DETECTION_ARCHIVER_DB_SCHEMA: ${DETECTION_ARCHIVER_DB_SCHEMA:-public} + DETECTION_ARCHIVER_DB_TABLE: ${DETECTION_ARCHIVER_DB_TABLE:-feed_detection} + ports: + - "${DETECTION_ARCHIVER_METRICS_PORT:-9700}:${DETECTION_ARCHIVER_METRICS_PORT:-9700}" + depends_on: + kafka: + condition: service_healthy + kafka-topic-init: + condition: service_completed_successfully + postgres: + condition: service_healthy + minio: + condition: service_healthy + minio-init-bucket: + condition: service_completed_successfully + healthcheck: + test: ["CMD", "python", "-m", "src.detection_archiver.healthcheck"] + interval: 10s + timeout: 5s + retries: 5 + restart: unless-stopped + # ============================================================ # ๐Ÿ“Š Statistics Archiver Worker # ============================================================ @@ -376,12 +417,18 @@ services: container_name: minio command: server /data --console-address ":9001" ports: - - "9000:9000" + - "${MINIO_PORT:-9000}:${MINIO_PORT:-9000}" - "9001:9001" environment: MINIO_PROMETHEUS_JOB_ID: minio MINIO_PROMETHEUS_AUTH_TYPE: public MINIO_PROMETHEUS_URL: http://prometheus:9090 + MINIO_ROOT_USER: ${MINIO_USER:-minio_admin} + MINIO_ROOT_PASSWORD: ${MINIO_PASSWORD:-minio_admin} + MINIO_PORT: ${MINIO_PORT:-9000} + MINIO_HOST: ${MINIO_HOST:-localhost} + MINIO_BUCKET: ${MINIO_BUCKET:-detections} + MINIO_SECURE: ${MINIO_SECURE:-false} healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] interval: 10s @@ -391,6 +438,38 @@ services: - minio_data:/data restart: unless-stopped + # ============================================================ + # โ˜๏ธ MinIO Init Bucket + # ============================================================ + minio-init-bucket: + image: minio/mc + container_name: minio-init-bucket + env_file: + - .env + depends_on: + minio: + condition: service_healthy + entrypoint: > + /bin/sh -c " + set -euo pipefail + echo 'Configuring MinIO client...'; + export MC_HOST_local=\"http://${MINIO_USER:-minio_admin}:${MINIO_PASSWORD:-minio_admin}@${MINIO_HOST:-minio}:${MINIO_PORT:-9000}\"; + /usr/bin/mc alias list; + echo 'MinIO client configured successfully.'; + echo 'Checking if bucket ${MINIO_BUCKET:-detections} exists...'; + if /usr/bin/mc ls local/${MINIO_BUCKET:-detections} >/dev/null 2>&1; then + echo 'Bucket ${MINIO_BUCKET:-detections} already exists. Skipping creation.'; + else + echo 'Creating bucket ${MINIO_BUCKET:-detections}...'; + /usr/bin/mc mb -p local/${MINIO_BUCKET:-detections}; + echo 'Listing all buckets in local MinIO:'; + /usr/bin/mc ls local; + echo 'MinIO bucket initialization complete.'; + exit 0; + fi; + " + restart: "no" + # ============================================================ # ๐Ÿ“ฌ RabbitMQ # ============================================================ @@ -446,7 +525,7 @@ services: - grafana_data:/var/lib/grafana - ./monitoring/grafana/datasources.yml:/etc/grafana/provisioning/datasources/datasources.yml - ./monitoring/grafana/dashboards.yml:/etc/grafana/provisioning/dashboards/dashboards.yml - - ./monitoring/grafana/dashboards:/etc/grafana/provisioning/dashboards + - ./monitoring/grafana/dashboards:/var/lib/grafana/dashboards depends_on: prometheus: condition: service_healthy diff --git a/monitoring/grafana/dashboards.yml b/monitoring/grafana/dashboards.yml index fab4768..279f72a 100644 --- a/monitoring/grafana/dashboards.yml +++ b/monitoring/grafana/dashboards.yml @@ -9,4 +9,4 @@ providers: editable: true updateIntervalSeconds: 15 options: - path: /etc/grafana/provisioning/dashboards + path: /var/lib/grafana/dashboards diff --git a/monitoring/grafana/dashboards/dashboards.yml b/monitoring/grafana/dashboards/dashboards.yml deleted file mode 100644 index e69de29..0000000 diff --git a/monitoring/grafana/dashboards/stocks.json b/monitoring/grafana/dashboards/stocks.json index bd38e88..79b7a90 100644 --- a/monitoring/grafana/dashboards/stocks.json +++ b/monitoring/grafana/dashboards/stocks.json @@ -472,7 +472,7 @@ "editorMode": "code", "format": "time_series", "rawQuery": true, - "rawSql": "SELECT window_end_time AS \"time\", avg_volume, std_dev_volume AS \"std_dev\" FROM feed_statistics WHERE ticker = ${ticker:sqlstring} AND WHERE window_length_minutes = ${window:int}", + "rawSql": "SELECT window_end_time AS \"time\", avg_volume, std_dev_volume AS \"std_dev\" FROM feed_statistics WHERE ticker = ${ticker:sqlstring} AND window_length_minutes = ${window:int}", "refId": "A", "sql": { "columns": [ diff --git a/monitoring/prometheus.yml b/monitoring/prometheus.yml index 9f4b36d..db1b85b 100644 --- a/monitoring/prometheus.yml +++ b/monitoring/prometheus.yml @@ -36,6 +36,10 @@ scrape_configs: static_configs: - targets: ['statistics-archiver:9500'] + - job_name: 'detection-archiver' + static_configs: + - targets: ['detection-archiver:9700'] + # Redis Exporter - job_name: 'redis' static_configs: diff --git a/services/detection-archiver/Dockerfile b/services/detection-archiver/Dockerfile new file mode 100644 index 0000000..8d410c4 --- /dev/null +++ b/services/detection-archiver/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.12-slim + +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + PIP_NO_CACHE_DIR=1 + +WORKDIR /app + +COPY services/detection-archiver/requirements.txt /tmp/requirements.txt +RUN pip install --no-cache-dir -r /tmp/requirements.txt + +COPY src /app/src + +CMD ["python", "-m", "src.detection_archiver.main"] diff --git a/services/detection-archiver/requirements.txt b/services/detection-archiver/requirements.txt new file mode 100644 index 0000000..9d4aa89 --- /dev/null +++ b/services/detection-archiver/requirements.txt @@ -0,0 +1,5 @@ +aiokafka>=0.10,<0.11 +asyncpg>=0.29,<0.30 +pydantic>=2.6,<3.0 +prometheus-client>=0.20.0,<0.21 +minio>=7.2.18,<7.3 \ No newline at end of file diff --git a/src/detection_archiver/detection_consumer.py b/src/detection_archiver/detection_consumer.py new file mode 100644 index 0000000..80a05ce --- /dev/null +++ b/src/detection_archiver/detection_consumer.py @@ -0,0 +1,84 @@ +import asyncio +import logging +from aiokafka import AIOKafkaConsumer +import os +from src.models.ticker_detection_model import TickerDetection +from src.detection_archiver.metrics import NUM_DETECTIONS_ARCHIVED_SUCCEEDED, NUM_DETECTIONS_CONSUMED, NUM_DETECTIONS_ARCHIVED_FAILED, NUM_CONSUMER_START_FAILED, NUM_CONSUMER_START_SUCCEEDED +from src.detection_archiver.detection_repository import DetectionRepository +from io import BytesIO +from minio import Minio + +logger = logging.getLogger(__name__) + +CONTENT_TYPE = 'application/json' +ENCODING_SCHEME = 'utf-8' + +class DetectionConsumer: + def __init__( + self, + consumer: AIOKafkaConsumer, + repository: DetectionRepository, + minio_client: Minio, + bucket_name: str, + ): + self._consumer = consumer + self._repository = repository + self._minio_client = minio_client + self._bucket_name = bucket_name + + async def run(self) -> None: + try: + await self._consumer.start() + except Exception: + NUM_CONSUMER_START_FAILED.inc() + logger.exception("Failed to start kafka consumer.") + raise + NUM_CONSUMER_START_SUCCEEDED.inc() + logger.info( + "Kafka consumer connected for topics: %s", + ", ".join(sorted(self._consumer.subscription() or [])), + ) + async for message in self._consumer: + NUM_DETECTIONS_CONSUMED.inc() + try: + ticker_detection = TickerDetection.model_validate_json(message.value) + ticker_time = ticker_detection.ticker.time + context_path = os.path.join( + f"ticker={ticker_detection.ticker.ticker}", + ticker_time.strftime("year=%Y/month=%m/day=%d/hour=%H"), + f"{ticker_time.isoformat()}.json", + ) + serialized_detection = ticker_detection.to_message() + stream = BytesIO(serialized_detection) + await asyncio.to_thread( + self._minio_client.put_object, + self._bucket_name, + context_path, + stream, + len(serialized_detection), + content_type=CONTENT_TYPE, + ) + logger.info( + "Persisted detection for %s to MinIO bucket %s at %s", + ticker_detection, + self._bucket_name, + context_path, + ) + await self._repository.insert_detection( + ticker_detection=ticker_detection, + context_path=context_path + ) + logger.info( + "Stored detection for %s in TimeScaleDB (context=%s)", + ticker_detection, + context_path, + ) + NUM_DETECTIONS_ARCHIVED_SUCCEEDED.inc() + except Exception: + NUM_DETECTIONS_ARCHIVED_FAILED.inc() + logger.exception( + "Failed to archive detection from partition %s offset %s", + message.partition, + message.offset, + ) + raise diff --git a/src/detection_archiver/detection_repository.py b/src/detection_archiver/detection_repository.py new file mode 100644 index 0000000..b334b2c --- /dev/null +++ b/src/detection_archiver/detection_repository.py @@ -0,0 +1,97 @@ +import logging +from typing import Final +import asyncpg +from src.models.ticker_detection_model import TickerDetection +logger = logging.getLogger(__name__) + +CREATE_EXTENSION_SQL: Final[str] = "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;" + +CREATE_TABLE_SQL: Final[str] = """ +CREATE TABLE IF NOT EXISTS {qualified_table} ( + ticker TEXT NOT NULL, + time TIMESTAMPTZ NOT NULL, + window_length_minutes INTEGER NOT NULL, + window_end_time TIMESTAMPTZ NOT NULL, + detection_time TIMESTAMPTZ NOT NULL, + close_price DOUBLE PRECISION, + volume BIGINT NOT NULL, + price_percent_change DOUBLE PRECISION, + reason TEXT NOT NULL, + context_path TEXT NOT NULL, + PRIMARY KEY (ticker, time, window_length_minutes, window_end_time) +); +""" + +CREATE_HYPERTABLE_SQL: Final[str] = """ +SELECT create_hypertable('{qualified_table}', 'time', if_not_exists => TRUE); +""" + +INSERT_SQL: Final[str] = """ +INSERT INTO {qualified_table} ( + ticker, + time, + window_length_minutes, + window_end_time, + detection_time, + close_price, + volume, + price_percent_change, + reason, + context_path +) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10 +) ON CONFLICT (ticker, time, window_length_minutes, window_end_time) DO UPDATE SET + detection_time = EXCLUDED.detection_time, + close_price = EXCLUDED.close_price, + volume = EXCLUDED.volume, + price_percent_change = EXCLUDED.price_percent_change, + reason = EXCLUDED.reason, + context_path = EXCLUDED.context_path; +""" + +class DetectionRepository: + def __init__(self, pool: asyncpg.Pool, schema: str, table: str): + self._pool = pool + self._schema = schema + self._table = table + self._qualified_table = f"{schema}.{table}" + self._create_table_sql = CREATE_TABLE_SQL.format(qualified_table=self._qualified_table) + self._create_hypertable_sql = CREATE_HYPERTABLE_SQL.format(qualified_table=self._qualified_table) + self._insert_sql = INSERT_SQL.format(qualified_table=self._qualified_table) + + @classmethod + async def create(cls, pool: asyncpg.Pool, schema: str, table: str) -> "DetectionRepository": + repository = cls(pool=pool, schema=schema, table=table) + await repository._initialize() + return repository + + async def _initialize(self) -> None: + async with self._pool.acquire() as connection: + await connection.execute(CREATE_EXTENSION_SQL) + await connection.execute(self._create_table_sql) + await connection.execute(self._create_hypertable_sql) + logger.info("Initialized detection hypertable %s", self._qualified_table) + + async def insert_detection(self, ticker_detection: TickerDetection, context_path: str) -> None: + async with self._pool.acquire() as connection: + for detection_detail in ticker_detection.detections: + await connection.execute( + self._insert_sql, + ticker_detection.ticker.ticker, + ticker_detection.ticker.time, + detection_detail.statistic.window_length_minutes, + detection_detail.statistic.window_end_time, + ticker_detection.detection_time, + ticker_detection.ticker.close_price, + ticker_detection.ticker.volume, + ticker_detection.ticker.price_percent_change, + detection_detail.reason, + context_path, + ) + logger.info( + "Archived detection for ticker: %s, timestamp: %s, window_length_minutes: %s window_end_time: %s", + ticker_detection.ticker.ticker, + ticker_detection.ticker.time, + detection_detail.statistic.window_length_minutes, + detection_detail.statistic.window_end_time, + ) diff --git a/src/detection_archiver/healthcheck.py b/src/detection_archiver/healthcheck.py new file mode 100644 index 0000000..3e210de --- /dev/null +++ b/src/detection_archiver/healthcheck.py @@ -0,0 +1,65 @@ +import asyncio +import logging +import asyncpg +from src.util.environment import get_env +from minio import Minio + +logger = logging.getLogger(__name__) + +TIMESCALE_DB_HOST_ENV = "DETECTION_ARCHIVER_DB_HOST" +TIMESCALE_DB_PORT_ENV = "DETECTION_ARCHIVER_DB_PORT" +TIMESCALE_DB_NAME_ENV = "DETECTION_ARCHIVER_DB_NAME" +TIMESCALE_DB_USER_ENV = "DETECTION_ARCHIVER_DB_USER" +TIMESCALE_DB_PASSWORD_ENV = "DETECTION_ARCHIVER_DB_PASSWORD" +MINIO_HOST_ENV = "MINIO_HOST" +MINIO_PORT_ENV = "MINIO_PORT" +MINIO_USER_ENV = "MINIO_USER" +MINIO_PASSWORD_ENV = "MINIO_PASSWORD" +MINIO_BUCKET_ENV = "MINIO_BUCKET" +MINIO_SECURE_ENV = "MINIO_SECURE" + +async def _run_healthcheck() -> None: + env = get_env([ + TIMESCALE_DB_HOST_ENV, + TIMESCALE_DB_PORT_ENV, + TIMESCALE_DB_NAME_ENV, + TIMESCALE_DB_USER_ENV, + TIMESCALE_DB_PASSWORD_ENV, + MINIO_HOST_ENV, + MINIO_PORT_ENV, + MINIO_USER_ENV, + MINIO_PASSWORD_ENV, + MINIO_BUCKET_ENV, + MINIO_SECURE_ENV, + ]) + + connection = await asyncpg.connect( + host=env[TIMESCALE_DB_HOST_ENV], + port=int(env[TIMESCALE_DB_PORT_ENV]), + database=env[TIMESCALE_DB_NAME_ENV], + user=env[TIMESCALE_DB_USER_ENV], + password=env[TIMESCALE_DB_PASSWORD_ENV], + command_timeout=5, + ) + + minio_client = Minio( + f"{env[MINIO_HOST_ENV]}:{env[MINIO_PORT_ENV]}", + access_key=env[MINIO_USER_ENV], + secret_key=env[MINIO_PASSWORD_ENV], + secure=env[MINIO_SECURE_ENV].lower() == "true", + ) + + try: + await connection.execute("SELECT 1;") + finally: + await connection.close() + logger.info("Detection archiver TimescaleDB healthcheck succeeded") + + buckets = minio_client.list_buckets() + print(f"Detection archiver MinIO healthcheck succeeded and found buckets: {buckets}.") + +def main() -> None: # pragma: no cover + asyncio.run(_run_healthcheck()) + +if __name__ == "__main__": # pragma: no cover + main() diff --git a/src/detection_archiver/main.py b/src/detection_archiver/main.py new file mode 100644 index 0000000..9b4743d --- /dev/null +++ b/src/detection_archiver/main.py @@ -0,0 +1,103 @@ +import asyncio +import logging + +import asyncpg +from aiokafka import AIOKafkaConsumer +from minio import Minio +from prometheus_client import start_http_server + +from src.detection_archiver.detection_consumer import DetectionConsumer +from src.detection_archiver.detection_repository import DetectionRepository +from src.util.environment import get_env + +logger = logging.getLogger(__name__) +logging.basicConfig( + level=logging.INFO, + format="[%(asctime)s][%(levelname)s][%(name)s] %(message)s", +) + +KAFKA_BOOTSTRAP_SERVERS_ENV = "KAFKA_BOOTSTRAP_SERVERS" +KAFKA_TICKER_DETECTION_TOPIC_ENV = "KAFKA_TICKER_DETECTION_TOPIC" +DETECTION_ARCHIVER_CONSUMER_GROUP_ENV = "DETECTION_ARCHIVER_CONSUMER_GROUP" +DETECTION_ARCHIVER_METRICS_PORT_ENV = "DETECTION_ARCHIVER_METRICS_PORT" +DETECTION_ARCHIVER_DB_HOST_ENV = "DETECTION_ARCHIVER_DB_HOST" +DETECTION_ARCHIVER_DB_PORT_ENV = "DETECTION_ARCHIVER_DB_PORT" +DETECTION_ARCHIVER_DB_NAME_ENV = "DETECTION_ARCHIVER_DB_NAME" +DETECTION_ARCHIVER_DB_USER_ENV = "DETECTION_ARCHIVER_DB_USER" +DETECTION_ARCHIVER_DB_PASSWORD_ENV = "DETECTION_ARCHIVER_DB_PASSWORD" +DETECTION_ARCHIVER_DB_SCHEMA_ENV = "DETECTION_ARCHIVER_DB_SCHEMA" +DETECTION_ARCHIVER_DB_TABLE_ENV = "DETECTION_ARCHIVER_DB_TABLE" +MINIO_HOST_ENV = "MINIO_HOST" +MINIO_PORT_ENV = "MINIO_PORT" +MINIO_USER_ENV = "MINIO_USER" +MINIO_PASSWORD_ENV = "MINIO_PASSWORD" +MINIO_BUCKET_ENV = "MINIO_BUCKET" +MINIO_SECURE_ENV = "MINIO_SECURE" + +async def main() -> None: + env = get_env([ + KAFKA_BOOTSTRAP_SERVERS_ENV, + KAFKA_TICKER_DETECTION_TOPIC_ENV, + DETECTION_ARCHIVER_CONSUMER_GROUP_ENV, + DETECTION_ARCHIVER_METRICS_PORT_ENV, + DETECTION_ARCHIVER_DB_HOST_ENV, + DETECTION_ARCHIVER_DB_PORT_ENV, + DETECTION_ARCHIVER_DB_NAME_ENV, + DETECTION_ARCHIVER_DB_USER_ENV, + DETECTION_ARCHIVER_DB_PASSWORD_ENV, + DETECTION_ARCHIVER_DB_SCHEMA_ENV, + DETECTION_ARCHIVER_DB_TABLE_ENV, + MINIO_HOST_ENV, + MINIO_PORT_ENV, + MINIO_USER_ENV, + MINIO_PASSWORD_ENV, + MINIO_BUCKET_ENV, + MINIO_SECURE_ENV, + ]) + + start_http_server(int(env[DETECTION_ARCHIVER_METRICS_PORT_ENV])) + + pool = await asyncpg.create_pool( + host=env[DETECTION_ARCHIVER_DB_HOST_ENV], + port=int(env[DETECTION_ARCHIVER_DB_PORT_ENV]), + database=env[DETECTION_ARCHIVER_DB_NAME_ENV], + user=env[DETECTION_ARCHIVER_DB_USER_ENV], + password=env[DETECTION_ARCHIVER_DB_PASSWORD_ENV], + command_timeout=10, + ) + + try: + repository = await DetectionRepository.create( + pool=pool, + schema=env[DETECTION_ARCHIVER_DB_SCHEMA_ENV], + table=env[DETECTION_ARCHIVER_DB_TABLE_ENV], + ) + + consumer = AIOKafkaConsumer( + env[KAFKA_TICKER_DETECTION_TOPIC_ENV], + bootstrap_servers=env[KAFKA_BOOTSTRAP_SERVERS_ENV], + group_id=env[DETECTION_ARCHIVER_CONSUMER_GROUP_ENV], + enable_auto_commit=True, + ) + + minio_client = Minio( + f"{env[MINIO_HOST_ENV]}:{env[MINIO_PORT_ENV]}", + access_key=env[MINIO_USER_ENV], + secret_key=env[MINIO_PASSWORD_ENV], + secure=env[MINIO_SECURE_ENV].lower() == "true", + ) + + detection_consumer = DetectionConsumer( + consumer=consumer, + repository=repository, + minio_client=minio_client, + bucket_name=env[MINIO_BUCKET_ENV], + ) + + await detection_consumer.run() + finally: + await pool.close() + + +if __name__ == "__main__": # pragma: no cover + asyncio.run(main()) diff --git a/src/detection_archiver/metrics.py b/src/detection_archiver/metrics.py new file mode 100644 index 0000000..9f388de --- /dev/null +++ b/src/detection_archiver/metrics.py @@ -0,0 +1,26 @@ +from prometheus_client import Counter + +NUM_DETECTIONS_CONSUMED = Counter( + "detection_archiver_stats_consumed_total", + "Number of statistic messages consumed from Kafka", +) + +NUM_DETECTIONS_ARCHIVED_SUCCEEDED = Counter( + "detection_archiver_stats_archived_total", + "Number of statistic records written to TimescaleDB", +) + +NUM_DETECTIONS_ARCHIVED_FAILED = Counter( + "detection_archiver_stats_failed_total", + "Number of statistic messages that failed to process", +) + +NUM_CONSUMER_START_SUCCEEDED = Counter( + "detection_archiver_consumer_start_succeeded_total", + "Number of times consumer start succeeded", +) + +NUM_CONSUMER_START_FAILED = Counter( + "detection_archiver_consumer_failed_total", + "Number of times consumer start failed", +) \ No newline at end of file diff --git a/src/models/statistic_model.py b/src/models/statistic_model.py index 661e00c..f0979c4 100644 --- a/src/models/statistic_model.py +++ b/src/models/statistic_model.py @@ -7,7 +7,7 @@ class Statistic(BaseModel): ticker: str window_length_minutes: int - window_end_time: str + window_end_time: datetime avg_volume: float std_dev_volume: float avg_price_percent_change: float diff --git a/src/statistics_archiver/statistic_repository.py b/src/statistics_archiver/statistic_repository.py index 5da1ad6..da0d362 100644 --- a/src/statistics_archiver/statistic_repository.py +++ b/src/statistics_archiver/statistic_repository.py @@ -1,5 +1,4 @@ import logging -from datetime import datetime from typing import Final import asyncpg @@ -72,13 +71,12 @@ async def _initialize(self) -> None: logger.info("Initialized statistics hypertable %s", self._qualified_table) async def insert_statistic(self, stat: Statistic) -> None: - window_end = _parse_time(stat.window_end_time) async with self._pool.acquire() as connection: await connection.execute( self._insert_sql, stat.ticker, stat.window_length_minutes, - window_end, + stat.window_end_time, stat.avg_volume, stat.std_dev_volume, stat.avg_price_percent_change, @@ -86,9 +84,3 @@ async def insert_statistic(self, stat: Statistic) -> None: stat.calculation_time, ) logger.info("Archived statistic for %s window %s at %s: %s", stat.ticker, stat.window_length_minutes, stat.window_end_time, stat) - -def _parse_time(value: str) -> datetime: - try: - return datetime.fromisoformat(value) - except ValueError as exc: # pragma: no cover - defensive guard - raise ValueError(f"Invalid ISO timestamp: {value}") from exc diff --git a/src/statistics_calculator/rolling_statistics.py b/src/statistics_calculator/rolling_statistics.py index 2310e7c..286730f 100644 --- a/src/statistics_calculator/rolling_statistics.py +++ b/src/statistics_calculator/rolling_statistics.py @@ -87,7 +87,7 @@ def update( volume: float, price_percent_change: float, event_time: datetime, - window_end_time: str, + window_end_time: datetime, ) -> list[Statistic]: last_time = self._last_event_times.get(ticker) if last_time is not None: diff --git a/src/statistics_calculator/statistics_processor.py b/src/statistics_calculator/statistics_processor.py index 8108f24..8b3352c 100644 --- a/src/statistics_calculator/statistics_processor.py +++ b/src/statistics_calculator/statistics_processor.py @@ -73,6 +73,5 @@ async def _persist(self, stat: Statistic) -> None: logger.info("Published to Kafka, topic=%s, stat=%s", self._statistics_topic, stat) @staticmethod - def _normalize_time(bar_time: datetime) -> str: - trimmed = bar_time.replace(second=0, microsecond=0) - return trimmed.isoformat() + def _normalize_time(bar_time: datetime) -> datetime: + return bar_time.replace(second=0, microsecond=0) diff --git a/tst/detection_archiver/test_detection_consumer.py b/tst/detection_archiver/test_detection_consumer.py new file mode 100644 index 0000000..979d987 --- /dev/null +++ b/tst/detection_archiver/test_detection_consumer.py @@ -0,0 +1,223 @@ +import asyncio +from datetime import datetime, timezone +from types import SimpleNamespace + +import pytest + +from src.detection_archiver.detection_consumer import DetectionConsumer +from src.detection_archiver.metrics import ( + NUM_CONSUMER_START_FAILED, + NUM_CONSUMER_START_SUCCEEDED, + NUM_DETECTIONS_ARCHIVED_FAILED, + NUM_DETECTIONS_ARCHIVED_SUCCEEDED, + NUM_DETECTIONS_CONSUMED, +) +from src.models.bar_model import ( + CLOSE_PRICE, + HIGH_PRICE, + LOW_PRICE, + NUM_TRADES, + OPEN_PRICE, + STOCK_SYMBOL, + TIMESTAMP, + VOLUME, + VOLUME_WEIGHTED_PRICE, + Bar, +) +from src.models.statistic_model import Statistic +from src.models.ticker_detection_model import DetectionDetail, TickerDetection +from src.models.ticker_model import Ticker + + +def _build_detection() -> TickerDetection: + bar_time = datetime(2024, 1, 1, 15, 30, tzinfo=timezone.utc) + bar_payload = { + STOCK_SYMBOL: "AAPL", + OPEN_PRICE: 190.0, + HIGH_PRICE: 191.0, + LOW_PRICE: 189.5, + CLOSE_PRICE: 190.5, + VOLUME: 2_000, + VOLUME_WEIGHTED_PRICE: 190.3, + NUM_TRADES: 55, + TIMESTAMP: bar_time.isoformat(), + } + bar = Bar.build(bar_payload) + ticker = Ticker.from_bar(bar=bar, price_percent_change=0.05) + stat = Statistic( + ticker="AAPL", + window_length_minutes=5, + window_end_time=bar_time, + avg_volume=1_200.0, + std_dev_volume=100.0, + avg_price_percent_change=0.01, + std_dev_price_percent_change=0.02, + calculation_time=datetime(2024, 1, 1, 15, 31, tzinfo=timezone.utc), + ) + return TickerDetection( + detection_time=datetime(2024, 1, 1, 15, 31, tzinfo=timezone.utc), + ticker=ticker, + statistics=[stat], + detections=[DetectionDetail(reason="volume spike", statistic=stat)], + ) + + +class DummyConsumer: + def __init__(self, messages, start_error: Exception | None = None): + self._messages = list(messages) + self._start_error = start_error + self.started = False + + async def start(self): + if self._start_error: + raise self._start_error + self.started = True + + def subscription(self): + return {"ticker-detection"} + + def __aiter__(self): + return self + + async def __anext__(self): + if not self._messages: + raise StopAsyncIteration + return self._messages.pop(0) + + +class DummyRepository: + def __init__(self, should_fail: bool = False): + self.should_fail = should_fail + self.calls = [] + + async def insert_detection(self, ticker_detection, context_path: str): + if self.should_fail: + raise RuntimeError("db unavailable") + self.calls.append((ticker_detection, context_path)) + + +class DummyMinio: + def __init__(self): + self.calls = [] + + def put_object(self, bucket, path, stream, length, content_type): + self.calls.append((bucket, path, length, content_type)) + + +def _message_from_detection(detection: TickerDetection): + return SimpleNamespace(value=detection.to_message(), partition=0, offset=1) + + +def test_detection_consumer_archives_detection(monkeypatch): + detection = _build_detection() + message = _message_from_detection(detection) + consumer = DummyConsumer([message]) + repository = DummyRepository() + minio_client = DummyMinio() + + async def immediate_to_thread(func, *args, **kwargs): + return func(*args, **kwargs) + + monkeypatch.setattr( + "src.detection_archiver.detection_consumer.asyncio.to_thread", + immediate_to_thread, + ) + + before_consumed = NUM_DETECTIONS_CONSUMED._value.get() + before_succeeded = NUM_DETECTIONS_ARCHIVED_SUCCEEDED._value.get() + before_failed = NUM_DETECTIONS_ARCHIVED_FAILED._value.get() + before_start_success = NUM_CONSUMER_START_SUCCEEDED._value.get() + before_start_failed = NUM_CONSUMER_START_FAILED._value.get() + + asyncio.run( + DetectionConsumer( + consumer=consumer, + repository=repository, + minio_client=minio_client, + bucket_name="detections", + ).run() + ) + + assert consumer.started is True + assert len(repository.calls) == 1 + assert len(minio_client.calls) == 1 + + _, context_path = repository.calls[0] + assert context_path.startswith("ticker=AAPL/year=2024/month=01/day=01/hour=15") + + bucket, stored_path, length, content_type = minio_client.calls[0] + assert bucket == "detections" + assert stored_path == context_path + assert length == len(detection.to_message()) + assert content_type == "application/json" + + assert NUM_DETECTIONS_CONSUMED._value.get() == before_consumed + 1 + assert NUM_DETECTIONS_ARCHIVED_SUCCEEDED._value.get() == before_succeeded + 1 + assert NUM_DETECTIONS_ARCHIVED_FAILED._value.get() == before_failed + assert NUM_CONSUMER_START_SUCCEEDED._value.get() == before_start_success + 1 + assert NUM_CONSUMER_START_FAILED._value.get() == before_start_failed + + +def test_detection_consumer_records_failure(monkeypatch): + detection = _build_detection() + message = _message_from_detection(detection) + consumer = DummyConsumer([message]) + repository = DummyRepository(should_fail=True) + minio_client = DummyMinio() + + async def immediate_to_thread(func, *args, **kwargs): + return func(*args, **kwargs) + + monkeypatch.setattr( + "src.detection_archiver.detection_consumer.asyncio.to_thread", + immediate_to_thread, + ) + + before_consumed = NUM_DETECTIONS_CONSUMED._value.get() + before_failed = NUM_DETECTIONS_ARCHIVED_FAILED._value.get() + + with pytest.raises(RuntimeError): + asyncio.run( + DetectionConsumer( + consumer=consumer, + repository=repository, + minio_client=minio_client, + bucket_name="detections", + ).run() + ) + + assert len(minio_client.calls) == 1 # upload attempted before failure + assert NUM_DETECTIONS_CONSUMED._value.get() == before_consumed + 1 + assert NUM_DETECTIONS_ARCHIVED_FAILED._value.get() == before_failed + 1 + + +def test_detection_consumer_start_failure(monkeypatch): + consumer = DummyConsumer([], start_error=RuntimeError("unavailable")) + repository = DummyRepository() + minio_client = DummyMinio() + + async def immediate_to_thread(func, *args, **kwargs): + return func(*args, **kwargs) + + monkeypatch.setattr( + "src.detection_archiver.detection_consumer.asyncio.to_thread", + immediate_to_thread, + ) + + before_fail = NUM_CONSUMER_START_FAILED._value.get() + before_success = NUM_CONSUMER_START_SUCCEEDED._value.get() + before_consumed = NUM_DETECTIONS_CONSUMED._value.get() + + with pytest.raises(RuntimeError): + asyncio.run( + DetectionConsumer( + consumer=consumer, + repository=repository, + minio_client=minio_client, + bucket_name="detections", + ).run() + ) + + assert NUM_CONSUMER_START_FAILED._value.get() == before_fail + 1 + assert NUM_CONSUMER_START_SUCCEEDED._value.get() == before_success + assert NUM_DETECTIONS_CONSUMED._value.get() == before_consumed diff --git a/tst/detection_archiver/test_detection_repository.py b/tst/detection_archiver/test_detection_repository.py new file mode 100644 index 0000000..a3fef40 --- /dev/null +++ b/tst/detection_archiver/test_detection_repository.py @@ -0,0 +1,121 @@ +import asyncio +from datetime import datetime, timezone + +import pytest + +from src.detection_archiver.detection_repository import DetectionRepository +from src.models.bar_model import ( + CLOSE_PRICE, + HIGH_PRICE, + LOW_PRICE, + NUM_TRADES, + OPEN_PRICE, + STOCK_SYMBOL, + TIMESTAMP, + VOLUME, + VOLUME_WEIGHTED_PRICE, + Bar, +) +from src.models.statistic_model import Statistic +from src.models.ticker_detection_model import DetectionDetail, TickerDetection +from src.models.ticker_model import Ticker + + +class DummyConnection: + def __init__(self): + self.queries = [] + + async def execute(self, sql, *params): + self.queries.append((sql, params)) + + +class DummyAcquire: + def __init__(self, connection): + self._connection = connection + + async def __aenter__(self): + return self._connection + + async def __aexit__(self, exc_type, exc, tb): + return False + + +class DummyPool: + def __init__(self): + self.connection = DummyConnection() + + def acquire(self): + return DummyAcquire(self.connection) + + +def _build_detection() -> TickerDetection: + bar_time = datetime(2024, 1, 1, 15, 0, tzinfo=timezone.utc) + bar_payload = { + STOCK_SYMBOL: "TSLA", + OPEN_PRICE: 200.0, + HIGH_PRICE: 201.0, + LOW_PRICE: 199.5, + CLOSE_PRICE: 200.5, + VOLUME: 1_500, + VOLUME_WEIGHTED_PRICE: 200.2, + NUM_TRADES: 42, + TIMESTAMP: bar_time.isoformat(), + } + bar = Bar.build(bar_payload) + ticker = Ticker.from_bar(bar=bar, price_percent_change=0.03) + stat = Statistic( + ticker="TSLA", + window_length_minutes=5, + window_end_time=bar_time, + avg_volume=1_100.0, + std_dev_volume=80.0, + avg_price_percent_change=0.01, + std_dev_price_percent_change=0.02, + calculation_time=datetime(2024, 1, 1, 15, 5, tzinfo=timezone.utc), + ) + detection = TickerDetection( + detection_time=datetime(2024, 1, 1, 15, 5, tzinfo=timezone.utc), + ticker=ticker, + statistics=[stat], + detections=[DetectionDetail(reason="threshold exceeded", statistic=stat)], + ) + return detection + + +def test_detection_repository_initialize_creates_schema(): + pool = DummyPool() + + repository = asyncio.run(DetectionRepository.create(pool=pool, schema="analytics", table="detections")) + + expected_table = "analytics.detections" + create_queries = [sql for sql, _ in pool.connection.queries] + + assert any("CREATE EXTENSION" in sql for sql in create_queries) + assert any(expected_table in sql for sql in create_queries) + assert isinstance(repository, DetectionRepository) + + +def test_detection_repository_insert_detection_executes_insert(): + pool = DummyPool() + repository = asyncio.run(DetectionRepository.create(pool=pool, schema="public", table="detections")) + pool.connection.queries.clear() + + detection = _build_detection() + context_path = "ticker=TSLA/year=2024/month=01/day=01/hour=15/2024-01-01T15:00:00+00:00.json" + + asyncio.run(repository.insert_detection(ticker_detection=detection, context_path=context_path)) + + assert len(pool.connection.queries) == 1 + sql, params = pool.connection.queries[0] + assert "INSERT INTO public.detections" in sql + assert len(params) == 10 + assert params[0] == "TSLA" + assert isinstance(params[1], datetime) + assert params[2] == 5 + assert params[3] == detection.detections[0].statistic.window_end_time + assert params[4] == detection.detection_time + assert params[5] == pytest.approx(detection.ticker.close_price) + assert params[6] == detection.ticker.volume + assert params[7] == detection.ticker.price_percent_change + assert params[8] == detection.detections[0].reason + assert params[9] == context_path diff --git a/tst/detection_archiver/test_healthcheck_and_main.py b/tst/detection_archiver/test_healthcheck_and_main.py new file mode 100644 index 0000000..aa4a674 --- /dev/null +++ b/tst/detection_archiver/test_healthcheck_and_main.py @@ -0,0 +1,143 @@ +import asyncio + +from src.detection_archiver import healthcheck +from src.detection_archiver import main as detection_main + + +def test_healthcheck_validates_db_and_minio(monkeypatch): + env = { + healthcheck.TIMESCALE_DB_HOST_ENV: "postgres", + healthcheck.TIMESCALE_DB_PORT_ENV: "5432", + healthcheck.TIMESCALE_DB_NAME_ENV: "tradestrike_db", + healthcheck.TIMESCALE_DB_USER_ENV: "admin", + healthcheck.TIMESCALE_DB_PASSWORD_ENV: "admin", + healthcheck.MINIO_HOST_ENV: "minio", + healthcheck.MINIO_PORT_ENV: "9000", + healthcheck.MINIO_USER_ENV: "minio-user", + healthcheck.MINIO_PASSWORD_ENV: "minio-pass", + healthcheck.MINIO_BUCKET_ENV: "detections", + healthcheck.MINIO_SECURE_ENV: "false", + } + monkeypatch.setattr(healthcheck, "get_env", lambda names: env) + + executed_queries = [] + + class DummyConnection: + async def execute(self, query): + executed_queries.append(query) + + async def close(self): + executed_queries.append("close") + + async def fake_connect(**kwargs): + assert kwargs["host"] == env[healthcheck.TIMESCALE_DB_HOST_ENV] + assert kwargs["port"] == int(env[healthcheck.TIMESCALE_DB_PORT_ENV]) + return DummyConnection() + + monkeypatch.setattr(healthcheck.asyncpg, "connect", fake_connect) + + listed_buckets = [] + + class DummyMinio: + def __init__(self, endpoint, access_key, secret_key, secure): + assert endpoint == f"{env[healthcheck.MINIO_HOST_ENV]}:{env[healthcheck.MINIO_PORT_ENV]}" + assert access_key == env[healthcheck.MINIO_USER_ENV] + assert secret_key == env[healthcheck.MINIO_PASSWORD_ENV] + assert secure is False + + def list_buckets(self): + listed_buckets.append("bucket-a") + return ["bucket-a"] + + monkeypatch.setattr(healthcheck, "Minio", DummyMinio) + + asyncio.run(healthcheck._run_healthcheck()) + + assert executed_queries == ["SELECT 1;", "close"] + assert listed_buckets == ["bucket-a"] + + +def test_main_initializes_dependencies_and_runs_consumer(monkeypatch): + env = { + detection_main.KAFKA_BOOTSTRAP_SERVERS_ENV: "kafka:9092", + detection_main.KAFKA_TICKER_DETECTION_TOPIC_ENV: "ticker-detection", + detection_main.DETECTION_ARCHIVER_CONSUMER_GROUP_ENV: "detection-archiver", + detection_main.DETECTION_ARCHIVER_METRICS_PORT_ENV: "9700", + detection_main.DETECTION_ARCHIVER_DB_HOST_ENV: "postgres", + detection_main.DETECTION_ARCHIVER_DB_PORT_ENV: "5432", + detection_main.DETECTION_ARCHIVER_DB_NAME_ENV: "tradestrike_db", + detection_main.DETECTION_ARCHIVER_DB_USER_ENV: "admin", + detection_main.DETECTION_ARCHIVER_DB_PASSWORD_ENV: "admin", + detection_main.DETECTION_ARCHIVER_DB_SCHEMA_ENV: "public", + detection_main.DETECTION_ARCHIVER_DB_TABLE_ENV: "detections", + detection_main.MINIO_HOST_ENV: "minio", + detection_main.MINIO_PORT_ENV: "9000", + detection_main.MINIO_USER_ENV: "minio-user", + detection_main.MINIO_PASSWORD_ENV: "minio-pass", + detection_main.MINIO_BUCKET_ENV: "detections", + detection_main.MINIO_SECURE_ENV: "false", + } + monkeypatch.setattr(detection_main, "get_env", lambda names: env) + + started_ports = [] + monkeypatch.setattr(detection_main, "start_http_server", lambda port: started_ports.append(port)) + + class DummyPool: + async def close(self): + self.closed = True + + pool_instance = DummyPool() + + async def fake_create_pool(**kwargs): + assert kwargs["host"] == env[detection_main.DETECTION_ARCHIVER_DB_HOST_ENV] + return pool_instance + + monkeypatch.setattr(detection_main.asyncpg, "create_pool", fake_create_pool) + + class DummyRepository: + created_with = None + + @classmethod + async def create(cls, pool, schema, table): + cls.created_with = (pool, schema, table) + return cls() + + monkeypatch.setattr(detection_main, "DetectionRepository", DummyRepository) + + class DummyKafkaConsumer: + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + + monkeypatch.setattr(detection_main, "AIOKafkaConsumer", DummyKafkaConsumer) + + class DummyMinio: + def __init__(self, endpoint, access_key, secret_key, secure): + self.endpoint = endpoint + self.access_key = access_key + self.secret_key = secret_key + self.secure = secure + + monkeypatch.setattr(detection_main, "Minio", DummyMinio) + + consumer_run_called = [] + + class DummyDetectionConsumer: + def __init__(self, **kwargs): + self.kwargs = kwargs + + async def run(self): + consumer_run_called.append(self.kwargs) + + monkeypatch.setattr(detection_main, "DetectionConsumer", DummyDetectionConsumer) + + asyncio.run(detection_main.main()) + + assert started_ports == [int(env[detection_main.DETECTION_ARCHIVER_METRICS_PORT_ENV])] + assert DummyRepository.created_with == ( + pool_instance, + env[detection_main.DETECTION_ARCHIVER_DB_SCHEMA_ENV], + env[detection_main.DETECTION_ARCHIVER_DB_TABLE_ENV], + ) + assert consumer_run_called # run invoked + assert getattr(pool_instance, "closed", False) is True diff --git a/tst/detection_engine/test_detection_engine.py b/tst/detection_engine/test_detection_engine.py index 7bcd206..9a2d00f 100644 --- a/tst/detection_engine/test_detection_engine.py +++ b/tst/detection_engine/test_detection_engine.py @@ -36,7 +36,7 @@ def _build_statistic(window: int) -> Statistic: return Statistic( ticker="AAPL", window_length_minutes=window, - window_end_time="2024-01-01T10:01:00+00:00", + window_end_time=datetime(2024, 1, 1, 10, 1, tzinfo=timezone.utc), avg_volume=900.0, std_dev_volume=100.0, avg_price_percent_change=0.01, @@ -116,7 +116,7 @@ def test_detection_engine_skips_when_price_z_below_threshold(): stat = Statistic( ticker="AAPL", window_length_minutes=5, - window_end_time="2024-01-01T10:01:00+00:00", + window_end_time=datetime(2024, 1, 1, 10, 1, tzinfo=timezone.utc), avg_volume=1000.0, std_dev_volume=10.0, avg_price_percent_change=0.0, @@ -140,7 +140,7 @@ def test_detection_engine_z_score_zero_when_stddev_missing(): Statistic( ticker="AAPL", window_length_minutes=5, - window_end_time="2024-01-01T10:01:00+00:00", + window_end_time=datetime(2024, 1, 1, 10, 1, tzinfo=timezone.utc), avg_volume=1000.0, std_dev_volume=0.0, avg_price_percent_change=0.0, diff --git a/tst/detection_engine/test_detection_engine_statistic_store.py b/tst/detection_engine/test_detection_engine_statistic_store.py index 1dea4e3..d663d7c 100644 --- a/tst/detection_engine/test_detection_engine_statistic_store.py +++ b/tst/detection_engine/test_detection_engine_statistic_store.py @@ -1,4 +1,5 @@ import asyncio +from datetime import datetime, timezone from unittest.mock import AsyncMock from src.detection_engine.statistic_store import StatisticStore @@ -10,7 +11,7 @@ def test_statistic_store_fetch_deduplicates_windows(): stat = Statistic( ticker="AAPL", window_length_minutes=5, - window_end_time="2024-01-01T10:05:00+00:00", + window_end_time=datetime(2024, 1, 1, 10, 5, tzinfo=timezone.utc), avg_volume=1000.0, std_dev_volume=100.0, avg_price_percent_change=0.01, diff --git a/tst/detection_engine/test_detection_processor.py b/tst/detection_engine/test_detection_processor.py index e9a3651..bd4d60c 100644 --- a/tst/detection_engine/test_detection_processor.py +++ b/tst/detection_engine/test_detection_processor.py @@ -106,7 +106,7 @@ def test_detection_processor_emits_detection_when_threshold_exceeded(): stats = Statistic( ticker="TSLA", window_length_minutes=5, - window_end_time=(base_time + timedelta(minutes=1)).isoformat(), + window_end_time=base_time + timedelta(minutes=1), avg_volume=1_100.0, std_dev_volume=100.0, avg_price_percent_change=0.01, @@ -178,7 +178,7 @@ def test_detection_processor_skips_when_threshold_not_met(): stats = Statistic( ticker="AAPL", window_length_minutes=5, - window_end_time=(base_time + timedelta(minutes=1)).isoformat(), + window_end_time=base_time + timedelta(minutes=1), avg_volume=1_000.0, std_dev_volume=120.0, avg_price_percent_change=0.02, diff --git a/tst/models/test_statistic_model.py b/tst/models/test_statistic_model.py index 61bd857..b930495 100644 --- a/tst/models/test_statistic_model.py +++ b/tst/models/test_statistic_model.py @@ -1,31 +1,36 @@ import json +from datetime import datetime, timezone import pytest from src.models.statistic_model import Statistic +def _parse_ts(value: str) -> datetime: + # Pydantic may emit Z suffix for UTC; normalize so fromisoformat works. + cleaned = value.replace("Z", "+00:00") + return datetime.fromisoformat(cleaned) + + def test_statistic_to_message_round_trip(): stat = Statistic( ticker="AAPL", window_length_minutes=5, - window_end_time="2024-01-01T10:05:00+00:00", + window_end_time=datetime(2024, 1, 1, 10, 5, tzinfo=timezone.utc), avg_volume=123.45, std_dev_volume=6.78, avg_price_percent_change=0.0123, std_dev_price_percent_change=0.0045, - calculation_time="2024-01-01T10:05:01+00:00", + calculation_time=datetime(2024, 1, 1, 10, 5, 1, tzinfo=timezone.utc), ) payload = json.loads(stat.to_message()) - assert payload == { - "ticker": "AAPL", - "window_length_minutes": 5, - "window_end_time": "2024-01-01T10:05:00+00:00", - "avg_volume": 123.45, - "std_dev_volume": 6.78, - "avg_price_percent_change": 0.0123, - "std_dev_price_percent_change": 0.0045, - "calculation_time": "2024-01-01T10:05:01Z", - } + assert payload["ticker"] == "AAPL" + assert payload["window_length_minutes"] == 5 + assert pytest.approx(payload["avg_volume"]) == 123.45 + assert pytest.approx(payload["std_dev_volume"]) == 6.78 + assert pytest.approx(payload["avg_price_percent_change"]) == 0.0123 + assert pytest.approx(payload["std_dev_price_percent_change"]) == 0.0045 + assert _parse_ts(payload["window_end_time"]) == datetime(2024, 1, 1, 10, 5, tzinfo=timezone.utc) + assert _parse_ts(payload["calculation_time"]) == datetime(2024, 1, 1, 10, 5, 1, tzinfo=timezone.utc) diff --git a/tst/models/test_ticker_detection_model.py b/tst/models/test_ticker_detection_model.py index e167d5e..4cf73cf 100644 --- a/tst/models/test_ticker_detection_model.py +++ b/tst/models/test_ticker_detection_model.py @@ -37,7 +37,7 @@ def _build_statistic() -> Statistic: return Statistic( ticker="AAPL", window_length_minutes=5, - window_end_time=datetime(2024, 1, 1, 14, 30, tzinfo=timezone.utc).isoformat(), + window_end_time=datetime(2024, 1, 1, 14, 30, tzinfo=timezone.utc), avg_volume=1_200.0, std_dev_volume=100.0, avg_price_percent_change=0.01, diff --git a/tst/statistics_archiver/test_statistic_consumer.py b/tst/statistics_archiver/test_statistic_consumer.py index 446d551..2a9656b 100644 --- a/tst/statistics_archiver/test_statistic_consumer.py +++ b/tst/statistics_archiver/test_statistic_consumer.py @@ -1,13 +1,19 @@ import asyncio import sys import types +from datetime import datetime, timezone from types import SimpleNamespace -from src.statistics_archiver.metrics import NUM_STATS_ARCHIVED_SUCCEEDED, NUM_STATS_CONSUMED, NUM_STATS_ARCHIVED_FAILED, NUM_CONSUMER_START_FAILED, NUM_CONSUMER_START_SUCCEEDED - import pytest from src.models.statistic_model import Statistic +from src.statistics_archiver.metrics import ( + NUM_STATS_ARCHIVED_FAILED, + NUM_STATS_ARCHIVED_SUCCEEDED, + NUM_STATS_CONSUMED, + NUM_CONSUMER_START_FAILED, + NUM_CONSUMER_START_SUCCEEDED, +) class _CounterValue: @@ -70,7 +76,7 @@ def build_message(**overrides): stat = Statistic( ticker="AAPL", window_length_minutes=5, - window_end_time="2024-01-01T10:05:00+00:00", + window_end_time=datetime(2024, 1, 1, 10, 5, tzinfo=timezone.utc), avg_volume=100.0, std_dev_volume=10.0, avg_price_percent_change=0.01, diff --git a/tst/statistics_archiver/test_statistic_repository.py b/tst/statistics_archiver/test_statistic_repository.py index 00a220c..300c7d4 100644 --- a/tst/statistics_archiver/test_statistic_repository.py +++ b/tst/statistics_archiver/test_statistic_repository.py @@ -4,7 +4,7 @@ import pytest from src.models.statistic_model import Statistic -from src.statistics_archiver.statistic_repository import StatisticRepository, _parse_time +from src.statistics_archiver.statistic_repository import StatisticRepository class DummyConnection: @@ -59,7 +59,7 @@ def test_statistic_repository_insert_statistic_executes_insert(): stat = Statistic( ticker="AAPL", window_length_minutes=5, - window_end_time="2024-01-01T10:05:00+00:00", + window_end_time=datetime(2024, 1, 1, 10, 5, tzinfo=timezone.utc), avg_volume=123.0, std_dev_volume=4.5, avg_price_percent_change=0.01, @@ -79,8 +79,3 @@ def test_statistic_repository_insert_statistic_executes_insert(): assert params[2].tzinfo is not None assert params[6] == pytest.approx(0.002) assert params[7] == datetime(2024, 1, 1, 10, 5, tzinfo=timezone.utc) - - -def test_parse_time_invalid_value(): - with pytest.raises(ValueError): - _parse_time("invalid") diff --git a/tst/statistics_calculator/test_rolling_statistics.py b/tst/statistics_calculator/test_rolling_statistics.py index b10e03c..615a28c 100644 --- a/tst/statistics_calculator/test_rolling_statistics.py +++ b/tst/statistics_calculator/test_rolling_statistics.py @@ -16,7 +16,7 @@ def test_rolling_statistics_calculator_computes_metrics(): volume=10, price_percent_change=0.05, event_time=base_time, - window_end_time=base_time.isoformat(), + window_end_time=base_time, ) assert len(stats) == 2 for stat in stats: @@ -32,7 +32,7 @@ def test_rolling_statistics_calculator_computes_metrics(): volume=20, price_percent_change=0.1, event_time=next_time, - window_end_time=next_time.isoformat(), + window_end_time=next_time, ) assert len(stats) == 2 stats_by_window = {stat.window_length_minutes: stat for stat in stats} @@ -59,14 +59,14 @@ def test_rolling_statistics_skips_large_gap(caplog): calculator = RollingStatisticsCalculator([2]) base_time = datetime(2024, 1, 1, 10, 0, tzinfo=timezone.utc) - calculator.update("AAPL", 10, 0.05, base_time, base_time.isoformat()) + calculator.update("AAPL", 10, 0.05, base_time, base_time) caplog.clear() result = calculator.update( "AAPL", 20, 0.07, base_time + timedelta(minutes=5), - (base_time + timedelta(minutes=5)).isoformat(), + base_time + timedelta(minutes=5), ) assert result == [] assert any("gap too large" in record.message for record in caplog.records) @@ -100,7 +100,7 @@ def test_rolling_statistics_zero_volume_returns_zero_metrics(): calculator = RollingStatisticsCalculator([2]) base_time = datetime(2024, 1, 1, 11, 0, tzinfo=timezone.utc) - stats = calculator.update("MSFT", 0, 0.0, base_time, base_time.isoformat()) + stats = calculator.update("MSFT", 0, 0.0, base_time, base_time) assert len(stats) == 1 stat = stats[0] @@ -115,13 +115,13 @@ def test_rolling_statistics_skips_out_of_order_event_time(caplog): calculator = RollingStatisticsCalculator([2]) base_time = datetime(2024, 1, 1, 13, 0, tzinfo=timezone.utc) - calculator.update("GOOG", 10, 0.02, base_time, base_time.isoformat()) + calculator.update("GOOG", 10, 0.02, base_time, base_time) calculator.update( "GOOG", 20, 0.03, base_time + timedelta(minutes=1), - (base_time + timedelta(minutes=1)).isoformat(), + base_time + timedelta(minutes=1), ) caplog.clear() @@ -130,7 +130,7 @@ def test_rolling_statistics_skips_out_of_order_event_time(caplog): 30, 0.01, base_time + timedelta(seconds=30), - (base_time + timedelta(seconds=30)).isoformat(), + base_time + timedelta(seconds=30), ) assert result == [] assert any("not newer" in record.message for record in caplog.records) @@ -140,13 +140,13 @@ def test_rolling_statistics_skips_when_event_time_not_increasing(): calculator = RollingStatisticsCalculator([2]) base_time = datetime(2024, 1, 1, 14, 0, tzinfo=timezone.utc) - calculator.update("NFLX", 10, 0.02, base_time, base_time.isoformat()) + calculator.update("NFLX", 10, 0.02, base_time, base_time) calculator.update( "NFLX", 20, 0.03, base_time + timedelta(minutes=1), - (base_time + timedelta(minutes=1)).isoformat(), + base_time + timedelta(minutes=1), ) # Non-increasing timestamp should be ignored @@ -156,7 +156,7 @@ def test_rolling_statistics_skips_when_event_time_not_increasing(): 25, 0.04, base_time + timedelta(minutes=1), - (base_time + timedelta(minutes=1)).isoformat(), + base_time + timedelta(minutes=1), ) == [] ) diff --git a/tst/statistics_calculator/test_statistics_processor.py b/tst/statistics_calculator/test_statistics_processor.py index 34d2c46..b9b709b 100644 --- a/tst/statistics_calculator/test_statistics_processor.py +++ b/tst/statistics_calculator/test_statistics_processor.py @@ -111,8 +111,9 @@ def test_statistics_processor_writes_to_redis_and_kafka(): assert message["std_dev_price_percent_change"] == 0 assert "calculation_time" in message - expected_window_end = (base_time + timedelta(minutes=1)).replace(second=0, microsecond=0).isoformat() - assert message["window_end_time"] == expected_window_end + expected_window_end = (base_time + timedelta(minutes=1)).replace(second=0, microsecond=0) + normalized = message["window_end_time"].replace("Z", "+00:00") + assert datetime.fromisoformat(normalized) == expected_window_end def test_statistics_processor_increments_failure_on_error():