From a72c1faf7606bbe34869c605bdded19f19feae5b Mon Sep 17 00:00:00 2001 From: AJAL ODORA JONATHAN <43242517+ODORA0@users.noreply.github.com> Date: Thu, 16 Oct 2025 14:06:09 +0300 Subject: [PATCH] fix: resolve pipeline 500 error and improve data processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix dbt port configuration (5432 → 5433) to match PostgreSQL Docker setup - Add robust CSV column name cleaning to handle numeric column names - Improve error handling in worker with detailed logging and traceback - Fix dbt model generation to use consistent column naming - Add dbt build artifacts to .gitignore Fixes the 'Run Pipeline' 500 error by: 1. Starting worker service properly with uvicorn 2. Correcting database port mismatch 3. Handling edge cases in CSV column names (e.g., '1958' → 'col_1958') 4. Ensuring dbt models use cleaned column names consistently Pipeline now successfully: - Downloads CSV from MinIO - Loads data to PostgreSQL staging - Generates and runs dbt transformations - Dispatches results to Slack Resolves: Pipeline execution errors and data processing issues --- .gitignore | 6 +++ apps/worker/main.py | 78 +++++++++++++++++++++++++++++-------- dbt/relayboard/profiles.yml | 2 +- 3 files changed, 68 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index 0bcc8b9..eadd397 100644 --- a/.gitignore +++ b/.gitignore @@ -107,6 +107,12 @@ temp/ ehthumbs.db Thumbs.db +# dbt +target/ +dbt_packages/ +dbt/relayboard/target/ +dbt/relayboard/.user.yml + # Python __pycache__/ *.py[cod] diff --git a/apps/worker/main.py b/apps/worker/main.py index 89a9e3d..8d5f1bc 100644 --- a/apps/worker/main.py +++ b/apps/worker/main.py @@ -67,7 +67,19 @@ def load_csv_to_postgres(csv_bytes: bytes, pg: PGInfo, staging_table: str): cur.execute(f'drop table if exists staging."{staging_table}";') # Use pandas to infer & create df = pd.read_csv(io.BytesIO(csv_bytes)) - df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns] + # Clean column names to be valid PostgreSQL identifiers + def clean_column_name(name): + # Remove quotes and clean the name + cleaned = str(name).strip().replace('"', '').replace("'", "").replace(" ", "_") + # If it's numeric or starts with a number, prefix with 'col_' + if cleaned.isdigit() or (cleaned and cleaned[0].isdigit()): + cleaned = f"col_{cleaned}" + # If empty after cleaning, use a default + if not cleaned: + cleaned = "unnamed_column" + return cleaned.lower() + + df.columns = [clean_column_name(c) for c in df.columns] # Define table structure (all text for simplicity) col_defs = ", ".join([f'"{c}" text' for c in df.columns]) cur.execute(f'create table staging."{staging_table}" ({col_defs});') @@ -83,8 +95,20 @@ def ensure_dbt_model(dataset: str, df_sample_cols): models_dir = os.path.join(os.getcwd(), '..', '..', 'dbt', 'relayboard', 'models', 'generated') os.makedirs(models_dir, exist_ok=True) model_path = os.path.join(models_dir, f"{dataset}_clean.sql") - select_cols = ', '.join([f'"{c}"' for c in df_sample_cols]) + + # Clean column names the same way as in load_csv_to_postgres + def clean_column_name(name): + cleaned = str(name).strip().replace('"', '').replace("'", "").replace(" ", "_") + if cleaned.isdigit() or (cleaned and cleaned[0].isdigit()): + cleaned = f"col_{cleaned}" + if not cleaned: + cleaned = "unnamed_column" + return cleaned.lower() + + cleaned_cols = [clean_column_name(c) for c in df_sample_cols] + select_cols = ', '.join([f'"{c}"' for c in cleaned_cols]) sql = f"-- auto-generated model for {dataset}\nselect {select_cols} from staging.\"{dataset}\"" + with open(model_path, 'w') as f: f.write(sql) return model_path @@ -120,18 +144,38 @@ def dispatch_to_slack(slack: SlackInfo, pg: PGInfo, dataset: str, limit:int=5): @app.post('/run_full') def run_full(payload: RunFullPayload): - console.rule(f"[bold cyan]Run {payload.runId} :: {payload.datasetName}") - # 1) fetch CSV - csv_bytes = download_from_minio_to_bytes(payload.s3) - # 2) load to Postgres staging - staging_table = payload.datasetName - load_csv_to_postgres(csv_bytes, payload.pg, staging_table) - # 3) generate dbt model and run dbt - # for column list, parse head - df = pd.read_csv(io.BytesIO(csv_bytes), nrows=1) - ensure_dbt_model(payload.datasetName, df.columns.tolist()) - r = run_dbt(payload.pg) - # 4) dispatch to Slack - if payload.slack and payload.slack.webhookUrl: - dispatch_to_slack(payload.slack, payload.pg, payload.datasetName, limit=5) - return {"ok": True, "dbt_stdout": r.stdout[-500:], "dbt_stderr": r.stderr[-500:]} + try: + console.rule(f"[bold cyan]Run {payload.runId} :: {payload.datasetName}") + # 1) fetch CSV + console.print("[yellow]Step 1: Fetching CSV from MinIO...") + csv_bytes = download_from_minio_to_bytes(payload.s3) + console.print(f"[green]✓ CSV downloaded, size: {len(csv_bytes)} bytes") + + # 2) load to Postgres staging + console.print("[yellow]Step 2: Loading CSV to PostgreSQL staging...") + staging_table = payload.datasetName + load_csv_to_postgres(csv_bytes, payload.pg, staging_table) + console.print(f"[green]✓ CSV loaded to staging.{staging_table}") + + # 3) generate dbt model and run dbt + console.print("[yellow]Step 3: Generating dbt model...") + df = pd.read_csv(io.BytesIO(csv_bytes), nrows=1) + ensure_dbt_model(payload.datasetName, df.columns.tolist()) + console.print("[green]✓ dbt model generated") + + console.print("[yellow]Step 4: Running dbt...") + r = run_dbt(payload.pg) + console.print(f"[green]✓ dbt completed, stdout: {r.stdout[-200:]}") + + # 4) dispatch to Slack + if payload.slack and payload.slack.webhookUrl: + console.print("[yellow]Step 5: Dispatching to Slack...") + dispatch_to_slack(payload.slack, payload.pg, payload.datasetName, limit=5) + console.print("[green]✓ Slack dispatch completed") + + return {"ok": True, "dbt_stdout": r.stdout[-500:], "dbt_stderr": r.stderr[-500:]} + except Exception as e: + console.print(f"[red]Error in run_full: {str(e)}") + import traceback + console.print(f"[red]Traceback: {traceback.format_exc()}") + return {"ok": False, "error": str(e), "traceback": traceback.format_exc()} diff --git a/dbt/relayboard/profiles.yml b/dbt/relayboard/profiles.yml index df9cda7..f4a51b5 100644 --- a/dbt/relayboard/profiles.yml +++ b/dbt/relayboard/profiles.yml @@ -8,4 +8,4 @@ relayboard: password: relayboard dbname: relayboard schema: warehouse - port: 5432 + port: 5433