Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ temp/
ehthumbs.db
Thumbs.db

# dbt
target/
dbt_packages/
dbt/relayboard/target/
dbt/relayboard/.user.yml

# Python
__pycache__/
*.py[cod]
Expand Down
78 changes: 61 additions & 17 deletions apps/worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,19 @@ def load_csv_to_postgres(csv_bytes: bytes, pg: PGInfo, staging_table: str):
cur.execute(f'drop table if exists staging."{staging_table}";')
# Use pandas to infer & create
df = pd.read_csv(io.BytesIO(csv_bytes))
df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns]
# Clean column names to be valid PostgreSQL identifiers
def clean_column_name(name):
# Remove quotes and clean the name
cleaned = str(name).strip().replace('"', '').replace("'", "").replace(" ", "_")
# If it's numeric or starts with a number, prefix with 'col_'
if cleaned.isdigit() or (cleaned and cleaned[0].isdigit()):
cleaned = f"col_{cleaned}"
# If empty after cleaning, use a default
if not cleaned:
cleaned = "unnamed_column"
return cleaned.lower()

df.columns = [clean_column_name(c) for c in df.columns]
# Define table structure (all text for simplicity)
col_defs = ", ".join([f'"{c}" text' for c in df.columns])
cur.execute(f'create table staging."{staging_table}" ({col_defs});')
Expand All @@ -83,8 +95,20 @@ def ensure_dbt_model(dataset: str, df_sample_cols):
models_dir = os.path.join(os.getcwd(), '..', '..', 'dbt', 'relayboard', 'models', 'generated')
os.makedirs(models_dir, exist_ok=True)
model_path = os.path.join(models_dir, f"{dataset}_clean.sql")
select_cols = ', '.join([f'"{c}"' for c in df_sample_cols])

# Clean column names the same way as in load_csv_to_postgres
def clean_column_name(name):
cleaned = str(name).strip().replace('"', '').replace("'", "").replace(" ", "_")
if cleaned.isdigit() or (cleaned and cleaned[0].isdigit()):
cleaned = f"col_{cleaned}"
if not cleaned:
cleaned = "unnamed_column"
return cleaned.lower()

cleaned_cols = [clean_column_name(c) for c in df_sample_cols]
select_cols = ', '.join([f'"{c}"' for c in cleaned_cols])
sql = f"-- auto-generated model for {dataset}\nselect {select_cols} from staging.\"{dataset}\""

with open(model_path, 'w') as f:
f.write(sql)
return model_path
Expand Down Expand Up @@ -120,18 +144,38 @@ def dispatch_to_slack(slack: SlackInfo, pg: PGInfo, dataset: str, limit:int=5):

@app.post('/run_full')
def run_full(payload: RunFullPayload):
console.rule(f"[bold cyan]Run {payload.runId} :: {payload.datasetName}")
# 1) fetch CSV
csv_bytes = download_from_minio_to_bytes(payload.s3)
# 2) load to Postgres staging
staging_table = payload.datasetName
load_csv_to_postgres(csv_bytes, payload.pg, staging_table)
# 3) generate dbt model and run dbt
# for column list, parse head
df = pd.read_csv(io.BytesIO(csv_bytes), nrows=1)
ensure_dbt_model(payload.datasetName, df.columns.tolist())
r = run_dbt(payload.pg)
# 4) dispatch to Slack
if payload.slack and payload.slack.webhookUrl:
dispatch_to_slack(payload.slack, payload.pg, payload.datasetName, limit=5)
return {"ok": True, "dbt_stdout": r.stdout[-500:], "dbt_stderr": r.stderr[-500:]}
try:
console.rule(f"[bold cyan]Run {payload.runId} :: {payload.datasetName}")
# 1) fetch CSV
console.print("[yellow]Step 1: Fetching CSV from MinIO...")
csv_bytes = download_from_minio_to_bytes(payload.s3)
console.print(f"[green]✓ CSV downloaded, size: {len(csv_bytes)} bytes")

# 2) load to Postgres staging
console.print("[yellow]Step 2: Loading CSV to PostgreSQL staging...")
staging_table = payload.datasetName
load_csv_to_postgres(csv_bytes, payload.pg, staging_table)
console.print(f"[green]✓ CSV loaded to staging.{staging_table}")

# 3) generate dbt model and run dbt
console.print("[yellow]Step 3: Generating dbt model...")
df = pd.read_csv(io.BytesIO(csv_bytes), nrows=1)
ensure_dbt_model(payload.datasetName, df.columns.tolist())
console.print("[green]✓ dbt model generated")

console.print("[yellow]Step 4: Running dbt...")
r = run_dbt(payload.pg)
console.print(f"[green]✓ dbt completed, stdout: {r.stdout[-200:]}")

# 4) dispatch to Slack
if payload.slack and payload.slack.webhookUrl:
console.print("[yellow]Step 5: Dispatching to Slack...")
dispatch_to_slack(payload.slack, payload.pg, payload.datasetName, limit=5)
console.print("[green]✓ Slack dispatch completed")

return {"ok": True, "dbt_stdout": r.stdout[-500:], "dbt_stderr": r.stderr[-500:]}
except Exception as e:
console.print(f"[red]Error in run_full: {str(e)}")
import traceback
console.print(f"[red]Traceback: {traceback.format_exc()}")
return {"ok": False, "error": str(e), "traceback": traceback.format_exc()}
2 changes: 1 addition & 1 deletion dbt/relayboard/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ relayboard:
password: relayboard
dbname: relayboard
schema: warehouse
port: 5432
port: 5433