Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
<img src="https://okdp.io/logos/okdp-notext.svg" height="20px" style="margin: 0 2px;" />
</a>

A collection of hands-on examples, helper utilities, Jupyter notebooks, and data workflows showcasing how to work with the [OKDP Platform](https://okdp.io/).
This repository is meant to help you explore OKDP capabilities around compute, object storage, data catalog, SQL engines, Spark, and analytics.
A collection of hands-on examples, helper utilities, Jupyter notebooks, Airflow DAGs, and data workflows showcasing how to work with the [OKDP Platform](https://okdp.io/).
This repository is meant to help you explore OKDP capabilities around compute, object storage, data catalog, SQL engines, Spark, workflow orchestration, and analytics.

Over time, these examples will be extended with lakehouse-oriented features, such as:

Expand Down Expand Up @@ -39,6 +39,16 @@ A PySpark notebook is included to showcase Spark-native exploratory data analysi

Use Apache Superset (SQL Lab) to query Trino and build visualizations/dashboards on top of the same datasets.

# Airflow

The [airflow/](./airflow/) directory contains example DAGs orchestrated by Apache Airflow on the OKDP platform. They demonstrate how to:

- Submit Spark jobs to **Spark Operator** via `SparkApplication` custom resources from a DAG.
- Build daily ETL pipelines reading from and writing to S3-compatible storage (SeaweedFS).
- Use Airflow `gitSync` to pull DAGs directly from this repository at runtime.

See [`airflow/README.md`](./airflow/README.md) for the full list of DAGs and quick-start instructions.

# Running the examples:

Using [okdp-ui](https://github.com/OKDP/okdp-sandbox), deploy the following components:
Expand All @@ -48,6 +58,7 @@ Using [okdp-ui](https://github.com/OKDP/okdp-sandbox), deploy the following comp
- Interactive Query: [Trino](https://trino.io/)
- Notebooks: [Jupyter](https://jupyter.org/)
- DataViz: [Apache Superset](https://superset.apache.org/)
- Workflow orchestration: [Apache Airflow](https://airflow.apache.org/)
- Applications: [okdp-examples](https://okdp.io)

# About the datasets
Expand Down
73 changes: 73 additions & 0 deletions airflow/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# Virtual environments
.venv/
venv/
ENV/
env/
.virtualenv/

# Environment variables
.env
.env.local
.env.*.local

# Airflow
logs/
*.log
airflow.db
airflow.cfg
airflow-webserver.pid
standalone_admin_password.txt

# IDE
.vscode/
.idea/
*.swp
*.swo
*~
.DS_Store

# Testing
.pytest_cache/
.coverage
htmlcov/
.tox/
.hypothesis/

# Jupyter
.ipynb_checkpoints/
*.ipynb

# Spark
spark-warehouse/
derby.log
metastore_db/

# Helm
*.tgz
.helmignore
charts/
118 changes: 118 additions & 0 deletions airflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Airflow Examples

Apache Airflow DAGs and helpers showcasing how to orchestrate Spark jobs and
data workflows on the OKDP platform.

These DAGs are automatically pulled into the Airflow scheduler by the
`gitSync` sidecar configured in the
[okdp-sandbox Airflow package](https://github.com/OKDP/okdp-sandbox/blob/main/packages/okdp-packages/airflow/airflow.yaml)
(see `dagGitRepo` / `dagGitSubPath`). Any change pushed to `main` is reflected
in the scheduler within ~60 seconds.

## Available DAGs

| DAG | Description |
|---|---|
| `hello_world` | Minimal DAG, validates scheduler/worker connectivity |
| `hello_daily` | Same as above, scheduled daily |
| `spark_pi_example` | Submits the canonical Spark Pi job via `SparkApplication` |
| `orders_etl_daily` | Daily Spark ETL with dynamic ConfigMap-based script injection |
| `nyc_taxi_pipeline` | Reads NYC taxi data from S3, transforms with Spark, writes back |

## Running the NYC Taxi pipeline

The `nyc_taxi_pipeline` DAG requires a one-time setup (ConfigMap + S3 dataset):

```bash
# 1. Deploy the Spark ETL ConfigMap
./airflow/deploy_nyc_taxi.sh

# 2. Open the Airflow UI and trigger the DAG `nyc_taxi_pipeline`
open https://airflow.okdp.sandbox

# 3. Verify the results in SeaweedFS S3
kubectl run --rm -it s3-check --image=amazon/aws-cli:latest --restart=Never \
--command -- aws --endpoint-url http://seaweedfs-pmj3xs-s3.default.svc.cluster.local:8333 \
--no-verify-ssl s3 ls s3://okdp/examples/data/processed/nyc_taxi/ --recursive
```

## Architecture (NYC Taxi pipeline)

```
Airflow DAG (PythonOperator)
→ SparkApplication (Spark Operator)
→ Spark Driver + Executors
→ Read: s3a://okdp/examples/data/raw/tripdata/yellow/ (11M+ rows)
→ Clean + Aggregate (168 rows: 24h × 7 days)
→ Write: s3a://okdp/examples/data/processed/nyc_taxi/yellow/run_id=.../nyc_taxi_aggregated.csv
```

## Datasets

NYC Yellow Taxi data is already provisioned in SeaweedFS by the
`okdp-examples` Helm chart at deployment time:

```
s3://okdp/examples/data/raw/tripdata/yellow/
├── month=2025-01/yellow_tripdata_2025-01.parquet (59 MB)
├── month=2025-02/yellow_tripdata_2025-02.parquet (60 MB)
└── month=2025-03/yellow_tripdata_2025-03.parquet (70 MB)
```

No manual download required.

## Pipeline steps (NYC Taxi)

1. **Read** — 3 months of Parquet data from S3 (11M+ rows)
2. **Clean** — Filter invalid trips (fare ≤ 0, distance ≤ 0, etc.)
3. **Aggregate** — Group by hour and day-of-week (168 rows)
4. **Write** — Upload aggregated CSV to SeaweedFS via the JVM AWS SDK

> **Note**: writes use the JVM S3 SDK (not the Hadoop FileOutputCommitter)
> to work around a SeaweedFS `copyObject` quirk.

## Useful commands

```bash
# SparkApplication status
kubectl get sparkapplications -n default

# Spark driver logs
kubectl logs -n default -l spark-role=driver --tail=50

# List Airflow DAG runs
kubectl exec -n default deploy/airflow-main-scheduler -c scheduler -- \
airflow dags list-runs -d nyc_taxi_pipeline -o plain
```

## Repository structure

```
airflow/
├── README.md
├── deploy_nyc_taxi.sh
├── dags/
│ ├── hello_world.py
│ ├── hello_daily.py
│ ├── spark_pi_example.py
│ ├── orders_etl_daily.py
│ ├── nyc_taxi_pipeline.py
│ └── spark_jobs/
│ └── orders_etl_job.py
├── manifests/
│ └── nyc-taxi-etl-configmap.yaml
└── tests/
├── test_dags.py
└── run_integration_tests.sh
```

## License

Apache 2.0

---

**Built 🚀 for the OKDP Community**
<a href="https://okdp.io">
<img src="https://okdp.io/logos/okdp-notext.svg" height="20px" style="margin: 0 2px;" />
</a>
10 changes: 10 additions & 0 deletions airflow/dags/.airflowignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Ignore local artifacts and non-DAG files (Airflow expects regex patterns)
^__pycache__/.*$
.*\.pyc$
.*\.pyo$
.*\.md$
^test_.*\.py$
.*_test\.py$
^spark_jobs/.*$
^hello_world\.py$
^spark_pi_example\.py$
35 changes: 35 additions & 0 deletions airflow/dags/hello_daily.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Minimal daily smoke-test DAG."""

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator


default_args = {
"owner": "airflow",
"start_date": datetime(2026, 1, 1),
"retries": 1,
"retry_delay": timedelta(minutes=2),
}


def log_hello() -> str:
message = "Hello from Airflow daily smoke-test"
print(message)
return message


with DAG(
dag_id="hello_daily",
default_args=default_args,
description="Simple DAG that validates scheduler/task execution every day",
schedule="0 0 * * *",
catchup=False,
tags=["example", "smoke", "daily"],
) as dag:
hello_task = PythonOperator(
task_id="log_hello",
python_callable=log_hello,
)

34 changes: 34 additions & 0 deletions airflow/dags/hello_world.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Simple DAG scheduled every day at midnight (UTC)."""

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator


default_args = {
"owner": "airflow",
"start_date": datetime(2026, 1, 1),
"retries": 1,
"retry_delay": timedelta(minutes=2),
}


def print_hello() -> str:
message = "Hello World from Airflow on OKDP"
print(message)
return message


with DAG(
dag_id="hello_world_midnight",
default_args=default_args,
description="Hello World DAG that runs daily at 00:00 UTC",
schedule="0 0 * * *",
catchup=False,
tags=["example", "python", "okdp"],
) as dag:
hello_task = PythonOperator(
task_id="hello_world_task",
python_callable=print_hello,
)
Loading
Loading