From ebeb00d2582b71760f92b75d34ee829e6a98a5e0 Mon Sep 17 00:00:00 2001 From: abir-oumghar Date: Sun, 10 May 2026 05:36:21 +0200 Subject: [PATCH 1/3] feat(airflow): point gitSync to okdp-examples repository DAGs now live in https://github.com/OKDP/okdp-examples under airflow/dags. Update both the schema defaults and the helm template inline defaults, and bump the package tag to 2.9.3-p11. --- packages/okdp-packages/airflow/airflow.yaml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/okdp-packages/airflow/airflow.yaml b/packages/okdp-packages/airflow/airflow.yaml index 2c3b986..1d6f8b5 100644 --- a/packages/okdp-packages/airflow/airflow.yaml +++ b/packages/okdp-packages/airflow/airflow.yaml @@ -1,6 +1,6 @@ apiVersion: v1alpha1 name: airflow -tag: 2.9.3-p10 +tag: 2.9.3-p11 protected: false description: | Apache Airflow 2.9.3 - Workflow orchestration platform with Spark Operator integration @@ -33,7 +33,7 @@ schema: description: "Sync interval for gitSync or S3 sidecar" dagGitRepo: type: string - default: https://github.com/OKDP/okdp-sandbox.git + default: https://github.com/OKDP/okdp-examples.git description: "Git repository URL used when dagsSource=git" dagGitBranch: type: string @@ -49,7 +49,7 @@ schema: description: "Git clone depth when dagsSource=git" dagGitSubPath: type: string - default: examples/airflow/dags + default: airflow/dags description: "Sub-path inside the Git repo where DAGs are stored" dagGitCredentialsSecret: type: string @@ -379,11 +379,11 @@ modules: enabled: false gitSync: enabled: {{ eq $dagsSource "git" }} - repo: {{ default "https://github.com/OKDP/okdp-sandbox.git" .Parameters.dagGitRepo | quote }} + repo: {{ default "https://github.com/OKDP/okdp-examples.git" .Parameters.dagGitRepo | quote }} branch: {{ default "main" .Parameters.dagGitBranch | quote }} ref: {{ default "HEAD" .Parameters.dagGitRef | quote }} depth: {{ default 1 .Parameters.dagGitDepth }} - subPath: {{ default "examples/airflow/dags" .Parameters.dagGitSubPath | quote }} + subPath: {{ default "airflow/dags" .Parameters.dagGitSubPath | quote }} period: "{{ $dagSyncIntervalSeconds }}s" {{- if .Parameters.dagGitCredentialsSecret }} credentialsSecret: {{ .Parameters.dagGitCredentialsSecret | quote }} From 741bf3420fff678debf6cf8ccad527d9d5c22fca Mon Sep 17 00:00:00 2001 From: abir-oumghar Date: Sun, 10 May 2026 05:36:52 +0200 Subject: [PATCH 2/3] chore: remove relocated airflow examples Examples moved to https://github.com/OKDP/okdp-examples/tree/main/airflow. The Airflow package now pulls them from there via gitSync. --- examples/airflow/.gitignore | 73 ---- examples/airflow/README.md | 81 ----- examples/airflow/dags/.airflowignore | 10 - examples/airflow/dags/hello_daily.py | 35 -- examples/airflow/dags/hello_world.py | 34 -- examples/airflow/dags/nyc_taxi_pipeline.py | 188 ---------- examples/airflow/dags/orders_etl_daily.py | 324 ------------------ .../airflow/dags/spark_jobs/orders_etl_job.py | 183 ---------- examples/airflow/dags/spark_pi_example.py | 121 ------- examples/airflow/deploy_nyc_taxi.sh | 83 ----- .../manifests/nyc-taxi-etl-configmap.yaml | 150 -------- .../airflow/tests/run_integration_tests.sh | 46 --- examples/airflow/tests/test_dags.py | 41 --- 13 files changed, 1369 deletions(-) delete mode 100644 examples/airflow/.gitignore delete mode 100644 examples/airflow/README.md delete mode 100644 examples/airflow/dags/.airflowignore delete mode 100644 examples/airflow/dags/hello_daily.py delete mode 100644 examples/airflow/dags/hello_world.py delete mode 100644 examples/airflow/dags/nyc_taxi_pipeline.py delete mode 100644 examples/airflow/dags/orders_etl_daily.py delete mode 100644 examples/airflow/dags/spark_jobs/orders_etl_job.py delete mode 100644 examples/airflow/dags/spark_pi_example.py delete mode 100755 examples/airflow/deploy_nyc_taxi.sh delete mode 100644 examples/airflow/manifests/nyc-taxi-etl-configmap.yaml delete mode 100755 examples/airflow/tests/run_integration_tests.sh delete mode 100644 examples/airflow/tests/test_dags.py diff --git a/examples/airflow/.gitignore b/examples/airflow/.gitignore deleted file mode 100644 index c7c78ae..0000000 --- a/examples/airflow/.gitignore +++ /dev/null @@ -1,73 +0,0 @@ -# Python -__pycache__/ -*.py[cod] -*$py.class -*.so -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -pip-wheel-metadata/ -share/python-wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST - -# Virtual environments -.venv/ -venv/ -ENV/ -env/ -.virtualenv/ - -# Environment variables -.env -.env.local -.env.*.local - -# Airflow -logs/ -*.log -airflow.db -airflow.cfg -airflow-webserver.pid -standalone_admin_password.txt - -# IDE -.vscode/ -.idea/ -*.swp -*.swo -*~ -.DS_Store - -# Testing -.pytest_cache/ -.coverage -htmlcov/ -.tox/ -.hypothesis/ - -# Jupyter -.ipynb_checkpoints/ -*.ipynb - -# Spark -spark-warehouse/ -derby.log -metastore_db/ - -# Helm -*.tgz -.helmignore -charts/ diff --git a/examples/airflow/README.md b/examples/airflow/README.md deleted file mode 100644 index fd411ec..0000000 --- a/examples/airflow/README.md +++ /dev/null @@ -1,81 +0,0 @@ -# NYC Taxi Pipeline - Airflow + Spark - -Pipeline ETL utilisant Airflow pour orchestrer un job Spark sur des données NYC Taxi (11M+ lignes) stockées dans SeaweedFS S3. - -## Démarrage rapide - -```bash -# 1. Déployer le ConfigMap Spark ETL + le DAG -./examples/airflow/deploy_nyc_taxi.sh - -# 2. Ouvrir Airflow et lancer le DAG "nyc_taxi_spark_pipeline" -open https://airflow.okdp.sandbox - -# 3. Vérifier les résultats dans SeaweedFS S3 -kubectl run --rm -it s3-check --image=amazon/aws-cli:latest --restart=Never \ - --command -- aws --endpoint-url http://seaweedfs-pmj3xs-s3.default.svc.cluster.local:8333 \ - --no-verify-ssl s3 ls s3://okdp/examples/data/processed/nyc_taxi/ --recursive -``` - -## Architecture - -``` -Airflow DAG (PythonOperator) - → SparkApplication (Spark Operator) - → Spark Driver + Executor - → Lecture: s3a://okdp/examples/data/raw/tripdata/yellow/ (11M lignes) - → Nettoyage + Agrégation (168 lignes: 24h × 7 jours) - → Écriture: s3a://okdp/examples/data/processed/nyc_taxi/yellow/run_id=.../nyc_taxi_aggregated.csv -``` - -## Données - -Données NYC Yellow Taxi déjà présentes dans SeaweedFS (package `okdp-examples`) : - -``` -s3://okdp/examples/data/raw/tripdata/yellow/ -├── month=2025-01/yellow_tripdata_2025-01.parquet (59 MB) -├── month=2025-02/yellow_tripdata_2025-02.parquet (60 MB) -└── month=2025-03/yellow_tripdata_2025-03.parquet (70 MB) -``` - -Aucun téléchargement requis. - -## Le pipeline - -1. **Lecture** — Lit les 3 mois de données Parquet depuis S3 (11M+ lignes) -2. **Nettoyage** — Filtre les courses invalides (fare ≤ 0, distance ≤ 0, etc.) -3. **Agrégation** — Regroupe par heure et jour de la semaine (168 lignes) -4. **Écriture** — Upload le CSV agrégé dans SeaweedFS via le SDK AWS Java - -> **Note** : L'écriture utilise le JVM S3 SDK (pas le Hadoop FileOutputCommitter) pour contourner un bug `copyObject` de SeaweedFS. - -## Commandes utiles - -```bash -# Statut du SparkApplication -kubectl get sparkapplications -n default - -# Logs du driver Spark -kubectl logs -n default -l spark-role=driver --tail=50 - -# Lister les DAG runs Airflow -kubectl exec -n default deploy/airflow-main-scheduler -c scheduler -- \ - airflow dags list-runs -d nyc_taxi_spark_pipeline -o plain -``` - -## Structure - -``` -examples/airflow/ -├── README.md -├── deploy_nyc_taxi.sh # Script de déploiement -├── dags/ -│ └── nyc_taxi_pipeline.py # DAG Airflow (PythonOperator + K8s API) -└── manifests/ - └── nyc-taxi-etl-configmap.yaml # Code PySpark ETL -``` - -## License - -Apache 2.0 diff --git a/examples/airflow/dags/.airflowignore b/examples/airflow/dags/.airflowignore deleted file mode 100644 index 34c5e49..0000000 --- a/examples/airflow/dags/.airflowignore +++ /dev/null @@ -1,10 +0,0 @@ -# Ignore local artifacts and non-DAG files (Airflow expects regex patterns) -^__pycache__/.*$ -.*\.pyc$ -.*\.pyo$ -.*\.md$ -^test_.*\.py$ -.*_test\.py$ -^spark_jobs/.*$ -^hello_world\.py$ -^spark_pi_example\.py$ diff --git a/examples/airflow/dags/hello_daily.py b/examples/airflow/dags/hello_daily.py deleted file mode 100644 index fbe4a26..0000000 --- a/examples/airflow/dags/hello_daily.py +++ /dev/null @@ -1,35 +0,0 @@ -"""Minimal daily smoke-test DAG.""" - -from datetime import datetime, timedelta - -from airflow import DAG -from airflow.operators.python import PythonOperator - - -default_args = { - "owner": "airflow", - "start_date": datetime(2026, 1, 1), - "retries": 1, - "retry_delay": timedelta(minutes=2), -} - - -def log_hello() -> str: - message = "Hello from Airflow daily smoke-test" - print(message) - return message - - -with DAG( - dag_id="hello_daily", - default_args=default_args, - description="Simple DAG that validates scheduler/task execution every day", - schedule="0 0 * * *", - catchup=False, - tags=["example", "smoke", "daily"], -) as dag: - hello_task = PythonOperator( - task_id="log_hello", - python_callable=log_hello, - ) - diff --git a/examples/airflow/dags/hello_world.py b/examples/airflow/dags/hello_world.py deleted file mode 100644 index 5798390..0000000 --- a/examples/airflow/dags/hello_world.py +++ /dev/null @@ -1,34 +0,0 @@ -"""Simple DAG scheduled every day at midnight (UTC).""" - -from datetime import datetime, timedelta - -from airflow import DAG -from airflow.operators.python import PythonOperator - - -default_args = { - "owner": "airflow", - "start_date": datetime(2026, 1, 1), - "retries": 1, - "retry_delay": timedelta(minutes=2), -} - - -def print_hello() -> str: - message = "Hello World from Airflow on OKDP" - print(message) - return message - - -with DAG( - dag_id="hello_world_midnight", - default_args=default_args, - description="Hello World DAG that runs daily at 00:00 UTC", - schedule="0 0 * * *", - catchup=False, - tags=["example", "python", "okdp"], -) as dag: - hello_task = PythonOperator( - task_id="hello_world_task", - python_callable=print_hello, - ) diff --git a/examples/airflow/dags/nyc_taxi_pipeline.py b/examples/airflow/dags/nyc_taxi_pipeline.py deleted file mode 100644 index 21effd9..0000000 --- a/examples/airflow/dags/nyc_taxi_pipeline.py +++ /dev/null @@ -1,188 +0,0 @@ -""" -NYC Taxi Pipeline - Airflow + Spark Operator -Utilise l'API Kubernetes Python pour soumettre un SparkApplication. -""" -import os -import re -import time -from datetime import datetime, timedelta - -from airflow import DAG -from airflow.operators.python import PythonOperator -from kubernetes import client, config -from kubernetes.client.exceptions import ApiException - -NAMESPACE = os.getenv("AIRFLOW_NAMESPACE", "default") -SPARK_APP_GROUP = "sparkoperator.k8s.io" -SPARK_APP_VERSION = "v1beta2" -SPARK_APP_PLURAL = "sparkapplications" -SPARK_IMAGE = "quay.io/okdp/spark-py:spark-3.5.6-python-3.11-scala-2.12-java-17" -SPARK_SERVICE_ACCOUNT = "spark" -S3_CREDENTIALS_SECRET = "creds-examples-s3" -S3_ACCESS_KEY_FIELD = "S3_ACCESS_KEY" -S3_SECRET_KEY_FIELD = "S3_SECRET_KEY" -CONFIGMAP_NAME = "nyc-taxi-etl-code" -SCRIPT_MOUNT_DIR = "/opt/spark/app" -SCRIPT_FILE_NAME = "nyc_taxi_etl.py" - -# Input/Output S3 -S3_INPUT = "s3a://okdp/examples/data/raw/tripdata/yellow/" -S3_OUTPUT_BASE = "s3a://okdp/examples/data/processed/nyc_taxi/yellow" - -default_args = { - "owner": "data-team", - "depends_on_past": False, - "start_date": datetime(2024, 1, 1), - "retries": 0, - "retry_delay": timedelta(minutes=2), -} - - -def _safe_name(prefix, suffix, max_len=63): - raw = f"{prefix}-{suffix}" - normalized = re.sub(r"[^a-z0-9-]", "-", raw.lower()).strip("-") - return normalized[:max_len].rstrip("-") - - -def _discover_s3_endpoint(core_api): - """Discover SeaweedFS S3 endpoint in-cluster.""" - try: - services = core_api.list_namespaced_service(namespace=NAMESPACE).items - for svc in services: - name = (svc.metadata.name or "").strip() - if re.match(r"^seaweedfs-[a-z0-9-]+-s3$", name): - return f"http://{name}.{NAMESPACE}.svc.cluster.local:8333" - except ApiException: - pass - return f"https://seaweedfs-seaweedfs-{NAMESPACE}.okdp.sandbox" - - -def _delete_if_exists(custom_api, app_name): - try: - custom_api.delete_namespaced_custom_object( - group=SPARK_APP_GROUP, - version=SPARK_APP_VERSION, - namespace=NAMESPACE, - plural=SPARK_APP_PLURAL, - name=app_name, - ) - time.sleep(2) - except ApiException as exc: - if exc.status != 404: - raise - - -def submit_and_wait_nyc_taxi_etl(run_suffix, timeout_seconds=1200): - """Submit SparkApplication and wait for completion.""" - config.load_incluster_config() - core_api = client.CoreV1Api() - custom_api = client.CustomObjectsApi() - - app_name = _safe_name("nyc-taxi-etl", run_suffix) - s3_endpoint = _discover_s3_endpoint(core_api) - ssl_enabled = str(s3_endpoint.lower().startswith("https://")).lower() - output_uri = f"{S3_OUTPUT_BASE}/run_id={run_suffix}" - - _delete_if_exists(custom_api, app_name) - - body = { - "apiVersion": f"{SPARK_APP_GROUP}/{SPARK_APP_VERSION}", - "kind": "SparkApplication", - "metadata": {"name": app_name, "namespace": NAMESPACE}, - "spec": { - "type": "Python", - "mode": "cluster", - "image": SPARK_IMAGE, - "imagePullPolicy": "IfNotPresent", - "mainApplicationFile": f"local://{SCRIPT_MOUNT_DIR}/{SCRIPT_FILE_NAME}", - "arguments": [ - "--input", S3_INPUT, - "--output", output_uri, - "--date", run_suffix, - ], - "sparkVersion": "3.5.6", - "restartPolicy": {"type": "Never"}, - "timeToLiveSeconds": 3600, - "sparkConf": { - "spark.hadoop.fs.s3a.endpoint": s3_endpoint, - "spark.hadoop.fs.s3a.path.style.access": "true", - "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", - "spark.hadoop.fs.s3a.connection.ssl.enabled": ssl_enabled, - # Fix for SeaweedFS: write to local then upload via script - "spark.hadoop.fs.s3a.fast.upload": "true", - "spark.hadoop.fs.s3a.fast.upload.buffer": "bytebuffer", - }, - "volumes": [ - {"name": "etl-script", "configMap": {"name": CONFIGMAP_NAME}}, - ], - "driver": { - "cores": 1, - "memory": "1g", - "serviceAccount": SPARK_SERVICE_ACCOUNT, - "labels": {"workload": "nyc-taxi-etl"}, - "volumeMounts": [{"name": "etl-script", "mountPath": SCRIPT_MOUNT_DIR}], - "envSecretKeyRefs": { - "AWS_ACCESS_KEY_ID": {"name": S3_CREDENTIALS_SECRET, "key": S3_ACCESS_KEY_FIELD}, - "AWS_SECRET_ACCESS_KEY": {"name": S3_CREDENTIALS_SECRET, "key": S3_SECRET_KEY_FIELD}, - }, - "env": [{"name": "S3_ENDPOINT", "value": s3_endpoint}], - }, - "executor": { - "instances": 1, - "cores": 1, - "memory": "1g", - "labels": {"workload": "nyc-taxi-etl"}, - "envSecretKeyRefs": { - "AWS_ACCESS_KEY_ID": {"name": S3_CREDENTIALS_SECRET, "key": S3_ACCESS_KEY_FIELD}, - "AWS_SECRET_ACCESS_KEY": {"name": S3_CREDENTIALS_SECRET, "key": S3_SECRET_KEY_FIELD}, - }, - "env": [{"name": "S3_ENDPOINT", "value": s3_endpoint}], - }, - }, - } - - custom_api.create_namespaced_custom_object( - group=SPARK_APP_GROUP, - version=SPARK_APP_VERSION, - namespace=NAMESPACE, - plural=SPARK_APP_PLURAL, - body=body, - ) - - deadline = time.time() + timeout_seconds - last_state = "SUBMITTED" - while time.time() < deadline: - app = custom_api.get_namespaced_custom_object( - group=SPARK_APP_GROUP, - version=SPARK_APP_VERSION, - namespace=NAMESPACE, - plural=SPARK_APP_PLURAL, - name=app_name, - ) - last_state = ( - app.get("status", {}) - .get("applicationState", {}) - .get("state", "SUBMITTED") - ) - if last_state == "COMPLETED": - return f"Spark ETL terminé: {app_name}" - if last_state in {"FAILED", "SUBMISSION_FAILED", "UNKNOWN"}: - raise RuntimeError(f"Spark ETL échoué: {app_name} state={last_state}") - time.sleep(10) - - raise TimeoutError(f"Spark ETL timeout après {timeout_seconds}s: {app_name} state={last_state}") - - -with DAG( - dag_id="nyc_taxi_spark_pipeline", - default_args=default_args, - description="NYC Taxi Spark ETL pipeline", - schedule=None, - catchup=False, - tags=["nyc-taxi", "spark", "etl"], -) as dag: - run_etl = PythonOperator( - task_id="submit_and_wait_nyc_taxi_etl", - python_callable=submit_and_wait_nyc_taxi_etl, - op_kwargs={"run_suffix": "{{ ts_nodash | lower }}"}, - ) diff --git a/examples/airflow/dags/orders_etl_daily.py b/examples/airflow/dags/orders_etl_daily.py deleted file mode 100644 index 3b439fb..0000000 --- a/examples/airflow/dags/orders_etl_daily.py +++ /dev/null @@ -1,324 +0,0 @@ -"""Submit and monitor a daily Spark ETL job through Spark Operator.""" - -from __future__ import annotations - -import base64 -import os -import re -import time -from datetime import datetime, timedelta -from pathlib import Path - -from airflow import DAG -from airflow.operators.python import PythonOperator -from kubernetes import client, config -from kubernetes.client.exceptions import ApiException - - -NAMESPACE = os.getenv("AIRFLOW_NAMESPACE", "default") -SPARK_APP_GROUP = "sparkoperator.k8s.io" -SPARK_APP_VERSION = "v1beta2" -SPARK_APP_PLURAL = "sparkapplications" -SPARK_IMAGE = "quay.io/okdp/spark-py:spark-3.5.6-python-3.11-scala-2.12-java-17" -SCRIPT_FILE_NAME = "orders_etl_job.py" -SCRIPT_FILE_PATH = Path(__file__).parent / "spark_jobs" / SCRIPT_FILE_NAME -SCRIPT_MOUNT_DIR = "/opt/spark/app" -SCRIPT_MOUNT_PATH = f"{SCRIPT_MOUNT_DIR}/{SCRIPT_FILE_NAME}" -SPARK_SERVICE_ACCOUNT = "spark" -S3_CREDENTIALS_SECRET = "creds-airflow-s3" -S3_ACCESS_KEY_FIELD = "accessKey" -S3_SECRET_KEY_FIELD = "secretKey" -DEFAULT_INGRESS_SUFFIX = "okdp.sandbox" -DEFAULT_S3_BUCKET = "airflow-logs" -DEFAULT_S3_INPUT_PREFIX = "orders/raw" -DEFAULT_S3_OUTPUT_PREFIX = "orders/curated" -S3_ENDPOINT_ENV_VAR = "AIRFLOW_ETL_S3_ENDPOINT" -S3_BUCKET_ENV_VAR = "AIRFLOW_ETL_S3_BUCKET" -S3_INPUT_PREFIX_ENV_VAR = "AIRFLOW_ETL_S3_INPUT_PREFIX" -S3_OUTPUT_PREFIX_ENV_VAR = "AIRFLOW_ETL_S3_OUTPUT_PREFIX" -INGRESS_SUFFIX_ENV_VAR = "AIRFLOW_INGRESS_SUFFIX" -S3_VERIFY_SSL_ENV_VAR = "AIRFLOW_ETL_S3_VERIFY_SSL" -S3_REGION_ENV_VAR = "AIRFLOW_ETL_S3_REGION" - - -default_args = { - "owner": "airflow", - "depends_on_past": False, - "start_date": datetime(2026, 1, 1), - "retries": 0, - "retry_delay": timedelta(minutes=2), -} - - -def _safe_k8s_name(prefix: str, suffix: str, max_len: int = 63) -> str: - raw = f"{prefix}-{suffix}" - normalized = re.sub(r"[^a-z0-9-]", "-", raw.lower()).strip("-") - if len(normalized) <= max_len: - return normalized - return normalized[:max_len].rstrip("-") - - -def _load_spark_script() -> str: - if not SCRIPT_FILE_PATH.is_file(): - raise FileNotFoundError(f"Spark ETL script not found: {SCRIPT_FILE_PATH}") - return SCRIPT_FILE_PATH.read_text(encoding="utf-8") - - -def _clean_prefix(value: str, default_value: str) -> str: - normalized = (value or default_value).strip().strip("/") - return normalized or default_value - - -def _discover_seaweedfs_s3_endpoint(core_api: client.CoreV1Api) -> str: - env_endpoint = os.getenv(S3_ENDPOINT_ENV_VAR, "").strip().rstrip("/") - if env_endpoint: - return env_endpoint - - # Prefer in-cluster SeaweedFS S3 service when available. - try: - services = core_api.list_namespaced_service(namespace=NAMESPACE).items - candidates = [] - for svc in services: - service_name = (svc.metadata.name or "").strip() - if re.match(r"^seaweedfs-[a-z0-9-]+-s3$", service_name): - candidates.append(service_name) - if candidates: - chosen = sorted(candidates)[0] - return f"http://{chosen}.{NAMESPACE}.svc.cluster.local:8333" - except ApiException: - pass - - ingress_suffix = os.getenv(INGRESS_SUFFIX_ENV_VAR, DEFAULT_INGRESS_SUFFIX).strip() - if not ingress_suffix: - ingress_suffix = DEFAULT_INGRESS_SUFFIX - return f"https://seaweedfs-seaweedfs-{NAMESPACE}.{ingress_suffix}" - - -def _resolve_s3_locations(core_api: client.CoreV1Api) -> tuple[str, str, str]: - bucket = ( - os.getenv(S3_BUCKET_ENV_VAR, "").strip() - or os.getenv("AIRFLOW_DAGS_S3_BUCKET", "").strip() - or DEFAULT_S3_BUCKET - ) - input_prefix = _clean_prefix(os.getenv(S3_INPUT_PREFIX_ENV_VAR, ""), DEFAULT_S3_INPUT_PREFIX) - output_prefix = _clean_prefix(os.getenv(S3_OUTPUT_PREFIX_ENV_VAR, ""), DEFAULT_S3_OUTPUT_PREFIX) - - s3_endpoint = _discover_seaweedfs_s3_endpoint(core_api=core_api) - s3_input_uri = f"s3a://{bucket}/{input_prefix}" - s3_output_uri_base = f"s3a://{bucket}/{output_prefix}" - return bucket, s3_endpoint, s3_input_uri, s3_output_uri_base - - -def _bool_env(env_name: str, default_value: bool) -> bool: - raw_value = os.getenv(env_name) - if raw_value is None: - return default_value - return raw_value.strip().lower() in {"1", "true", "yes", "on"} - - -def _ensure_s3_bucket_exists(core_api: client.CoreV1Api, s3_endpoint: str, bucket: str) -> None: - try: - secret = core_api.read_namespaced_secret(name=S3_CREDENTIALS_SECRET, namespace=NAMESPACE) - except ApiException as exc: - raise RuntimeError( - f"Unable to read S3 credentials secret {S3_CREDENTIALS_SECRET} in namespace {NAMESPACE}" - ) from exc - - secret_data = secret.data or {} - access_key_b64 = secret_data.get(S3_ACCESS_KEY_FIELD, "") - secret_key_b64 = secret_data.get(S3_SECRET_KEY_FIELD, "") - if not access_key_b64 or not secret_key_b64: - raise RuntimeError( - f"S3 credentials secret {S3_CREDENTIALS_SECRET} is missing keys " - f"{S3_ACCESS_KEY_FIELD}/{S3_SECRET_KEY_FIELD}" - ) - - access_key = base64.b64decode(access_key_b64).decode("utf-8") - secret_key = base64.b64decode(secret_key_b64).decode("utf-8") - - verify_ssl = _bool_env( - S3_VERIFY_SSL_ENV_VAR, - default_value=s3_endpoint.lower().startswith("https://"), - ) - s3_region = os.getenv(S3_REGION_ENV_VAR, "us-east-1") - - import boto3 - from botocore.exceptions import ClientError - - s3_client = boto3.client( - "s3", - endpoint_url=s3_endpoint, - aws_access_key_id=access_key, - aws_secret_access_key=secret_key, - region_name=s3_region, - verify=verify_ssl, - ) - - try: - s3_client.head_bucket(Bucket=bucket) - return - except ClientError as exc: - error_code = str(exc.response.get("Error", {}).get("Code", "")) - if error_code not in {"404", "NoSuchBucket", "NotFound"}: - raise RuntimeError( - f"Unable to access bucket {bucket} on endpoint {s3_endpoint}: {exc}" - ) from exc - - s3_client.create_bucket(Bucket=bucket) - - -def _upsert_config_map(core_api: client.CoreV1Api, name: str, script_content: str) -> None: - body = client.V1ConfigMap( - metadata=client.V1ObjectMeta( - name=name, - namespace=NAMESPACE, - labels={"app": "orders-etl", "managed-by": "airflow"}, - ), - data={SCRIPT_FILE_NAME: script_content}, - ) - try: - core_api.patch_namespaced_config_map(name=name, namespace=NAMESPACE, body=body) - except ApiException as exc: - if exc.status != 404: - raise - core_api.create_namespaced_config_map(namespace=NAMESPACE, body=body) - - -def _delete_if_exists(custom_api: client.CustomObjectsApi, app_name: str) -> None: - try: - custom_api.delete_namespaced_custom_object( - group=SPARK_APP_GROUP, - version=SPARK_APP_VERSION, - namespace=NAMESPACE, - plural=SPARK_APP_PLURAL, - name=app_name, - ) - time.sleep(2) - except ApiException as exc: - if exc.status != 404: - raise - - -def submit_and_wait_orders_etl(run_suffix: str, timeout_seconds: int = 1200) -> str: - config.load_incluster_config() - core_api = client.CoreV1Api() - custom_api = client.CustomObjectsApi() - - spark_app_name = _safe_k8s_name("orders-etl", run_suffix) - script_cm_name = _safe_k8s_name("orders-etl-script", run_suffix) - bucket, s3_endpoint, s3_input_uri, s3_output_uri_base = _resolve_s3_locations(core_api=core_api) - output_uri = f"{s3_output_uri_base}/run_id={run_suffix}" - ssl_enabled = str(s3_endpoint.lower().startswith("https://")).lower() - - _ensure_s3_bucket_exists(core_api=core_api, s3_endpoint=s3_endpoint, bucket=bucket) - script_content = _load_spark_script() - _upsert_config_map(core_api=core_api, name=script_cm_name, script_content=script_content) - _delete_if_exists(custom_api=custom_api, app_name=spark_app_name) - - body = { - "apiVersion": f"{SPARK_APP_GROUP}/{SPARK_APP_VERSION}", - "kind": "SparkApplication", - "metadata": {"name": spark_app_name, "namespace": NAMESPACE}, - "spec": { - "type": "Python", - "mode": "cluster", - "image": SPARK_IMAGE, - "imagePullPolicy": "IfNotPresent", - "mainApplicationFile": f"local://{SCRIPT_MOUNT_PATH}", - "arguments": [ - "--input-uri", - s3_input_uri, - "--output-uri", - output_uri, - "--run-id", - run_suffix, - ], - "sparkVersion": "3.5.6", - "restartPolicy": {"type": "Never"}, - "timeToLiveSeconds": 600, - "sparkConf": { - "spark.hadoop.fs.s3a.endpoint": s3_endpoint, - "spark.hadoop.fs.s3a.path.style.access": "true", - "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", - "spark.hadoop.fs.s3a.connection.ssl.enabled": ssl_enabled, - }, - "volumes": [ - { - "name": "etl-script", - "configMap": {"name": script_cm_name}, - } - ], - "driver": { - "cores": 1, - "memory": "1g", - "serviceAccount": SPARK_SERVICE_ACCOUNT, - "labels": {"workload": "orders-etl", "version": "3.5.6"}, - "volumeMounts": [{"name": "etl-script", "mountPath": SCRIPT_MOUNT_DIR}], - "envSecretKeyRefs": { - "S3_ACCESS_KEY": {"name": S3_CREDENTIALS_SECRET, "key": S3_ACCESS_KEY_FIELD}, - "S3_SECRET_KEY": {"name": S3_CREDENTIALS_SECRET, "key": S3_SECRET_KEY_FIELD}, - }, - "env": [{"name": "S3_ENDPOINT", "value": s3_endpoint}], - }, - "executor": { - "instances": 1, - "cores": 1, - "memory": "1g", - "labels": {"workload": "orders-etl", "version": "3.5.6"}, - "envSecretKeyRefs": { - "S3_ACCESS_KEY": {"name": S3_CREDENTIALS_SECRET, "key": S3_ACCESS_KEY_FIELD}, - "S3_SECRET_KEY": {"name": S3_CREDENTIALS_SECRET, "key": S3_SECRET_KEY_FIELD}, - }, - "env": [{"name": "S3_ENDPOINT", "value": s3_endpoint}], - }, - }, - } - - custom_api.create_namespaced_custom_object( - group=SPARK_APP_GROUP, - version=SPARK_APP_VERSION, - namespace=NAMESPACE, - plural=SPARK_APP_PLURAL, - body=body, - ) - - deadline = time.time() + timeout_seconds - last_state = "SUBMITTED" - while time.time() < deadline: - app = custom_api.get_namespaced_custom_object( - group=SPARK_APP_GROUP, - version=SPARK_APP_VERSION, - namespace=NAMESPACE, - plural=SPARK_APP_PLURAL, - name=spark_app_name, - ) - last_state = ( - app.get("status", {}) - .get("applicationState", {}) - .get("state", "SUBMITTED") - ) - - if last_state == "COMPLETED": - return f"Spark ETL finished successfully: {spark_app_name}" - if last_state in {"FAILED", "SUBMISSION_FAILED", "UNKNOWN"}: - raise RuntimeError(f"Spark ETL failed for {spark_app_name} with state={last_state}") - time.sleep(10) - - raise TimeoutError( - f"Spark ETL timeout after {timeout_seconds}s for {spark_app_name} (last_state={last_state})" - ) - - -with DAG( - dag_id="orders_etl_daily", - default_args=default_args, - description="Daily Spark ETL workflow orchestrated by Airflow and Spark Operator", - schedule="0 0 * * *", - catchup=False, - tags=["etl", "spark", "daily", "orders"], -) as dag: - run_orders_etl = PythonOperator( - task_id="submit_and_wait_orders_etl", - python_callable=submit_and_wait_orders_etl, - op_kwargs={"run_suffix": "{{ ts_nodash | lower }}"}, - ) diff --git a/examples/airflow/dags/spark_jobs/orders_etl_job.py b/examples/airflow/dags/spark_jobs/orders_etl_job.py deleted file mode 100644 index 4f45d81..0000000 --- a/examples/airflow/dags/spark_jobs/orders_etl_job.py +++ /dev/null @@ -1,183 +0,0 @@ -"""Spark ETL job for daily orders processing.""" - -from __future__ import annotations - -import argparse -import csv -import io -import os -from datetime import datetime -from typing import Any, Iterable -from urllib.parse import urlparse - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="Run orders ETL with Spark") - parser.add_argument("--input-uri", required=True, help="Input URI (for example: s3a://bucket/path)") - parser.add_argument("--output-uri", required=True, help="Output URI for curated dataset") - parser.add_argument("--run-id", required=True, help="Unique ETL run identifier") - return parser.parse_args() - - -def _bootstrap_input_if_missing(spark: Any) -> Any: - from pyspark.sql.types import DoubleType, IntegerType, StringType, StructField, StructType - - sample_rows: Iterable[tuple[str, str, str, int, float]] = [ - ("o-1001", "c-001", "2026-02-01 08:10:00", 2, 19.90), - ("o-1002", "c-001", "2026-02-01 08:45:00", 1, 5.50), - ("o-1003", "c-002", "2026-02-01 09:20:00", 3, 12.00), - ("o-1004", "c-003", "2026-02-01 10:00:00", 1, 42.30), - ("o-1005", "c-002", "2026-02-01 11:15:00", 2, 7.25), - ] - schema = StructType( - [ - StructField("order_id", StringType(), False), - StructField("customer_id", StringType(), False), - StructField("order_ts", StringType(), False), - StructField("quantity", IntegerType(), False), - StructField("unit_price", DoubleType(), False), - ] - ) - - df = spark.createDataFrame(sample_rows, schema=schema) - return df - - -def _s3a_to_bucket_key(s3a_uri: str) -> tuple[str, str]: - if not s3a_uri.startswith("s3a://"): - raise ValueError(f"Unsupported URI scheme for output: {s3a_uri}") - s3_uri = s3a_uri.replace("s3a://", "s3://", 1) - parsed = urlparse(s3_uri) - bucket = parsed.netloc.strip() - key_prefix = parsed.path.lstrip("/") - if not bucket: - raise ValueError(f"Missing S3 bucket in URI: {s3a_uri}") - return bucket, key_prefix - - -def _upload_curated_csv_with_jvm_s3(curated_df: Any, output_uri: str, spark: Any) -> str: - endpoint = os.getenv("S3_ENDPOINT", "") - access_key = os.getenv("S3_ACCESS_KEY", "") - secret_key = os.getenv("S3_SECRET_KEY", "") - if not endpoint or not access_key or not secret_key: - raise RuntimeError("Missing S3 runtime credentials or endpoint for output upload") - - bucket, key_prefix = _s3a_to_bucket_key(output_uri) - object_key = f"{key_prefix.rstrip('/')}/curated.csv" if key_prefix else "curated.csv" - - rows = curated_df.orderBy("order_date", "customer_id").collect() - csv_buffer = io.StringIO() - writer = csv.writer(csv_buffer) - writer.writerow( - [ - "order_date", - "customer_id", - "orders_count", - "gross_amount", - "etl_run_id", - "processed_at_utc", - ] - ) - for row in rows: - writer.writerow( - [ - row["order_date"], - row["customer_id"], - row["orders_count"], - row["gross_amount"], - row["etl_run_id"], - row["processed_at_utc"], - ] - ) - - jvm = spark.sparkContext._jvm - gateway = spark.sparkContext._gateway - region = os.getenv("AWS_REGION", "us-east-1") - payload = csv_buffer.getvalue() - - credentials = jvm.com.amazonaws.auth.BasicAWSCredentials(access_key, secret_key) - credentials_provider = jvm.com.amazonaws.auth.AWSStaticCredentialsProvider(credentials) - endpoint_config = jvm.com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration( - endpoint, - region, - ) - s3_client = ( - jvm.com.amazonaws.services.s3.AmazonS3ClientBuilder.standard() - .withPathStyleAccessEnabled(True) - .withEndpointConfiguration(endpoint_config) - .withCredentials(credentials_provider) - .build() - ) - s3_client.putObject(bucket, object_key, payload) - - return f"s3a://{bucket}/{object_key}" - - -def main() -> None: - args = parse_args() - - from pyspark.sql import SparkSession - from pyspark.sql import functions as F - - spark = SparkSession.builder.appName(f"orders-etl-{args.run_id}").getOrCreate() - spark.sparkContext.setLogLevel("WARN") - - # Ensure S3A is configured from runtime env injected by SparkApplication. - access_key = os.getenv("S3_ACCESS_KEY", "") - secret_key = os.getenv("S3_SECRET_KEY", "") - endpoint = os.getenv("S3_ENDPOINT", "") - if access_key and secret_key: - hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration() - hadoop_conf.set("fs.s3a.access.key", access_key) - hadoop_conf.set("fs.s3a.secret.key", secret_key) - if endpoint: - hadoop_conf.set("fs.s3a.endpoint", endpoint) - hadoop_conf.set("fs.s3a.connection.ssl.enabled", str(endpoint.startswith("https://")).lower()) - hadoop_conf.set("fs.s3a.path.style.access", "true") - hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") - - try: - raw_df = spark.read.option("header", "true").csv(args.input_uri) - if raw_df.rdd.isEmpty(): - raw_df = _bootstrap_input_if_missing(spark=spark) - except Exception: - raw_df = _bootstrap_input_if_missing(spark=spark) - - normalized_df = ( - raw_df.withColumn("order_id", F.col("order_id").cast("string")) - .withColumn("customer_id", F.col("customer_id").cast("string")) - .withColumn("order_ts", F.to_timestamp("order_ts")) - .withColumn("quantity", F.col("quantity").cast("int")) - .withColumn("unit_price", F.col("unit_price").cast("double")) - .fillna({"quantity": 1, "unit_price": 0.0}) - .withColumn("order_date", F.to_date("order_ts")) - .withColumn("line_amount", F.round(F.col("quantity") * F.col("unit_price"), 2)) - ) - - curated_df = ( - normalized_df.groupBy("order_date", "customer_id") - .agg( - F.countDistinct("order_id").alias("orders_count"), - F.round(F.sum("line_amount"), 2).alias("gross_amount"), - ) - .withColumn("etl_run_id", F.lit(args.run_id)) - .withColumn("processed_at_utc", F.lit(datetime.utcnow().isoformat())) - ) - - written_uri = _upload_curated_csv_with_jvm_s3( - curated_df=curated_df, - output_uri=args.output_uri, - spark=spark, - ) - - print(f"Orders ETL completed for run_id={args.run_id}") - print(f"Input URI: {args.input_uri}") - print(f"Output URI: {written_uri}") - print(f"Input rows: {normalized_df.count()}") - print(f"Curated rows: {curated_df.count()}") - - spark.stop() - - -if __name__ == "__main__": - main() diff --git a/examples/airflow/dags/spark_pi_example.py b/examples/airflow/dags/spark_pi_example.py deleted file mode 100644 index c899bee..0000000 --- a/examples/airflow/dags/spark_pi_example.py +++ /dev/null @@ -1,121 +0,0 @@ -"""Submit and monitor a Spark Pi SparkApplication from Airflow.""" - -from __future__ import annotations - -import time -from datetime import datetime, timedelta - -from airflow import DAG -from airflow.operators.python import PythonOperator -from kubernetes import client, config -from kubernetes.client.exceptions import ApiException - - -SPARK_IMAGE = "quay.io/okdp/spark-py:spark-3.5.6-python-3.11-scala-2.12-java-17" -SPARK_MAIN_APP = "local:///opt/spark/examples/jars/spark-examples_2.12-3.5.6.jar" -SPARK_MAIN_CLASS = "org.apache.spark.examples.SparkPi" -NAMESPACE = "default" - - -default_args = { - "owner": "airflow", - "depends_on_past": False, - "start_date": datetime(2026, 1, 1), - "retries": 0, - "retry_delay": timedelta(minutes=2), -} - - -def submit_and_wait_spark_pi(app_name: str, timeout_seconds: int = 900) -> str: - config.load_incluster_config() - api = client.CustomObjectsApi() - - body = { - "apiVersion": "sparkoperator.k8s.io/v1beta2", - "kind": "SparkApplication", - "metadata": {"name": app_name, "namespace": NAMESPACE}, - "spec": { - "type": "Scala", - "mode": "cluster", - "image": SPARK_IMAGE, - "imagePullPolicy": "IfNotPresent", - "mainClass": SPARK_MAIN_CLASS, - "mainApplicationFile": SPARK_MAIN_APP, - "sparkVersion": "3.5.6", - "restartPolicy": {"type": "Never"}, - "timeToLiveSeconds": 600, - "driver": { - "cores": 1, - "memory": "512m", - "serviceAccount": "spark", - "labels": {"version": "3.5.6"}, - }, - "executor": { - "cores": 1, - "instances": 1, - "memory": "512m", - "labels": {"version": "3.5.6"}, - }, - }, - } - - try: - api.delete_namespaced_custom_object( - group="sparkoperator.k8s.io", - version="v1beta2", - namespace=NAMESPACE, - plural="sparkapplications", - name=app_name, - ) - time.sleep(3) - except ApiException as exc: - if exc.status != 404: - raise - - api.create_namespaced_custom_object( - group="sparkoperator.k8s.io", - version="v1beta2", - namespace=NAMESPACE, - plural="sparkapplications", - body=body, - ) - - deadline = time.time() + timeout_seconds - last_state = "PENDING" - while time.time() < deadline: - app = api.get_namespaced_custom_object( - group="sparkoperator.k8s.io", - version="v1beta2", - namespace=NAMESPACE, - plural="sparkapplications", - name=app_name, - ) - last_state = ( - app.get("status", {}) - .get("applicationState", {}) - .get("state", "PENDING") - ) - if last_state == "COMPLETED": - return f"SparkApplication {app_name} completed successfully" - if last_state in {"FAILED", "SUBMISSION_FAILED", "UNKNOWN"}: - raise RuntimeError(f"SparkApplication {app_name} failed with state={last_state}") - time.sleep(10) - - raise TimeoutError( - f"SparkApplication {app_name} did not complete within {timeout_seconds}s (last_state={last_state})" - ) - - -with DAG( - dag_id="spark_pi_midnight", - default_args=default_args, - description="Run Spark Pi via SparkApplication once a day at midnight UTC", - schedule="0 0 * * *", - catchup=False, - tags=["spark", "example", "okdp"], -) as dag: - run_spark_pi = PythonOperator( - task_id="submit_and_wait_spark_pi", - python_callable=submit_and_wait_spark_pi, - op_kwargs={"app_name": "spark-pi-{{ ts_nodash | lower }}"}, - ) diff --git a/examples/airflow/deploy_nyc_taxi.sh b/examples/airflow/deploy_nyc_taxi.sh deleted file mode 100755 index 0c1ecff..0000000 --- a/examples/airflow/deploy_nyc_taxi.sh +++ /dev/null @@ -1,83 +0,0 @@ -#!/bin/bash -# -# Déploiement du pipeline NYC Taxi (Airflow + Spark Operator) -# Usage: ./examples/airflow/deploy_nyc_taxi.sh -# - -set -e - -GREEN='\033[0;32m' -YELLOW='\033[1;33m' -RED='\033[0;31m' -NC='\033[0m' - -NAMESPACE="default" - -echo "=======================================================================" -echo "NYC Taxi Pipeline - Déploiement" -echo "=======================================================================" - -# --- Prérequis --- -echo "" -echo "1. Vérification des prérequis" -echo "-----------------------------------------------------------------------" - -for check in \ - "kubectl:command -v kubectl" \ - "Cluster:kubectl cluster-info" \ - "Spark Operator:kubectl get crd sparkapplications.sparkoperator.k8s.io" \ - "ServiceAccount spark:kubectl get sa spark -n $NAMESPACE" \ - "Secret S3:kubectl get secret creds-examples-s3 -n $NAMESPACE"; do - label="${check%%:*}" - cmd="${check#*:}" - if eval "$cmd" &>/dev/null; then - echo -e "${GREEN}✓ $label${NC}" - else - echo -e "${RED}✗ $label${NC}" - exit 1 - fi -done - -# --- ConfigMap --- -echo "" -echo "2. Déploiement du ConfigMap Spark ETL" -echo "-----------------------------------------------------------------------" - -if kubectl apply -f examples/airflow/manifests/nyc-taxi-etl-configmap.yaml; then - echo -e "${GREEN}✓ ConfigMap déployé${NC}" -else - echo -e "${RED}✗ Échec déploiement ConfigMap${NC}" - exit 1 -fi - -# --- DAG --- -echo "" -echo "3. Déploiement du DAG Airflow" -echo "-----------------------------------------------------------------------" - -SCHEDULER_POD=$(kubectl get pod -n $NAMESPACE -l component=scheduler -o jsonpath='{.items[0].metadata.name}' 2>/dev/null) - -if [ -z "$SCHEDULER_POD" ]; then - echo -e "${YELLOW}⚠ Airflow scheduler non trouvé dans namespace $NAMESPACE${NC}" - echo " Copiez manuellement le DAG:" - echo " kubectl cp examples/airflow/dags/nyc_taxi_pipeline.py $NAMESPACE/:/opt/airflow/dags/" -else - if kubectl cp examples/airflow/dags/nyc_taxi_pipeline.py \ - "$NAMESPACE/$SCHEDULER_POD:/opt/airflow/dags/" 2>/dev/null; then - echo -e "${GREEN}✓ DAG copié dans $SCHEDULER_POD${NC}" - else - echo -e "${YELLOW}⚠ Échec copie du DAG${NC}" - fi -fi - -# --- Done --- -echo "" -echo "=======================================================================" -echo -e "${GREEN}Déploiement terminé${NC}" -echo "=======================================================================" -echo "" -echo "Prochaines étapes:" -echo " 1. Ouvrir Airflow UI : https://airflow.okdp.sandbox" -echo " 2. Trigger le DAG 'nyc_taxi_spark_pipeline'" -echo " 3. Surveiller : kubectl get sparkapplication -n $NAMESPACE -w" -echo "" diff --git a/examples/airflow/manifests/nyc-taxi-etl-configmap.yaml b/examples/airflow/manifests/nyc-taxi-etl-configmap.yaml deleted file mode 100644 index f37e823..0000000 --- a/examples/airflow/manifests/nyc-taxi-etl-configmap.yaml +++ /dev/null @@ -1,150 +0,0 @@ ---- -apiVersion: v1 -kind: ConfigMap -metadata: - name: nyc-taxi-etl-code - namespace: default - labels: - app: nyc-taxi-etl -data: - nyc_taxi_etl.py: | - """ - NYC Taxi ETL - PySpark job - Reads from S3, transforms, writes aggregated results back to S3. - Uses JVM AWS SDK for output (avoids Hadoop rename issues with SeaweedFS). - """ - import argparse - import os - from pyspark.sql import SparkSession - from pyspark.sql.functions import col, sum as _sum, count, avg, round as _round, hour, dayofweek - - def parse_args(): - parser = argparse.ArgumentParser() - parser.add_argument("--input", required=True) - parser.add_argument("--output", required=True) - parser.add_argument("--date", required=True) - return parser.parse_args() - - def upload_to_s3_via_jvm(spark, df, output_uri): - """Collect DataFrame to driver and upload as CSV to S3 via JVM SDK.""" - import csv - import io - from urllib.parse import urlparse - - # Parse output URI - parsed = urlparse(output_uri.replace("s3a://", "s3://", 1)) - bucket = parsed.netloc - key_prefix = parsed.path.lstrip("/") - - # Get credentials from environment - endpoint = os.getenv("S3_ENDPOINT", "") - access_key = os.getenv("AWS_ACCESS_KEY_ID", "") - secret_key = os.getenv("AWS_SECRET_ACCESS_KEY", "") - region = os.getenv("AWS_REGION", "us-east-1") - - if not endpoint or not access_key or not secret_key: - raise RuntimeError("Missing S3 credentials or endpoint") - - # Collect to driver as CSV - rows = df.collect() - columns = df.columns - buf = io.StringIO() - writer = csv.writer(buf) - writer.writerow(columns) - for row in rows: - writer.writerow([row[c] for c in columns]) - csv_content = buf.getvalue() - print(f"Collected {len(rows)} rows, CSV size: {len(csv_content)} bytes") - - # Upload using JVM AWS SDK - jvm = spark.sparkContext._jvm - credentials = jvm.com.amazonaws.auth.BasicAWSCredentials(access_key, secret_key) - credentials_provider = jvm.com.amazonaws.auth.AWSStaticCredentialsProvider(credentials) - endpoint_config = jvm.com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration( - endpoint, region - ) - s3_client = ( - jvm.com.amazonaws.services.s3.AmazonS3ClientBuilder.standard() - .withCredentials(credentials_provider) - .withEndpointConfiguration(endpoint_config) - .withPathStyleAccessEnabled(True) - .build() - ) - - s3_key = f"{key_prefix}/nyc_taxi_aggregated.csv" - print(f"Uploading -> s3://{bucket}/{s3_key}") - s3_client.putObject(bucket, s3_key, csv_content) - - # Upload _SUCCESS marker - s3_client.putObject(bucket, f"{key_prefix}/_SUCCESS", "") - print(f"Output uploaded to s3://{bucket}/{key_prefix}/") - - def main(): - args = parse_args() - - print("=" * 70) - print("NYC Taxi ETL Job") - print("=" * 70) - print(f"Input: {args.input}") - print(f"Output: {args.output}") - print(f"Date: {args.date}") - print("=" * 70) - - spark = SparkSession.builder \ - .appName(f"NYC-Taxi-ETL-{args.date}") \ - .getOrCreate() - - spark.sparkContext.setLogLevel("WARN") - - # Read - print("\nReading data...") - df = spark.read.parquet(args.input) - total_rows = df.count() - print(f"Rows read: {total_rows:,}") - - # Schema - print("\nSchema:") - df.printSchema() - - # Clean - print("\nCleaning...") - df_clean = df.filter( - (col("fare_amount") > 0) & - (col("trip_distance") > 0) & - (col("passenger_count") > 0) - ) - clean_rows = df_clean.count() - print(f"Rows after cleaning: {clean_rows:,}") - print(f"Rows removed: {total_rows - clean_rows:,}") - - # Transform - print("\nTransforming...") - df_transformed = df_clean \ - .withColumn("pickup_hour", hour(col("tpep_pickup_datetime"))) \ - .withColumn("pickup_dayofweek", dayofweek(col("tpep_pickup_datetime"))) - - # Aggregate by hour + day of week - print("\nAggregating...") - df_agg = df_transformed.groupBy("pickup_hour", "pickup_dayofweek").agg( - count("*").alias("total_trips"), - _round(avg("fare_amount"), 2).alias("avg_fare"), - _round(avg("trip_distance"), 2).alias("avg_distance"), - _round(_sum("fare_amount"), 2).alias("total_revenue") - ).orderBy("pickup_hour") - - # Show summary - print("\nResults summary:") - df_agg.show(24, truncate=False) - - # Write output using JVM S3 client (avoids SeaweedFS rename issues) - print(f"\nWriting results to: {args.output}") - upload_to_s3_via_jvm(spark, df_agg, args.output) - - print("\n" + "=" * 70) - print("ETL completed successfully!") - print("=" * 70) - - spark.stop() - - if __name__ == "__main__": - main() diff --git a/examples/airflow/tests/run_integration_tests.sh b/examples/airflow/tests/run_integration_tests.sh deleted file mode 100755 index bf718c4..0000000 --- a/examples/airflow/tests/run_integration_tests.sh +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -NAMESPACE="${1:-default}" -RELEASE="${2:-airflow-main}" -EXEC_DATE="${3:-2026-01-01}" -DAGS_SRC_DIR="${4:-examples/airflow/dags}" - -find_scheduler_pod() { - kubectl get pods -n "${NAMESPACE}" --no-headers \ - | awk '/airflow-main-scheduler/ && $2 ~ /^1\/1$/ && $3 == "Running" {print $1; exit}' -} - -find_webserver_pod() { - kubectl get pods -n "${NAMESPACE}" --no-headers \ - | awk '/airflow-main-webserver/ && $2 ~ /^1\/1$/ && $3 == "Running" {print $1; exit}' -} - -SCHEDULER_POD="$(find_scheduler_pod)" -WEBSERVER_POD="$(find_webserver_pod)" - -if [[ -z "${SCHEDULER_POD}" || -z "${WEBSERVER_POD}" ]]; then - echo "Pods Airflow introuvables dans namespace ${NAMESPACE}" - exit 1 -fi - -echo "Using scheduler pod: ${SCHEDULER_POD}" -echo "Using webserver pod: ${WEBSERVER_POD}" -echo "Syncing DAGs from ${DAGS_SRC_DIR}..." -kubectl cp "${DAGS_SRC_DIR}/." "${NAMESPACE}/${SCHEDULER_POD}:/opt/airflow/dags" -c scheduler -kubectl cp "${DAGS_SRC_DIR}/." "${NAMESPACE}/${WEBSERVER_POD}:/opt/airflow/dags" -c webserver -sleep 8 - -echo "Reserializing DAGs..." -kubectl exec -n "${NAMESPACE}" "${SCHEDULER_POD}" -c scheduler -- airflow dags reserialize - -echo "Listing DAGs..." -kubectl exec -n "${NAMESPACE}" "${SCHEDULER_POD}" -c scheduler -- airflow dags list | grep -E "hello_daily|orders_etl_daily" - -echo "Testing hello_daily..." -kubectl exec -n "${NAMESPACE}" "${SCHEDULER_POD}" -c scheduler -- airflow tasks test hello_daily log_hello "${EXEC_DATE}" - -echo "Testing orders_etl_daily..." -kubectl exec -n "${NAMESPACE}" "${SCHEDULER_POD}" -c scheduler -- airflow dags test orders_etl_daily "${EXEC_DATE}" - -echo "All integration DAG tests passed." diff --git a/examples/airflow/tests/test_dags.py b/examples/airflow/tests/test_dags.py deleted file mode 100644 index 5c525ba..0000000 --- a/examples/airflow/tests/test_dags.py +++ /dev/null @@ -1,41 +0,0 @@ -from pathlib import Path - -import pytest - - -airflow = pytest.importorskip("airflow") -from airflow.models import DagBag -from airflow.operators.python import PythonOperator - - -DAGS_DIR = Path(__file__).resolve().parents[1] / "dags" - - -@pytest.fixture(scope="module") -def dag_bag() -> DagBag: - bag = DagBag(dag_folder=str(DAGS_DIR), include_examples=False) - assert not bag.import_errors, f"DAG import errors: {bag.import_errors}" - return bag - - -def test_hello_daily_dag_loaded(dag_bag: DagBag) -> None: - dag = dag_bag.get_dag("hello_daily") - assert dag is not None - assert "example" in dag.tags - assert "daily" in dag.tags - - task_ids = {task.task_id for task in dag.tasks} - assert task_ids == {"log_hello"} - hello_task = dag.get_task("log_hello") - assert isinstance(hello_task, PythonOperator) - - -def test_orders_etl_dag_loaded(dag_bag: DagBag) -> None: - dag = dag_bag.get_dag("orders_etl_daily") - assert dag is not None - assert "etl" in dag.tags - assert "spark" in dag.tags - - task = dag.get_task("submit_and_wait_orders_etl") - assert isinstance(task, PythonOperator) - assert task.op_kwargs["run_suffix"] == "{{ ts_nodash | lower }}" From f12b3bb0635f0955976935528f2d58fb0e0c4fbd Mon Sep 17 00:00:00 2001 From: abir-oumghar Date: Mon, 11 May 2026 11:17:40 +0200 Subject: [PATCH 3/3] fix(airflow): drop parasitic ACME annotation on web ingress The sandbox issues certificates via a local CA ClusterIssuer (no ACME challenge), so 'acme.cert-manager.io/http01-edit-in-place' prevents cert-manager's ingress-shim from creating the airflow-tls Secret. As a result the ingress falls back to nginx's default cert, which the browser rejects. Other services (Jupyter, Superset, Trino...) don't carry this annotation and work as expected. --- packages/okdp-packages/airflow/airflow.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/okdp-packages/airflow/airflow.yaml b/packages/okdp-packages/airflow/airflow.yaml index 1d6f8b5..a2c927d 100644 --- a/packages/okdp-packages/airflow/airflow.yaml +++ b/packages/okdp-packages/airflow/airflow.yaml @@ -341,7 +341,6 @@ modules: annotations: kubernetes.io/ingress.class: nginx cert-manager.io/cluster-issuer: {{ .Context.certificateIssuers.selfSigned.name }} - acme.cert-manager.io/http01-edit-in-place: "true" nginx.ingress.kubernetes.io/proxy-body-size: "0" hosts: