diff --git a/.gcp/terraforms/iam_bindings.tf b/.gcp/terraforms/iam_bindings.tf index a024fed..f42dcf8 100644 --- a/.gcp/terraforms/iam_bindings.tf +++ b/.gcp/terraforms/iam_bindings.tf @@ -12,18 +12,21 @@ resource "google_service_account_iam_member" "github_deployer_sa" { # Roles for github-actions-deployer locals { deployer_roles = [ - "roles/run.developer", - "roles/workflows.editor", - "roles/cloudscheduler.admin", - "roles/iam.serviceAccountUser" + "roles/run.developer", # Manage Cloud Run jobs + "roles/workflows.editor", # Manage Workflows + "roles/cloudscheduler.admin", # Manage Scheduler + "roles/iam.serviceAccountUser", # Act as SAs for jobs + "roles/artifactregistry.admin", # Manage Artifact Registry + "roles/eventarc.admin", # Manage Eventarc triggers + "roles/storage.admin", # Manage buckets and state locking + "roles/resourcemanager.projectIamAdmin", # Manage the IAM bindings in this code + "roles/iam.workloadIdentityPoolAdmin", # Manage WIF in wif.tf + "roles/monitoring.admin", # Manage Monitoring in monitoring.tf + "roles/iam.serviceAccountAdmin", # Manage Alert policies in monitoring.tf + "roles/iam.admin" # Manage Iam roles ] } -# Enables to: -# Deploy Cloud Run containers -# Deploy pipeline-dispatcher.yml -# Update the cron job -# Attach SAs to Cloud Run resource "google_project_iam_member" "github_deployer_permissions" { for_each = toset(local.deployer_roles) project = var.project_id diff --git a/.github/workflows/ci-infra.yml b/.github/workflows/ci-infra.yml index b9977c1..83ccb58 100644 --- a/.github/workflows/ci-infra.yml +++ b/.github/workflows/ci-infra.yml @@ -12,6 +12,11 @@ on: workflow_dispatch: workflow_call: +# Add concurrency control to prevent state lock issues +concurrency: + group: terraform-${{ env.ENV }} + cancel-in-progress: false + permissions: contents: 'read' id-token: 'write' @@ -50,4 +55,4 @@ jobs: TF_VAR_region: ${{ env.REGION }} TF_VAR_github_repo: ${{ env.GITHUB_REPO }} TF_VAR_alert_email_map: ${{ secrets.ALERT_EMAIL_MAP }} - run: terraform apply -auto-approve -lock=false \ No newline at end of file + run: terraform apply -auto-approve \ No newline at end of file diff --git a/README.md b/README.md index 5a4e089..f536a0c 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Small to mid-sized organizations are trapped in a cycle where they outgrow the c This project solves that challenge by delivering a highly resilient, event-driven data pipeline on Google Cloud Platform for reliable operational analytics. It guarantees data integrity through a strict Medallion architecture (Bronze, Silver, Gold) that relies on rigid data contracts and validation gates to catch and isolate bad data early in the lifecycle. ### Defensive Pipeline Architecture -![WIP_pipeline_diagram_picture](https://still-working-on-it.need-to-finish-readme.first) +![pipeline-orchestration-diagram](/assets/diagrams/01-pipeline-orchestration-diagram.png) To eliminate the risk of cross-run data contamination and memory bloat, the pipeline employs a defensive state-management strategy where local compute environments are strictly temporary: * **Stateless Orchestration:** Every execution operates within an isolated, deterministic `run_id` workspace that is aggressively purged post-run. @@ -67,29 +67,60 @@ The pipeline does not just move data; it actively defends the analytical layer f The pipeline is explicitly engineered to process massive datasets within the rigid memory constraints of serverless compute (Cloud Run). By leveraging the Polars Rust engine (Lazy API & Streaming), the system achieves near-perfect memory density, operating consistently at the physical hardware ceiling. -**GCP Stress-Test Metrics (18 Million Row Snapshot)** +### GCP Stress-Test Metrics (Scaling Efficiency) -![engine-performance-8gb](/assets/screenshots/engine-performance-8gb-2cpu.png) +| 18M Snapshot (8GiB / 2 vCPU) | 36M Snapshot (16GiB / 4 vCPU) | +| :---: | :---: | +| ![engine-performance-8gb](/assets/screenshots/engine-performance-8gb-2cpu.png) | ![engine-performance-16gb](/assets/screenshots/engine-performance-16gb-4cpu.png) | -> The data used for this chart [`benchmarks/`](/assets/benchmarks/polars/18mrows_dataset_stats_log.csv) and the 18m rows dataset can be found her [`data/`](/data/) +> Benchmark data: [`18m_stats_log.csv`](/assets/benchmarks/polars/18mrows_dataset_stats_log.csv) and [`36m_stats_log.csv`](/assets/benchmarks/polars/36mrows_dataset_stats_log.csv) +| Metric | 18M Rows (8GB / 2 vCPU) | 36M Rows (16GB / 4 vCPU) | +| :--- | :--- | :--- | +| **Throughput (Processing)** | ~116,000 Rows / Second | ~220,000 Rows / Second | +| **Total Runtime (Wall-Clock)** | 02m 34s | 02m 43s | +| **Memory Tax (Fixed)** | ~1.5 GiB | ~1.5 GiB | +| **Effective Data Headroom** | ~6.5 GiB | ~14.5 GiB | -| Metric | Value (18M Row Peak Load) | -| :--- | :--- | -| **Throughput (Processing)** | ~116,000 Rows / Second | -| **Total Runtime (Wall-Clock)** | 02m 34s | -| **Compute Provision** | 2 vCPU / 8 GiB | -| **Memory Tax (Fixed)** | ~1.5 GiB (OS / Sandbox / IO Buffers) | -| **Effective Data Headroom** | ~6.5 GiB (Active Transformation) | - -* **Linear Vertical Scaling:** Bumping the Cloud Run provision to 32GiB allows the same architecture to process ~72 Million rows without code changes. -* **Predictable Capacity:** Identifying the 1.5GB "Memory Tax" allows for precise resource governance, ensuring jobs never fail due to unpredictable Signal 9 (OOM) events. +* **Near-Linear Performance Scaling:** Doubling the compute and dataset size results in only a 9-second increase in wall-clock time, effectively doubling the throughput as the Polars engine saturates the additional vCPUs. +* **Predictable Capacity:** Identifying the "Memory Tax" (OS/IO overhead) allows for precise resource governance, ensuring jobs never fail due to unpredictable Signal 9 (OOM) events. * **Zero-Idle Economics:** 100% serverless execution ensures zero billable time during idle periods, significantly reducing the Total Cost of Ownership (TCO) compared to dedicated cluster solutions. -**Measurement Methodology** +### Cost Efficiency & Free-Tier + +The pipeline's processing speed allows for a full analytical rebuild of 36M rows while remaining comfortably within the **GCP Cloud Run Free Tier** (180k vCPU-sec, 360k GiB-sec). This means a small-to-mid-sized organization can run this production-grade pipeline multiple times a day with **zero compute costs.** + +| Compute Provision | Dataset | vCPU-Seconds / Run | GiB-Seconds / Run | Monthly Free-Tier Runs | +| :--- | :--- | :--- | :--- | :--- | +| **8 GiB / 2 vCPU** | ~18m rows | 308 | 1,232 | **~292 Runs / Month** | +| **16 GiB / 4 vCPU** | ~36m rows | 652 | 2,608 | **~138 Runs / Month** | +| **32 GiB / 8 vCPU** | ~72m rows | 1,304 | 5,216 | **~69 Runs / Month** | + +> *Calculations based on verified benchmarks. Even at the highest 32GiB tier, the pipeline can execute a full state rebuild twice daily for $0* + +### Measurement Methodology * **Performance Profiling:** Captured from production telemetry via the pipeline's native `run_duration` metadata, calculating the precise delta between `started_at` and `completed_at` timestamps. -* **Memory Utilization:** Monitored via an integrated [`psutil.virtual_memory().used`](/assets/benchmarks/polars/README.md) profiling implementation to verify the actual resource footprint and confirm the physical ceiling for an 8GiB provision. -* **Throughput Efficiency:** Leverages Polars' streaming evaluation to maintain high throughput and minimize CPU idle time during GCS I/O, providing a significant performance advantage over traditional eager-loading engines. +* **Memory Utilization:** Monitored via an integrated [`psutil.virtual_memory().used`](/assets/benchmarks/polars/README.md) profiling implementation to verify the actual resource footprint and confirm the physical ceiling for 8GiB/16GiB provision. +* **Throughput Efficiency:** Leverages Polars streaming evaluation to maintain high throughput and minimize CPU idle time during GCS I/O, providing a significant performance advantage over traditional eager-loading engines. + +### **Scaling Roadmap: From Serverless to Enterprise Lakehouse** + +To ensure the architecture survives the transition from millions to billions of rows, the pipeline is designed to evolve across three validated scaling paths. This roadmap prioritizes cost-efficiency at low volumes while providing a clear architectural pivot for enterprise-scale workloads. + +#### **Stage 1: Temporal Sharding (Vertical Efficiency)** +* **Strategy:** Refactor the `Assemble` stage to iterate through **yearly batch partitions** while `Semantic` stage to **streams output directly** to a GCS staging location. +* **Publish Evolution:** Moves to a **Partitioned Atomic Swap**. Yearly shards are streamed directly to a staged GCS version prefix. The `Integrity Gate` validates cloud-side completeness before the `latest_version.json` pointer is updated. +* **Trade-off:** **Latency vs. Memory.** Significantly increases total wall-clock time due to repeated I/O cycles, but allows 32GiB instances to process 100M+ rows by isolating join-intensity to specific temporal shards. + +#### **Stage 2: Incremental Delta Architecture (Event-Driven)** +* **Strategy:** Transition from a "Full Rebuild" batch model to a **Stateless Delta Propagation** model, processing only active deltas. +* **Publish Evolution:** Moves to a **Checkpoint-based Commit**. Folder-based versioning is replaced by an atomic merge into the Gold layer. The "Pointer" evolves into a metadata watermark signifying data freshness to downstream consumers. +* **Trade-off:** **Simplicity vs. Scale.** Eliminates memory constraints and reduces runtime costs, but sacrifices easy "point-in-time" folder recovery. Requires "Last-Mile" deduplication logic (e.g., SQL Views) for downstream consumers. + +#### **Stage 3: BigQuery "Engine-as-a-Service" (The Enterprise Pivot)** +* **Strategy:** Offload the `Assemble` and `Semantic` compute layers entirely to **BigQuery (ELT Pattern)**. +* **Publish Evolution:** Moves to a **Atomic View Redirection**. The Python "Gatekeeper" builds semantics in a staging dataset and runs SQL-driven integrity checks. Publication is achieved by an atomic swap of a BigQuery Authorized View, replacing the file-based pointer system. +* **Trade-off:** **Cost vs. Capability.** Provides an infinite scaling ceiling and removes all local infrastructure bounds, but introduces higher cost-per-query overhead and requires transitioning from local Parquet files to managed cloud storage. ## Observability & Alerting diff --git a/assets/benchmarks/polars/36mrows_dataset_stats_log.csv b/assets/benchmarks/polars/36mrows_dataset_stats_log.csv new file mode 100644 index 0000000..8f0913d --- /dev/null +++ b/assets/benchmarks/polars/36mrows_dataset_stats_log.csv @@ -0,0 +1,165 @@ +view,timestamp,logger,memory,unit +DEFAULT,2026-04-09T21:43:40.178678Z,METRIC_MEM:,2812.83,MB +DEFAULT,2026-04-09T21:43:41.178984Z,METRIC_MEM:,2868.2,MB +DEFAULT,2026-04-09T21:43:42.179341Z,METRIC_MEM:,2931.05,MB +DEFAULT,2026-04-09T21:43:43.179687Z,METRIC_MEM:,2994.48,MB +DEFAULT,2026-04-09T21:43:44.179962Z,METRIC_MEM:,3049.48,MB +DEFAULT,2026-04-09T21:43:45.180339Z,METRIC_MEM:,3111.88,MB +DEFAULT,2026-04-09T21:43:46.180721Z,METRIC_MEM:,3175.25,MB +DEFAULT,2026-04-09T21:43:47.181107Z,METRIC_MEM:,3246.77,MB +DEFAULT,2026-04-09T21:43:48.181558Z,METRIC_MEM:,3309.72,MB +DEFAULT,2026-04-09T21:43:49.181470Z,METRIC_MEM:,3364.92,MB +DEFAULT,2026-04-09T21:43:50.181366Z,METRIC_MEM:,3427.7,MB +DEFAULT,2026-04-09T21:43:51.181351Z,METRIC_MEM:,3483.11,MB +DEFAULT,2026-04-09T21:43:52.181694Z,METRIC_MEM:,3554.01,MB +DEFAULT,2026-04-09T21:43:53.182108Z,METRIC_MEM:,3609.08,MB +DEFAULT,2026-04-09T21:43:54.182588Z,METRIC_MEM:,3672.5,MB +DEFAULT,2026-04-09T21:43:55.182979Z,METRIC_MEM:,3726.8,MB +DEFAULT,2026-04-09T21:43:56.183486Z,METRIC_MEM:,3790.45,MB +DEFAULT,2026-04-09T21:43:57.183846Z,METRIC_MEM:,3854.01,MB +DEFAULT,2026-04-09T21:43:58.184338Z,METRIC_MEM:,3916.96,MB +DEFAULT,2026-04-09T21:43:59.184674Z,METRIC_MEM:,3980.06,MB +DEFAULT,2026-04-09T21:44:00.185069Z,METRIC_MEM:,4043.16,MB +DEFAULT,2026-04-09T21:44:01.185349Z,METRIC_MEM:,4098.34,MB +DEFAULT,2026-04-09T21:44:02.185794Z,METRIC_MEM:,4161.24,MB +DEFAULT,2026-04-09T21:44:03.190091Z,METRIC_MEM:,4224.66,MB +DEFAULT,2026-04-09T21:44:04.186657Z,METRIC_MEM:,4287.29,MB +DEFAULT,2026-04-09T21:44:05.187081Z,METRIC_MEM:,4342.34,MB +DEFAULT,2026-04-09T21:44:06.187557Z,METRIC_MEM:,4405.66,MB +DEFAULT,2026-04-09T21:44:07.187831Z,METRIC_MEM:,4467.94,MB +DEFAULT,2026-04-09T21:44:08.188119Z,METRIC_MEM:,4523.98,MB +DEFAULT,2026-04-09T21:44:09.188007Z,METRIC_MEM:,4586.83,MB +DEFAULT,2026-04-09T21:44:10.187799Z,METRIC_MEM:,4649.84,MB +DEFAULT,2026-04-09T21:44:11.187840Z,METRIC_MEM:,4705.04,MB +DEFAULT,2026-04-09T21:44:12.187947Z,METRIC_MEM:,4768.01,MB +DEFAULT,2026-04-09T21:44:13.188380Z,METRIC_MEM:,4831.27,MB +DEFAULT,2026-04-09T21:44:14.189078Z,METRIC_MEM:,4894.65,MB +DEFAULT,2026-04-09T21:44:15.189546Z,METRIC_MEM:,4958.21,MB +DEFAULT,2026-04-09T21:44:16.189898Z,METRIC_MEM:,5020.09,MB +DEFAULT,2026-04-09T21:44:17.190829Z,METRIC_MEM:,5075.54,MB +DEFAULT,2026-04-09T21:44:18.191150Z,METRIC_MEM:,5138.73,MB +DEFAULT,2026-04-09T21:44:19.191518Z,METRIC_MEM:,5182.56,MB +DEFAULT,2026-04-09T21:44:20.197169Z,METRIC_MEM:,5623.91,MB +DEFAULT,2026-04-09T21:44:21.197587Z,METRIC_MEM:,6033.96,MB +DEFAULT,2026-04-09T21:44:22.201299Z,METRIC_MEM:,6488.75,MB +DEFAULT,2026-04-09T21:44:23.198486Z,METRIC_MEM:,6785.55,MB +DEFAULT,2026-04-09T21:44:24.198818Z,METRIC_MEM:,6895.43,MB +DEFAULT,2026-04-09T21:44:25.199286Z,METRIC_MEM:,7071.41,MB +DEFAULT,2026-04-09T21:44:26.199620Z,METRIC_MEM:,7220.95,MB +DEFAULT,2026-04-09T21:44:27.200130Z,METRIC_MEM:,7458.04,MB +DEFAULT,2026-04-09T21:44:28.200477Z,METRIC_MEM:,7530.45,MB +DEFAULT,2026-04-09T21:44:29.200378Z,METRIC_MEM:,7652.43,MB +DEFAULT,2026-04-09T21:44:30.200421Z,METRIC_MEM:,7838.43,MB +DEFAULT,2026-04-09T21:44:31.200275Z,METRIC_MEM:,8050.17,MB +DEFAULT,2026-04-09T21:44:32.200504Z,METRIC_MEM:,8270.6,MB +DEFAULT,2026-04-09T21:44:33.200943Z,METRIC_MEM:,8439.16,MB +DEFAULT,2026-04-09T21:44:34.201362Z,METRIC_MEM:,8777.46,MB +DEFAULT,2026-04-09T21:44:35.201918Z,METRIC_MEM:,8934.88,MB +DEFAULT,2026-04-09T21:44:36.202212Z,METRIC_MEM:,8910.29,MB +DEFAULT,2026-04-09T21:44:37.202609Z,METRIC_MEM:,7358.61,MB +DEFAULT,2026-04-09T21:44:38.204970Z,METRIC_MEM:,7661.91,MB +DEFAULT,2026-04-09T21:44:39.209060Z,METRIC_MEM:,7835.91,MB +DEFAULT,2026-04-09T21:44:40.209130Z,METRIC_MEM:,7907.15,MB +DEFAULT,2026-04-09T21:44:41.204203Z,METRIC_MEM:,8154.41,MB +DEFAULT,2026-04-09T21:44:42.209313Z,METRIC_MEM:,8405.19,MB +DEFAULT,2026-04-09T21:44:43.209838Z,METRIC_MEM:,7903.66,MB +DEFAULT,2026-04-09T21:44:44.210360Z,METRIC_MEM:,8222.37,MB +DEFAULT,2026-04-09T21:44:45.210692Z,METRIC_MEM:,8588.28,MB +DEFAULT,2026-04-09T21:44:46.213686Z,METRIC_MEM:,9032.81,MB +DEFAULT,2026-04-09T21:44:47.211559Z,METRIC_MEM:,9398.2,MB +DEFAULT,2026-04-09T21:44:48.211871Z,METRIC_MEM:,9777.2,MB +DEFAULT,2026-04-09T21:44:49.211810Z,METRIC_MEM:,10187.28,MB +DEFAULT,2026-04-09T21:44:50.211790Z,METRIC_MEM:,10544.79,MB +DEFAULT,2026-04-09T21:44:51.211702Z,METRIC_MEM:,10918.25,MB +DEFAULT,2026-04-09T21:44:52.226469Z,METRIC_MEM:,11645.3,MB +DEFAULT,2026-04-09T21:44:53.226946Z,METRIC_MEM:,12692.11,MB +DEFAULT,2026-04-09T21:44:54.227541Z,METRIC_MEM:,13956.78,MB +DEFAULT,2026-04-09T21:44:55.228051Z,METRIC_MEM:,13874.34,MB +DEFAULT,2026-04-09T21:44:56.228511Z,METRIC_MEM:,12523.92,MB +DEFAULT,2026-04-09T21:44:57.228953Z,METRIC_MEM:,12711.82,MB +DEFAULT,2026-04-09T21:44:58.229409Z,METRIC_MEM:,12737.88,MB +DEFAULT,2026-04-09T21:44:59.229860Z,METRIC_MEM:,12783.1,MB +DEFAULT,2026-04-09T21:45:00.230271Z,METRIC_MEM:,12872.66,MB +DEFAULT,2026-04-09T21:45:01.230679Z,METRIC_MEM:,12877.08,MB +DEFAULT,2026-04-09T21:45:02.231040Z,METRIC_MEM:,12811.36,MB +DEFAULT,2026-04-09T21:45:03.231455Z,METRIC_MEM:,12722.5,MB +DEFAULT,2026-04-09T21:45:04.237488Z,METRIC_MEM:,12728.71,MB +DEFAULT,2026-04-09T21:45:05.232293Z,METRIC_MEM:,12771.59,MB +DEFAULT,2026-04-09T21:45:06.232823Z,METRIC_MEM:,12869.63,MB +DEFAULT,2026-04-09T21:45:07.237757Z,METRIC_MEM:,12988.25,MB +DEFAULT,2026-04-09T21:45:08.233418Z,METRIC_MEM:,13095.33,MB +DEFAULT,2026-04-09T21:45:09.233396Z,METRIC_MEM:,13201.29,MB +DEFAULT,2026-04-09T21:45:10.236817Z,METRIC_MEM:,13288.67,MB +DEFAULT,2026-04-09T21:45:11.237004Z,METRIC_MEM:,13399.59,MB +DEFAULT,2026-04-09T21:45:12.237337Z,METRIC_MEM:,13502.8,MB +DEFAULT,2026-04-09T21:45:13.237766Z,METRIC_MEM:,13616.84,MB +DEFAULT,2026-04-09T21:45:14.240824Z,METRIC_MEM:,13711.62,MB +DEFAULT,2026-04-09T21:45:15.241266Z,METRIC_MEM:,13820.56,MB +DEFAULT,2026-04-09T21:45:16.241645Z,METRIC_MEM:,13937.32,MB +DEFAULT,2026-04-09T21:45:17.242003Z,METRIC_MEM:,14028.76,MB +DEFAULT,2026-04-09T21:45:18.242372Z,METRIC_MEM:,14126.38,MB +DEFAULT,2026-04-09T21:45:19.245015Z,METRIC_MEM:,14220.46,MB +DEFAULT,2026-04-09T21:45:20.243002Z,METRIC_MEM:,14323.86,MB +DEFAULT,2026-04-09T21:45:21.245997Z,METRIC_MEM:,14424.54,MB +DEFAULT,2026-04-09T21:45:22.247559Z,METRIC_MEM:,7915.41,MB +DEFAULT,2026-04-09T21:45:23.246817Z,METRIC_MEM:,8489.96,MB +DEFAULT,2026-04-09T21:45:24.247168Z,METRIC_MEM:,8911.7,MB +DEFAULT,2026-04-09T21:45:25.249494Z,METRIC_MEM:,9354.19,MB +DEFAULT,2026-04-09T21:45:26.249975Z,METRIC_MEM:,9780.51,MB +DEFAULT,2026-04-09T21:45:27.250390Z,METRIC_MEM:,10217.01,MB +DEFAULT,2026-04-09T21:45:28.250624Z,METRIC_MEM:,10676.55,MB +DEFAULT,2026-04-09T21:45:29.250558Z,METRIC_MEM:,11116.09,MB +DEFAULT,2026-04-09T21:45:30.250662Z,METRIC_MEM:,11445.64,MB +DEFAULT,2026-04-09T21:45:31.250691Z,METRIC_MEM:,11473.34,MB +DEFAULT,2026-04-09T21:45:32.256485Z,METRIC_MEM:,10635.77,MB +DEFAULT,2026-04-09T21:45:33.256709Z,METRIC_MEM:,9477.71,MB +DEFAULT,2026-04-09T21:45:34.256519Z,METRIC_MEM:,8338.32,MB +DEFAULT,2026-04-09T21:45:35.260632Z,METRIC_MEM:,8521.45,MB +DEFAULT,2026-04-09T21:45:36.260800Z,METRIC_MEM:,8241.01,MB +DEFAULT,2026-04-09T21:45:37.260872Z,METRIC_MEM:,8706.99,MB +DEFAULT,2026-04-09T21:45:38.258259Z,METRIC_MEM:,8889.8,MB +DEFAULT,2026-04-09T21:45:39.258646Z,METRIC_MEM:,9144.98,MB +DEFAULT,2026-04-09T21:45:40.261191Z,METRIC_MEM:,9366.04,MB +DEFAULT,2026-04-09T21:45:41.259351Z,METRIC_MEM:,9586.33,MB +DEFAULT,2026-04-09T21:45:42.259706Z,METRIC_MEM:,9807.19,MB +DEFAULT,2026-04-09T21:45:43.260068Z,METRIC_MEM:,10050.3,MB +DEFAULT,2026-04-09T21:45:44.260485Z,METRIC_MEM:,10235.79,MB +DEFAULT,2026-04-09T21:45:45.260779Z,METRIC_MEM:,10445.2,MB +DEFAULT,2026-04-09T21:45:46.261136Z,METRIC_MEM:,10634.31,MB +DEFAULT,2026-04-09T21:45:47.261490Z,METRIC_MEM:,10886.51,MB +DEFAULT,2026-04-09T21:45:48.265978Z,METRIC_MEM:,11324.49,MB +DEFAULT,2026-04-09T21:45:49.266059Z,METRIC_MEM:,12017.47,MB +DEFAULT,2026-04-09T21:45:50.266025Z,METRIC_MEM:,11914.96,MB +DEFAULT,2026-04-09T21:45:51.266082Z,METRIC_MEM:,11647.12,MB +DEFAULT,2026-04-09T21:45:52.272392Z,METRIC_MEM:,11648.13,MB +DEFAULT,2026-04-09T21:45:53.269008Z,METRIC_MEM:,9406.49,MB +DEFAULT,2026-04-09T21:45:54.269512Z,METRIC_MEM:,9458.97,MB +DEFAULT,2026-04-09T21:45:55.269918Z,METRIC_MEM:,9560.43,MB +DEFAULT,2026-04-09T21:45:56.270231Z,METRIC_MEM:,9660.94,MB +DEFAULT,2026-04-09T21:45:57.270653Z,METRIC_MEM:,9766.71,MB +DEFAULT,2026-04-09T21:45:58.271050Z,METRIC_MEM:,9849.17,MB +DEFAULT,2026-04-09T21:45:59.303917Z,METRIC_MEM:,9507.84,MB +DEFAULT,2026-04-09T21:46:00.313037Z,METRIC_MEM:,9122.2,MB +DEFAULT,2026-04-09T21:46:01.309538Z,METRIC_MEM:,9528.65,MB +DEFAULT,2026-04-09T21:46:02.310012Z,METRIC_MEM:,9809.38,MB +DEFAULT,2026-04-09T21:46:03.310421Z,METRIC_MEM:,10074.81,MB +DEFAULT,2026-04-09T21:46:04.310900Z,METRIC_MEM:,10459.12,MB +DEFAULT,2026-04-09T21:46:05.311218Z,METRIC_MEM:,10760.92,MB +DEFAULT,2026-04-09T21:46:06.311655Z,METRIC_MEM:,11069.43,MB +DEFAULT,2026-04-09T21:46:07.312043Z,METRIC_MEM:,11441.79,MB +DEFAULT,2026-04-09T21:46:08.312403Z,METRIC_MEM:,11540.78,MB +DEFAULT,2026-04-09T21:46:09.312344Z,METRIC_MEM:,11546.04,MB +DEFAULT,2026-04-09T21:46:10.312541Z,METRIC_MEM:,10711.32,MB +DEFAULT,2026-04-09T21:46:11.312428Z,METRIC_MEM:,9515.68,MB +DEFAULT,2026-04-09T21:46:12.312786Z,METRIC_MEM:,8679.4,MB +DEFAULT,2026-04-09T21:46:13.313152Z,METRIC_MEM:,8724.61,MB +DEFAULT,2026-04-09T21:46:14.313501Z,METRIC_MEM:,8833.48,MB +DEFAULT,2026-04-09T21:46:15.313877Z,METRIC_MEM:,8843.11,MB +DEFAULT,2026-04-09T21:46:16.314200Z,METRIC_MEM:,8844.81,MB +DEFAULT,2026-04-09T21:46:17.315061Z,METRIC_MEM:,8844.75,MB +DEFAULT,2026-04-09T21:46:18.315452Z,METRIC_MEM:,8845.94,MB +DEFAULT,2026-04-09T21:46:19.315830Z,METRIC_MEM:,8850.8,MB +DEFAULT,2026-04-09T21:46:20.316259Z,METRIC_MEM:,8860.11,MB +DEFAULT,2026-04-09T21:46:21.316618Z,METRIC_MEM:,8866.31,MB +DEFAULT,2026-04-09T21:46:22.316957Z,METRIC_MEM:,8858.84,MB +DEFAULT,2026-04-09T21:46:23.317325Z,METRIC_MEM:,8746.22,MB \ No newline at end of file diff --git a/assets/benchmarks/polars/README.md b/assets/benchmarks/polars/README.md index 26429cd..2de9c2f 100644 --- a/assets/benchmarks/polars/README.md +++ b/assets/benchmarks/polars/README.md @@ -1,6 +1,6 @@ # Measurement Methodology -This section provides proof that the memory metrics in the root README were captured from a real Cloud Run execution of the 18M row dataset. +This section details the methodology used to capture the memory metrics in the [`GCP Stress-Test Metrics (Scaling Efficiency)`](/README.md#gcp-stress-test-metrics-scaling-efficiency) The telemetry logger below was added **temporarily** to the orchestrator for a specific benchmarking run. This code was pushed directly to the Cloud Artifact Registry as an experimental image tag (`mem-record`) and is not part of the permanent git repository history. @@ -28,6 +28,25 @@ finally: stop_event.set() logger_thread.join() ``` +Since `psutil` requires C-extensions to compile, the **Dockerfile** was modified to include the necessary build tools and the package itself. This allowed for benchmarking without altering the project's permanent `requirements.txt`. + +```docker +FROM python:3.11-slim +ENV # Environments... + +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + gcc \ + python3-dev && \ + pip install --no-cache-dir -r requirements.txt psutil && \ + apt-get purge -y --auto-remove gcc python3-dev && \ + rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +# the rest of docker code... + +``` ### Data Collection * **Source:** Real-time stdout logs from the Cloud Run job execution. diff --git a/assets/diagrams/00-operations-anlytics-pipeline-diagram.png b/assets/diagrams/00-operations-anlytics-pipeline-diagram.png deleted file mode 100644 index 8c3b5b4..0000000 Binary files a/assets/diagrams/00-operations-anlytics-pipeline-diagram.png and /dev/null differ diff --git a/assets/diagrams/01-pipeline-orchestration-diagram.png b/assets/diagrams/01-pipeline-orchestration-diagram.png new file mode 100644 index 0000000..26f3237 Binary files /dev/null and b/assets/diagrams/01-pipeline-orchestration-diagram.png differ diff --git a/assets/diagrams/01-pipeline-runner-diagram.png b/assets/diagrams/01-pipeline-runner-diagram.png deleted file mode 100644 index 5cf5608..0000000 Binary files a/assets/diagrams/01-pipeline-runner-diagram.png and /dev/null differ diff --git a/assets/diagrams/02-validation-stage-diagram.png b/assets/diagrams/02-validation-stage-diagram.png index 6a8d719..308cc8d 100644 Binary files a/assets/diagrams/02-validation-stage-diagram.png and b/assets/diagrams/02-validation-stage-diagram.png differ diff --git a/assets/diagrams/03-contract-stage-diagram.png b/assets/diagrams/03-contract-stage-diagram.png index 2b0baaa..64a2b90 100644 Binary files a/assets/diagrams/03-contract-stage-diagram.png and b/assets/diagrams/03-contract-stage-diagram.png differ diff --git a/assets/diagrams/04-assemble-stage-diagram.png b/assets/diagrams/04-assemble-stage-diagram.png index 514de79..9199465 100644 Binary files a/assets/diagrams/04-assemble-stage-diagram.png and b/assets/diagrams/04-assemble-stage-diagram.png differ diff --git a/assets/diagrams/05-semantic-stage-diagram.png b/assets/diagrams/05-semantic-stage-diagram.png index 72bfa5e..9c0fa0a 100644 Binary files a/assets/diagrams/05-semantic-stage-diagram.png and b/assets/diagrams/05-semantic-stage-diagram.png differ diff --git a/assets/screenshots/engine-performance-16gb-4cpu.png b/assets/screenshots/engine-performance-16gb-4cpu.png new file mode 100644 index 0000000..dbf638b Binary files /dev/null and b/assets/screenshots/engine-performance-16gb-4cpu.png differ diff --git a/data/README.md b/data/README.md index 001cebb..7ad4e8c 100644 --- a/data/README.md +++ b/data/README.md @@ -3,12 +3,27 @@ This directory serves as the local state provider for the pipeline when executing in a non-cloud environment. It mimics the structure of the Google Cloud Storage (GCS) buckets, allowing for high-fidelity local simulation and performance benchmarking. ## Synthetic Dataset -To replicate the high-volume environment described in the [Performance & Scale](/README#performance-&-scale) section, you can download the 15M-row synthetic dataset here: [**Kaggle Dataset Link**](https://15m-row-synthetic-dataset.com) +To replicate the high-volume environment described in the [GCP Stress-Test Metrics (Scaling Efficiency)](/README.md#gcp-stress-test-metrics-scaling-efficiency) section, you can download the 36M-row synthetic dataset here: [**Kaggle Dataset Link**](https://www.kaggle.com/datasets/melvidabryan/e-commerce-synthetic-dataset) -### Dataset Structure -The downloaded archive contains the following partitions: -* **`raw/`**: Represents the **Bronze Layer**. Contains daily delta CSV snapshots. The `RunContext` class expects this directory to be populated when running locally. (~4.5GB total) -* **`contracted/`**: Represents the **Silver Layer**. Contains accumulated, schema-enforced Parquet files. This acts as the authoritative state for Gold-layer assembly. (~1.55GB total) +>*Note: This upload contains the **Contracted Version** of the dataset. The original "Raw" state—totaling approximately 24GB of unrefined CSVs was omitted to prioritize transfer efficiency.* + +### File Structure & Purpose +The dataset is divided into two primary directories to facilitate different stages of pipeline testing: + +| Directory | Files | Description | +| :--- | :--- | :--- | +| `contracted/` | 110 files | **Production-Scale Test:** The full 36M row dataset (~4.04 GB) formatted to strict enterprise schema requirements. | +| `raw/` | 5 files | **Delta Sample (Validation):** Small-scale samples (~10k rows each) representing **daily incoming deltas**. These files are intentionally "noisy" to exhibit the full range of injected data quality errors. | + +### Included Tables + +The dataset provides a complete relational snapshot of an e-commerce ecosystem: + + * **`df_orders`**: Fact table with lifecycle timestamps (Purchase, Approved, Delivered, Estimated). + * **`df_order_items`**: Bridge table linking orders to products and sellers. + * **`df_payments`**: Transactional data including sequential payment tracking. + * **`df_products`**: Dimensions including weight, dimensions, and fragility indexes. + * **`df_customers`**: Geographic data and business segments (D2C, SMB, Enterprise). ## Local Execution Setup 1. Extract the downloaded dataset archive. diff --git a/data_pipeline/assembly/assembly_executor.py b/data_pipeline/assembly/assembly_executor.py index 8f0af0b..e088b60 100644 --- a/data_pipeline/assembly/assembly_executor.py +++ b/data_pipeline/assembly/assembly_executor.py @@ -3,6 +3,8 @@ # ============================================================================= import gc +import ctypes +import platform from typing import Dict from data_pipeline.shared.run_context import RunContext from data_pipeline.shared.loader_exporter import load_historical_table, export_file @@ -22,6 +24,28 @@ ) +def force_gc(): + """ + Force the Linux allocator to release memory back to the OS. + + Workflow: + 1. Purge: Triggers Python's global garbage collection. + 2. Purge: Invokes libc malloc_trim if on a Linux system to reclaim heap memory. + + Operational Guarantees: + - Memory Safety: Minimizes memory fragmentation and peak RSS. + + Side Effects: + - Instructs the OS to reclaim unused memory from the process heap. + """ + gc.collect() + if platform.system() == "Linux": + try: + ctypes.CDLL("libc.so.6").malloc_trim(0) + except Exception as e: + print(f"[WARNING] Force gc and malloc_trim (release memory) failed: {e}") + + # ------------------------------------------------------------ # EVENT ASSEMBLY ORCHESTRATION # ------------------------------------------------------------ @@ -31,23 +55,24 @@ def orchestrate_event_assembly(run_context: RunContext, report: Dict) -> bool: """ Coordinates the linear transformation pipeline for order-grain events. - Execution Flow: - 1. Load: Fetch 'orders', 'items', and 'payments'. - 2. Merge: Join into a single row-per-order grain. - 3. Derive: Calculate analytical time-deltas and lineage. - 4. Freeze: Enforce semantic schema and dtypes. - 5. Export: Persist to the assembly zone. + Workflow: + 1. Hydrate: Fetches core event tables (Orders, Items, Payments). + 2. Delegate: Joins and aggregates into a single row-per-order grain. + 3. Delegate: Calculates analytical time-deltas and lineage. + 4. Validate: Enforces semantic schema and dtypes. + 5. Promote: Persists the unified event table to the assembly zone. + 6. Purge: Triggers explicit memory reclamation via force_gc(). - Memory Management: - - Explicitly deletes intermediate DataFrames and triggers gc.collect() - after export to minimize peak memory footprint. + Operational Guarantees: + - Grain Integrity: Strictly enforces one row per order_id. + - Fail-Fast: Halts immediately if any sub-task fails. - Failures: - - Returns False immediately (fail-fast) if any sub-task wrapper fails. + Side Effects: + - Persists 'assembled_events' Parquet file. + - Mutates 'report' with event assembly metrics. Failure Behavior: - - Traps unexpected exceptions during the assembly pipeline via a try-except block. - - Logs errors to the report and ensures memory reclamation via a finally block. + - Returns False and logs errors to the report upon any step failure. """ report["assembled_events"] = { @@ -78,6 +103,7 @@ def orchestrate_event_assembly(run_context: RunContext, report: Dict) -> bool: return False del tables + force_gc() ok, lf_derived = task_wrapper( report=report, @@ -85,7 +111,6 @@ def orchestrate_event_assembly(run_context: RunContext, report: Dict) -> bool: status_tracker=tracker, func=derive_fields, lf=lf_merged, - run_id=run_context.run_id, ) if not ok: return False @@ -118,7 +143,7 @@ def orchestrate_event_assembly(run_context: RunContext, report: Dict) -> bool: log_error(f"Unexpected error processing event assembly: {e}", report) finally: - gc.collect() + force_gc() return True @@ -132,22 +157,20 @@ def orchestrate_dimension_refs(run_context: RunContext, report: Dict) -> bool: """ Iteratively extracts and exports dimension reference tables. - Contract: - - Processes every table defined in the DIMENSION_REFERENCES registry. - - Performs one-to-one extraction from Silver (contracted) to Gold (assembled). + Workflow: + 1. Hydrate: Loads historical source table from the contracted zone. + 2. Delegate: Extracts unique dimension keys and required columns. + 3. Promote: Persists the reference table to the assembly zone. + 4. Purge: Clears intermediate dataframes and triggers garbage collection. - Invariants: - - Fail-Fast: If a single dimension fails to load or validate, the - entire orchestration terminates and returns False. + Operational Guarantees: + - Fail-Fast: Termination of the entire loop if any dimension extraction fails. Side Effects: - - Performs per-iteration memory cleanup (del/gc.collect) to prevent - accumulation of large dimension frames. + - Persists multiple dimension reference Parquet files. Failure Behavior: - - Traps FileNotFoundError and general Exceptions during each iteration. - - Logs specific table-level failures and terminates the orchestration to - maintain consistency across dimension snapshots. + - Logs table-specific errors and returns False. """ for table, config in DIMENSION_REFERENCES.items(): @@ -177,7 +200,6 @@ def orchestrate_dimension_refs(run_context: RunContext, report: Dict) -> bool: status_tracker=tracker, func=dimension_references, lf=lf_raw, - table_name=table, primary_key=primary_key, req_column=require_col, ) @@ -229,29 +251,19 @@ def assemble_events(run_context: RunContext) -> dict: """ Main entry point for the Silver-to-Gold Assembly stage. - This component coordinates the transformation of normalized relational - tables into contract-compliant analytical datasets. - - Workflow I: Event Assembly (Order Grain) - 1. Load: Fetches core event tables (Orders, Items, Payments). - 2. Merge: Join datasets with strict 1:1 order_id cardinality enforcement. - 3. Derive: Calculate temporal metrics (lead times) and lineage attributes. - 4. Freeze: Project final schema and enforce strictly defined dtypes. - 5. Export: Persist the unified event table to the Gold zone. - - Workflow II: Dimension Reference Extraction - 1. Iterate: Process Customer and Product registries. - 2. Extract: Select required columns and deduplicate by primary key. - 3. Export: Persist independent reference tables to the Gold zone. + Workflow: + 1. Delegate: Triggers Workflow I (Event Assembly). + 2. Delegate: Triggers Workflow II (Dimension Reference Extraction). Operational Guarantees: - - Grain: Strictly one row per 'order_id' for the event dataset. - - Failure: Fail-fast; any task failure halts the stage and returns a 'failed' status. - - Context: Relies on 'run_context' for deterministic path resolution. + - Sequential Dependency: Dimension references only process after event assembly attempts. + - Stage Atomicity: Any failure in either workflow marks the stage as 'failed'. + + Side Effects: + - Orchestrates the creation of the entire Gold/Assembled layer. Failure Behavior: - - Cascades orchestration failures: If either Workflow I or Workflow II returns - False, the stage status is set to 'failed' and the report is returned immediately. + - Returns a report with status='failed' if either orchestration branch returns False. Returns: dict: A stage report containing 'status' and step-level execution logs. diff --git a/data_pipeline/assembly/assembly_logic.py b/data_pipeline/assembly/assembly_logic.py index 473d6de..08303e2 100644 --- a/data_pipeline/assembly/assembly_logic.py +++ b/data_pipeline/assembly/assembly_logic.py @@ -52,19 +52,21 @@ def merge_data(tables: Dict) -> pl.LazyFrame: Contract: - Inner joins 'df_orders' with 'df_order_items' to ensure analytical relevance. - Left joins 'df_payments' to capture financial metadata. + - Subtractive Filtering: Discards orders lacking corresponding item records. Optimization Logic: - Hash-Join: Maps high-cardinality UUIDs to UInt64 hashes to reduce Join Hash Table memory. - - Pre-aggregation: Sums payments and deduplicates items BEFORE joining to guarantee - a strict 1:1 grain and prevent Cartesian row explosions. + - Pre-aggregation: Sums payments and deduplicates items BEFORE joining to guarantee a strict 1:1 grain and prevent Cartesian row explosions. - Early Projection: Selects required columns at the source to minimize join width. Invariants: - Dataset Grain: Strictly one row per 'order_id'. - - Referential Integrity: Orders lacking corresponding item records are discarded. + + Outputs: + - Merged LazyFrame containing joined order, item, and payment data. Failures: - - Potential for cardinality explosion if pre-aggregation logic is bypassed. + - [Structural] Crashes if required tables ('df_orders', 'df_order_items') are missing from input Dict. """ pl.enable_string_cache() @@ -119,7 +121,7 @@ def merge_data(tables: Dict) -> pl.LazyFrame: return df_merged -def derive_fields(lf: pl.LazyFrame, run_id: str) -> pl.LazyFrame: +def derive_fields(lf: pl.LazyFrame) -> pl.LazyFrame: """ Analytical enrichment and temporal metric derivation layer. @@ -129,16 +131,17 @@ def derive_fields(lf: pl.LazyFrame, run_id: str) -> pl.LazyFrame: Optimization Logic: - Memory-Efficient Casting: Forces durations and years to Int16 (2 bytes) to minimize row width. - - Categorical Compression: Casts repetitive strings (year_week, run_id) to Categorical. + - Categorical Compression: Casts repetitive strings (year_week) to Categorical. - Early Drop: Purges non-contract columns (e.g., estimated_delivery) immediately after use. Invariants: - - Lineage: Every row is stamped with the current 'run_id' for traceability. - Temporal Grain: Metrics (lead_time, lags, delays) are represented as integer days. + Outputs: + - Enriched LazyFrame with derived analytical fields. + Failures: - - Raises ComputeError (from Polars) if date subtraction logic encounters nulls - in non-nullable date columns. + - [Structural] Crashes if input LazyFrame lacks required timestamp columns. """ lf_derived = lf.with_columns( @@ -159,12 +162,9 @@ def derive_fields(lf: pl.LazyFrame, run_id: str) -> pl.LazyFrame: .dt.total_days() .cast(pl.Int16), order_date=pl.col("order_purchase_timestamp").dt.date(), - order_year=pl.col("order_purchase_timestamp").dt.year(), - order_week_iso=pl.col("order_purchase_timestamp").dt.strftime("W%V"), order_year_week=pl.col("order_purchase_timestamp") .dt.strftime("%G-W%V") .cast(pl.Categorical), - run_id=pl.lit(run_id).cast(pl.Categorical), ).drop("order_estimated_delivery_date") return lf_derived @@ -179,14 +179,16 @@ def freeze_schema(lf: pl.LazyFrame) -> pl.LazyFrame: - Type Enforcement: Casts remaining columns to the formats defined in 'ASSEMBLE_DTYPES'. Optimization Logic: - - Zero-Copy Streaming: Omits sorting to maintain the non-blocking execution plan - required for efficient sink_parquet() operations in memory-constrained environments. + - Zero-Copy Streaming: Omits sorting to maintain the non-blocking execution plan required for efficient sink_parquet() operations. Invariants: - Structure: Final execution plan exactly matches the modeling configuration spec. + Outputs: + - Schema-compliant LazyFrame ready for persistence. + Failures: - - Raises RuntimeError if the input frame lacks columns required by 'ASSEMBLE_SCHEMA'. + - [Structural] Raises RuntimeError if input frame lacks columns required by 'ASSEMBLE_SCHEMA'. """ current_columns = lf.collect_schema().names() @@ -206,7 +208,6 @@ def freeze_schema(lf: pl.LazyFrame) -> pl.LazyFrame: def dimension_references( lf: pl.LazyFrame, - table_name: str, primary_key: list[str], req_column: list[str], ) -> pl.LazyFrame: @@ -214,25 +215,19 @@ def dimension_references( Extracts a unique reference dataset from a historical source. Contract: - - Filters input to specified 'req_column' set. - - Enforces uniqueness based on 'primary_key'. + - Subtractive Filtering: Selects specified 'req_column' set and enforces uniqueness. Invariants: - Dataset Grain: Strictly one row per 'primary_key'. - - Sorting: Inherits source order (deterministic behavior not guaranteed). + + Outputs: + - Unique reference LazyFrame. Failures: - - Raises RuntimeError if 'primary_key' duplicates persist after extraction. + - [Structural] Crashes if input LazyFrame lacks 'primary_key' or 'req_column'. """ - lf_dim = lf.select(req_column).unique(subset=primary_key).sort(primary_key) - - if ( - lf_dim.select(pl.col(primary_key).is_duplicated().any()) - .collect(engine="streaming") - .item() - ): - raise RuntimeError(f"Duplicated {primary_key} detected in {table_name}") + lf_dim = lf.select(req_column).unique(subset=primary_key) return lf_dim @@ -254,23 +249,20 @@ def task_wrapper( Unified task runner that handles logging, reporting, and execution. Workflow: - 1. Dispatch: Executes the logic function 'func' with provided arguments. + 1. Delegate: Executes the logic function 'func' with provided arguments. 2. Monitor: Traps any exceptions raised during execution. - 3. Report: Updates 'status_tracker' and logs success/failure to the report. + 3. Promote: Updates 'status_tracker' and logs success/failure to the report. Operational Guarantees: - - Fail-Safe: Traps all exceptions to prevent pipeline termination, converting - errors into 'failed' status reports. - - Integrity: Updates the 'status' field in the report for the given step to - either True (success) or False (failed) regardless of outcome. + - Fail-Safe: Traps all exceptions to prevent pipeline termination. + - Integrity: Updates step-level status regardless of outcome. Side Effects: - Mutates 'report' and 'status_tracker' dictionaries in-place. - Prints informational/error logs to stdout. Failure Behavior: - - Catches all Exceptions, logs the traceback via 'log_error', and returns - (False, None) to signal a controlled sub-task failure. + - Returns (False, None) upon any exception. Returns: tuple[bool, Any]: (Success Boolean, Result Data or None). @@ -303,11 +295,13 @@ def load_event_table(run_context: RunContext, report: Dict) -> Any: Batch-loads core event tables required for assembly. Contract: - - Iterates through the global EVENT_TABLES registry. - - Loads Parquet files from the provided 'contracted_path'. + - Hydrate: Iterates through EVENT_TABLES and loads Parquet files from 'contracted_path'. Outputs: - - Returns a dictionary keyed by table name. + - Dict keyed by table name containing loaded LazyFrames. + + Failures: + - [Operational] Returns None if any required table is missing or fails to load. """ contracted_path = run_context.contracted_path @@ -339,11 +333,16 @@ def export_path(run_context: RunContext, file_name: str) -> Path: Generates a deterministic destination path for assembled artifacts. Contract: - - Parses the 'run_id' (format: YYYYMMDD...) to extract date partitions. - - Constructs a filename with the pattern: {file_name}_{YYYY}_{MM}_{DD}.parquet. + - Transformation: Parses 'run_id' to extract date partitions and constructs a timestamped filename. Invariants: - - Output location is always relative to 'run_context.assembled_path'. + - Path Integrity: Output location is always relative to 'run_context.assembled_path'. + + Outputs: + - Path object for the target file. + + Failures: + - [Structural] Crashes if 'run_id' format is invalid. """ year = run_context.run_id[:4] diff --git a/data_pipeline/contract/contract_executor.py b/data_pipeline/contract/contract_executor.py index 47c1525..beeecbd 100644 --- a/data_pipeline/contract/contract_executor.py +++ b/data_pipeline/contract/contract_executor.py @@ -17,27 +17,24 @@ def apply_contract( """ Main entry point for the Raw-to-Contracted Stage. - This component enforces structural data quality gates based on the logical - role of the table. It acts as a subtractive filter and schema-freezer, - ensuring only compliant rows and columns reach the Silver (contracted) layer. - Workflow: - 1. Resolve: Determines table configuration and role (event_fact, entity_reference, etc.). - 2. Load: Fetches the raw snapshot from the lake's snapshot zone. - 3. Sequence: Iteratively applies atomic filtering rules (Deduplication, Null-checks, etc.). - 4. Track: Captures row-level telemetry and identifies compromised 'order_id's. - 5. Propagate: Returns validated/invalidated IDs to maintain referential integrity. - 6. Freeze: Executes 'enforce_schema' as the terminal step to project approved columns. - 7. Export: Persists the contract-compliant dataset to the Silver zone. + 1. Resolve: Identifies table metadata (role, schema, keys) from the central registry. + 2. Hydrate: Fetches the raw snapshot from the lake's snapshot zone. + 3. Delegate: Iteratively applies atomic logic rules (Deduplication, Chronology, Null-checks). + 4. Validate: Executes 'enforce_schema' as the terminal structural gate. + 5. Promote: Persists the contract-compliant dataset to the Silver (contracted) zone. Operational Guarantees: - - Subtractive Only: Filters rows first; never mutates row values (only column types). - - Finality: The 'enforce_schema' step guarantees the artifact matches the system registry. - - Referential Integrity: Tables processed after 'df_orders' use its output for parent-check filtering. + - Subtractive Only: Exclusively filters rows or casts types; never mutates business values. + - Referential Safety: Propagates invalidated keys across table boundaries to ensure consistent pruning. + - Structural Finality: Guarantees output parity with the ASSEMBLE_SCHEMA specification. + + Side Effects: + - Persists a Parquet artifact to the contracted directory. + - Updates newly invalidated 'order_id' sets for downstream cross-table pruning. Failure Behavior: - - Traps logic-step exceptions via a try-except block within the ROLE_STEPS loop. - - Marks stage status as 'failed' and returns early upon encountering any transformation error. + - Traps logic-step exceptions; logs errors to the report and halts the current table's processing. Returns: tuple: (Stage Report Dict, Newly Invalidated IDs Set, Validated Order IDs Set) diff --git a/data_pipeline/contract/contract_logic.py b/data_pipeline/contract/contract_logic.py index 9e1f7c8..6a826f8 100644 --- a/data_pipeline/contract/contract_logic.py +++ b/data_pipeline/contract/contract_logic.py @@ -16,8 +16,14 @@ def deduplicate_exact_events(df: pd.DataFrame) -> tuple[pd.DataFrame, int]: - Identifies and removes rows where every column value is an exact match. - Retains the 'first' encountered instance of the record. - Returns: - tuple: (Filtered DataFrame, Integer count of dropped rows) + Invariants: + - Grain: Preserves the original semantic grain while purging physical duplicates. + + Outputs: + - Tuple: (Filtered DataFrame, Integer count of dropped rows). + + Failures: + - [Structural] Crashes if input is not a pandas DataFrame. """ initial_count = len(df) @@ -40,14 +46,17 @@ def remove_unparsable_timestamps(df: pd.DataFrame) -> tuple[pd.DataFrame, int, s Contract: - Evaluates all columns defined in REQUIRED_TIMESTAMPS. - - Drops any row containing at least one NaT/unparsable value in these columns. + - Subtractive Filtering: Drops any row containing at least one NaT/unparsable value in target columns. Invariants: - - Does not cast types permanently; performs internal validation only. - - Emits 'order_id' of failing rows to prevent orphan processing downstream. + - Type Safety: Does not cast types permanently; performs internal validation only. + - Lineage: Emits 'order_id' of failing rows to enable cascade pruning downstream. - Returns: - tuple: (Filtered DataFrame, Count of dropped rows, Set of invalid order_ids) + Outputs: + - Tuple: (Filtered DataFrame, Count of dropped rows, Set of invalid order_ids). + + Failures: + - [Structural] Crashes if REQUIRED_TIMESTAMPS columns are missing from the DataFrame. """ initial_count = len(df) @@ -82,12 +91,17 @@ def remove_impossible_timestamps(df: pd.DataFrame) -> tuple[pd.DataFrame, int, s Enforces logical chronology for the order lifecycle. Contract: - - Invariant I: Order Approval Date >= Order Purchase Date. - - Invariant II: Order Delivery Date >= Order Purchase Date. - - Drops rows where the temporal sequence is physically impossible. + - Chronological Gate: Order Approval Date >= Order Purchase Date AND Order Delivery Date >= Order Purchase Date. + - Subtractive Filtering: Drops rows where the temporal sequence violates physical reality. - Returns: - tuple: (Filtered DataFrame, Count of dropped rows, Set of invalid order_ids) + Invariants: + - Temporal Alignment: Ensures all orders have a positive or zero lead time. + + Outputs: + - Tuple: (Filtered DataFrame, Count of dropped rows, Set of invalid order_ids). + + Failures: + - [Structural] Crashes if lifecycle timestamp columns are missing. """ purchase_ts = pd.to_datetime(df["order_purchase_timestamp"]) @@ -118,11 +132,17 @@ def remove_rows_with_null_constraint( Enforces mandatory data presence (NOT NULL) for a dynamic column list. Contract: - - Evaluates the subset of columns provided in 'non_nullable_column'. - - Drops any row where at least one target column contains a Null/NaN. + - Subset Validation: Evaluates only columns provided in 'non_nullable_column'. + - Subtractive Filtering: Drops any row where at least one target column contains Null/NaN. + + Invariants: + - Data Integrity: Guarantees 100% population for critical join keys and metrics. - Returns: - tuple: (Filtered DataFrame, Count of dropped rows, Set of invalid order_ids) + Outputs: + - Tuple: (Filtered DataFrame, Count of dropped rows, Set of invalid order_ids). + + Failures: + - [Structural] Crashes if 'non_nullable_column' names are not in the DataFrame. """ initial_count = len(df) @@ -150,11 +170,17 @@ def cascade_drop_by_order_id( Enforces referential cleanup based on a blacklist of compromised keys. Contract: - - Drops any row whose 'order_id' exists in the 'invalid_order_ids' set. + - Blacklist Filtering: Drops any row whose 'order_id' exists in 'invalid_order_ids'. - Purpose: Prunes child records (items/payments) whose parent orders failed validation. - Returns: - tuple: (Filtered DataFrame, Integer count of dropped rows) + Invariants: + - Referential Integrity: Prevents orphan records from reaching the assembly stage. + + Outputs: + - Tuple: (Filtered DataFrame, Integer count of dropped rows). + + Failures: + - [Structural] Crashes if 'order_id' column is missing. """ initial_count = len(df) @@ -172,11 +198,17 @@ def enforce_parent_reference( Enforces referential integrity based on a whitelist of validated keys. Contract: - - Drops any row whose 'order_id' is NOT present in the 'valid_order_ids' set. + - Whitelist Filtering: Drops any row whose 'order_id' is NOT present in 'valid_order_ids'. - Purpose: Final referential gate to ensure total alignment with the 'orders' grain. - Returns: - tuple: (Filtered DataFrame, Integer count of dropped rows) + Invariants: + - Data Reliability: Guarantees that every child record has a corresponding valid parent. + + Outputs: + - Tuple: (Filtered DataFrame, Integer count of dropped rows). + + Failures: + - [Structural] Crashes if 'order_id' column is missing. """ initial_count = len(df) @@ -200,11 +232,14 @@ def enforce_schema( - Type Enforcement: Casts remaining columns to the formats defined in 'dtypes'. Invariants: - - Column Integrity: The output column count and order strictly match 'required_column'. - - Type Safety: Ensures the dataset is ready for downstream analytical joins (e.g., matching IDs). + - Structural Integrity: Output exactly matches the modeling specification. + - Grain: Preserves the input row count. + + Outputs: + - Tuple: (Filtered DataFrame, Integer count of columns removed). - Returns: - tuple: (Filtered DataFrame, Integer count of columns removed). + Failures: + - [Structural] Crashes if required columns are missing or if dtypes are incompatible. """ initial_col_count = len(df.columns) diff --git a/data_pipeline/publish/publish_executor.py b/data_pipeline/publish/publish_executor.py index 2b0aa36..4fffc23 100644 --- a/data_pipeline/publish/publish_executor.py +++ b/data_pipeline/publish/publish_executor.py @@ -16,26 +16,22 @@ def execute_publish_lifecycle(run_context: RunContext) -> Dict: """ Main entry point for the Pipeline Publish Stage. - This component manages the transition of analytical artifacts from - the internal assembly zones to the production-facing BI environment. - Workflow: - 1. Integrity Gate: Verifies that the current run has produced all - required semantic modules and tables defined in the registry. - 2. Promotion: Moves/copies artifacts into a permanent, read-only - versioned directory (v{run_id}). - 3. Activation: Performs an atomic update of the 'latest' pointer - to switch BI/Reporting traffic to the new version. + 1. Validate: Executes the 'Integrity Gate' to ensure all semantic artifacts exist and are schema-compliant. + 2. Promote: Transfers validated artifacts to the permanent versioned publication zone. + 3. Delegate: Triggers the atomic pointer swap to activate the new version for BI consumers. Operational Guarantees: - - Atomicity: The 'latest' pointer is updated ONLY if all prior - validation and promotion steps succeed. - - Immutability: Promoted versions are treated as static snapshots. - - Fail-Fast: Any failure in the lifecycle prevents version activation. + - Atomicity: The 'latest' version pointer is updated ONLY after successful promotion of all artifacts. + - Immutability: Once published, a versioned directory is treated as a static, read-only snapshot. + - Fail-Fast: Any failure in validation or promotion immediately halts the lifecycle. + + Side Effects: + - Persists a new versioned directory (v{run_id}) in the publication zone. + - Mutates the 'latest_version.json' manifest to update the global version pointer. Failure Behavior: - - Explicit Fail-Fast: Uses 'fail_step' helper to terminate the lifecycle and - mark status as 'failed' immediately after any step failure. + - Traps step-level failures; logs errors and returns a report with status='failed', preventing version activation. Returns: Dict: A global publish report containing status and step-level logs. diff --git a/data_pipeline/publish/publish_logic.py b/data_pipeline/publish/publish_logic.py index fab36c1..91dad08 100644 --- a/data_pipeline/publish/publish_logic.py +++ b/data_pipeline/publish/publish_logic.py @@ -46,15 +46,18 @@ def run_integrity_gate(run_context: RunContext) -> Dict: Enforces the pre-publication structural completeness contract. Contract: - - Scans the runtime semantic directory for existence. - - Validates that every Module and Table defined in SEMANTIC_MODULES - exists as a physical artifact. + - Structural Validation: Scans the runtime semantic directory and verifies 1:1 parity with SEMANTIC_MODULES registry. + - Schema Enforcement: Validates that all physical Parquet files contain the required column set. Invariants: - - Failure is triggered if any expected Parquet file is missing. + - Completeness: Halts publication if any expected module or table is missing from the file system. + - Version Alignment: Ensures all files follow the current run_id timestamp convention. - Returns: - Dict: A report object containing the success status and findings. + Outputs: + - Dict: Report containing 'status' and detailed findings. + + Failures: + - [Structural] Returns status='failed' if directories are missing, modules mismatch, or schemas are incomplete. """ report = init_report() @@ -129,15 +132,18 @@ def promote_semantic_version(run_context: RunContext) -> Dict: Manages the archival of the current run into the publication zone. Contract: - - Creates a permanent directory following the 'v{run_id}' convention. - - Transfers all semantic artifacts to the versioned destination. + - Promote: Transfers validated semantic artifacts from the runtime zone to a permanent versioned destination. + - Versioning: Creates a new directory following the 'v{run_id}' physical convention. Invariants: - - Destination is derived from run_context.published_path. - - Relies on the storage_adapter for Local/GCS transparency. + - Immutability: Once promoted, artifacts are treated as static, read-only snapshots. + - Path Integrity: Destination is derived strictly from run_context.published_path. + + Outputs: + - Dict: Report logging the promotion status and any transfer errors. - Returns: - Dict: A report object logging the promotion status. + Failures: + - [Operational] Returns status='failed' if the version directory already exists or upload fails. """ report = init_report() @@ -172,16 +178,18 @@ def activate_published_version(run_context: RunContext) -> Dict: Atomically updates the system-wide 'latest' version pointer. Contract: - - Generates a JSON manifest containing run_id and publication metadata. - - Overwrites the root 'latest_version.json' in the published zone. + - Atomic Update: Overwrites the root 'latest_version.json' to shift downstream consumers to the new run. + - BI Consistency: Guarantees that analytical tools see the new version only after successful promotion. Invariants: - - Atomic Update: Local updates use write-and-replace to prevent corruption. - - BI Consistency: Downstream consumers see the new version only after - this atomic swap is complete. + - Pointer Integrity: Manifest always contains current run_id and ISO-8601 publication timestamps. + - Atomicity: Local updates use a write-and-replace (os.replace) strategy to prevent manifest corruption. + + Outputs: + - Dict: Report logging the activation status. - Returns: - Dict: A report object logging the activation status. + Failures: + - [Operational] Returns status='failed' if manifest generation or storage upload (Local/GCS) fails. """ report = init_report() diff --git a/data_pipeline/semantic/semantic_executor.py b/data_pipeline/semantic/semantic_executor.py index 8da155c..ee2b631 100644 --- a/data_pipeline/semantic/semantic_executor.py +++ b/data_pipeline/semantic/semantic_executor.py @@ -26,13 +26,17 @@ def validate_and_freeze_table(lf: pl.LazyFrame, table: dict) -> pl.LazyFrame: - Types: Explicitly casts columns to types defined in table['dtypes']. Optimization Logic: - - Lazy Contract Enforcement: Defers all validation and casting to the final - streaming sink, avoiding intermediate materialization passes. - - Zero-Copy Sort Omission: Bypasses sorting to maintain compatibility with - non-blocking streaming engines. + - Lazy Contract Enforcement: Defers all validation and casting to the final streaming sink, avoiding intermediate materialization passes. + - Zero-Copy Sort Omission: Bypasses sorting to maintain compatibility with non-blocking streaming engines. - Behavior: - - Fast-Fail: Raises RuntimeError on schema violations during plan construction. + Invariants: + - Schema Integrity: Output exactly matches the registry specification. + + Outputs: + - Validated LazyFrame ready for export. + + Failures: + - [Structural] Raises RuntimeError on schema violations during plan construction. """ current_columns = lf.collect_schema().names() @@ -62,26 +66,25 @@ def orchestrate_module( Coordinates the construction, validation, and export of a semantic module. Workflow: - 1. Build: Executes the module-specific builder logic. - 2. Loop: Iterates through each returned table in the builder output. - 3. Validate: Enforces technical contracts (schema, dtypes). - 4. Export: Persists validated artifacts to the semantic zone. - 5. Cleanup: Manages memory via explicit deletion and garbage collection. + 1. Delegate: Executes module-specific builder logic with assembled data. + 2. Validate: Enforces technical contracts (schema, dtypes) for each table. + 3. Promote: Persists validated artifacts to the semantic zone. + 4. Purge: Manages memory via explicit deletion and garbage collection. Optimization Logic: - - Linear Streaming Propagation: Maintains the LazyFrame chain from builder to - export, ensuring constant memory usage regardless of dataset scale. - - Incremental Resource Reclamation: Triggers explicit Python garbage collection - after every table export to purge intermediate metadata and plan overhead. + - Linear Streaming Propagation: Maintains the LazyFrame chain from builder to export, ensuring constant memory usage regardless of dataset scale. + - Incremental Resource Reclamation: Triggers explicit Python garbage collection after every table export to purge intermediate metadata and plan overhead. - Invariants: - - Fail-Fast: Any error in building or table-level processing halts the module. - - Strict Config: Builder output must match keys in 'module_config["tables"]'. + Operational Guarantees: + - Fail-Fast: Any error in building or table-level processing halts the module immediately. + - Atomic Module Status: Marks the global report as 'failed' upon any internal exception. + + Side Effects: + - Persists multiple Parquet files to the semantic directory. + - Mutates the 'report' dictionary with step-level statuses. Failure Behavior: - - Traps builder-level and table-level exceptions via try-except blocks. - - Logs specific errors (e.g., FileExistsError, general Exceptions) and marks - the module and global report status as 'failed' before returning False. + - Traps builder-level and table-level exceptions; logs errors and returns False. Returns: bool: True if the module and all its tables were successfully processed. @@ -127,12 +130,14 @@ def orchestrate_module( day = run_context.run_id[6:8] # Validate, Freeze, and Export Each Table - for table_name, df_table in builder_output.items(): + table_names = list(builder_output.keys()) + for table_name in table_names: + lf_frozen = None + df_table = builder_output.pop(table_name) try: if table_name not in module_config["tables"]: report["status"] = "failed" - return False table_config = module_config["tables"][table_name] @@ -140,7 +145,7 @@ def orchestrate_module( ok, lf_frozen = task_wrapper( report=report, - step_name="validate_and_freeze", + step_name="validate_stage", status_tracker=tracker, func=validate_and_freeze_table, lf=df_table, @@ -168,10 +173,14 @@ def orchestrate_module( log_error(f"Unexpected error processing {table_name}: {e}", report) finally: - if df_table is not None: - del df_table + if "lf_frozen" in locals(): + del lf_frozen + del df_table gc.collect() + del builder_output + gc.collect() + log_info(f"Export Module: {module_name} Successfully", report) module_report[module_name]["export"] = True @@ -183,19 +192,20 @@ def build_semantic_layer(run_context: RunContext) -> Dict: Main entry point for the Gold-to-Semantic stage. Workflow: - 1. Source Verification: Loads 'assembled_events' and halts if empty/missing. - 2. Registry Execution: Iterates through 'SEMANTIC_MODULES'. - 3. Orchestration: Triggers builder logic followed by contract enforcement. - 4. Cleanup: Purges memory after each module export. + 1. Hydrate: Loads 'assembled_events' from the assembly zone. + 2. Delegate: Iterates through SEMANTIC_MODULES registry and triggers orchestration. + 3. Purge: Clears the assembled event frame and triggers garbage collection. + + Operational Guarantees: + - Stage-Level Atomicity: Any module failure halts the entire stage. + - Deterministic Lineage: Uses run_id for output partitioning. - Guarantees: - - Atomicity: Module failures are trapped but mark the entire stage as 'failed'. - - Lineage: Uses 'run_id' for deterministic output partitioning. + Side Effects: + - Creates semantic module subdirectories. + - Generates a comprehensive stage report. Failure Behavior: - - Fail-Fast on missing source: Returns immediately if 'assembled_events' is missing. - - Bubbles up module-level failures: If any module orchestration returns False, - the stage status is set to 'failed' and the report is returned. + - Returns a report with status='failed' if source files are missing or any module fails. Returns: Dict: A global report of module statuses and error logs. diff --git a/data_pipeline/semantic/semantic_logic.py b/data_pipeline/semantic/semantic_logic.py index 9086c94..5f743d1 100644 --- a/data_pipeline/semantic/semantic_logic.py +++ b/data_pipeline/semantic/semantic_logic.py @@ -17,57 +17,70 @@ def build_seller_semantic(lf: pl.LazyFrame, run_context: RunContext) -> Dict: Constructs the Seller-centric analytical layer from assembled events. Contract: - - Transforms order-grain events into weekly seller performance snapshots. - - Aggregates metrics including revenue, lead times, and fulfillment lag. + - Subtractive Filtering: Selects strictly required columns for performance. + - Transformation: Derives week_start_date and boolean status flags. + - Aggregation: Computes weekly performance metrics (revenue, lead times, delays) per seller. Optimization Logic: - - Narrow Aggregation: Casts counts and sums to Int16/Int32 and revenues to Float32 - immediately within the agg() block to minimize the memory footprint of the result set. - - Categorical Handling: Relies on pre-cast Categorical columns for grouping keys. + - Streaming Projection: Selects required columns for aggregation, allowing the streaming engine to push projection through the plan. + - Non-Blocking Aggregation: Executes aggregations in a streaming fashion, maintaining a constant memory profile. + - Categorical Handling: Utilizes categorical grouping keys to maintain optimized performance during non-blocking aggregation. Invariants: - - Lineage: Enforces a single 'run_id' across the input dataset. - Fact Grain: Strictly 1 row per ('seller_id', 'order_year_week'). - Dimension Grain: Strictly 1 row per 'seller_id'. - Temporal: Aligns all metrics to ISO-week start dates (Monday). + Outputs: + - Dict containing 'seller_weekly_fact' (LazyFrame) and 'seller_dim' (LazyFrame). + Failures: - - Raises RuntimeError if multiple 'run_id' values are detected in the source LazyFrame. + - [Structural] Crashes if input LazyFrame lacks required columns. """ - if lf.select(pl.col("run_id").n_unique()).collect(engine="streaming").item() != 1: - raise RuntimeError("Multiple run_ids detected in source") + needed_cols = [ + "seller_id", + "order_year_week", + "order_date", + "order_status", + "order_id", + "order_revenue", + "lead_time_days", + "delivery_delay_days", + "approval_lag_days", + ] + + lf_filtered = lf.select(needed_cols).with_columns( + seller_id=pl.col("seller_id").cast(pl.Categorical), + order_year_week=pl.col("order_year_week").cast(pl.Categorical), + ) seller_weekly_fact = ( - lf.with_columns( + lf_filtered.with_columns( week_start_date=pl.col("order_date").dt.truncate("1w"), is_delivered=pl.col("order_status").eq("delivered"), is_cancelled=pl.col("order_status").eq("cancelled"), ) .group_by(["seller_id", "order_year_week"]) .agg( - run_id=pl.col("run_id").first().cast(pl.Categorical), week_start_date=pl.col("week_start_date").min(), - weekly_order_count=pl.col("order_id").count().cast(pl.Int16()), - weekly_delivered_orders=pl.col("is_delivered").sum().cast(pl.Int16()), - weekly_cancelled_orders=pl.col("is_cancelled").sum().cast(pl.Int16()), - weekly_revenue=pl.col("order_revenue").sum().cast(pl.Float32()), - weekly_avg_lead_time=pl.col("lead_time_days").mean().cast(pl.Float32()), - weekly_total_lead_time=pl.col("lead_time_days").sum().cast(pl.Int16()), + weekly_order_count=pl.col("order_id").count().cast(pl.Int16), + weekly_delivered_orders=pl.col("is_delivered").sum().cast(pl.Int16), + weekly_cancelled_orders=pl.col("is_cancelled").sum().cast(pl.Int16), + weekly_revenue=pl.col("order_revenue").sum().cast(pl.Float32), + weekly_avg_lead_time=pl.col("lead_time_days").mean().cast(pl.Float32), + weekly_total_lead_time=pl.col("lead_time_days").sum().cast(pl.Int16), weekly_avg_delivery_delay=pl.col("delivery_delay_days") .mean() - .cast(pl.Float32()), + .cast(pl.Float32), weekly_total_delivery_delay=pl.col("delivery_delay_days") .sum() - .cast(pl.Int16()), - weekly_avg_approval_lag=pl.col("approval_lag_days") - .mean() - .cast(pl.Float32()), + .cast(pl.Int16), + weekly_avg_approval_lag=pl.col("approval_lag_days").mean().cast(pl.Float32), ) ) - seller_dim = lf.group_by("seller_id").agg( - run_id=pl.col("run_id").first(), + seller_dim = lf_filtered.group_by("seller_id").agg( first_order_date=pl.col("order_date").min(), first_order_year_week=pl.col("order_year_week").min(), ) @@ -90,60 +103,74 @@ def build_customer_semantic(lf: pl.LazyFrame, run_context: RunContext) -> Dict: Constructs the Customer-centric analytical layer from assembled events. Contract: - - Aggregates consumer behavior metrics into a weekly temporal grain. - - Calculates lifetime-to-date attributes and first-purchase markers. + - Subtractive Filtering: Selects strictly required columns for performance. + - Transformation: Derives week_start_date and boolean status flags. + - Aggregation: Computes weekly performance metrics (revenue, lead times, delays) per customer. + - Hydration: Loads historical customer dimension table from the assembly zone. Optimization Logic: - - Narrow Aggregation: Casts counts and sums to Int16/Int32 and revenues to Float32 - immediately within the agg() block to minimize the memory footprint of the result set. - - Categorical Handling: Relies on pre-cast Categorical columns for grouping keys. + - Streaming Projection: Selects required columns for aggregation, allowing the streaming engine to push projection through the plan. + - Non-Blocking Aggregation: Executes aggregations in a streaming fashion, maintaining a constant memory profile. + - Categorical Handling: Utilizes categorical grouping keys to maintain optimized performance during non-blocking aggregation. Invariants: - - Lineage: Requires a unified 'run_id' for consistent partitioning. - Fact Grain: Strictly 1 row per ('customer_id', 'order_year_week'). - Dimension Grain: Strictly 1 row per 'customer_id'. + Outputs: + - Dict containing 'customer_weekly_fact' (LazyFrame) and 'customer_dim' (LazyFrame). + Failures: - - Raises RuntimeError if multiple 'run_id' values are detected in the source LazyFrame. + - [Structural] Crashes if input LazyFrame lacks required columns. + - [Operational] Crashes if 'df_customers' cannot be loaded from the assembly zone. """ - if lf.select(pl.col("run_id").n_unique()).collect(engine="streaming").item() != 1: - raise RuntimeError("Multiple run_ids detected in source") + needed_cols = [ + "customer_id", + "order_year_week", + "order_date", + "order_status", + "order_id", + "order_revenue", + "lead_time_days", + "delivery_delay_days", + "approval_lag_days", + ] + + # Cast grouping keys to Categorical to reduce hash table memory pressure + lf_filtered = lf.select(needed_cols).with_columns( + customer_id=pl.col("customer_id").cast(pl.Categorical), + order_year_week=pl.col("order_year_week").cast(pl.Categorical), + ) customer_weekly_fact = ( - lf.with_columns( + lf_filtered.with_columns( week_start_date=pl.col("order_date").dt.truncate("1w"), is_delivered=pl.col("order_status").eq("delivered"), is_cancelled=pl.col("order_status").eq("cancelled"), ) .group_by(["customer_id", "order_year_week"]) .agg( - run_id=pl.col("run_id").first().cast(pl.Categorical), week_start_date=pl.col("week_start_date").min(), - weekly_order_count=pl.col("order_id").count().cast(pl.Int16()), - weekly_delivered_orders=pl.col("is_delivered").sum().cast(pl.Int16()), - weekly_cancelled_orders=pl.col("is_cancelled").sum().cast(pl.Int16()), - weekly_revenue=pl.col("order_revenue").sum().cast(pl.Float32()), - weekly_avg_lead_time=pl.col("lead_time_days").mean().cast(pl.Float32()), - weekly_total_lead_time=pl.col("lead_time_days").sum().cast(pl.Int16()), + weekly_order_count=pl.col("order_id").count().cast(pl.Int16), + weekly_delivered_orders=pl.col("is_delivered").sum().cast(pl.Int16), + weekly_cancelled_orders=pl.col("is_cancelled").sum().cast(pl.Int16), + weekly_revenue=pl.col("order_revenue").sum().cast(pl.Float32), + weekly_avg_lead_time=pl.col("lead_time_days").mean().cast(pl.Float32), + weekly_total_lead_time=pl.col("lead_time_days").sum().cast(pl.Int16), weekly_avg_delivery_delay=pl.col("delivery_delay_days") .mean() - .cast(pl.Float32()), + .cast(pl.Float32), weekly_total_delivery_delay=pl.col("delivery_delay_days") .sum() - .cast(pl.Int16()), - weekly_avg_approval_lag=pl.col("approval_lag_days") - .mean() - .cast(pl.Float32()), + .cast(pl.Int16), + weekly_avg_approval_lag=pl.col("approval_lag_days").mean().cast(pl.Float32), ) ) customer_dim = load_historical_table( base_path=run_context.assembled_path, table_name="df_customers" ) - customer_dim = customer_dim.with_columns( - run_id=pl.lit(run_context.run_id).cast(pl.Categorical) - ) customer_semantic = { "customer_weekly_fact": customer_weekly_fact, @@ -163,60 +190,73 @@ def build_product_semantic(lf: pl.LazyFrame, run_context: RunContext) -> Dict: Constructs the Product-centric analytical layer from assembled events. Contract: - - Aggregates sales velocity and fulfillment health per product. - - Merges category metadata with weekly transaction volumes. + - Subtractive Filtering: Selects strictly required columns for performance. + - Transformation: Derives week_start_date and boolean status flags. + - Aggregation: Computes weekly performance metrics (revenue, lead times, delays) per product. + - Hydration: Loads historical product dimension table from the assembly zone. Optimization Logic: - - Narrow Aggregation: Casts counts and sums to Int16/Int32 and revenues to Float32 - immediately within the agg() block to minimize the memory footprint of the result set. - - Categorical Handling: Relies on pre-cast Categorical columns for grouping keys. + - Streaming Projection: Selects required columns for aggregation, allowing the streaming engine to push projection through the plan. + - Non-Blocking Aggregation: Executes aggregations in a streaming fashion, maintaining a constant memory profile. + - Categorical Handling: Utilizes categorical grouping keys to maintain optimized performance during non-blocking aggregation. Invariants: - - Lineage: Validates input homogeneity via 'run_id' check. - Fact Grain: Strictly 1 row per ('product_id', 'order_year_week'). - Dimension Grain: Strictly 1 row per 'product_id'. + Outputs: + - Dict containing 'product_weekly_fact' (LazyFrame) and 'product_dim' (LazyFrame). + Failures: - - Raises RuntimeError if multiple 'run_id' values are detected in the source LazyFrame. + - [Structural] Crashes if input LazyFrame lacks required columns. + - [Operational] Crashes if 'df_products' cannot be loaded from the assembly zone. """ - if lf.select(pl.col("run_id").n_unique()).collect(engine="streaming").item() != 1: - raise RuntimeError("Multiple run_ids detected in source") + needed_cols = [ + "product_id", + "order_year_week", + "order_date", + "order_status", + "order_id", + "order_revenue", + "lead_time_days", + "delivery_delay_days", + "approval_lag_days", + ] + + lf_filtered = lf.select(needed_cols).with_columns( + product_id=pl.col("product_id").cast(pl.Categorical), + order_year_week=pl.col("order_year_week").cast(pl.Categorical), + ) product_weekly_fact = ( - lf.with_columns( + lf_filtered.with_columns( week_start_date=pl.col("order_date").dt.truncate("1w"), is_delivered=pl.col("order_status").eq("delivered"), is_cancelled=pl.col("order_status").eq("cancelled"), ) .group_by(["product_id", "order_year_week"]) .agg( - run_id=pl.col("run_id").first().cast(pl.Categorical), week_start_date=pl.col("week_start_date").min(), - weekly_order_count=pl.col("order_id").count().cast(pl.Int16()), - weekly_delivered_orders=pl.col("is_delivered").sum().cast(pl.Int16()), - weekly_cancelled_orders=pl.col("is_cancelled").sum().cast(pl.Int16()), - weekly_revenue=pl.col("order_revenue").sum().cast(pl.Float32()), - weekly_avg_lead_time=pl.col("lead_time_days").mean().cast(pl.Float32()), - weekly_total_lead_time=pl.col("lead_time_days").sum().cast(pl.Int16()), + weekly_order_count=pl.col("order_id").count().cast(pl.Int16), + weekly_delivered_orders=pl.col("is_delivered").sum().cast(pl.Int16), + weekly_cancelled_orders=pl.col("is_cancelled").sum().cast(pl.Int16), + weekly_revenue=pl.col("order_revenue").sum().cast(pl.Float32), + weekly_avg_lead_time=pl.col("lead_time_days").mean().cast(pl.Float32), + weekly_total_lead_time=pl.col("lead_time_days").sum().cast(pl.Int16), weekly_avg_delivery_delay=pl.col("delivery_delay_days") .mean() - .cast(pl.Float32()), + .cast(pl.Float32), weekly_total_delivery_delay=pl.col("delivery_delay_days") .sum() - .cast(pl.Int16()), - weekly_avg_approval_lag=pl.col("approval_lag_days") - .mean() - .cast(pl.Float32()), + .cast(pl.Int16), + weekly_avg_approval_lag=pl.col("approval_lag_days").mean().cast(pl.Float32), ) ) product_dim = load_historical_table( base_path=run_context.assembled_path, table_name="df_products" ) - product_dim = product_dim.with_columns( - run_id=pl.lit(run_context.run_id).cast(pl.Categorical) - ) product_semantic = { "product_weekly_fact": product_weekly_fact, diff --git a/data_pipeline/shared/loader_exporter.py b/data_pipeline/shared/loader_exporter.py index b52c46a..49cd2f9 100644 --- a/data_pipeline/shared/loader_exporter.py +++ b/data_pipeline/shared/loader_exporter.py @@ -137,7 +137,7 @@ def export_file( row_count = len(df) elif isinstance(df, pl.LazyFrame): - df.sink_parquet(output_path, compression="brotli") + df.sink_parquet(output_path, compression="snappy") row_count = "streaming" else: diff --git a/data_pipeline/shared/modeling_configs.py b/data_pipeline/shared/modeling_configs.py index 425ae12..22da458 100644 --- a/data_pipeline/shared/modeling_configs.py +++ b/data_pipeline/shared/modeling_configs.py @@ -25,9 +25,7 @@ "approval_lag_days", "delivery_delay_days", "order_date", - "order_year", "order_year_week", - "run_id", ] ASSEMBLE_DTYPES: Mapping[str, pl.DataType] = { @@ -44,9 +42,7 @@ "approval_lag_days": pl.Int16(), "delivery_delay_days": pl.Int16(), "order_date": pl.Datetime(), - "order_year": pl.Int16(), "order_year_week": pl.String(), - "run_id": pl.Categorical(), } # ------------------------------------------------------------ @@ -72,14 +68,12 @@ "seller_id", "first_order_date", "first_order_year_week", - "run_id", ] SELLER_DIM_DTYPES: Mapping[str, pl.DataType] = { "seller_id": pl.String(), "first_order_date": pl.Datetime(), "first_order_year_week": pl.String(), - "run_id": pl.Categorical(), } @@ -88,7 +82,6 @@ "seller_id", "order_year_week", "week_start_date", - "run_id", "weekly_order_count", "weekly_delivered_orders", "weekly_cancelled_orders", @@ -104,15 +97,14 @@ "seller_id": pl.String(), "order_year_week": pl.String(), "week_start_date": pl.Datetime(), - "run_id": pl.Categorical(), "weekly_order_count": pl.Int16(), "weekly_delivered_orders": pl.Int16(), "weekly_cancelled_orders": pl.Int16(), "weekly_revenue": pl.Float32(), "weekly_avg_lead_time": pl.Float32(), - "weekly_total_lead_time": pl.Int32(), + "weekly_total_lead_time": pl.Int16(), "weekly_avg_delivery_delay": pl.Float32(), - "weekly_total_delivery_delay": pl.Int32(), + "weekly_total_delivery_delay": pl.Int16(), "weekly_avg_approval_lag": pl.Float32(), } @@ -128,7 +120,6 @@ "customer_city", "customer_segment", "account_creation_date", - "run_id", ] CUSTOMER_DIM_DTYPES: Mapping[str, pl.DataType] = { @@ -137,7 +128,6 @@ "customer_city": pl.Categorical(), "customer_segment": pl.Categorical(), "account_creation_date": pl.Datetime(), - "run_id": pl.Categorical(), } # Customer Fact and dtypes @@ -145,7 +135,6 @@ "customer_id", "order_year_week", "week_start_date", - "run_id", "weekly_order_count", "weekly_delivered_orders", "weekly_cancelled_orders", @@ -161,15 +150,14 @@ "customer_id": pl.String(), "order_year_week": pl.String(), "week_start_date": pl.Datetime(), - "run_id": pl.Categorical(), "weekly_order_count": pl.Int16(), "weekly_delivered_orders": pl.Int16(), "weekly_cancelled_orders": pl.Int16(), "weekly_revenue": pl.Float32(), "weekly_avg_lead_time": pl.Float32(), - "weekly_total_lead_time": pl.Int32(), + "weekly_total_lead_time": pl.Int16(), "weekly_avg_delivery_delay": pl.Float32(), - "weekly_total_delivery_delay": pl.Int32(), + "weekly_total_delivery_delay": pl.Int16(), "weekly_avg_approval_lag": pl.Float32(), } @@ -188,7 +176,6 @@ "product_fragility_index", "product_weight_g", "supplier_tier", - "run_id", ] PRODUCT_DIM_DTYPES: Mapping[str, pl.DataType] = { @@ -200,7 +187,6 @@ "product_fragility_index": pl.Categorical(), "product_weight_g": pl.Float32(), "supplier_tier": pl.Categorical(), - "run_id": pl.Categorical(), } @@ -209,7 +195,6 @@ "product_id", "order_year_week", "week_start_date", - "run_id", "weekly_order_count", "weekly_delivered_orders", "weekly_cancelled_orders", @@ -226,14 +211,13 @@ "product_id": pl.String(), "order_year_week": pl.String(), "week_start_date": pl.Datetime(), - "run_id": pl.Categorical(), "weekly_order_count": pl.Int16(), "weekly_delivered_orders": pl.Int16(), "weekly_cancelled_orders": pl.Int16(), "weekly_revenue": pl.Float32(), "weekly_avg_lead_time": pl.Float32(), - "weekly_total_lead_time": pl.Int32(), + "weekly_total_lead_time": pl.Int16(), "weekly_avg_delivery_delay": pl.Float32(), - "weekly_total_delivery_delay": pl.Int32(), + "weekly_total_delivery_delay": pl.Int16(), "weekly_avg_approval_lag": pl.Float32(), } diff --git a/data_pipeline/validation/validation_executor.py b/data_pipeline/validation/validation_executor.py index 88838fa..55fc2f6 100644 --- a/data_pipeline/validation/validation_executor.py +++ b/data_pipeline/validation/validation_executor.py @@ -23,24 +23,19 @@ def apply_validation(run_context: RunContext, base_path: Path | None = None) -> """ Main entry point for the Pipeline Validation Stage. - This component serves as the primary diagnostic gate for the data pipeline, - ensuring that raw snapshots meet the structural requirements for the - subsequent Contract and Assembly stages. - Workflow: - 1. Loading: Iteratively fetches logical tables from the snapshot zone. - 2. Base Check: Enforces schema, uniqueness, and null constraints via 'run_base_validations'. - 3. Role Dispatch: Executes specialized logic (Event/Transaction) based on 'TABLE_CONFIG'. - 4. Referential Check: Evaluates inter-table integrity (orphans) via 'run_cross_table_validations'. + 1. Hydrate: Iteratively fetches logical tables from the snapshot zone. + 2. Delegate: Enforces base structural integrity (Schema, PK, Nulls) for each table. + 3. Delegate: Executes role-specific domain checks (Event Chronology, Transaction Ranges). + 4. Delegate: Performs cross-table referential analysis (Orphan Detection). Operational Guarantees: - - Diagnostic Only: This function is read-only and will never mutate the source data. - - Comprehensive Reporting: Captures all failures across all tables before returning; does not fail-fast on the first table error. - - Severity: Structural issues are logged as 'errors' while referential issues are 'warnings'. + - Diagnostic Only: Read-only; never mutates source snapshots. + - Non-Blocking: Processes all tables regardless of individual base validation failures. + - Severity Model: Distinguishes between fatal Structural Errors and non-fatal Referential Warnings. Failure Behavior: - - Non-Blocking: Continues processing remaining tables even if one fails base validations. - - Status Update: Sets global report status to 'failed' if any errors or warnings are accumulated. + - Sets the global report status to 'failed' if any errors or warnings are accumulated across the dataset. Returns: Dict: A unified validation report containing 'status' and detailed finding lists. diff --git a/data_pipeline/validation/validation_logic.py b/data_pipeline/validation/validation_logic.py index 17f82b8..6122fd3 100644 --- a/data_pipeline/validation/validation_logic.py +++ b/data_pipeline/validation/validation_logic.py @@ -56,13 +56,17 @@ def run_base_validations( Contract: - Mandatory Schema: All 'required_column' names must exist in the DataFrame. - - Uniqueness: Values in 'primary_key' columns must be unique. + - Uniqueness: Enforces primary key uniqueness and detects conflicting duplicates. - Non-Nullability: Columns in 'non_nullable_column' must not contain NaN values. - - Presence: The table must contain at least one row. + + Invariants: + - Diagnostic Safety: Read-only; does not mutate the input DataFrame. + + Outputs: + - Boolean: True if all mandatory structural checks pass. Failures: - - Logs findings to 'report["errors"]'. - - Returns False if any mandatory check fails, signaling that the table is unusable. + - [Structural] Logs findings to 'report["errors"]' and returns False for missing columns, empty datasets, or PK conflicts. """ if df.empty: @@ -156,13 +160,17 @@ def run_event_fact_validations( Enforces business-logic chronology for Event-Role tables. Contract: - - Evaluates temporal sequence: Purchase <= Approval <= Delivery. - - Validates that timestamps are logically situated in the past. + - Chronological Check: Evaluates temporal sequence (Purchase <= Approval <= Delivery). + - Parseability: Validates timestamp string compatibility with system formats. + + Invariants: + - Temporal Consistency: Flags records where delivery precedes purchase as Warnings. + + Outputs: + - Boolean: True if all temporal checks are executed. Failures: - - Logs violations to 'report["warnings"]'. These are non-fatal to the stage - but indicate data quality degradation. - - Logs 'report["errors"]' if required timestamp columns are missing. + - [Structural] Returns False if required timestamp columns are missing. """ missing_ts_columns = [col for col in REQUIRED_TIMESTAMPS if col not in df.columns] @@ -220,11 +228,13 @@ def run_transaction_detail_validations( Enforces domain and range constraints for Transaction-Role tables. Contract: - - Values: Ensures 'price' and 'freight_value' are non-negative. - - Payments: Ensures 'payment_installments' and 'payment_value' are >= 0. + - Range Check: Ensures financial metrics (price, freight, payments) are non-negative. + + Outputs: + - Boolean: True if domain validations complete. Failures: - - Logs out-of-range values to 'report["errors"]'. + - [Operational] Logs out-of-range values to 'report["errors"]'. """ numeric_columns = df.select_dtypes(include=["number"]).columns.tolist() @@ -247,16 +257,16 @@ def run_cross_table_validations( Enforces referential integrity (Foreign Key) across the dataset. Contract: - - Orphan Detection: Ensures all Order Items and Payments have a valid parent - in the 'orders' table. - - Set Theory: Uses 'order_id' set intersection to identify disconnected children. + - Orphan Detection: Identifies child records (Items, Payments) lacking a valid parent Order. Invariants: - - Skip-Safe: If 'df_orders' is missing from the input dict, the check is skipped - and logged as 'info'. + - Referential Grain: Uses 'order_id' as the primary join key for set intersection. + + Outputs: + - Boolean: True if cross-table analysis completes. Failures: - - Logs orphan counts to 'report["warnings"]'. + - [Operational] Logs orphan counts as Warnings to the report. """ required_tables = ["df_orders", "df_order_items", "df_payments"] diff --git a/docs/data_pipeline/assembly_stage.md b/docs/data_pipeline/assembly_stage.md index f023d96..bf42e9e 100644 --- a/docs/data_pipeline/assembly_stage.md +++ b/docs/data_pipeline/assembly_stage.md @@ -6,6 +6,8 @@ **Role:** Data Integration and Analytical Flattening. +![assembled-stage-diagram](/assets/diagrams/04-assemble-stage-diagram.png) + ## **System Contract** **Purpose** @@ -46,6 +48,13 @@ The **Executor** coordinates two distinct sub-orchestrations: 2. **Deduplication:** Extracts the required column subset and drops duplicate primary keys. 3. **Export:** Persists each dimension (e.g., `df_customers`) as an independent artifact. +## **Optimization & Memory Invariants** + +* **Primitive Integer Pipeline:** To operate within 4GB RAM, the pipeline converts 36-byte UUID strings into 8-byte `UInt64` hashes for joins, and 4-byte `UInt32` categoricals for payloads. This is the primary driver of memory efficiency for 36M+ row datasets. +* **Streaming-First Join:** By deferring aggregations until after raw joins on `order_id`, we leverage Polars' streaming engine to avoid massive, materialized hash tables. +* **Low-Level Memory Reclamation:** The executor utilizes `ctypes.CDLL('libc.so.6').malloc_trim(0)` at high-water mark transitions. This forces the Linux allocator to release free memory back to the OS, preventing Cloud Run from terminating the process due to bloated (but unused) heap memory. +* **Zero-Copy Streaming:** `sink_parquet()` is used to prevent the pipeline from fully materializing the assembly result set in memory. + ## **Boundaries** | This component **DOES** | This component **DOES NOT** | @@ -66,6 +75,4 @@ The **Executor** coordinates two distinct sub-orchestrations: * **Export Failure:** Disk I/O errors or path resolution issues during the `export_file` call halt the lifecycle. ### **Functional Findings (Data Level)** -* **Cardinality Explosion:** If `merge_data` detects multiple rows for a single `order_id`, it raises a `RuntimeError`. This is caught by the `task_wrapper` and treated as a fatal violation. -* **Reference Duplication:** If a dimension table contains duplicate primary keys after extraction, it raises a `RuntimeError` which is trapped by the `task_wrapper`. * **Partial Payments:** Orders without payments are allowed (via Left Join); the system fills these with `None/NaN`, which is considered a valid business state rather than a failure. \ No newline at end of file diff --git a/docs/data_pipeline/contract_stage.md b/docs/data_pipeline/contract_stage.md index 66299eb..7dd13bd 100644 --- a/docs/data_pipeline/contract_stage.md +++ b/docs/data_pipeline/contract_stage.md @@ -7,6 +7,8 @@ **Role:** Structural Enforcement and Subtractive Filtering. +![contract-stage-diagram](/assets/diagrams/03-contract-stage-diagram.png) + ## **System Contract** **Purpose** diff --git a/docs/data_pipeline/pipeline_orchestrator.md b/docs/data_pipeline/pipeline_orchestrator.md index 82e7e6a..18806b0 100644 --- a/docs/data_pipeline/pipeline_orchestrator.md +++ b/docs/data_pipeline/pipeline_orchestrator.md @@ -4,6 +4,8 @@ **Role:** End-to-End Lifecycle, Resource, and Persistence Manager. +![pipeline-orchestration-diagram](/assets/diagrams/01-pipeline-orchestration-diagram.png) + ## **System Contract** **Purpose** diff --git a/docs/data_pipeline/semantic_stage.md b/docs/data_pipeline/semantic_stage.md index 9783bdb..92ce4a5 100644 --- a/docs/data_pipeline/semantic_stage.md +++ b/docs/data_pipeline/semantic_stage.md @@ -7,6 +7,8 @@ **Role:** Analytical Module Construction. +![semantic-stage-diagram](/assets/diagrams/05-semantic-stage-diagram.png) + ## **System Contract** **Purpose** @@ -15,7 +17,6 @@ Transforms the unified Gold-layer "Order-Grain" event table into entity-centric **Invariants** -* **Lineage Integrity:** Strictly enforces that all data within a builder execution belongs to a single `run_id`. Cross-run data contamination triggers a terminal failure. * **Temporal Grain:** All fact tables are aggregated at the ISO-Week level, aligned deterministically to the Monday of each week (`W-MON`). * **Entity Grain:** * **Fact Tables:** Strictly 1 row per `(Entity_ID, order_year_week)`. @@ -46,6 +47,13 @@ The **Executor** coordinates the semantic build through a modular, registry-driv * **Optimization:** Utilizes `sink_parquet` for `LazyFrame` exports, ensuring zero-copy streaming and constant memory usage. 6. **Memory Management:** Explicitly deletes `LazyFrames` and triggers `gc.collect()` after every individual table export (Fact and Dim) to purge intermediate memory usage. +## **Optimization & Memory Invariants** + +* **Local Categorical Aggregation:** To optimize memory during grouping operations, builders cast high-cardinality grouping keys (e.g., `seller_id`) to `pl.Categorical` locally. This creates a temporary, localized dictionary optimized specifically for that module's aggregation plan, bypassing the need for a persistent global string cache. +* **Narrow Aggregation Payloads:** All aggregation results (counts, sums) are immediately cast to `Int16` or `Float32` within the `agg()` block. This prevents the materialized result set from expanding in memory. +* **Schema Hand-off:** While the building process uses `Categorical` for performance, the final output is cast back to `pl.String()` via the registry/freezing process. This ensures downstream compatibility with BI tools and prevents "dictionary leakage" between pipeline runs. +* **Streaming Export:** `sink_parquet()` is utilized for all fact and dimension table exports, enabling zero-copy streaming of results directly from the query plan to storage. + ## **Boundaries** | This component **DOES** | This component **DOES NOT** | @@ -64,6 +72,4 @@ The **Executor** coordinates the semantic build through a modular, registry-driv * **Registry Mismatch:** If a builder returns a table name not defined in the `SEMANTIC_MODULES` registry, the executor raises a `RuntimeError`. ### **Functional Findings (Data Level)** -* **Lineage Violation:** If the source data contains more than one `run_id`, the logic builders raise a `RuntimeError` to prevent multi-run pollution. This is trapped by the executor. -* **Grain Breach:** If a builder produces duplicate rows for the defined primary grain (e.g., multiple rows for the same Seller/Week), the validation step raises a `RuntimeError`, which is trapped. * **Schema Violation:** If a required column defined in the registry is missing from the builder's output, the freeze step raises a `KeyError` or `RuntimeError`, which is trapped. \ No newline at end of file diff --git a/docs/data_pipeline/validation_stage.md b/docs/data_pipeline/validation_stage.md index ba23b0b..b98463d 100644 --- a/docs/data_pipeline/validation_stage.md +++ b/docs/data_pipeline/validation_stage.md @@ -6,6 +6,8 @@ **Role:** Structural Data Quality Gatekeeper. +![validation-stage-diagram](/assets/diagrams/02-validation-stage-diagram.png) + ## **System Contract** **Purpose** @@ -35,7 +37,7 @@ The **Executor** coordinates the validation lifecycle through the following dete 2. **Data Loading:** Attempts to load each table as a DataFrame. If a table is missing, an `error` is logged to the report. 3. **Base Validation:** Dispatches the DataFrame to `run_base_validations` to check for: * Presence of required columns. - * Uniqueness of Primary Keys. + * Uniqueness of Primary Keys and column names. * Compliance with non-nullable constraints. 4. **Role-Specific Dispatch:** If base validations pass, the executor applies specialized rules: * `event_fact`: Triggers `run_event_fact_validations` (temporal chronology). diff --git a/docs/terraform/gcp-iac.md b/docs/terraform/gcp-iac.md new file mode 100644 index 0000000..3523dd2 --- /dev/null +++ b/docs/terraform/gcp-iac.md @@ -0,0 +1,94 @@ +# GCP Infrastructure: Operations Analytics Pipeline + +This repository contains the Terraform configuration for the Operations Analytics data pipeline. The infrastructure is designed to be serverless, event-driven, and highly secure, utilizing Google Cloud Run, Workflows, and Eventarc. + +## Architecture Overview +The pipeline follows a **Trigger-Action-Archive** flow: +1. **Extraction:** A Cloud Scheduler job triggers the `drive-extractor` Cloud Run job at midnight (PHT). +2. **Archival:** The extractor saves raw data into the **Archival Bucket** (Coldline storage for 3 years). +3. **Dispatch:** An Eventarc trigger detects the new file and invokes a Google Workflow (`pipeline-dispatcher`). +4. **Processing:** The Workflow triggers the main `operations-pipeline` Cloud Run job (2 vCPU, 8Gi RAM) for heavy-duty data processing. +5. **Transient Storage:** Intermediate files are stored in the **Pipeline Bucket** with a 7-day TTL on raw data to minimize costs and exposure. + +## Prerequisites +* **Terraform:** Version `~> 1.5.0` +* **Provider:** `hashicorp/google` version `~> 7.0` +* **Backend:** GCS bucket `operations-terraform-state-vault-2026` must exist for state management. + +## Post-Provisioning (CI/CD Handshake) +The integration between GCP and GitHub Actions requires a one-time "Bootstrap" extraction to populate Repository Secrets. This process completes the cryptographic trust relationship established by Workload Identity Federation (WIF). + +### 1. Secret Injection Matrix +| GitHub Secret | Source / Origin | Purpose | +| :--- | :--- | :--- | +| `WIF_PROVIDER` | `terraform output -raw GITHUB_WIF_PROVIDER_NAME` | Logical path for the WIF identity provider handshake. | +| `DEPLOYER_SA_EMAIL` | `github-actions-deployer@...` | Target identity for GitHub OIDC impersonation. | +| `GCP_PROJECT_ID` | `var.project_id` | Project scoping for GCP API and resource discovery. | + +### 2. Bootstrapping Constraint +The initial infrastructure provisioning must be executed by a maintainer with `Project IAM Admin` or `Owner` privileges. This "privileged apply" is required to establish the WIF provider and assign the administrative roles to the `github-actions-deployer` service account. Subsequent updates are autonomously managed by the CI/CD identity. + +## Infrastructure Components + +### 1. Compute & Jobs (`jobs.tf`) +| Resource Name | Type | Memory | Timeout | Purpose | +| :--- | :--- | :--- | :--- | :--- | +| `operations-pipeline` | Cloud Run Job | 8Gi | 30m | Main Polars-based processing engine. | +| `drive-extractor` | Cloud Run Job | 1Gi | 15m | Pulls source data from external APIs. | +| `ops-repo` | Artifact Registry | n/a | n/a | Docker repository for pipeline images. | + +### 2. Storage & Lifecycle (`storage.tf`) +| Bucket Name | Storage Class | Lifecycle Policy | +| :--- | :--- | :--- | +| `ops-archival-storage` | Standard -> Coldline | Move to Coldline after 400 days; Delete after 3 years. | +| `ops-pipeline-storage` | Standard | Delete files with prefix `raw/` after 7 days. | + +### 3. Orchestration (`orchestration.tf`) +* **Cloud Scheduler:** `0 0 * * *` (Daily 12AM PHT) triggers the Extractor. +* **Eventarc:** Monitors `object.v1.finalized` on the Archival bucket. +* **Workflows:** `pipeline-dispatcher` evaluates logic to trigger the main pipeline. + +## IAM & Security Matrix (`iam_bindings.tf`, `wif.tf`) + +This project implements **Zero Trust** via Workload Identity Federation and granular Service Account (SA) permissions. + +### 1. Identity Registry +| Identity Name | Role/Purpose | +| :--- | :--- | +| `github-actions-deployer` | CI/CD automation for infra and code updates. | +| `drive-extractor-sa` | I/O identity for data extraction and archival. | +| `ops-pipeline-sa` | Compute identity for the main processing pipeline. | +| `eventarc-invoker-sa` | Orchestration identity to receive events and trigger workflows. | +| `job-invoker-sa` | Scheduler identity to trigger Cloud Run jobs. | + +### 2. Permission Bindings +| Identity | Target | Roles | Rationale | +| :--- | :--- | :--- | :--- | +| **Github Deployer** | Project | `run.developer`, `workflows.editor`, `cloudscheduler.admin`, `artifactregistry.admin`, `eventarc.admin`, `storage.admin`, `resourcemanager.projectIamAdmin`, `iam.workloadIdentityPoolAdmin`, `monitoring.admin`, `iam.serviceAccountAdmin`, `iam.serviceAccountUser`, `iam.admin` | **Least Privilege:** Granular roles for managing the entire pipeline lifecycle, IAM bindings, and state management. | +| **Drive Extractor** | Archival/Pipeline Buckets | `roles/storage.objectAdmin` | Full CRUD for data landing and archival. | +| **Ops Pipeline** | Pipeline Bucket | `roles/storage.objectAdmin` | Read raw data and write processed artifacts. | +| **Event Invoker** | Project | `roles/eventarc.eventReceiver` | Receive GCS notifications. | +| | Project | `roles/workflows.invoker` | Permission to start workflow execution. | + +### 3. Workload Identity Federation +* **Pool:** `github-pool` +* **Trust Policy:** Restricted to `${var.github_repo}` to prevent unauthorized repository access. + +## Inputs & Variables (`variables.tf`) +| Name | Type | Sensitive | Description | +| :--- | :--- | :--- | :--- | +| `project_id` | `string` | No | Target Google Cloud Project ID. | +| `environment` | `string` | No | Deployment environment (dev, prod). | +| `github_repo` | `string` | No | Format: `owner/repository`. | +| `alert_email_map` | `map` | **Yes** | Monitoring notification recipients. | + +## State Management +State is managed remotely in GCS to ensure consistency and locking. +```hcl +terraform { + backend "gcs" { + bucket = "operations-terraform-state-vault-2026" + prefix = "terraform/state" + } +} +``` diff --git a/tests/test_assembly_stage.py b/tests/test_assembly_stage.py index 051b6d1..b690b29 100644 --- a/tests/test_assembly_stage.py +++ b/tests/test_assembly_stage.py @@ -250,7 +250,7 @@ def test_merge_data_aggregates_duplicates( def test_derived_fields_correctness(valid_derived_df): - result = derive_fields(valid_derived_df, "20230101T120000") + result = derive_fields(valid_derived_df) if isinstance(result, pl.LazyFrame): result = result.collect() @@ -258,7 +258,6 @@ def test_derived_fields_correctness(valid_derived_df): assert result["lead_time_days"].to_list() == [3, 5] assert result["approval_lag_days"].to_list() == [1, 1] assert result["delivery_delay_days"].to_list() == [1, 1] - assert result.select(pl.col("run_id").unique()).item() == "20230101T120000" assert "order_year_week" in result.columns @@ -358,22 +357,14 @@ def test_assemble_data_fails_on_missing_column( def test_dimension_references_uniqueness(): df = pl.DataFrame({"id": ["1", "1", "2"], "val": ["a", "a", "b"]}) - result = dimension_references(df.lazy(), "test", ["id"], ["id", "val"]) + result = dimension_references(df.lazy(), ["id"], ["id", "val"]) if isinstance(result, pl.LazyFrame): result = result.collect() assert result.height == 2 df_conflict = pl.DataFrame({"id": ["1", "1"], "val": ["a", "b"]}) - result = dimension_references(df_conflict.lazy(), "test", ["id"], ["id", "val"]) + result = dimension_references(df_conflict.lazy(), ["id"], ["id", "val"]) if isinstance(result, pl.LazyFrame): result = result.collect() assert result.height == 1 - - -def test_dimension_references_fails_if_cols_missing(): - df = pl.DataFrame({"id": ["1"]}) - from polars.exceptions import ColumnNotFoundError - - with pytest.raises((KeyError, ColumnNotFoundError)): - dimension_references(df.lazy(), "test", ["id"], ["id", "missing"]) diff --git a/tests/test_semantic_stage.py b/tests/test_semantic_stage.py index b27c571..9ba084c 100644 --- a/tests/test_semantic_stage.py +++ b/tests/test_semantic_stage.py @@ -166,21 +166,6 @@ def test_seller_semantic_model_grain_preserved_success(tmp_path, valid_assembled assert dim_df.height == expected_dim_len -def test_seller_semantic_fails_on_multiple_run_ids(tmp_path, valid_assembled_df): - run_context = RunContext.create(base=tmp_path, run_id="20230101T120000") - # Clone and modify run_id - broken_df = valid_assembled_df.clone() - broken_df = broken_df.with_columns( - pl.when(pl.Series([False, True])) - .then(pl.lit("another_run").cast(pl.Categorical)) - .otherwise(pl.col("run_id")) - .alias("run_id") - ) - - with pytest.raises(RuntimeError, match="Multiple run_ids detected"): - build_seller_semantic(broken_df.lazy(), run_context) - - # ============================================================================= # BUILD BI SEMANTIC # ============================================================================= @@ -220,34 +205,6 @@ def test_build_semantic_layer_success( assert outputs_path.exists() -def test_build_semantic_layer_fails_on_multiple_ids(tmp_path, valid_assembled_df): - run_id = "20230101T120000" - run_context = RunContext.create(base=tmp_path, run_id=run_id) - run_context.initialize_directories() - - # Clone and modify run_id for Polars - broken_assembled = valid_assembled_df.clone() - broken_assembled = broken_assembled.with_columns( - pl.when(pl.Series([False, True])) - .then(pl.lit("another_run").cast(pl.Categorical)) - .otherwise(pl.col("run_id")) - .alias("run_id") - ) - - broken_assembled.write_parquet( - run_context.assembled_path / "assembled_events_2023_01_01.parquet" - ) - - report = build_semantic_layer(run_context) - - assert report["status"] == "failed" - assert ( - report["modules"]["seller_semantic"]["seller_weekly_fact"]["build_stage"] - == False - ) - assert any("Multiple run_ids detected" in error for error in report["errors"]) - - def test_build_semantic_layer_fails_on_missing_columns(tmp_path, valid_assembled_df): run_id = "20230101T120000" run_context = RunContext.create(base=tmp_path, run_id=run_id)