Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,18 @@ STATISTICS_DB_TABLE=feed_statistics
KAFKA_TICKER_DETECTION_TOPIC=ticker-detection
DETECTION_ENGINE_VOLUME_Z_THRESHOLD=2.5
DETECTION_ENGINE_PRICE_Z_THRESHOLD=2.5
DETECTION_ARCHIVER_CONSUMER_GROUP=detection-archiver
DETECTION_ARCHIVER_METRICS_PORT=9700
DETECTION_ARCHIVER_DB_HOST=postgres
DETECTION_ARCHIVER_DB_PORT=5432
DETECTION_ARCHIVER_DB_NAME=tradestrike_db
DETECTION_ARCHIVER_DB_USER=admin
DETECTION_ARCHIVER_DB_PASSWORD=admin
DETECTION_ARCHIVER_DB_SCHEMA=public
DETECTION_ARCHIVER_DB_TABLE=feed_detection
MINIO_HOST=minio
MINIO_PORT=9000
MINIO_USER=minio_admin
MINIO_PASSWORD=minio_admin
MINIO_BUCKET=detections
MINIO_SECURE=false
21 changes: 12 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ FAKE_ALPACA_INTERVAL_SECONDS ?= 1

all: test

install: check_python venv install-feed-ingestor-deps
install: check_python venv install-service-deps

check_python:
@if [ -z "$(PYTHON)" ]; then echo "Python interpreter not found. Please install Python 3."; exit 1; fi
Expand All @@ -27,13 +27,16 @@ venv: requirements.txt
$(PYTHON) -m venv venv
$(VENV_PYTHON_PATH) -m pip install -r requirements.txt

install-feed-ingestor-deps: venv
@REQ_FILE=services/feed-ingestor/requirements.txt; \
if [ ! -f $$REQ_FILE ]; then \
echo "⚠️ No runtime requirements found for feed-ingestor at $$REQ_FILE"; \
install-service-deps: venv
@REQ_FILES=$$(find services -mindepth 2 -maxdepth 2 -name requirements.txt | sort); \
if [ -z "$$REQ_FILES" ]; then \
echo "⚠️ No runtime requirements found under services/"; \
else \
echo "📦 Installing dependencies for service 'feed-ingestor'..."; \
$(VENV_PYTHON_PATH) -m pip install -r $$REQ_FILE; \
for req in $$REQ_FILES; do \
service=$$(basename $$(dirname $$req)); \
echo "📦 Installing dependencies for service '$$service'..."; \
$(VENV_PYTHON_PATH) -m pip install -r $$req; \
done; \
fi

install-service: check_python venv
Expand All @@ -44,7 +47,7 @@ install-service: check_python venv
$(VENV_PYTHON_PATH) -m pip install -r $$REQ_FILE

test: install
$(VENV_PYTHON_PATH) -m pytest -s tst/ --cov-fail-under=100 --cov=src --cov-report=html
$(VENV_PYTHON_PATH) -m pytest -s tst/ --cov-fail-under=100 --cov=src --cov-report=term-missing --cov-report=html

clean: delete-stack
@echo "🧹 Cleaning environment..."
Expand Down Expand Up @@ -74,4 +77,4 @@ create-stack: install-docker test
USE_FAKE_ALPACA_CLIENT=$(MOCK_ALPACA_CLIENT) FAKE_ALPACA_INTERVAL_SECONDS=$(FAKE_ALPACA_INTERVAL_SECONDS) docker compose up -d --build
@echo "🎯 Environment setup complete — all services running."

.PHONY: all install install-feed-ingestor-deps install-service clean test install-docker create-stack delete-stack
.PHONY: all install install-service install-service-deps clean test install-docker create-stack delete-stack
83 changes: 81 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ services:
- /bin/sh
- -c
- |
set -euo pipefail
echo "Waiting for Kafka to become reachable..."
while ! KAFKA_OPTS="" /opt/kafka/bin/kafka-topics.sh --bootstrap-server "${KAFKA_BOOTSTRAP_SERVERS}" --list >/dev/null 2>&1; do
echo " still waiting..."
Expand Down Expand Up @@ -207,6 +208,46 @@ services:
retries: 5
restart: unless-stopped

# ============================================================
# 📊 Detection Archiver Worker
# ============================================================
detection-archiver:
build:
context: .
dockerfile: services/detection-archiver/Dockerfile
container_name: detection-archiver
env_file:
- .env
environment:
DETECTION_ARCHIVER_CONSUMER_GROUP: ${DETECTION_ARCHIVER_CONSUMER_GROUP:-detection-archiver}
DETECTION_ARCHIVER_METRICS_PORT: ${DETECTION_ARCHIVER_METRICS_PORT:-9700}
DETECTION_ARCHIVER_DB_HOST: ${DETECTION_ARCHIVER_DB_HOST:-postgres}
DETECTION_ARCHIVER_DB_PORT: ${DETECTION_ARCHIVER_DB_PORT:-5432}
DETECTION_ARCHIVER_DB_NAME: ${DETECTION_ARCHIVER_DB_NAME:-tradestrike_db}
DETECTION_ARCHIVER_DB_USER: ${DETECTION_ARCHIVER_DB_USER:-admin}
DETECTION_ARCHIVER_DB_PASSWORD: ${DETECTION_ARCHIVER_DB_PASSWORD:-admin}
DETECTION_ARCHIVER_DB_SCHEMA: ${DETECTION_ARCHIVER_DB_SCHEMA:-public}
DETECTION_ARCHIVER_DB_TABLE: ${DETECTION_ARCHIVER_DB_TABLE:-feed_detection}
ports:
- "${DETECTION_ARCHIVER_METRICS_PORT:-9700}:${DETECTION_ARCHIVER_METRICS_PORT:-9700}"
depends_on:
kafka:
condition: service_healthy
kafka-topic-init:
condition: service_completed_successfully
postgres:
condition: service_healthy
minio:
condition: service_healthy
minio-init-bucket:
condition: service_completed_successfully
healthcheck:
test: ["CMD", "python", "-m", "src.detection_archiver.healthcheck"]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped

# ============================================================
# 📊 Statistics Archiver Worker
# ============================================================
Expand Down Expand Up @@ -376,12 +417,18 @@ services:
container_name: minio
command: server /data --console-address ":9001"
ports:
- "9000:9000"
- "${MINIO_PORT:-9000}:${MINIO_PORT:-9000}"
- "9001:9001"
environment:
MINIO_PROMETHEUS_JOB_ID: minio
MINIO_PROMETHEUS_AUTH_TYPE: public
MINIO_PROMETHEUS_URL: http://prometheus:9090
MINIO_ROOT_USER: ${MINIO_USER:-minio_admin}
MINIO_ROOT_PASSWORD: ${MINIO_PASSWORD:-minio_admin}
MINIO_PORT: ${MINIO_PORT:-9000}
MINIO_HOST: ${MINIO_HOST:-localhost}
MINIO_BUCKET: ${MINIO_BUCKET:-detections}
MINIO_SECURE: ${MINIO_SECURE:-false}
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 10s
Expand All @@ -391,6 +438,38 @@ services:
- minio_data:/data
restart: unless-stopped

# ============================================================
# ☁️ MinIO Init Bucket
# ============================================================
minio-init-bucket:
image: minio/mc
container_name: minio-init-bucket
env_file:
- .env
depends_on:
minio:
condition: service_healthy
entrypoint: >
/bin/sh -c "
set -euo pipefail
echo 'Configuring MinIO client...';
export MC_HOST_local=\"http://${MINIO_USER:-minio_admin}:${MINIO_PASSWORD:-minio_admin}@${MINIO_HOST:-minio}:${MINIO_PORT:-9000}\";
/usr/bin/mc alias list;
echo 'MinIO client configured successfully.';
echo 'Checking if bucket ${MINIO_BUCKET:-detections} exists...';
if /usr/bin/mc ls local/${MINIO_BUCKET:-detections} >/dev/null 2>&1; then
echo 'Bucket ${MINIO_BUCKET:-detections} already exists. Skipping creation.';
else
echo 'Creating bucket ${MINIO_BUCKET:-detections}...';
/usr/bin/mc mb -p local/${MINIO_BUCKET:-detections};
echo 'Listing all buckets in local MinIO:';
/usr/bin/mc ls local;
echo 'MinIO bucket initialization complete.';
exit 0;
fi;
"
restart: "no"

# ============================================================
# 📬 RabbitMQ
# ============================================================
Expand Down Expand Up @@ -446,7 +525,7 @@ services:
- grafana_data:/var/lib/grafana
- ./monitoring/grafana/datasources.yml:/etc/grafana/provisioning/datasources/datasources.yml
- ./monitoring/grafana/dashboards.yml:/etc/grafana/provisioning/dashboards/dashboards.yml
- ./monitoring/grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./monitoring/grafana/dashboards:/var/lib/grafana/dashboards
depends_on:
prometheus:
condition: service_healthy
Expand Down
2 changes: 1 addition & 1 deletion monitoring/grafana/dashboards.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ providers:
editable: true
updateIntervalSeconds: 15
options:
path: /etc/grafana/provisioning/dashboards
path: /var/lib/grafana/dashboards
Empty file.
2 changes: 1 addition & 1 deletion monitoring/grafana/dashboards/stocks.json
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@
"editorMode": "code",
"format": "time_series",
"rawQuery": true,
"rawSql": "SELECT window_end_time AS \"time\", avg_volume, std_dev_volume AS \"std_dev\" FROM feed_statistics WHERE ticker = ${ticker:sqlstring} AND WHERE window_length_minutes = ${window:int}",
"rawSql": "SELECT window_end_time AS \"time\", avg_volume, std_dev_volume AS \"std_dev\" FROM feed_statistics WHERE ticker = ${ticker:sqlstring} AND window_length_minutes = ${window:int}",
"refId": "A",
"sql": {
"columns": [
Expand Down
4 changes: 4 additions & 0 deletions monitoring/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ scrape_configs:
static_configs:
- targets: ['statistics-archiver:9500']

- job_name: 'detection-archiver'
static_configs:
- targets: ['detection-archiver:9700']

# Redis Exporter
- job_name: 'redis'
static_configs:
Expand Down
14 changes: 14 additions & 0 deletions services/detection-archiver/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM python:3.12-slim

ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PIP_NO_CACHE_DIR=1

WORKDIR /app

COPY services/detection-archiver/requirements.txt /tmp/requirements.txt
RUN pip install --no-cache-dir -r /tmp/requirements.txt

COPY src /app/src

CMD ["python", "-m", "src.detection_archiver.main"]
5 changes: 5 additions & 0 deletions services/detection-archiver/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
aiokafka>=0.10,<0.11
asyncpg>=0.29,<0.30
pydantic>=2.6,<3.0
prometheus-client>=0.20.0,<0.21
minio>=7.2.18,<7.3
84 changes: 84 additions & 0 deletions src/detection_archiver/detection_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import asyncio
import logging
from aiokafka import AIOKafkaConsumer
import os
from src.models.ticker_detection_model import TickerDetection
from src.detection_archiver.metrics import NUM_DETECTIONS_ARCHIVED_SUCCEEDED, NUM_DETECTIONS_CONSUMED, NUM_DETECTIONS_ARCHIVED_FAILED, NUM_CONSUMER_START_FAILED, NUM_CONSUMER_START_SUCCEEDED
from src.detection_archiver.detection_repository import DetectionRepository
from io import BytesIO
from minio import Minio

logger = logging.getLogger(__name__)

CONTENT_TYPE = 'application/json'
ENCODING_SCHEME = 'utf-8'

class DetectionConsumer:
def __init__(
self,
consumer: AIOKafkaConsumer,
repository: DetectionRepository,
minio_client: Minio,
bucket_name: str,
):
self._consumer = consumer
self._repository = repository
self._minio_client = minio_client
self._bucket_name = bucket_name

async def run(self) -> None:
try:
await self._consumer.start()
except Exception:
NUM_CONSUMER_START_FAILED.inc()
logger.exception("Failed to start kafka consumer.")
raise
NUM_CONSUMER_START_SUCCEEDED.inc()
logger.info(
"Kafka consumer connected for topics: %s",
", ".join(sorted(self._consumer.subscription() or [])),
)
async for message in self._consumer:
NUM_DETECTIONS_CONSUMED.inc()
try:
ticker_detection = TickerDetection.model_validate_json(message.value)
ticker_time = ticker_detection.ticker.time
context_path = os.path.join(
f"ticker={ticker_detection.ticker.ticker}",
ticker_time.strftime("year=%Y/month=%m/day=%d/hour=%H"),
f"{ticker_time.isoformat()}.json",
)
serialized_detection = ticker_detection.to_message()
stream = BytesIO(serialized_detection)
await asyncio.to_thread(
self._minio_client.put_object,
self._bucket_name,
context_path,
stream,
len(serialized_detection),
content_type=CONTENT_TYPE,
)
logger.info(
"Persisted detection for %s to MinIO bucket %s at %s",
ticker_detection,
self._bucket_name,
context_path,
)
await self._repository.insert_detection(
ticker_detection=ticker_detection,
context_path=context_path
)
logger.info(
"Stored detection for %s in TimeScaleDB (context=%s)",
ticker_detection,
context_path,
)
NUM_DETECTIONS_ARCHIVED_SUCCEEDED.inc()
except Exception:
NUM_DETECTIONS_ARCHIVED_FAILED.inc()
logger.exception(
"Failed to archive detection from partition %s offset %s",
message.partition,
message.offset,
)
raise
Loading