Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.9
hooks:
- id: ruff
args: ["--fix"]
- id: ruff-format
16 changes: 13 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ SERVICE ?=
MOCK_ALPACA_CLIENT ?= false
FAKE_ALPACA_INTERVAL_SECONDS ?= 1

all: test
all: install lint test

install: check_python venv install-service-deps

Expand Down Expand Up @@ -46,9 +46,19 @@ install-service: check_python venv
echo "📦 Installing dependencies for service '$(SERVICE)'..."; \
$(VENV_PYTHON_PATH) -m pip install -r $$REQ_FILE

test: install
test: install lint
$(VENV_PYTHON_PATH) -m pytest -s tst/ --cov-fail-under=100 --cov=src --cov-report=term-missing --cov-report=html

lint: install
$(VENV_PYTHON_PATH) -m ruff check src tst

lint-fix: install
$(VENV_PYTHON_PATH) -m ruff format src tst
$(VENV_PYTHON_PATH) -m ruff check src tst --fix

format: install
$(VENV_PYTHON_PATH) -m ruff format src tst

clean: delete-stack
@echo "🧹 Cleaning environment..."
@if [ -f .coverage ]; then rm .coverage; fi
Expand Down Expand Up @@ -77,4 +87,4 @@ create-stack: install-docker test
USE_FAKE_ALPACA_CLIENT=$(MOCK_ALPACA_CLIENT) FAKE_ALPACA_INTERVAL_SECONDS=$(FAKE_ALPACA_INTERVAL_SECONDS) docker compose up -d --build
@echo "🎯 Environment setup complete — all services running."

.PHONY: all install install-service install-service-deps clean test install-docker create-stack delete-stack
.PHONY: all install install-service install-service-deps clean test lint lint-fix format install-docker create-stack delete-stack
53 changes: 38 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,28 @@ Professional traders have access to expensive terminals and fast proprietary sca

TradeStrike runs as a Docker Compose stack with the following services:

| Service | Description | Ports (host) |
|------------------|-----------------------------------------------------------------------------|-----------------------|
| `kafka` | Apache Kafka 4.x (KRaft mode) with JMX exporter baked in | `9092`, `19092`, `9102` |
| `kafka-exporter` | Prometheus Kafka lag exporter (`danielqsj/kafka-exporter`) | `9308` |
| `feed-ingestor` | Python worker that streams Alpaca market data and writes to Kafka | `9200` (Prometheus metrics) |
| `detection-engine` | Kafka worker that evaluates anomalies and emits ticker detections | `9600` (Prometheus metrics) |
| `redis` | In-memory cache/queue supporting downstream services | `6379` |
| `postgres` | Relational datastore for persisted application state | `5432` |
| `rabbitmq` | Message broker with management and metrics endpoints | `5672`, `15672`, `15692` |
| `minio` | S3-compatible object storage for artifacts and historical data | `9000`, `9001` |
| `prometheus` | Metrics collection | `9090` |
| `grafana` | Dashboards and alerting | `3000` |
| `loki` | Centralized log aggregation backend | `3100` |
| `promtail` | Log shipper that forwards container logs to Loki | `9080` |
| Service | Description | Ports (host) |
|----------------------|-----------------------------------------------------------------------------|---------------------------|
| `kafka` | Apache Kafka 4.x (KRaft mode) with JMX exporter baked in | `9092`, `19092`, `9102` |
| `kafka-exporter` | Prometheus Kafka lag exporter (`danielqsj/kafka-exporter`) | `9308` |
| `kafka-topic-init` | One-shot container that creates required Kafka topics idempotently | n/a |
| `feed-ingestor` | Streams Alpaca market data (real or fake) into Kafka | `9200` (Prom metrics) |
| `feed-archiver` | Writes Kafka bars into TimescaleDB | `9300` (Prom metrics) |
| `detection-engine` | Evaluates anomalies per ticker window | `9600` (Prom metrics) |
| `detection-archiver` | Stores detections in TimescaleDB + MinIO | `9700` (Prom metrics) |
| `statistics-calculator` | Maintains rolling stats and publishes aggregates | `9400` (Prom metrics) |
| `statistics-archiver` | Persists rolling stats to TimescaleDB | `9500` (Prom metrics) |
| `redis` | In-memory store for ephemeral stats caches | `6379` |
| `redis-exporter` | Prometheus exporter for Redis metrics | `9121` |
| `postgres` | TimescaleDB instance for persistent application state | `5432` |
| `postgres-exporter` | Prometheus exporter for Timescale metrics | `9187` |
| `minio` | S3-compatible object storage | `9000`, `9001` |
| `minio-init-bucket` | Initializes MinIO bucket during stack startup | n/a |
| `prometheus` | Metrics collection | `9090` |
| `grafana` | Dashboards + alerting | `3000` |
| `loki` | Centralized log aggregation backend | `3100` |
| `promtail` | Log shipper forwarding container logs to Loki | `9080` |
| `rabbitmq` | Message broker with management UI | `5672`, `15672`, `15692` |

Source code and Dockerfiles live under `services/<name>/`, while shared monitoring configuration stays in `monitoring/`.

Expand Down Expand Up @@ -89,6 +97,7 @@ To get started with the repository, follow these steps:
5. **Access monitoring tools**:
- Prometheus: http://localhost:9090
- Grafana: http://localhost:3000 (default credentials `admin` / `admin`)
- Minio: http://localhost:9001 (default credentials `minio_admin` / `minio_admin`)

6. **Tail logs**:
Promtail ships container logs to Loki. You can browse them through Grafana (`Explore` tab) or via `docker compose logs <service>`.
Expand All @@ -100,12 +109,26 @@ Even though services run in containers, a local virtual environment remains usef
```
make install # create venv + install shared dev dependencies
make install-service SERVICE=feed-ingestor # install runtime deps for a specific service
make test # run pytest suite with 100% coverage gate
make lint # run Ruff lint (read-only)
make lint-fix # run formatter + lint autofix for quick cleanup
make format # Ruff formatter only (no lint)
make test # run lint + pytest with 100% coverage gate
make clean # tear down stack + remove venv, caches, coverage artifacts
```

Each Python microservice keeps its runtime dependencies under `services/<name>/requirements.txt`. Use `make install-service SERVICE=<name>` to layer that service's dependencies into your local venv when needed.

### Optional Git Hooks

Install the bundled pre-commit hooks to automatically run Ruff before each commit:

```
pip install pre-commit
pre-commit install
```

This registers the hooks defined in `.pre-commit-config.yaml`. You can always run them manually with `pre-commit run --all-files`.

## Maintenance Commands

- Stop and remove the entire stack (including named volumes):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pytest>=8.3.5,<9.0
pytest-cov>=5.0.0,<6.0
ruff>=0.6.9,<0.7
10 changes: 10 additions & 0 deletions ruff.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
line-length = 140
target-version = "py312"
fix = false

[lint]
select = ["E", "F", "B", "I"]
ignore = []

[lint.isort]
combine-as-imports = true
26 changes: 16 additions & 10 deletions src/detection_archiver/detection_consumer.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
import asyncio
import logging
from aiokafka import AIOKafkaConsumer
import os
from src.models.ticker_detection_model import TickerDetection
from src.detection_archiver.metrics import NUM_DETECTIONS_ARCHIVED_SUCCEEDED, NUM_DETECTIONS_CONSUMED, NUM_DETECTIONS_ARCHIVED_FAILED, NUM_CONSUMER_START_FAILED, NUM_CONSUMER_START_SUCCEEDED
from src.detection_archiver.detection_repository import DetectionRepository
from io import BytesIO

from aiokafka import AIOKafkaConsumer
from minio import Minio

from src.detection_archiver.detection_repository import DetectionRepository
from src.detection_archiver.metrics import (
NUM_CONSUMER_START_FAILED,
NUM_CONSUMER_START_SUCCEEDED,
NUM_DETECTIONS_ARCHIVED_FAILED,
NUM_DETECTIONS_ARCHIVED_SUCCEEDED,
NUM_DETECTIONS_CONSUMED,
)
from src.models.ticker_detection_model import TickerDetection

logger = logging.getLogger(__name__)

CONTENT_TYPE = 'application/json'
ENCODING_SCHEME = 'utf-8'
CONTENT_TYPE = "application/json"
ENCODING_SCHEME = "utf-8"


class DetectionConsumer:
def __init__(
Expand Down Expand Up @@ -64,10 +73,7 @@ async def run(self) -> None:
self._bucket_name,
context_path,
)
await self._repository.insert_detection(
ticker_detection=ticker_detection,
context_path=context_path
)
await self._repository.insert_detection(ticker_detection=ticker_detection, context_path=context_path)
logger.info(
"Stored detection for %s in TimeScaleDB (context=%s)",
ticker_detection,
Expand Down
6 changes: 5 additions & 1 deletion src/detection_archiver/detection_repository.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import logging
from typing import Final

import asyncpg

from src.models.ticker_detection_model import TickerDetection

logger = logging.getLogger(__name__)

CREATE_EXTENSION_SQL: Final[str] = "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;"
Expand Down Expand Up @@ -49,6 +52,7 @@
context_path = EXCLUDED.context_path;
"""


class DetectionRepository:
def __init__(self, pool: asyncpg.Pool, schema: str, table: str):
self._pool = pool
Expand Down Expand Up @@ -89,7 +93,7 @@ async def insert_detection(self, ticker_detection: TickerDetection, context_path
context_path,
)
logger.info(
"Archived detection for ticker: %s, timestamp: %s, window_length_minutes: %s window_end_time: %s",
"Archived detection for ticker: %s, timestamp: %s, window_length_minutes: %s window_end_time: %s",
ticker_detection.ticker.ticker,
ticker_detection.ticker.time,
detection_detail.statistic.window_length_minutes,
Expand Down
37 changes: 22 additions & 15 deletions src/detection_archiver/healthcheck.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
import logging

import asyncpg
from src.util.environment import get_env
from minio import Minio

from src.util.environment import get_env

logger = logging.getLogger(__name__)

TIMESCALE_DB_HOST_ENV = "DETECTION_ARCHIVER_DB_HOST"
Expand All @@ -18,20 +20,23 @@
MINIO_BUCKET_ENV = "MINIO_BUCKET"
MINIO_SECURE_ENV = "MINIO_SECURE"


async def _run_healthcheck() -> None:
env = get_env([
TIMESCALE_DB_HOST_ENV,
TIMESCALE_DB_PORT_ENV,
TIMESCALE_DB_NAME_ENV,
TIMESCALE_DB_USER_ENV,
TIMESCALE_DB_PASSWORD_ENV,
MINIO_HOST_ENV,
MINIO_PORT_ENV,
MINIO_USER_ENV,
MINIO_PASSWORD_ENV,
MINIO_BUCKET_ENV,
MINIO_SECURE_ENV,
])
env = get_env(
[
TIMESCALE_DB_HOST_ENV,
TIMESCALE_DB_PORT_ENV,
TIMESCALE_DB_NAME_ENV,
TIMESCALE_DB_USER_ENV,
TIMESCALE_DB_PASSWORD_ENV,
MINIO_HOST_ENV,
MINIO_PORT_ENV,
MINIO_USER_ENV,
MINIO_PASSWORD_ENV,
MINIO_BUCKET_ENV,
MINIO_SECURE_ENV,
]
)

connection = await asyncpg.connect(
host=env[TIMESCALE_DB_HOST_ENV],
Expand All @@ -58,8 +63,10 @@ async def _run_healthcheck() -> None:
buckets = minio_client.list_buckets()
print(f"Detection archiver MinIO healthcheck succeeded and found buckets: {buckets}.")

def main() -> None: # pragma: no cover

def main() -> None: # pragma: no cover
asyncio.run(_run_healthcheck())


if __name__ == "__main__": # pragma: no cover
main()
41 changes: 22 additions & 19 deletions src/detection_archiver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,29 @@
MINIO_BUCKET_ENV = "MINIO_BUCKET"
MINIO_SECURE_ENV = "MINIO_SECURE"


async def main() -> None:
env = get_env([
KAFKA_BOOTSTRAP_SERVERS_ENV,
KAFKA_TICKER_DETECTION_TOPIC_ENV,
DETECTION_ARCHIVER_CONSUMER_GROUP_ENV,
DETECTION_ARCHIVER_METRICS_PORT_ENV,
DETECTION_ARCHIVER_DB_HOST_ENV,
DETECTION_ARCHIVER_DB_PORT_ENV,
DETECTION_ARCHIVER_DB_NAME_ENV,
DETECTION_ARCHIVER_DB_USER_ENV,
DETECTION_ARCHIVER_DB_PASSWORD_ENV,
DETECTION_ARCHIVER_DB_SCHEMA_ENV,
DETECTION_ARCHIVER_DB_TABLE_ENV,
MINIO_HOST_ENV,
MINIO_PORT_ENV,
MINIO_USER_ENV,
MINIO_PASSWORD_ENV,
MINIO_BUCKET_ENV,
MINIO_SECURE_ENV,
])
env = get_env(
[
KAFKA_BOOTSTRAP_SERVERS_ENV,
KAFKA_TICKER_DETECTION_TOPIC_ENV,
DETECTION_ARCHIVER_CONSUMER_GROUP_ENV,
DETECTION_ARCHIVER_METRICS_PORT_ENV,
DETECTION_ARCHIVER_DB_HOST_ENV,
DETECTION_ARCHIVER_DB_PORT_ENV,
DETECTION_ARCHIVER_DB_NAME_ENV,
DETECTION_ARCHIVER_DB_USER_ENV,
DETECTION_ARCHIVER_DB_PASSWORD_ENV,
DETECTION_ARCHIVER_DB_SCHEMA_ENV,
DETECTION_ARCHIVER_DB_TABLE_ENV,
MINIO_HOST_ENV,
MINIO_PORT_ENV,
MINIO_USER_ENV,
MINIO_PASSWORD_ENV,
MINIO_BUCKET_ENV,
MINIO_SECURE_ENV,
]
)

start_http_server(int(env[DETECTION_ARCHIVER_METRICS_PORT_ENV]))

Expand Down
2 changes: 1 addition & 1 deletion src/detection_archiver/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@
NUM_CONSUMER_START_FAILED = Counter(
"detection_archiver_consumer_failed_total",
"Number of times consumer start failed",
)
)
Loading