From 2af09fed6097868d25b579234098cc8daf25bc90 Mon Sep 17 00:00:00 2001 From: AJAL ODORA JONATHAN <43242517+ODORA0@users.noreply.github.com> Date: Thu, 16 Oct 2025 14:41:39 +0300 Subject: [PATCH] feat: enhance Slack notifications with comprehensive data display MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Increase row limit from 5 to 20 rows per Slack message - Add total row count display for complete dataset visibility - Implement intelligent pagination for large datasets (>20 rows) - Show clear truncation indicators when data is limited - Add ORDER BY clause for consistent data ordering - Improve message formatting with better information hierarchy Benefits: - Complete visibility for small datasets (≤20 rows) - Graceful handling of large datasets with pagination notes - Better context for data pipeline results - More informative and actionable Slack notifications Resolves: Limited data visibility in Slack notifications --- apps/worker/main.py | 37 +++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/apps/worker/main.py b/apps/worker/main.py index 8d5f1bc..62d0854 100644 --- a/apps/worker/main.py +++ b/apps/worker/main.py @@ -118,27 +118,48 @@ def run_dbt(pg: PGInfo): dbt_dir = os.path.join(os.getcwd(), '..', '..', 'dbt', 'relayboard') return subprocess.run(["dbt", "run"], cwd=dbt_dir, capture_output=True, text=True) -def dispatch_to_slack(slack: SlackInfo, pg: PGInfo, dataset: str, limit:int=5): +def dispatch_to_slack(slack: SlackInfo, pg: PGInfo, dataset: str, max_rows_per_message: int = 20): conn_str = f"host={pg.host} port={pg.port} user={pg.user} password={pg.password} dbname={pg.database}" with psycopg.connect(conn_str) as conn: with conn.cursor() as cur: # simple: read from warehouse._clean if exists, else from staging table = f'warehouse."{dataset}_clean"' try: - cur.execute(f"select * from {table} limit {limit}") + # First get total count + cur.execute(f"select count(*) from {table}") + total_rows = cur.fetchone()[0] + + # Then get the data with ordering for consistency + cur.execute(f"select * from {table} order by 1 limit {max_rows_per_message}") except Exception: table = f'staging."{dataset}"' - cur.execute(f"select * from {table} limit {limit}") + # First get total count + cur.execute(f"select count(*) from {table}") + total_rows = cur.fetchone()[0] + + # Then get the data with ordering for consistency + cur.execute(f"select * from {table} order by 1 limit {max_rows_per_message}") + rows = cur.fetchall() cols = [d[0] for d in cur.description] - text_lines = ["*Relayboard Dispatch*", - f"Table: `{table}`", - f"Rows: {len(rows)}", - "```"] + + text_lines = [ + "*Relayboard Dispatch*", + f"Table: `{table}`", + f"Total Rows: {total_rows}", + f"Showing: {len(rows)} rows" + (f" (first {max_rows_per_message})" if total_rows > max_rows_per_message else ""), + "```" + ] + for r in rows: line = ', '.join([f"{c}={str(v)[:32]}" for c,v in zip(cols, r)]) text_lines.append(line) + text_lines.append("```") + + if total_rows > max_rows_per_message: + text_lines.append(f"_Note: Showing first {max_rows_per_message} of {total_rows} total rows_") + payload = {"text": "\n".join(text_lines)} requests.post(slack.webhookUrl, json=payload) @@ -170,7 +191,7 @@ def run_full(payload: RunFullPayload): # 4) dispatch to Slack if payload.slack and payload.slack.webhookUrl: console.print("[yellow]Step 5: Dispatching to Slack...") - dispatch_to_slack(payload.slack, payload.pg, payload.datasetName, limit=5) + dispatch_to_slack(payload.slack, payload.pg, payload.datasetName, max_rows_per_message=20) console.print("[green]✓ Slack dispatch completed") return {"ok": True, "dbt_stdout": r.stdout[-500:], "dbt_stderr": r.stderr[-500:]}