Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions apps/worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,27 +118,48 @@ def run_dbt(pg: PGInfo):
dbt_dir = os.path.join(os.getcwd(), '..', '..', 'dbt', 'relayboard')
return subprocess.run(["dbt", "run"], cwd=dbt_dir, capture_output=True, text=True)

def dispatch_to_slack(slack: SlackInfo, pg: PGInfo, dataset: str, limit:int=5):
def dispatch_to_slack(slack: SlackInfo, pg: PGInfo, dataset: str, max_rows_per_message: int = 20):
conn_str = f"host={pg.host} port={pg.port} user={pg.user} password={pg.password} dbname={pg.database}"
with psycopg.connect(conn_str) as conn:
with conn.cursor() as cur:
# simple: read from warehouse.<dataset>_clean if exists, else from staging
table = f'warehouse."{dataset}_clean"'
try:
cur.execute(f"select * from {table} limit {limit}")
# First get total count
cur.execute(f"select count(*) from {table}")
total_rows = cur.fetchone()[0]

# Then get the data with ordering for consistency
cur.execute(f"select * from {table} order by 1 limit {max_rows_per_message}")
except Exception:
table = f'staging."{dataset}"'
cur.execute(f"select * from {table} limit {limit}")
# First get total count
cur.execute(f"select count(*) from {table}")
total_rows = cur.fetchone()[0]

# Then get the data with ordering for consistency
cur.execute(f"select * from {table} order by 1 limit {max_rows_per_message}")

rows = cur.fetchall()
cols = [d[0] for d in cur.description]
text_lines = ["*Relayboard Dispatch*",
f"Table: `{table}`",
f"Rows: {len(rows)}",
"```"]

text_lines = [
"*Relayboard Dispatch*",
f"Table: `{table}`",
f"Total Rows: {total_rows}",
f"Showing: {len(rows)} rows" + (f" (first {max_rows_per_message})" if total_rows > max_rows_per_message else ""),
"```"
]

for r in rows:
line = ', '.join([f"{c}={str(v)[:32]}" for c,v in zip(cols, r)])
text_lines.append(line)

text_lines.append("```")

if total_rows > max_rows_per_message:
text_lines.append(f"_Note: Showing first {max_rows_per_message} of {total_rows} total rows_")

payload = {"text": "\n".join(text_lines)}
requests.post(slack.webhookUrl, json=payload)

Expand Down Expand Up @@ -170,7 +191,7 @@ def run_full(payload: RunFullPayload):
# 4) dispatch to Slack
if payload.slack and payload.slack.webhookUrl:
console.print("[yellow]Step 5: Dispatching to Slack...")
dispatch_to_slack(payload.slack, payload.pg, payload.datasetName, limit=5)
dispatch_to_slack(payload.slack, payload.pg, payload.datasetName, max_rows_per_message=20)
console.print("[green]✓ Slack dispatch completed")

return {"ok": True, "dbt_stdout": r.stdout[-500:], "dbt_stderr": r.stderr[-500:]}
Expand Down