High-performance research toolkit for streaming anomaly detection and adaptive ensembling. This repository provides data preprocessing, dataset partitioning, multiple anomaly detection algorithms, and example integrations with Kafka for real-time evaluation and throughput benchmarking.
Table of Contents
- Highlights
- Features
- Repository Structure
- Prerequisites
- Installation
- Datasets
- Quickstart
- Running With Kafka
- Scripts & Examples
- Development & Contributing
- License
- Designed for high-throughput streaming evaluation using partitioned HIGGS dataset splits.
- Implements multiple drift- and anomaly-detection strategies and ensembling pipelines.
- Includes tools for dataset partitioning, throughput benchmarking, and Kafka producer/consumer examples.
- Data partitioning and split management for large CSV datasets.
- Multiple detection modules (single-node and ensemble approaches).
- Integration examples with Kafka for realistic streaming workloads.
- Scripts for benchmarking throughput and measuring end-to-end performance.
download_higgs_dataset.py— helper to fetch the HIGGS dataset.split_higgs_100.py— split large HIGGS CSV into 100 parts (used in experiments).higgs_100_splits/— example dataset splits used for streaming experiments.kafka_producer_real.py,kafka_consumer_simple.py— simple Kafka integration examples.ensemble_pipeline.py,ensemble_nids.py— ensemble-based anomaly detection pipelines.cdfrs.py,cdfrs_higgs.py,cdfrs_nids.py— core CDFRS algorithm implementations and dataset-specific wrappers.swd.py,swd_nids.py,swd2.py,swd3.py— sliding-window / drift-detection variants and experiments.throughput.md— notes and measurements on throughput experiments.comparison/— implementation comparisons and feature tracking.
Refer to the scripts' top-of-file docstrings for details on arguments and usage.
This repository is organized around a modular streaming-evaluation architecture:
- Data ingestion & partitioning: CSV split utilities and Kafka producers load partitioned HIGGS/NIDS data into streams.
- Detection modules: Core algorithms (
cdfrs,swd, etc.) implement single-model detectors and dataset-specific wrappers. - Ensembling & pipelines:
ensemble_pipeline.pyandensemble_nids.pycompose detectors into ensemble workflows with aggregation and rollback logic. - Integration & IO: Kafka producer/consumer examples decouple transport from detection logic for realistic experiments.
- Benchmarking & tooling: Throughput benchmarks, split validators, and helper scripts measure performance and ensure reproducible experiments.
The design favors small, testable components that can be wired together for experiments or production-like streaming tests.
If you prefer editable source, a Mermaid version of the architecture is available below and in docs/architecture.mmd.
flowchart LR
subgraph Ingestion[Data Ingestion & Partitioning]
A[CSV splits] -->|publish| K[Kafka Producer]
end
subgraph Detection[Detection Modules]
D1[`cdfrs`] & D2[`swd`] & D3[dataset wrappers]
end
subgraph Ensemble[Ensembling & Pipelines]
E[Orchestrator / Aggregator]
end
subgraph IO[Integration & IO]
C[Kafka Consumer / Connectors]
end
subgraph Tools[Benchmarking & Tooling]
B[Throughput benchmarks]
end
A --> D1
A --> D2
A --> D3
D1 --> E
D2 --> E
D3 --> E
E --> C
C --> B
- Python 3.10+ recommended
- Linux or macOS (development tested on Ubuntu 24.04 LTS)
- Java & Kafka for streaming examples (if using provided Kafka scripts)
- Install Python dependencies listed in
requirements.txt
- Create and activate a virtual environment:
python -m venv .venv
source .venv/bin/activate- Install dependencies:
pip install --upgrade pip
pip install -r requirements.txt- (Optional) If you plan to run the Kafka examples, install and start Kafka and Zookeeper per your environment.
This project commonly uses the HIGGS dataset split into 100 parts for streaming experiments. The higgs_100_splits/ directory contains example split files. Use download_higgs_dataset.py to fetch the raw dataset and split_higgs_100.py to partition it.
- Prepare dataset (download or point the scripts to your CSV files).
- Run a single-model test or the ensemble pipeline to validate end-to-end flow:
python cdfrs_higgs.py --input higgs_100_splits/HIGGS_part_001.csv
# or run the ensemble
python ensemble_pipeline.py --config configs/ensemble_higgs.yaml- Check output logs and metrics produced by the scripts. Many scripts print concise performance summaries to stdout.
Start Kafka and Zookeeper according to your distribution (Confluent Platform, Apache Kafka binary, or Docker compose). This repo includes docker-compose.yml to run Kafka locally for testing.
Producer example (publish split files to topic):
python kafka_producer_real.py --topic higgs-splits --dir higgs_100_splitsConsumer example (simple consumer used by detection pipelines):
python kafka_consumer_simple.py --topic higgs-splitsSee README_KAFKA.md for additional notes and recommended Kafka settings for high-throughput testing.
benchmark_kafka_throughput.py— microbenchmarks for Kafka end-to-end throughput.partition_nids.py— partitioning utilities for NIDS-style datasets.ensemble_nids.py— ensemble experiments for network intrusion detection data.check_100_split_sizes.py— validates the sizes of split CSVs.
Each script exposes CLI flags. Run python <script>.py --help for usage details.
Contributions are welcome. Recommended workflow:
- Fork the repository and create a feature branch.
- Run tests and linters, add unit tests for new logic.
- Open a pull request describing the change and rationale.
Please follow the existing code style and keep changes minimal and focused.
- If Kafka producers or consumers fail, verify broker connectivity and topic creation.
- When running large splits, ensure sufficient disk and memory resources.
- For dependency errors, try upgrading
pipand reinstalling therequirements.txt.
This project is licensed under the MIT License — see LICENSE for details.