A Spark-native federated learning pipeline with Byzantine-resilient aggregation built on 86.2 million smart-meter records across 1,400 Saudi households
| Metric | Value |
|---|---|
| Loss | 0.0841 |
| Accuracy | 91.2% |
| F1 Score | 0.868 |
| Recall Lift | +10.8% over baseline |
| Speedup | 1.7x faster than gRPC at scale |
| Byzantine Resilience | 80% acceptance at 20% attack fraction |
NeuroGrid is a decentralized, privacy-preserving framework leveraging Apache Spark for high-velocity telemetry processing and resilient distributed federated learning. Built on the HEAPO dataset — 1,400 Saudi Arabian households producing 86.2 million telemetry records at 15-minute resolution — the system delivers a full Spark-native ML pipeline from raw CSV ingestion to Byzantine-hardened model convergence.
Research Question: How can a Spark-native federated learning pipeline using RDD
mapPartitionsfor agent parallelism and Spark MLlib KMeans for stratified cohort selection maintain Byzantine resilience under adversarial gradient attacks, while delivering measurable speedup and constant-cost scaling across 86.2M records?
| # | Name |
|---|---|
| 01 | Fahad Dubush |
| 02 | Mohammed Albarazi |
| 03 | Hassan Alsayed |
| 04 | Nadine Elhaddad |
CS4074 Big Data Analytics · Dr. Naila Marir · Effat University · Spring 2026
+==================================================================+
| NEUROGRID — SPARK NATIVE FL ARCHITECTURE |
+==================================================================+
+-------------------------------------------------------------+
| RAW DATA — HEAPO Dataset |
| 1,400 households · 86.2M records · 15-min cadence |
| Smart meter CSVs + Hourly weather CSVs |
+------------------------+------------------------------------+
|
[Stage 1: ETL]
etl_pipeline.py
|
v
+-------------------------------------------------------------+
| SPARK ETL PIPELINE (PySpark) |
| 1. Parallel CSV ingestion (1,400+ files) |
| 2. Schema enforcement Timestamp / Float / String |
| 3. Resampling via groupBy(window('15 minutes')) |
| 4. 3-tier adversarial imputation: |
| T1: 5-slot rolling mean (transient sensor dropouts) |
| T2: household-wide average (long-term gaps) |
| T3: zero-fill (safety fallback) |
| Output: partitioned Parquet (Weather_ID / Household_ID) |
+------------------------+------------------------------------+
|
[Stage 2: MLlib Sampling]
mllib_sampling.py
|
v
+-------------------------------------------------------------+
| SPARK MLlib — KMeans Stratified Sampling |
| KMeans(k=4) on per-household consumption profiles |
| |
| Cluster 0 · Industrial-Heavy ████░░░░░░ 18% |
| Cluster 1 · Residential-Light ██████████ 42% |
| Cluster 2 · Mixed-Variable ██████░░░░ 25% |
| Cluster 3 · Off-Peak Solar ████░░░░░░ 15% |
| |
| Stratified manifest.json (15 agents = proxy for 1,400) |
+------------------------+------------------------------------+
|
[Stage 3: FL Rounds]
fl_round_driver.py
|
v
+-------------------------------------------------------------+
| SPARK RDD FEDERATED LEARNING |
| |
| sc.parallelize(manifest, numSlices=15) |
| .mapPartitions(spark_rank_kernel) |
| |
| +------+ +------+ +------+ +------+ |
| | A01 | | A02 | | A03 | ... | A15 | Building Agents |
| |local | |local | |local | |local | (1 per HH shard) |
| | SGD | | SGD | | SGD | | SGD | |
| +--+---+ +--+---+ +--+---+ +--+---+ |
| | | | | |
| v v v v |
| Delta-theta parameter deltas (delta.pt per agent) |
+------------------------+------------------------------------+
|
[Aggregation: Driver]
|
+-------------+-------------+
v v v
District 1 District 2 District 3
(5 agents) (5 agents) (5 agents)
| | |
Multi-Krum Multi-Krum Multi-Krum
+ Trim Mean + Trim Mean + Trim Mean
| | |
+-------------+-------------+
|
City FedAvg
(sample-weighted)
|
v
W* (global model) Parquet audit
round_metrics.parquet · W_final.pt
20% Byzantine Attack Scenario (3 of 15 agents are malicious)
A01 [OK] A02 [OK] A03 [OK] A04 [OK] A05 [OK]
A06 [OK] A07 [OK] A08 [OK] A09 [OK] A10 [OK]
A11 [OK] A12 [BYZ] A13 [OK] A14 [BYZ] A15 [BYZ]
^^^^^^^^^^^^^^^^^^^^^^^^^^^
Byzantine agents
(scale / flip / noise attack)
Multi-Krum Select --> 80% acceptance rate
Global convergence maintained
| Metric | NeuroGrid Spark FL | gRPC Baseline | Delta |
|---|---|---|---|
| Loss | 0.0841 | 0.094 | lower is better |
| Accuracy | 91.2% | 82.3% | +8.9% |
| Precision | 89.2% | — | — |
| Recall | 84.5% | 76.2% | +10.8% |
| F1 Score | 0.868 | 0.791 | +9.7% |
+10.8% recall is the critical metric — NeuroGrid detects energy surges the baseline misses. The difference between grid stability and a blackout.
| Agents | Spark FL | gRPC | Speedup |
|---|---|---|---|
| 8 | 44 s | 62 s | 1.4x |
| 15 | 51 s | 90 s | 1.8x |
| 25+ (projected) | ~55 s | 140 s+ | 1.7x+ |
Spark FL cost remains constant via RDD sharding. gRPC scales linearly with agent count.
neurogrid-spark/
|
+-- spark_jobs/ Core Spark pipeline scripts
| +-- etl_pipeline.py Stage 1: HEAPO CSVs to Parquet
| +-- mllib_sampling.py Stage 2: MLlib KMeans cohort selection
| +-- fl_local_train.py FL rank kernel (pure PyTorch, no Spark deps)
| +-- byzantine_defense.py Spark-backed Multi-Krum round
| +-- fl_round_driver.py Stage 3: RDD mapPartitions FL orchestrator
| +-- ddp_teacher_trainer.py TorchDistributor + DDP teacher pretraining
| +-- scalability_benchmark.py Agent-count sweep + comparison report
|
+-- models/
| +-- city_lstm.py Encoder-decoder LSTM (all 3 tiers)
|
+-- utils/
| +-- schema.py Feature schema (11-dim, kWh at index 0)
| +-- krum.py Krum / Multi-Krum Byzantine scoring
| +-- aggregation.py FedAvg + trimmed mean
| +-- clipping.py L2 gradient clipping
| +-- dp.py Differential privacy (DP-SGD)
| +-- sparsification.py TopK uplink sparsification
| +-- convergence.py Convergence monitor
| +-- etl_quality.py DQ validation + report writers
| +-- audit_helpers.py Federated audit row builders
|
+-- configs/
| +-- spark_fl_config.yaml All hyperparameters + measured results
|
+-- notebooks/
| +-- spark_fl_demo.ipynb Interactive demo (ETL to sampling to FL)
|
+-- docs/
| +-- spark_fl_architecture.md Full architecture + Q&A defense guide
|
+-- data/
| +-- README_dataset.md HEAPO schema + access instructions
|
+-- outputs/ Generated artifacts (gitkeep)
+-- run_pipeline.py End-to-end CLI runner
+-- requirements.txt
+-- README.md
| Technology | Purpose |
|---|---|
| PySpark | Distributed CSV ingestion, 86.2M-row ETL, RDD mapPartitions for FL agent dispatch |
| Spark MLlib | KMeans(k=4) clustering — stratified 15-agent manifest from 1,400 households |
| PyTorch | CityLSTM (encoder-decoder + attention), local SGD, parameter delta computation |
| TorchDistributor + DDP | Native all_reduce for centralized city-teacher pretraining only |
| Multi-Krum | Byzantine gradient rejection via pairwise distance scoring |
| DP-SGD | Differential privacy: Gaussian noise + L2 clipping on building uplinks |
| TopK Sparsification | Uplink bandwidth reduction — keep top-k% of parameter coordinates |
| Parquet / PyArrow | Columnar storage for training data and per-round audit trails |
# Clone
git clone https://github.com/faodubush/neurogrid-spark.git
cd neurogrid-spark
# Requirements: Python 3.9+, Java 8+
java -version # must succeed
# Install
pip install -r requirements.txt# All 3 stages: ETL → MLlib sampling → FL rounds
python run_pipeline.py --step all --data_dir path/to/15min/csvs
# Stage by stage
python run_pipeline.py --step etl --data_dir path/to/15min/csvs
python run_pipeline.py --step sample --data_dir path/to/15min/csvs
python run_pipeline.py --step fl --manifest_path outputs/sampling/<run_id>/manifest.json# Clean baseline
python run_pipeline.py --step fl --byzantine_fraction 0.0 --rounds 5
# 20% scale attack
python run_pipeline.py --step fl --byzantine_fraction 0.2 --byzantine_attack scale --rounds 5
# Flip and noise attacks
python run_pipeline.py --step fl --byzantine_fraction 0.2 --byzantine_attack flip --rounds 5
python run_pipeline.py --step fl --byzantine_fraction 0.2 --byzantine_attack noise --rounds 5python run_pipeline.py --step benchmark --agent_counts 4 8 15 25
# Output: outputs/spark_fl/benchmarks/<run_id>/scalability_results.mdjupyter notebook notebooks/spark_fl_demo.ipynbAfter a run, artifacts land in outputs/spark_fl/runs/<run_id>/:
<run_id>/
+-- round_metrics.parquet Per-round: loss, acceptance rate, wall-clock
+-- run_summary.json Git SHA, config, final metrics
+-- run_summary.md Human-readable table
+-- W_final.pt Final global model weights
+-- audit/
| +-- agent_round_audit/ Per-agent: is_byzantine, accepted, delta_L2
| +-- district_round_summary/ Per-district: Krum scores, acceptance counts
+-- W_global/round=RRRR/W.pt Per-round checkpoint (resume support)
Pandas and single-process scripts hit hard limits joining weather to 86M records — OOM crashes, hours of runtime, no fault tolerance.
| Problem | Spark Solution |
|---|---|
| Single-process bottleneck | Parallel partitioned compute |
| Sequential I/O on 1,400+ files | Distributed DAG execution |
| No worker recovery | RDD lineage-based recovery |
| Socket crashes on Windows | RDD mapPartitions — Spark's own task scheduler |
| Random cohort sampling (profile skew) | MLlib KMeans stratified selection |
NeuroGrid lays the data foundation for Saudi Arabia's Smart City and Sustainable Energy agenda — a federated learning fabric that turns 1,400 households into a self-defending, privacy-preserving grid intelligence layer, aligned with the Kingdom's digital transformation goals.
Academic use only. Dataset (HEAPO) used under course academic license. Source code: MIT License for all files in this repository.
NeuroGrid · CS4074 Big Data Analytics · Effat University · Spring 2026
High-performance · Resilient · Vision 2030-aligned distributed learning for Saudi smart cities