From 384a157a2fc7483d3f744d7bc7150b6c482c67ed Mon Sep 17 00:00:00 2001 From: Abir Oumghar Date: Fri, 3 Apr 2026 20:01:27 +0200 Subject: [PATCH 1/5] feat: integrate Airflow Add Airflow package, DAG examples, OIDC configuration, and Spark operator integration. --- .gitignore | 73 ++++++ README.md | 81 +++++++ dags/.airflowignore | 10 + dags/hello_daily.py | 35 +++ dags/hello_world.py | 34 +++ dags/nyc_taxi_pipeline.py | 188 +++++++++++++++ dags/orders_etl_daily.py | 324 ++++++++++++++++++++++++++ dags/spark_jobs/orders_etl_job.py | 183 +++++++++++++++ dags/spark_pi_example.py | 121 ++++++++++ deploy_nyc_taxi.sh | 83 +++++++ manifests/nyc-taxi-etl-configmap.yaml | 150 ++++++++++++ tests/run_integration_tests.sh | 46 ++++ tests/test_dags.py | 41 ++++ 13 files changed, 1369 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 dags/.airflowignore create mode 100644 dags/hello_daily.py create mode 100644 dags/hello_world.py create mode 100644 dags/nyc_taxi_pipeline.py create mode 100644 dags/orders_etl_daily.py create mode 100644 dags/spark_jobs/orders_etl_job.py create mode 100644 dags/spark_pi_example.py create mode 100755 deploy_nyc_taxi.sh create mode 100644 manifests/nyc-taxi-etl-configmap.yaml create mode 100755 tests/run_integration_tests.sh create mode 100644 tests/test_dags.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c7c78ae --- /dev/null +++ b/.gitignore @@ -0,0 +1,73 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Virtual environments +.venv/ +venv/ +ENV/ +env/ +.virtualenv/ + +# Environment variables +.env +.env.local +.env.*.local + +# Airflow +logs/ +*.log +airflow.db +airflow.cfg +airflow-webserver.pid +standalone_admin_password.txt + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ +.DS_Store + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +.tox/ +.hypothesis/ + +# Jupyter +.ipynb_checkpoints/ +*.ipynb + +# Spark +spark-warehouse/ +derby.log +metastore_db/ + +# Helm +*.tgz +.helmignore +charts/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..fd411ec --- /dev/null +++ b/README.md @@ -0,0 +1,81 @@ +# NYC Taxi Pipeline - Airflow + Spark + +Pipeline ETL utilisant Airflow pour orchestrer un job Spark sur des données NYC Taxi (11M+ lignes) stockées dans SeaweedFS S3. + +## Démarrage rapide + +```bash +# 1. Déployer le ConfigMap Spark ETL + le DAG +./examples/airflow/deploy_nyc_taxi.sh + +# 2. Ouvrir Airflow et lancer le DAG "nyc_taxi_spark_pipeline" +open https://airflow.okdp.sandbox + +# 3. Vérifier les résultats dans SeaweedFS S3 +kubectl run --rm -it s3-check --image=amazon/aws-cli:latest --restart=Never \ + --command -- aws --endpoint-url http://seaweedfs-pmj3xs-s3.default.svc.cluster.local:8333 \ + --no-verify-ssl s3 ls s3://okdp/examples/data/processed/nyc_taxi/ --recursive +``` + +## Architecture + +``` +Airflow DAG (PythonOperator) + → SparkApplication (Spark Operator) + → Spark Driver + Executor + → Lecture: s3a://okdp/examples/data/raw/tripdata/yellow/ (11M lignes) + → Nettoyage + Agrégation (168 lignes: 24h × 7 jours) + → Écriture: s3a://okdp/examples/data/processed/nyc_taxi/yellow/run_id=.../nyc_taxi_aggregated.csv +``` + +## Données + +Données NYC Yellow Taxi déjà présentes dans SeaweedFS (package `okdp-examples`) : + +``` +s3://okdp/examples/data/raw/tripdata/yellow/ +├── month=2025-01/yellow_tripdata_2025-01.parquet (59 MB) +├── month=2025-02/yellow_tripdata_2025-02.parquet (60 MB) +└── month=2025-03/yellow_tripdata_2025-03.parquet (70 MB) +``` + +Aucun téléchargement requis. + +## Le pipeline + +1. **Lecture** — Lit les 3 mois de données Parquet depuis S3 (11M+ lignes) +2. **Nettoyage** — Filtre les courses invalides (fare ≤ 0, distance ≤ 0, etc.) +3. **Agrégation** — Regroupe par heure et jour de la semaine (168 lignes) +4. **Écriture** — Upload le CSV agrégé dans SeaweedFS via le SDK AWS Java + +> **Note** : L'écriture utilise le JVM S3 SDK (pas le Hadoop FileOutputCommitter) pour contourner un bug `copyObject` de SeaweedFS. + +## Commandes utiles + +```bash +# Statut du SparkApplication +kubectl get sparkapplications -n default + +# Logs du driver Spark +kubectl logs -n default -l spark-role=driver --tail=50 + +# Lister les DAG runs Airflow +kubectl exec -n default deploy/airflow-main-scheduler -c scheduler -- \ + airflow dags list-runs -d nyc_taxi_spark_pipeline -o plain +``` + +## Structure + +``` +examples/airflow/ +├── README.md +├── deploy_nyc_taxi.sh # Script de déploiement +├── dags/ +│ └── nyc_taxi_pipeline.py # DAG Airflow (PythonOperator + K8s API) +└── manifests/ + └── nyc-taxi-etl-configmap.yaml # Code PySpark ETL +``` + +## License + +Apache 2.0 diff --git a/dags/.airflowignore b/dags/.airflowignore new file mode 100644 index 0000000..34c5e49 --- /dev/null +++ b/dags/.airflowignore @@ -0,0 +1,10 @@ +# Ignore local artifacts and non-DAG files (Airflow expects regex patterns) +^__pycache__/.*$ +.*\.pyc$ +.*\.pyo$ +.*\.md$ +^test_.*\.py$ +.*_test\.py$ +^spark_jobs/.*$ +^hello_world\.py$ +^spark_pi_example\.py$ diff --git a/dags/hello_daily.py b/dags/hello_daily.py new file mode 100644 index 0000000..fbe4a26 --- /dev/null +++ b/dags/hello_daily.py @@ -0,0 +1,35 @@ +"""Minimal daily smoke-test DAG.""" + +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.operators.python import PythonOperator + + +default_args = { + "owner": "airflow", + "start_date": datetime(2026, 1, 1), + "retries": 1, + "retry_delay": timedelta(minutes=2), +} + + +def log_hello() -> str: + message = "Hello from Airflow daily smoke-test" + print(message) + return message + + +with DAG( + dag_id="hello_daily", + default_args=default_args, + description="Simple DAG that validates scheduler/task execution every day", + schedule="0 0 * * *", + catchup=False, + tags=["example", "smoke", "daily"], +) as dag: + hello_task = PythonOperator( + task_id="log_hello", + python_callable=log_hello, + ) + diff --git a/dags/hello_world.py b/dags/hello_world.py new file mode 100644 index 0000000..5798390 --- /dev/null +++ b/dags/hello_world.py @@ -0,0 +1,34 @@ +"""Simple DAG scheduled every day at midnight (UTC).""" + +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.operators.python import PythonOperator + + +default_args = { + "owner": "airflow", + "start_date": datetime(2026, 1, 1), + "retries": 1, + "retry_delay": timedelta(minutes=2), +} + + +def print_hello() -> str: + message = "Hello World from Airflow on OKDP" + print(message) + return message + + +with DAG( + dag_id="hello_world_midnight", + default_args=default_args, + description="Hello World DAG that runs daily at 00:00 UTC", + schedule="0 0 * * *", + catchup=False, + tags=["example", "python", "okdp"], +) as dag: + hello_task = PythonOperator( + task_id="hello_world_task", + python_callable=print_hello, + ) diff --git a/dags/nyc_taxi_pipeline.py b/dags/nyc_taxi_pipeline.py new file mode 100644 index 0000000..21effd9 --- /dev/null +++ b/dags/nyc_taxi_pipeline.py @@ -0,0 +1,188 @@ +""" +NYC Taxi Pipeline - Airflow + Spark Operator +Utilise l'API Kubernetes Python pour soumettre un SparkApplication. +""" +import os +import re +import time +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.operators.python import PythonOperator +from kubernetes import client, config +from kubernetes.client.exceptions import ApiException + +NAMESPACE = os.getenv("AIRFLOW_NAMESPACE", "default") +SPARK_APP_GROUP = "sparkoperator.k8s.io" +SPARK_APP_VERSION = "v1beta2" +SPARK_APP_PLURAL = "sparkapplications" +SPARK_IMAGE = "quay.io/okdp/spark-py:spark-3.5.6-python-3.11-scala-2.12-java-17" +SPARK_SERVICE_ACCOUNT = "spark" +S3_CREDENTIALS_SECRET = "creds-examples-s3" +S3_ACCESS_KEY_FIELD = "S3_ACCESS_KEY" +S3_SECRET_KEY_FIELD = "S3_SECRET_KEY" +CONFIGMAP_NAME = "nyc-taxi-etl-code" +SCRIPT_MOUNT_DIR = "/opt/spark/app" +SCRIPT_FILE_NAME = "nyc_taxi_etl.py" + +# Input/Output S3 +S3_INPUT = "s3a://okdp/examples/data/raw/tripdata/yellow/" +S3_OUTPUT_BASE = "s3a://okdp/examples/data/processed/nyc_taxi/yellow" + +default_args = { + "owner": "data-team", + "depends_on_past": False, + "start_date": datetime(2024, 1, 1), + "retries": 0, + "retry_delay": timedelta(minutes=2), +} + + +def _safe_name(prefix, suffix, max_len=63): + raw = f"{prefix}-{suffix}" + normalized = re.sub(r"[^a-z0-9-]", "-", raw.lower()).strip("-") + return normalized[:max_len].rstrip("-") + + +def _discover_s3_endpoint(core_api): + """Discover SeaweedFS S3 endpoint in-cluster.""" + try: + services = core_api.list_namespaced_service(namespace=NAMESPACE).items + for svc in services: + name = (svc.metadata.name or "").strip() + if re.match(r"^seaweedfs-[a-z0-9-]+-s3$", name): + return f"http://{name}.{NAMESPACE}.svc.cluster.local:8333" + except ApiException: + pass + return f"https://seaweedfs-seaweedfs-{NAMESPACE}.okdp.sandbox" + + +def _delete_if_exists(custom_api, app_name): + try: + custom_api.delete_namespaced_custom_object( + group=SPARK_APP_GROUP, + version=SPARK_APP_VERSION, + namespace=NAMESPACE, + plural=SPARK_APP_PLURAL, + name=app_name, + ) + time.sleep(2) + except ApiException as exc: + if exc.status != 404: + raise + + +def submit_and_wait_nyc_taxi_etl(run_suffix, timeout_seconds=1200): + """Submit SparkApplication and wait for completion.""" + config.load_incluster_config() + core_api = client.CoreV1Api() + custom_api = client.CustomObjectsApi() + + app_name = _safe_name("nyc-taxi-etl", run_suffix) + s3_endpoint = _discover_s3_endpoint(core_api) + ssl_enabled = str(s3_endpoint.lower().startswith("https://")).lower() + output_uri = f"{S3_OUTPUT_BASE}/run_id={run_suffix}" + + _delete_if_exists(custom_api, app_name) + + body = { + "apiVersion": f"{SPARK_APP_GROUP}/{SPARK_APP_VERSION}", + "kind": "SparkApplication", + "metadata": {"name": app_name, "namespace": NAMESPACE}, + "spec": { + "type": "Python", + "mode": "cluster", + "image": SPARK_IMAGE, + "imagePullPolicy": "IfNotPresent", + "mainApplicationFile": f"local://{SCRIPT_MOUNT_DIR}/{SCRIPT_FILE_NAME}", + "arguments": [ + "--input", S3_INPUT, + "--output", output_uri, + "--date", run_suffix, + ], + "sparkVersion": "3.5.6", + "restartPolicy": {"type": "Never"}, + "timeToLiveSeconds": 3600, + "sparkConf": { + "spark.hadoop.fs.s3a.endpoint": s3_endpoint, + "spark.hadoop.fs.s3a.path.style.access": "true", + "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.hadoop.fs.s3a.connection.ssl.enabled": ssl_enabled, + # Fix for SeaweedFS: write to local then upload via script + "spark.hadoop.fs.s3a.fast.upload": "true", + "spark.hadoop.fs.s3a.fast.upload.buffer": "bytebuffer", + }, + "volumes": [ + {"name": "etl-script", "configMap": {"name": CONFIGMAP_NAME}}, + ], + "driver": { + "cores": 1, + "memory": "1g", + "serviceAccount": SPARK_SERVICE_ACCOUNT, + "labels": {"workload": "nyc-taxi-etl"}, + "volumeMounts": [{"name": "etl-script", "mountPath": SCRIPT_MOUNT_DIR}], + "envSecretKeyRefs": { + "AWS_ACCESS_KEY_ID": {"name": S3_CREDENTIALS_SECRET, "key": S3_ACCESS_KEY_FIELD}, + "AWS_SECRET_ACCESS_KEY": {"name": S3_CREDENTIALS_SECRET, "key": S3_SECRET_KEY_FIELD}, + }, + "env": [{"name": "S3_ENDPOINT", "value": s3_endpoint}], + }, + "executor": { + "instances": 1, + "cores": 1, + "memory": "1g", + "labels": {"workload": "nyc-taxi-etl"}, + "envSecretKeyRefs": { + "AWS_ACCESS_KEY_ID": {"name": S3_CREDENTIALS_SECRET, "key": S3_ACCESS_KEY_FIELD}, + "AWS_SECRET_ACCESS_KEY": {"name": S3_CREDENTIALS_SECRET, "key": S3_SECRET_KEY_FIELD}, + }, + "env": [{"name": "S3_ENDPOINT", "value": s3_endpoint}], + }, + }, + } + + custom_api.create_namespaced_custom_object( + group=SPARK_APP_GROUP, + version=SPARK_APP_VERSION, + namespace=NAMESPACE, + plural=SPARK_APP_PLURAL, + body=body, + ) + + deadline = time.time() + timeout_seconds + last_state = "SUBMITTED" + while time.time() < deadline: + app = custom_api.get_namespaced_custom_object( + group=SPARK_APP_GROUP, + version=SPARK_APP_VERSION, + namespace=NAMESPACE, + plural=SPARK_APP_PLURAL, + name=app_name, + ) + last_state = ( + app.get("status", {}) + .get("applicationState", {}) + .get("state", "SUBMITTED") + ) + if last_state == "COMPLETED": + return f"Spark ETL terminé: {app_name}" + if last_state in {"FAILED", "SUBMISSION_FAILED", "UNKNOWN"}: + raise RuntimeError(f"Spark ETL échoué: {app_name} state={last_state}") + time.sleep(10) + + raise TimeoutError(f"Spark ETL timeout après {timeout_seconds}s: {app_name} state={last_state}") + + +with DAG( + dag_id="nyc_taxi_spark_pipeline", + default_args=default_args, + description="NYC Taxi Spark ETL pipeline", + schedule=None, + catchup=False, + tags=["nyc-taxi", "spark", "etl"], +) as dag: + run_etl = PythonOperator( + task_id="submit_and_wait_nyc_taxi_etl", + python_callable=submit_and_wait_nyc_taxi_etl, + op_kwargs={"run_suffix": "{{ ts_nodash | lower }}"}, + ) diff --git a/dags/orders_etl_daily.py b/dags/orders_etl_daily.py new file mode 100644 index 0000000..3b439fb --- /dev/null +++ b/dags/orders_etl_daily.py @@ -0,0 +1,324 @@ +"""Submit and monitor a daily Spark ETL job through Spark Operator.""" + +from __future__ import annotations + +import base64 +import os +import re +import time +from datetime import datetime, timedelta +from pathlib import Path + +from airflow import DAG +from airflow.operators.python import PythonOperator +from kubernetes import client, config +from kubernetes.client.exceptions import ApiException + + +NAMESPACE = os.getenv("AIRFLOW_NAMESPACE", "default") +SPARK_APP_GROUP = "sparkoperator.k8s.io" +SPARK_APP_VERSION = "v1beta2" +SPARK_APP_PLURAL = "sparkapplications" +SPARK_IMAGE = "quay.io/okdp/spark-py:spark-3.5.6-python-3.11-scala-2.12-java-17" +SCRIPT_FILE_NAME = "orders_etl_job.py" +SCRIPT_FILE_PATH = Path(__file__).parent / "spark_jobs" / SCRIPT_FILE_NAME +SCRIPT_MOUNT_DIR = "/opt/spark/app" +SCRIPT_MOUNT_PATH = f"{SCRIPT_MOUNT_DIR}/{SCRIPT_FILE_NAME}" +SPARK_SERVICE_ACCOUNT = "spark" +S3_CREDENTIALS_SECRET = "creds-airflow-s3" +S3_ACCESS_KEY_FIELD = "accessKey" +S3_SECRET_KEY_FIELD = "secretKey" +DEFAULT_INGRESS_SUFFIX = "okdp.sandbox" +DEFAULT_S3_BUCKET = "airflow-logs" +DEFAULT_S3_INPUT_PREFIX = "orders/raw" +DEFAULT_S3_OUTPUT_PREFIX = "orders/curated" +S3_ENDPOINT_ENV_VAR = "AIRFLOW_ETL_S3_ENDPOINT" +S3_BUCKET_ENV_VAR = "AIRFLOW_ETL_S3_BUCKET" +S3_INPUT_PREFIX_ENV_VAR = "AIRFLOW_ETL_S3_INPUT_PREFIX" +S3_OUTPUT_PREFIX_ENV_VAR = "AIRFLOW_ETL_S3_OUTPUT_PREFIX" +INGRESS_SUFFIX_ENV_VAR = "AIRFLOW_INGRESS_SUFFIX" +S3_VERIFY_SSL_ENV_VAR = "AIRFLOW_ETL_S3_VERIFY_SSL" +S3_REGION_ENV_VAR = "AIRFLOW_ETL_S3_REGION" + + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime(2026, 1, 1), + "retries": 0, + "retry_delay": timedelta(minutes=2), +} + + +def _safe_k8s_name(prefix: str, suffix: str, max_len: int = 63) -> str: + raw = f"{prefix}-{suffix}" + normalized = re.sub(r"[^a-z0-9-]", "-", raw.lower()).strip("-") + if len(normalized) <= max_len: + return normalized + return normalized[:max_len].rstrip("-") + + +def _load_spark_script() -> str: + if not SCRIPT_FILE_PATH.is_file(): + raise FileNotFoundError(f"Spark ETL script not found: {SCRIPT_FILE_PATH}") + return SCRIPT_FILE_PATH.read_text(encoding="utf-8") + + +def _clean_prefix(value: str, default_value: str) -> str: + normalized = (value or default_value).strip().strip("/") + return normalized or default_value + + +def _discover_seaweedfs_s3_endpoint(core_api: client.CoreV1Api) -> str: + env_endpoint = os.getenv(S3_ENDPOINT_ENV_VAR, "").strip().rstrip("/") + if env_endpoint: + return env_endpoint + + # Prefer in-cluster SeaweedFS S3 service when available. + try: + services = core_api.list_namespaced_service(namespace=NAMESPACE).items + candidates = [] + for svc in services: + service_name = (svc.metadata.name or "").strip() + if re.match(r"^seaweedfs-[a-z0-9-]+-s3$", service_name): + candidates.append(service_name) + if candidates: + chosen = sorted(candidates)[0] + return f"http://{chosen}.{NAMESPACE}.svc.cluster.local:8333" + except ApiException: + pass + + ingress_suffix = os.getenv(INGRESS_SUFFIX_ENV_VAR, DEFAULT_INGRESS_SUFFIX).strip() + if not ingress_suffix: + ingress_suffix = DEFAULT_INGRESS_SUFFIX + return f"https://seaweedfs-seaweedfs-{NAMESPACE}.{ingress_suffix}" + + +def _resolve_s3_locations(core_api: client.CoreV1Api) -> tuple[str, str, str]: + bucket = ( + os.getenv(S3_BUCKET_ENV_VAR, "").strip() + or os.getenv("AIRFLOW_DAGS_S3_BUCKET", "").strip() + or DEFAULT_S3_BUCKET + ) + input_prefix = _clean_prefix(os.getenv(S3_INPUT_PREFIX_ENV_VAR, ""), DEFAULT_S3_INPUT_PREFIX) + output_prefix = _clean_prefix(os.getenv(S3_OUTPUT_PREFIX_ENV_VAR, ""), DEFAULT_S3_OUTPUT_PREFIX) + + s3_endpoint = _discover_seaweedfs_s3_endpoint(core_api=core_api) + s3_input_uri = f"s3a://{bucket}/{input_prefix}" + s3_output_uri_base = f"s3a://{bucket}/{output_prefix}" + return bucket, s3_endpoint, s3_input_uri, s3_output_uri_base + + +def _bool_env(env_name: str, default_value: bool) -> bool: + raw_value = os.getenv(env_name) + if raw_value is None: + return default_value + return raw_value.strip().lower() in {"1", "true", "yes", "on"} + + +def _ensure_s3_bucket_exists(core_api: client.CoreV1Api, s3_endpoint: str, bucket: str) -> None: + try: + secret = core_api.read_namespaced_secret(name=S3_CREDENTIALS_SECRET, namespace=NAMESPACE) + except ApiException as exc: + raise RuntimeError( + f"Unable to read S3 credentials secret {S3_CREDENTIALS_SECRET} in namespace {NAMESPACE}" + ) from exc + + secret_data = secret.data or {} + access_key_b64 = secret_data.get(S3_ACCESS_KEY_FIELD, "") + secret_key_b64 = secret_data.get(S3_SECRET_KEY_FIELD, "") + if not access_key_b64 or not secret_key_b64: + raise RuntimeError( + f"S3 credentials secret {S3_CREDENTIALS_SECRET} is missing keys " + f"{S3_ACCESS_KEY_FIELD}/{S3_SECRET_KEY_FIELD}" + ) + + access_key = base64.b64decode(access_key_b64).decode("utf-8") + secret_key = base64.b64decode(secret_key_b64).decode("utf-8") + + verify_ssl = _bool_env( + S3_VERIFY_SSL_ENV_VAR, + default_value=s3_endpoint.lower().startswith("https://"), + ) + s3_region = os.getenv(S3_REGION_ENV_VAR, "us-east-1") + + import boto3 + from botocore.exceptions import ClientError + + s3_client = boto3.client( + "s3", + endpoint_url=s3_endpoint, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name=s3_region, + verify=verify_ssl, + ) + + try: + s3_client.head_bucket(Bucket=bucket) + return + except ClientError as exc: + error_code = str(exc.response.get("Error", {}).get("Code", "")) + if error_code not in {"404", "NoSuchBucket", "NotFound"}: + raise RuntimeError( + f"Unable to access bucket {bucket} on endpoint {s3_endpoint}: {exc}" + ) from exc + + s3_client.create_bucket(Bucket=bucket) + + +def _upsert_config_map(core_api: client.CoreV1Api, name: str, script_content: str) -> None: + body = client.V1ConfigMap( + metadata=client.V1ObjectMeta( + name=name, + namespace=NAMESPACE, + labels={"app": "orders-etl", "managed-by": "airflow"}, + ), + data={SCRIPT_FILE_NAME: script_content}, + ) + try: + core_api.patch_namespaced_config_map(name=name, namespace=NAMESPACE, body=body) + except ApiException as exc: + if exc.status != 404: + raise + core_api.create_namespaced_config_map(namespace=NAMESPACE, body=body) + + +def _delete_if_exists(custom_api: client.CustomObjectsApi, app_name: str) -> None: + try: + custom_api.delete_namespaced_custom_object( + group=SPARK_APP_GROUP, + version=SPARK_APP_VERSION, + namespace=NAMESPACE, + plural=SPARK_APP_PLURAL, + name=app_name, + ) + time.sleep(2) + except ApiException as exc: + if exc.status != 404: + raise + + +def submit_and_wait_orders_etl(run_suffix: str, timeout_seconds: int = 1200) -> str: + config.load_incluster_config() + core_api = client.CoreV1Api() + custom_api = client.CustomObjectsApi() + + spark_app_name = _safe_k8s_name("orders-etl", run_suffix) + script_cm_name = _safe_k8s_name("orders-etl-script", run_suffix) + bucket, s3_endpoint, s3_input_uri, s3_output_uri_base = _resolve_s3_locations(core_api=core_api) + output_uri = f"{s3_output_uri_base}/run_id={run_suffix}" + ssl_enabled = str(s3_endpoint.lower().startswith("https://")).lower() + + _ensure_s3_bucket_exists(core_api=core_api, s3_endpoint=s3_endpoint, bucket=bucket) + script_content = _load_spark_script() + _upsert_config_map(core_api=core_api, name=script_cm_name, script_content=script_content) + _delete_if_exists(custom_api=custom_api, app_name=spark_app_name) + + body = { + "apiVersion": f"{SPARK_APP_GROUP}/{SPARK_APP_VERSION}", + "kind": "SparkApplication", + "metadata": {"name": spark_app_name, "namespace": NAMESPACE}, + "spec": { + "type": "Python", + "mode": "cluster", + "image": SPARK_IMAGE, + "imagePullPolicy": "IfNotPresent", + "mainApplicationFile": f"local://{SCRIPT_MOUNT_PATH}", + "arguments": [ + "--input-uri", + s3_input_uri, + "--output-uri", + output_uri, + "--run-id", + run_suffix, + ], + "sparkVersion": "3.5.6", + "restartPolicy": {"type": "Never"}, + "timeToLiveSeconds": 600, + "sparkConf": { + "spark.hadoop.fs.s3a.endpoint": s3_endpoint, + "spark.hadoop.fs.s3a.path.style.access": "true", + "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.hadoop.fs.s3a.connection.ssl.enabled": ssl_enabled, + }, + "volumes": [ + { + "name": "etl-script", + "configMap": {"name": script_cm_name}, + } + ], + "driver": { + "cores": 1, + "memory": "1g", + "serviceAccount": SPARK_SERVICE_ACCOUNT, + "labels": {"workload": "orders-etl", "version": "3.5.6"}, + "volumeMounts": [{"name": "etl-script", "mountPath": SCRIPT_MOUNT_DIR}], + "envSecretKeyRefs": { + "S3_ACCESS_KEY": {"name": S3_CREDENTIALS_SECRET, "key": S3_ACCESS_KEY_FIELD}, + "S3_SECRET_KEY": {"name": S3_CREDENTIALS_SECRET, "key": S3_SECRET_KEY_FIELD}, + }, + "env": [{"name": "S3_ENDPOINT", "value": s3_endpoint}], + }, + "executor": { + "instances": 1, + "cores": 1, + "memory": "1g", + "labels": {"workload": "orders-etl", "version": "3.5.6"}, + "envSecretKeyRefs": { + "S3_ACCESS_KEY": {"name": S3_CREDENTIALS_SECRET, "key": S3_ACCESS_KEY_FIELD}, + "S3_SECRET_KEY": {"name": S3_CREDENTIALS_SECRET, "key": S3_SECRET_KEY_FIELD}, + }, + "env": [{"name": "S3_ENDPOINT", "value": s3_endpoint}], + }, + }, + } + + custom_api.create_namespaced_custom_object( + group=SPARK_APP_GROUP, + version=SPARK_APP_VERSION, + namespace=NAMESPACE, + plural=SPARK_APP_PLURAL, + body=body, + ) + + deadline = time.time() + timeout_seconds + last_state = "SUBMITTED" + while time.time() < deadline: + app = custom_api.get_namespaced_custom_object( + group=SPARK_APP_GROUP, + version=SPARK_APP_VERSION, + namespace=NAMESPACE, + plural=SPARK_APP_PLURAL, + name=spark_app_name, + ) + last_state = ( + app.get("status", {}) + .get("applicationState", {}) + .get("state", "SUBMITTED") + ) + + if last_state == "COMPLETED": + return f"Spark ETL finished successfully: {spark_app_name}" + if last_state in {"FAILED", "SUBMISSION_FAILED", "UNKNOWN"}: + raise RuntimeError(f"Spark ETL failed for {spark_app_name} with state={last_state}") + time.sleep(10) + + raise TimeoutError( + f"Spark ETL timeout after {timeout_seconds}s for {spark_app_name} (last_state={last_state})" + ) + + +with DAG( + dag_id="orders_etl_daily", + default_args=default_args, + description="Daily Spark ETL workflow orchestrated by Airflow and Spark Operator", + schedule="0 0 * * *", + catchup=False, + tags=["etl", "spark", "daily", "orders"], +) as dag: + run_orders_etl = PythonOperator( + task_id="submit_and_wait_orders_etl", + python_callable=submit_and_wait_orders_etl, + op_kwargs={"run_suffix": "{{ ts_nodash | lower }}"}, + ) diff --git a/dags/spark_jobs/orders_etl_job.py b/dags/spark_jobs/orders_etl_job.py new file mode 100644 index 0000000..4f45d81 --- /dev/null +++ b/dags/spark_jobs/orders_etl_job.py @@ -0,0 +1,183 @@ +"""Spark ETL job for daily orders processing.""" + +from __future__ import annotations + +import argparse +import csv +import io +import os +from datetime import datetime +from typing import Any, Iterable +from urllib.parse import urlparse + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Run orders ETL with Spark") + parser.add_argument("--input-uri", required=True, help="Input URI (for example: s3a://bucket/path)") + parser.add_argument("--output-uri", required=True, help="Output URI for curated dataset") + parser.add_argument("--run-id", required=True, help="Unique ETL run identifier") + return parser.parse_args() + + +def _bootstrap_input_if_missing(spark: Any) -> Any: + from pyspark.sql.types import DoubleType, IntegerType, StringType, StructField, StructType + + sample_rows: Iterable[tuple[str, str, str, int, float]] = [ + ("o-1001", "c-001", "2026-02-01 08:10:00", 2, 19.90), + ("o-1002", "c-001", "2026-02-01 08:45:00", 1, 5.50), + ("o-1003", "c-002", "2026-02-01 09:20:00", 3, 12.00), + ("o-1004", "c-003", "2026-02-01 10:00:00", 1, 42.30), + ("o-1005", "c-002", "2026-02-01 11:15:00", 2, 7.25), + ] + schema = StructType( + [ + StructField("order_id", StringType(), False), + StructField("customer_id", StringType(), False), + StructField("order_ts", StringType(), False), + StructField("quantity", IntegerType(), False), + StructField("unit_price", DoubleType(), False), + ] + ) + + df = spark.createDataFrame(sample_rows, schema=schema) + return df + + +def _s3a_to_bucket_key(s3a_uri: str) -> tuple[str, str]: + if not s3a_uri.startswith("s3a://"): + raise ValueError(f"Unsupported URI scheme for output: {s3a_uri}") + s3_uri = s3a_uri.replace("s3a://", "s3://", 1) + parsed = urlparse(s3_uri) + bucket = parsed.netloc.strip() + key_prefix = parsed.path.lstrip("/") + if not bucket: + raise ValueError(f"Missing S3 bucket in URI: {s3a_uri}") + return bucket, key_prefix + + +def _upload_curated_csv_with_jvm_s3(curated_df: Any, output_uri: str, spark: Any) -> str: + endpoint = os.getenv("S3_ENDPOINT", "") + access_key = os.getenv("S3_ACCESS_KEY", "") + secret_key = os.getenv("S3_SECRET_KEY", "") + if not endpoint or not access_key or not secret_key: + raise RuntimeError("Missing S3 runtime credentials or endpoint for output upload") + + bucket, key_prefix = _s3a_to_bucket_key(output_uri) + object_key = f"{key_prefix.rstrip('/')}/curated.csv" if key_prefix else "curated.csv" + + rows = curated_df.orderBy("order_date", "customer_id").collect() + csv_buffer = io.StringIO() + writer = csv.writer(csv_buffer) + writer.writerow( + [ + "order_date", + "customer_id", + "orders_count", + "gross_amount", + "etl_run_id", + "processed_at_utc", + ] + ) + for row in rows: + writer.writerow( + [ + row["order_date"], + row["customer_id"], + row["orders_count"], + row["gross_amount"], + row["etl_run_id"], + row["processed_at_utc"], + ] + ) + + jvm = spark.sparkContext._jvm + gateway = spark.sparkContext._gateway + region = os.getenv("AWS_REGION", "us-east-1") + payload = csv_buffer.getvalue() + + credentials = jvm.com.amazonaws.auth.BasicAWSCredentials(access_key, secret_key) + credentials_provider = jvm.com.amazonaws.auth.AWSStaticCredentialsProvider(credentials) + endpoint_config = jvm.com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration( + endpoint, + region, + ) + s3_client = ( + jvm.com.amazonaws.services.s3.AmazonS3ClientBuilder.standard() + .withPathStyleAccessEnabled(True) + .withEndpointConfiguration(endpoint_config) + .withCredentials(credentials_provider) + .build() + ) + s3_client.putObject(bucket, object_key, payload) + + return f"s3a://{bucket}/{object_key}" + + +def main() -> None: + args = parse_args() + + from pyspark.sql import SparkSession + from pyspark.sql import functions as F + + spark = SparkSession.builder.appName(f"orders-etl-{args.run_id}").getOrCreate() + spark.sparkContext.setLogLevel("WARN") + + # Ensure S3A is configured from runtime env injected by SparkApplication. + access_key = os.getenv("S3_ACCESS_KEY", "") + secret_key = os.getenv("S3_SECRET_KEY", "") + endpoint = os.getenv("S3_ENDPOINT", "") + if access_key and secret_key: + hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration() + hadoop_conf.set("fs.s3a.access.key", access_key) + hadoop_conf.set("fs.s3a.secret.key", secret_key) + if endpoint: + hadoop_conf.set("fs.s3a.endpoint", endpoint) + hadoop_conf.set("fs.s3a.connection.ssl.enabled", str(endpoint.startswith("https://")).lower()) + hadoop_conf.set("fs.s3a.path.style.access", "true") + hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + + try: + raw_df = spark.read.option("header", "true").csv(args.input_uri) + if raw_df.rdd.isEmpty(): + raw_df = _bootstrap_input_if_missing(spark=spark) + except Exception: + raw_df = _bootstrap_input_if_missing(spark=spark) + + normalized_df = ( + raw_df.withColumn("order_id", F.col("order_id").cast("string")) + .withColumn("customer_id", F.col("customer_id").cast("string")) + .withColumn("order_ts", F.to_timestamp("order_ts")) + .withColumn("quantity", F.col("quantity").cast("int")) + .withColumn("unit_price", F.col("unit_price").cast("double")) + .fillna({"quantity": 1, "unit_price": 0.0}) + .withColumn("order_date", F.to_date("order_ts")) + .withColumn("line_amount", F.round(F.col("quantity") * F.col("unit_price"), 2)) + ) + + curated_df = ( + normalized_df.groupBy("order_date", "customer_id") + .agg( + F.countDistinct("order_id").alias("orders_count"), + F.round(F.sum("line_amount"), 2).alias("gross_amount"), + ) + .withColumn("etl_run_id", F.lit(args.run_id)) + .withColumn("processed_at_utc", F.lit(datetime.utcnow().isoformat())) + ) + + written_uri = _upload_curated_csv_with_jvm_s3( + curated_df=curated_df, + output_uri=args.output_uri, + spark=spark, + ) + + print(f"Orders ETL completed for run_id={args.run_id}") + print(f"Input URI: {args.input_uri}") + print(f"Output URI: {written_uri}") + print(f"Input rows: {normalized_df.count()}") + print(f"Curated rows: {curated_df.count()}") + + spark.stop() + + +if __name__ == "__main__": + main() diff --git a/dags/spark_pi_example.py b/dags/spark_pi_example.py new file mode 100644 index 0000000..c899bee --- /dev/null +++ b/dags/spark_pi_example.py @@ -0,0 +1,121 @@ +"""Submit and monitor a Spark Pi SparkApplication from Airflow.""" + +from __future__ import annotations + +import time +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.operators.python import PythonOperator +from kubernetes import client, config +from kubernetes.client.exceptions import ApiException + + +SPARK_IMAGE = "quay.io/okdp/spark-py:spark-3.5.6-python-3.11-scala-2.12-java-17" +SPARK_MAIN_APP = "local:///opt/spark/examples/jars/spark-examples_2.12-3.5.6.jar" +SPARK_MAIN_CLASS = "org.apache.spark.examples.SparkPi" +NAMESPACE = "default" + + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime(2026, 1, 1), + "retries": 0, + "retry_delay": timedelta(minutes=2), +} + + +def submit_and_wait_spark_pi(app_name: str, timeout_seconds: int = 900) -> str: + config.load_incluster_config() + api = client.CustomObjectsApi() + + body = { + "apiVersion": "sparkoperator.k8s.io/v1beta2", + "kind": "SparkApplication", + "metadata": {"name": app_name, "namespace": NAMESPACE}, + "spec": { + "type": "Scala", + "mode": "cluster", + "image": SPARK_IMAGE, + "imagePullPolicy": "IfNotPresent", + "mainClass": SPARK_MAIN_CLASS, + "mainApplicationFile": SPARK_MAIN_APP, + "sparkVersion": "3.5.6", + "restartPolicy": {"type": "Never"}, + "timeToLiveSeconds": 600, + "driver": { + "cores": 1, + "memory": "512m", + "serviceAccount": "spark", + "labels": {"version": "3.5.6"}, + }, + "executor": { + "cores": 1, + "instances": 1, + "memory": "512m", + "labels": {"version": "3.5.6"}, + }, + }, + } + + try: + api.delete_namespaced_custom_object( + group="sparkoperator.k8s.io", + version="v1beta2", + namespace=NAMESPACE, + plural="sparkapplications", + name=app_name, + ) + time.sleep(3) + except ApiException as exc: + if exc.status != 404: + raise + + api.create_namespaced_custom_object( + group="sparkoperator.k8s.io", + version="v1beta2", + namespace=NAMESPACE, + plural="sparkapplications", + body=body, + ) + + deadline = time.time() + timeout_seconds + last_state = "PENDING" + while time.time() < deadline: + app = api.get_namespaced_custom_object( + group="sparkoperator.k8s.io", + version="v1beta2", + namespace=NAMESPACE, + plural="sparkapplications", + name=app_name, + ) + last_state = ( + app.get("status", {}) + .get("applicationState", {}) + .get("state", "PENDING") + ) + if last_state == "COMPLETED": + return f"SparkApplication {app_name} completed successfully" + if last_state in {"FAILED", "SUBMISSION_FAILED", "UNKNOWN"}: + raise RuntimeError(f"SparkApplication {app_name} failed with state={last_state}") + time.sleep(10) + + raise TimeoutError( + f"SparkApplication {app_name} did not complete within {timeout_seconds}s (last_state={last_state})" + ) + + +with DAG( + dag_id="spark_pi_midnight", + default_args=default_args, + description="Run Spark Pi via SparkApplication once a day at midnight UTC", + schedule="0 0 * * *", + catchup=False, + tags=["spark", "example", "okdp"], +) as dag: + run_spark_pi = PythonOperator( + task_id="submit_and_wait_spark_pi", + python_callable=submit_and_wait_spark_pi, + op_kwargs={"app_name": "spark-pi-{{ ts_nodash | lower }}"}, + ) diff --git a/deploy_nyc_taxi.sh b/deploy_nyc_taxi.sh new file mode 100755 index 0000000..0c1ecff --- /dev/null +++ b/deploy_nyc_taxi.sh @@ -0,0 +1,83 @@ +#!/bin/bash +# +# Déploiement du pipeline NYC Taxi (Airflow + Spark Operator) +# Usage: ./examples/airflow/deploy_nyc_taxi.sh +# + +set -e + +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +RED='\033[0;31m' +NC='\033[0m' + +NAMESPACE="default" + +echo "=======================================================================" +echo "NYC Taxi Pipeline - Déploiement" +echo "=======================================================================" + +# --- Prérequis --- +echo "" +echo "1. Vérification des prérequis" +echo "-----------------------------------------------------------------------" + +for check in \ + "kubectl:command -v kubectl" \ + "Cluster:kubectl cluster-info" \ + "Spark Operator:kubectl get crd sparkapplications.sparkoperator.k8s.io" \ + "ServiceAccount spark:kubectl get sa spark -n $NAMESPACE" \ + "Secret S3:kubectl get secret creds-examples-s3 -n $NAMESPACE"; do + label="${check%%:*}" + cmd="${check#*:}" + if eval "$cmd" &>/dev/null; then + echo -e "${GREEN}✓ $label${NC}" + else + echo -e "${RED}✗ $label${NC}" + exit 1 + fi +done + +# --- ConfigMap --- +echo "" +echo "2. Déploiement du ConfigMap Spark ETL" +echo "-----------------------------------------------------------------------" + +if kubectl apply -f examples/airflow/manifests/nyc-taxi-etl-configmap.yaml; then + echo -e "${GREEN}✓ ConfigMap déployé${NC}" +else + echo -e "${RED}✗ Échec déploiement ConfigMap${NC}" + exit 1 +fi + +# --- DAG --- +echo "" +echo "3. Déploiement du DAG Airflow" +echo "-----------------------------------------------------------------------" + +SCHEDULER_POD=$(kubectl get pod -n $NAMESPACE -l component=scheduler -o jsonpath='{.items[0].metadata.name}' 2>/dev/null) + +if [ -z "$SCHEDULER_POD" ]; then + echo -e "${YELLOW}⚠ Airflow scheduler non trouvé dans namespace $NAMESPACE${NC}" + echo " Copiez manuellement le DAG:" + echo " kubectl cp examples/airflow/dags/nyc_taxi_pipeline.py $NAMESPACE/:/opt/airflow/dags/" +else + if kubectl cp examples/airflow/dags/nyc_taxi_pipeline.py \ + "$NAMESPACE/$SCHEDULER_POD:/opt/airflow/dags/" 2>/dev/null; then + echo -e "${GREEN}✓ DAG copié dans $SCHEDULER_POD${NC}" + else + echo -e "${YELLOW}⚠ Échec copie du DAG${NC}" + fi +fi + +# --- Done --- +echo "" +echo "=======================================================================" +echo -e "${GREEN}Déploiement terminé${NC}" +echo "=======================================================================" +echo "" +echo "Prochaines étapes:" +echo " 1. Ouvrir Airflow UI : https://airflow.okdp.sandbox" +echo " 2. Trigger le DAG 'nyc_taxi_spark_pipeline'" +echo " 3. Surveiller : kubectl get sparkapplication -n $NAMESPACE -w" +echo "" diff --git a/manifests/nyc-taxi-etl-configmap.yaml b/manifests/nyc-taxi-etl-configmap.yaml new file mode 100644 index 0000000..f37e823 --- /dev/null +++ b/manifests/nyc-taxi-etl-configmap.yaml @@ -0,0 +1,150 @@ +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: nyc-taxi-etl-code + namespace: default + labels: + app: nyc-taxi-etl +data: + nyc_taxi_etl.py: | + """ + NYC Taxi ETL - PySpark job + Reads from S3, transforms, writes aggregated results back to S3. + Uses JVM AWS SDK for output (avoids Hadoop rename issues with SeaweedFS). + """ + import argparse + import os + from pyspark.sql import SparkSession + from pyspark.sql.functions import col, sum as _sum, count, avg, round as _round, hour, dayofweek + + def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--input", required=True) + parser.add_argument("--output", required=True) + parser.add_argument("--date", required=True) + return parser.parse_args() + + def upload_to_s3_via_jvm(spark, df, output_uri): + """Collect DataFrame to driver and upload as CSV to S3 via JVM SDK.""" + import csv + import io + from urllib.parse import urlparse + + # Parse output URI + parsed = urlparse(output_uri.replace("s3a://", "s3://", 1)) + bucket = parsed.netloc + key_prefix = parsed.path.lstrip("/") + + # Get credentials from environment + endpoint = os.getenv("S3_ENDPOINT", "") + access_key = os.getenv("AWS_ACCESS_KEY_ID", "") + secret_key = os.getenv("AWS_SECRET_ACCESS_KEY", "") + region = os.getenv("AWS_REGION", "us-east-1") + + if not endpoint or not access_key or not secret_key: + raise RuntimeError("Missing S3 credentials or endpoint") + + # Collect to driver as CSV + rows = df.collect() + columns = df.columns + buf = io.StringIO() + writer = csv.writer(buf) + writer.writerow(columns) + for row in rows: + writer.writerow([row[c] for c in columns]) + csv_content = buf.getvalue() + print(f"Collected {len(rows)} rows, CSV size: {len(csv_content)} bytes") + + # Upload using JVM AWS SDK + jvm = spark.sparkContext._jvm + credentials = jvm.com.amazonaws.auth.BasicAWSCredentials(access_key, secret_key) + credentials_provider = jvm.com.amazonaws.auth.AWSStaticCredentialsProvider(credentials) + endpoint_config = jvm.com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration( + endpoint, region + ) + s3_client = ( + jvm.com.amazonaws.services.s3.AmazonS3ClientBuilder.standard() + .withCredentials(credentials_provider) + .withEndpointConfiguration(endpoint_config) + .withPathStyleAccessEnabled(True) + .build() + ) + + s3_key = f"{key_prefix}/nyc_taxi_aggregated.csv" + print(f"Uploading -> s3://{bucket}/{s3_key}") + s3_client.putObject(bucket, s3_key, csv_content) + + # Upload _SUCCESS marker + s3_client.putObject(bucket, f"{key_prefix}/_SUCCESS", "") + print(f"Output uploaded to s3://{bucket}/{key_prefix}/") + + def main(): + args = parse_args() + + print("=" * 70) + print("NYC Taxi ETL Job") + print("=" * 70) + print(f"Input: {args.input}") + print(f"Output: {args.output}") + print(f"Date: {args.date}") + print("=" * 70) + + spark = SparkSession.builder \ + .appName(f"NYC-Taxi-ETL-{args.date}") \ + .getOrCreate() + + spark.sparkContext.setLogLevel("WARN") + + # Read + print("\nReading data...") + df = spark.read.parquet(args.input) + total_rows = df.count() + print(f"Rows read: {total_rows:,}") + + # Schema + print("\nSchema:") + df.printSchema() + + # Clean + print("\nCleaning...") + df_clean = df.filter( + (col("fare_amount") > 0) & + (col("trip_distance") > 0) & + (col("passenger_count") > 0) + ) + clean_rows = df_clean.count() + print(f"Rows after cleaning: {clean_rows:,}") + print(f"Rows removed: {total_rows - clean_rows:,}") + + # Transform + print("\nTransforming...") + df_transformed = df_clean \ + .withColumn("pickup_hour", hour(col("tpep_pickup_datetime"))) \ + .withColumn("pickup_dayofweek", dayofweek(col("tpep_pickup_datetime"))) + + # Aggregate by hour + day of week + print("\nAggregating...") + df_agg = df_transformed.groupBy("pickup_hour", "pickup_dayofweek").agg( + count("*").alias("total_trips"), + _round(avg("fare_amount"), 2).alias("avg_fare"), + _round(avg("trip_distance"), 2).alias("avg_distance"), + _round(_sum("fare_amount"), 2).alias("total_revenue") + ).orderBy("pickup_hour") + + # Show summary + print("\nResults summary:") + df_agg.show(24, truncate=False) + + # Write output using JVM S3 client (avoids SeaweedFS rename issues) + print(f"\nWriting results to: {args.output}") + upload_to_s3_via_jvm(spark, df_agg, args.output) + + print("\n" + "=" * 70) + print("ETL completed successfully!") + print("=" * 70) + + spark.stop() + + if __name__ == "__main__": + main() diff --git a/tests/run_integration_tests.sh b/tests/run_integration_tests.sh new file mode 100755 index 0000000..bf718c4 --- /dev/null +++ b/tests/run_integration_tests.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env bash +set -euo pipefail + +NAMESPACE="${1:-default}" +RELEASE="${2:-airflow-main}" +EXEC_DATE="${3:-2026-01-01}" +DAGS_SRC_DIR="${4:-examples/airflow/dags}" + +find_scheduler_pod() { + kubectl get pods -n "${NAMESPACE}" --no-headers \ + | awk '/airflow-main-scheduler/ && $2 ~ /^1\/1$/ && $3 == "Running" {print $1; exit}' +} + +find_webserver_pod() { + kubectl get pods -n "${NAMESPACE}" --no-headers \ + | awk '/airflow-main-webserver/ && $2 ~ /^1\/1$/ && $3 == "Running" {print $1; exit}' +} + +SCHEDULER_POD="$(find_scheduler_pod)" +WEBSERVER_POD="$(find_webserver_pod)" + +if [[ -z "${SCHEDULER_POD}" || -z "${WEBSERVER_POD}" ]]; then + echo "Pods Airflow introuvables dans namespace ${NAMESPACE}" + exit 1 +fi + +echo "Using scheduler pod: ${SCHEDULER_POD}" +echo "Using webserver pod: ${WEBSERVER_POD}" +echo "Syncing DAGs from ${DAGS_SRC_DIR}..." +kubectl cp "${DAGS_SRC_DIR}/." "${NAMESPACE}/${SCHEDULER_POD}:/opt/airflow/dags" -c scheduler +kubectl cp "${DAGS_SRC_DIR}/." "${NAMESPACE}/${WEBSERVER_POD}:/opt/airflow/dags" -c webserver +sleep 8 + +echo "Reserializing DAGs..." +kubectl exec -n "${NAMESPACE}" "${SCHEDULER_POD}" -c scheduler -- airflow dags reserialize + +echo "Listing DAGs..." +kubectl exec -n "${NAMESPACE}" "${SCHEDULER_POD}" -c scheduler -- airflow dags list | grep -E "hello_daily|orders_etl_daily" + +echo "Testing hello_daily..." +kubectl exec -n "${NAMESPACE}" "${SCHEDULER_POD}" -c scheduler -- airflow tasks test hello_daily log_hello "${EXEC_DATE}" + +echo "Testing orders_etl_daily..." +kubectl exec -n "${NAMESPACE}" "${SCHEDULER_POD}" -c scheduler -- airflow dags test orders_etl_daily "${EXEC_DATE}" + +echo "All integration DAG tests passed." diff --git a/tests/test_dags.py b/tests/test_dags.py new file mode 100644 index 0000000..5c525ba --- /dev/null +++ b/tests/test_dags.py @@ -0,0 +1,41 @@ +from pathlib import Path + +import pytest + + +airflow = pytest.importorskip("airflow") +from airflow.models import DagBag +from airflow.operators.python import PythonOperator + + +DAGS_DIR = Path(__file__).resolve().parents[1] / "dags" + + +@pytest.fixture(scope="module") +def dag_bag() -> DagBag: + bag = DagBag(dag_folder=str(DAGS_DIR), include_examples=False) + assert not bag.import_errors, f"DAG import errors: {bag.import_errors}" + return bag + + +def test_hello_daily_dag_loaded(dag_bag: DagBag) -> None: + dag = dag_bag.get_dag("hello_daily") + assert dag is not None + assert "example" in dag.tags + assert "daily" in dag.tags + + task_ids = {task.task_id for task in dag.tasks} + assert task_ids == {"log_hello"} + hello_task = dag.get_task("log_hello") + assert isinstance(hello_task, PythonOperator) + + +def test_orders_etl_dag_loaded(dag_bag: DagBag) -> None: + dag = dag_bag.get_dag("orders_etl_daily") + assert dag is not None + assert "etl" in dag.tags + assert "spark" in dag.tags + + task = dag.get_task("submit_and_wait_orders_etl") + assert isinstance(task, PythonOperator) + assert task.op_kwargs["run_suffix"] == "{{ ts_nodash | lower }}" From df23c0f55defc0a10fc446d9dfd2adb0c408e266 Mon Sep 17 00:00:00 2001 From: abir-oumghar Date: Sun, 10 May 2026 05:06:48 +0200 Subject: [PATCH 2/5] fix(airflow): make deploy_nyc_taxi.sh location-independent Use SCRIPT_DIR resolved from BASH_SOURCE so the script can be invoked from any working directory. Drop the obsolete kubectl cp section that duplicated work now done by the Airflow gitSync sidecar. --- airflow/deploy_nyc_taxi.sh | 55 +++++++++++++------------------------- 1 file changed, 19 insertions(+), 36 deletions(-) diff --git a/airflow/deploy_nyc_taxi.sh b/airflow/deploy_nyc_taxi.sh index 0c1ecff..7db1281 100755 --- a/airflow/deploy_nyc_taxi.sh +++ b/airflow/deploy_nyc_taxi.sh @@ -1,25 +1,26 @@ #!/bin/bash # -# Déploiement du pipeline NYC Taxi (Airflow + Spark Operator) -# Usage: ./examples/airflow/deploy_nyc_taxi.sh +# NYC Taxi pipeline deployment (Airflow + Spark Operator). +# Usage: ./airflow/deploy_nyc_taxi.sh # set -e +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + GREEN='\033[0;32m' -YELLOW='\033[1;33m' RED='\033[0;31m' NC='\033[0m' NAMESPACE="default" echo "=======================================================================" -echo "NYC Taxi Pipeline - Déploiement" +echo "NYC Taxi Pipeline - Deployment" echo "=======================================================================" -# --- Prérequis --- +# --- Prerequisites --- echo "" -echo "1. Vérification des prérequis" +echo "1. Checking prerequisites" echo "-----------------------------------------------------------------------" for check in \ @@ -27,7 +28,7 @@ for check in \ "Cluster:kubectl cluster-info" \ "Spark Operator:kubectl get crd sparkapplications.sparkoperator.k8s.io" \ "ServiceAccount spark:kubectl get sa spark -n $NAMESPACE" \ - "Secret S3:kubectl get secret creds-examples-s3 -n $NAMESPACE"; do + "S3 Secret:kubectl get secret creds-examples-s3 -n $NAMESPACE"; do label="${check%%:*}" cmd="${check#*:}" if eval "$cmd" &>/dev/null; then @@ -40,44 +41,26 @@ done # --- ConfigMap --- echo "" -echo "2. Déploiement du ConfigMap Spark ETL" +echo "2. Deploying the Spark ETL ConfigMap" echo "-----------------------------------------------------------------------" -if kubectl apply -f examples/airflow/manifests/nyc-taxi-etl-configmap.yaml; then - echo -e "${GREEN}✓ ConfigMap déployé${NC}" +if kubectl apply -f "$SCRIPT_DIR/manifests/nyc-taxi-etl-configmap.yaml"; then + echo -e "${GREEN}✓ ConfigMap deployed${NC}" else - echo -e "${RED}✗ Échec déploiement ConfigMap${NC}" + echo -e "${RED}✗ ConfigMap deployment failed${NC}" exit 1 fi -# --- DAG --- -echo "" -echo "3. Déploiement du DAG Airflow" -echo "-----------------------------------------------------------------------" - -SCHEDULER_POD=$(kubectl get pod -n $NAMESPACE -l component=scheduler -o jsonpath='{.items[0].metadata.name}' 2>/dev/null) - -if [ -z "$SCHEDULER_POD" ]; then - echo -e "${YELLOW}⚠ Airflow scheduler non trouvé dans namespace $NAMESPACE${NC}" - echo " Copiez manuellement le DAG:" - echo " kubectl cp examples/airflow/dags/nyc_taxi_pipeline.py $NAMESPACE/:/opt/airflow/dags/" -else - if kubectl cp examples/airflow/dags/nyc_taxi_pipeline.py \ - "$NAMESPACE/$SCHEDULER_POD:/opt/airflow/dags/" 2>/dev/null; then - echo -e "${GREEN}✓ DAG copié dans $SCHEDULER_POD${NC}" - else - echo -e "${YELLOW}⚠ Échec copie du DAG${NC}" - fi -fi - # --- Done --- echo "" echo "=======================================================================" -echo -e "${GREEN}Déploiement terminé${NC}" +echo -e "${GREEN}Deployment complete${NC}" echo "=======================================================================" echo "" -echo "Prochaines étapes:" -echo " 1. Ouvrir Airflow UI : https://airflow.okdp.sandbox" -echo " 2. Trigger le DAG 'nyc_taxi_spark_pipeline'" -echo " 3. Surveiller : kubectl get sparkapplication -n $NAMESPACE -w" +echo "DAGs are pulled into Airflow automatically by the gitSync sidecar." +echo "" +echo "Next steps:" +echo " 1. Open the Airflow UI: https://airflow.okdp.sandbox" +echo " 2. Trigger the DAG 'nyc_taxi_pipeline'" +echo " 3. Monitor: kubectl get sparkapplication -n $NAMESPACE -w" echo "" From 4592321d6ce248ac3ae67eb13105aca53494b63f Mon Sep 17 00:00:00 2001 From: abir-oumghar Date: Sun, 10 May 2026 05:08:44 +0200 Subject: [PATCH 3/5] docs(airflow): rewrite README for okdp-examples context Broaden the scope from the single NYC Taxi pipeline to all Airflow DAGs shipped here. Translate to English to match the rest of the repo, list the available DAGs, and document the gitSync mechanism used by the sandbox to pick them up. --- airflow/README.md | 94 +++++++++++++++++++++++++++++++---------------- 1 file changed, 62 insertions(+), 32 deletions(-) diff --git a/airflow/README.md b/airflow/README.md index fd411ec..f215344 100644 --- a/airflow/README.md +++ b/airflow/README.md @@ -1,36 +1,56 @@ -# NYC Taxi Pipeline - Airflow + Spark +# Airflow Examples -Pipeline ETL utilisant Airflow pour orchestrer un job Spark sur des données NYC Taxi (11M+ lignes) stockées dans SeaweedFS S3. +Apache Airflow DAGs and helpers showcasing how to orchestrate Spark jobs and +data workflows on the OKDP platform. -## Démarrage rapide +These DAGs are automatically pulled into the Airflow scheduler by the +`gitSync` sidecar configured in the +[okdp-sandbox Airflow package](https://github.com/OKDP/okdp-sandbox/blob/main/packages/okdp-packages/airflow/airflow.yaml) +(see `dagGitRepo` / `dagGitSubPath`). Any change pushed to `main` is reflected +in the scheduler within ~60 seconds. + +## Available DAGs + +| DAG | Description | +|---|---| +| `hello_world` | Minimal DAG, validates scheduler/worker connectivity | +| `hello_daily` | Same as above, scheduled daily | +| `spark_pi_example` | Submits the canonical Spark Pi job via `SparkApplication` | +| `orders_etl_daily` | Daily Spark ETL with dynamic ConfigMap-based script injection | +| `nyc_taxi_pipeline` | Reads NYC taxi data from S3, transforms with Spark, writes back | + +## Quick start — NYC Taxi pipeline + +The `nyc_taxi_pipeline` DAG requires a one-time setup (ConfigMap + S3 dataset): ```bash -# 1. Déployer le ConfigMap Spark ETL + le DAG -./examples/airflow/deploy_nyc_taxi.sh +# 1. Deploy the Spark ETL ConfigMap +./airflow/deploy_nyc_taxi.sh -# 2. Ouvrir Airflow et lancer le DAG "nyc_taxi_spark_pipeline" +# 2. Open the Airflow UI and trigger the DAG `nyc_taxi_pipeline` open https://airflow.okdp.sandbox -# 3. Vérifier les résultats dans SeaweedFS S3 +# 3. Verify the results in SeaweedFS S3 kubectl run --rm -it s3-check --image=amazon/aws-cli:latest --restart=Never \ --command -- aws --endpoint-url http://seaweedfs-pmj3xs-s3.default.svc.cluster.local:8333 \ --no-verify-ssl s3 ls s3://okdp/examples/data/processed/nyc_taxi/ --recursive ``` -## Architecture +## Architecture (NYC Taxi pipeline) ``` Airflow DAG (PythonOperator) → SparkApplication (Spark Operator) - → Spark Driver + Executor - → Lecture: s3a://okdp/examples/data/raw/tripdata/yellow/ (11M lignes) - → Nettoyage + Agrégation (168 lignes: 24h × 7 jours) - → Écriture: s3a://okdp/examples/data/processed/nyc_taxi/yellow/run_id=.../nyc_taxi_aggregated.csv + → Spark Driver + Executors + → Read: s3a://okdp/examples/data/raw/tripdata/yellow/ (11M+ rows) + → Clean + Aggregate (168 rows: 24h × 7 days) + → Write: s3a://okdp/examples/data/processed/nyc_taxi/yellow/run_id=.../nyc_taxi_aggregated.csv ``` -## Données +## Datasets -Données NYC Yellow Taxi déjà présentes dans SeaweedFS (package `okdp-examples`) : +NYC Yellow Taxi data is already provisioned in SeaweedFS by the +`okdp-examples` Helm chart at deployment time: ``` s3://okdp/examples/data/raw/tripdata/yellow/ @@ -39,41 +59,51 @@ s3://okdp/examples/data/raw/tripdata/yellow/ └── month=2025-03/yellow_tripdata_2025-03.parquet (70 MB) ``` -Aucun téléchargement requis. +No manual download required. -## Le pipeline +## Pipeline steps (NYC Taxi) -1. **Lecture** — Lit les 3 mois de données Parquet depuis S3 (11M+ lignes) -2. **Nettoyage** — Filtre les courses invalides (fare ≤ 0, distance ≤ 0, etc.) -3. **Agrégation** — Regroupe par heure et jour de la semaine (168 lignes) -4. **Écriture** — Upload le CSV agrégé dans SeaweedFS via le SDK AWS Java +1. **Read** — 3 months of Parquet data from S3 (11M+ rows) +2. **Clean** — Filter invalid trips (fare ≤ 0, distance ≤ 0, etc.) +3. **Aggregate** — Group by hour and day-of-week (168 rows) +4. **Write** — Upload aggregated CSV to SeaweedFS via the JVM AWS SDK -> **Note** : L'écriture utilise le JVM S3 SDK (pas le Hadoop FileOutputCommitter) pour contourner un bug `copyObject` de SeaweedFS. +> **Note**: writes use the JVM S3 SDK (not the Hadoop FileOutputCommitter) +> to work around a SeaweedFS `copyObject` quirk. -## Commandes utiles +## Useful commands ```bash -# Statut du SparkApplication +# SparkApplication status kubectl get sparkapplications -n default -# Logs du driver Spark +# Spark driver logs kubectl logs -n default -l spark-role=driver --tail=50 -# Lister les DAG runs Airflow +# List Airflow DAG runs kubectl exec -n default deploy/airflow-main-scheduler -c scheduler -- \ - airflow dags list-runs -d nyc_taxi_spark_pipeline -o plain + airflow dags list-runs -d nyc_taxi_pipeline -o plain ``` -## Structure +## Directory layout ``` -examples/airflow/ +airflow/ ├── README.md -├── deploy_nyc_taxi.sh # Script de déploiement +├── deploy_nyc_taxi.sh ├── dags/ -│ └── nyc_taxi_pipeline.py # DAG Airflow (PythonOperator + K8s API) -└── manifests/ - └── nyc-taxi-etl-configmap.yaml # Code PySpark ETL +│ ├── hello_world.py +│ ├── hello_daily.py +│ ├── spark_pi_example.py +│ ├── orders_etl_daily.py +│ ├── nyc_taxi_pipeline.py +│ └── spark_jobs/ +│ └── orders_etl_job.py +├── manifests/ +│ └── nyc-taxi-etl-configmap.yaml +└── tests/ + ├── test_dags.py + └── run_integration_tests.sh ``` ## License From 0f751e9db2c6bd5e0e7555c6731aa585a608af39 Mon Sep 17 00:00:00 2001 From: abir-oumghar Date: Sun, 10 May 2026 05:11:31 +0200 Subject: [PATCH 4/5] docs: integrate airflow examples in root README Add Airflow DAGs to the intro paragraph, a dedicated section pointing to the new airflow/ directory, and Apache Airflow to the components list under 'Running the examples'. --- README.md | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 88daaa3..00fcdef 100644 --- a/README.md +++ b/README.md @@ -5,8 +5,8 @@ -A collection of hands-on examples, helper utilities, Jupyter notebooks, and data workflows showcasing how to work with the [OKDP Platform](https://okdp.io/). -This repository is meant to help you explore OKDP capabilities around compute, object storage, data catalog, SQL engines, Spark, and analytics. +A collection of hands-on examples, helper utilities, Jupyter notebooks, Airflow DAGs, and data workflows showcasing how to work with the [OKDP Platform](https://okdp.io/). +This repository is meant to help you explore OKDP capabilities around compute, object storage, data catalog, SQL engines, Spark, workflow orchestration, and analytics. Over time, these examples will be extended with lakehouse-oriented features, such as: @@ -39,6 +39,16 @@ A PySpark notebook is included to showcase Spark-native exploratory data analysi Use Apache Superset (SQL Lab) to query Trino and build visualizations/dashboards on top of the same datasets. +# Airflow + +The [airflow/](./airflow/) directory contains example DAGs orchestrated by Apache Airflow on the OKDP platform. They demonstrate how to: + +- Submit Spark jobs to **Spark Operator** via `SparkApplication` custom resources from a DAG. +- Build daily ETL pipelines reading from and writing to S3-compatible storage (SeaweedFS). +- Use Airflow `gitSync` to pull DAGs directly from this repository at runtime. + +See [`airflow/README.md`](./airflow/README.md) for the full list of DAGs and quick-start instructions. + # Running the examples: Using [okdp-ui](https://github.com/OKDP/okdp-sandbox), deploy the following components: @@ -48,6 +58,7 @@ Using [okdp-ui](https://github.com/OKDP/okdp-sandbox), deploy the following comp - Interactive Query: [Trino](https://trino.io/) - Notebooks: [Jupyter](https://jupyter.org/) - DataViz: [Apache Superset](https://superset.apache.org/) +- Workflow orchestration: [Apache Airflow](https://airflow.apache.org/) - Applications: [okdp-examples](https://okdp.io) # About the datasets From a29b878e8516d4ae83cd10821e7dc32147d8bfe8 Mon Sep 17 00:00:00 2001 From: abir-oumghar Date: Sun, 10 May 2026 05:19:47 +0200 Subject: [PATCH 5/5] docs(airflow): polish section titles and add OKDP signature --- airflow/README.md | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/airflow/README.md b/airflow/README.md index f215344..137fe7a 100644 --- a/airflow/README.md +++ b/airflow/README.md @@ -19,7 +19,7 @@ in the scheduler within ~60 seconds. | `orders_etl_daily` | Daily Spark ETL with dynamic ConfigMap-based script injection | | `nyc_taxi_pipeline` | Reads NYC taxi data from S3, transforms with Spark, writes back | -## Quick start — NYC Taxi pipeline +## Running the NYC Taxi pipeline The `nyc_taxi_pipeline` DAG requires a one-time setup (ConfigMap + S3 dataset): @@ -85,7 +85,7 @@ kubectl exec -n default deploy/airflow-main-scheduler -c scheduler -- \ airflow dags list-runs -d nyc_taxi_pipeline -o plain ``` -## Directory layout +## Repository structure ``` airflow/ @@ -109,3 +109,10 @@ airflow/ ## License Apache 2.0 + +--- + +**Built 🚀 for the OKDP Community** + + +