Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
From c2324c74ceb8da6ae9cde5355d981cf872650bee Mon Sep 17 00:00:00 2001
From: Martin <martinngutswen02@gmail.com>
Date: Fri, 29 May 2026 16:09:30 +0100
Subject: [PATCH 1/4] feat: implement Redis pub/sub for real-time notifications

---
src/jobs/scheduler.ts | 8 +++
src/models/transaction.ts | 2 +-
src/workers/notificationWorker.ts | 113 ++++++++++++++++++++++++++++++
3 files changed, 122 insertions(+), 1 deletion(-)
create mode 100644 src/workers/notificationWorker.ts

diff --git a/src/jobs/scheduler.ts b/src/jobs/scheduler.ts
index 3fff24b..6186211 100644
--- a/src/jobs/scheduler.ts
+++ b/src/jobs/scheduler.ts
@@ -17,6 +17,7 @@ import { runLiquidityRebalanceJob } from "./liquidityRebalanceJob";
import { runCrossChainMonitorJob } from "./crossChainMonitorJob";
import { runDailyProviderReconciliation } from "./providerReconciliationJob";
import { runReconciliationJob } from "./reconciliationJob";
+import { startNotificationWorker } from "../workers/notificationWorker";


interface JobConfig {
@@ -128,4 +129,11 @@ export function startJobs(): void {
cron.schedule(job.schedule, () => runJob(job));
console.log(`[scheduler] "${job.name}" scheduled - ${job.schedule}`);
}
+
+ // Start the notification worker which listens for Redis pub/sub events
+ // and drives user-facing notifications in real-time. This replaces any
+ // DB-polling notification mechanisms.
+ startNotificationWorker().catch((err) => {
+ console.warn("Failed to start NotificationWorker:", err);
+ });
}
diff --git a/src/models/transaction.ts b/src/models/transaction.ts
index bfbd273..00d5e31 100644
--- a/src/models/transaction.ts
+++ b/src/models/transaction.ts
@@ -195,7 +195,7 @@ export class TransactionModel {
const res = await queryWrite(q, params);
if (!res.rowCount) return;

- const row = result.rows[0];
+ const row = res.rows[0];

// ── Invalidate caches on transaction status update ────────────────────
if (row.user_id) {
diff --git a/src/workers/notificationWorker.ts b/src/workers/notificationWorker.ts
new file mode 100644
index 0000000..59050b4
--- /dev/null
+++ b/src/workers/notificationWorker.ts
@@ -0,0 +1,113 @@
+import IORedis from "ioredis";
+import { SubscriptionChannels } from "../graphql/subscriptions";
+import { notificationRouter } from "../services/notificationRouter";
+import { TransactionModel } from "../models/transaction";
+
+const REDIS_URL = process.env.REDIS_URL || "redis://localhost:6379";
+
+const redisOptions: any = {
+ retryStrategy: (times: number) => Math.min(100 + times * 200, 3000),
+ enableOfflineQueue: false,
+ maxRetriesPerRequest: 1,
+ lazyConnect: false,
+};
+
+let subscriber: IORedis | null = null;
+
+/**
+ * Notification worker — subscribes to transaction update channels in Redis
+ * and routes user-facing notifications (email/sms/push/etc.) via
+ * `NotificationRouter`. This replaces DB polling for notification triggers.
+ */
+export async function startNotificationWorker(): Promise<void> {
+ if (!process.env.REDIS_URL) {
+ console.warn(
+ "NotificationWorker: REDIS_URL not set — running without Redis subscription",
+ );
+ return;
+ }
+
+ subscriber = new IORedis(REDIS_URL, redisOptions);
+
+ subscriber.on("connect", () => console.log("NotificationWorker: Redis connected"));
+ subscriber.on("error", (err) =>
+ console.error("NotificationWorker: Redis error:", err),
+ );
+
+ await subscriber.connect();
+
+ // Subscribe to broadcast updates and per-transaction channels (pattern)
+ await subscriber.subscribe(SubscriptionChannels.TRANSACTION_UPDATED);
+ await subscriber.psubscribe("TRANSACTION_UPDATED:*");
+
+ subscriber.on("message", async (_channel: string, rawMessage: string) => {
+ try {
+ const payload = JSON.parse(rawMessage) as {
+ id?: string;
+ status?: string;
+ [key: string]: any;
+ };
+
+ const txId = payload.id;
+ const status = payload.status;
+ if (!txId || !status) return;
+
+ const txModel = new TransactionModel();
+ const tx = await txModel.findById(txId);
+ if (!tx) return;
+
+ if (status === "completed") {
+ await notificationRouter.routeTransactionNotification(tx, "completed");
+ } else if (status === "failed") {
+ await notificationRouter.routeTransactionNotification(tx, "failed", payload.error);
+ }
+ } catch (err) {
+ console.error("NotificationWorker: failed to handle message:", err);
+ }
+ });
+
+ // pmessage handles pattern subscriptions (TRANSACTION_UPDATED:<id>)
+ subscriber.on(
+ "pmessage",
+ async (_pattern: string, _channel: string, rawMessage: string) => {
+ try {
+ const payload = JSON.parse(rawMessage) as {
+ id?: string;
+ status?: string;
+ [key: string]: any;
+ };
+
+ const txId = payload.id;
+ const status = payload.status;
+ if (!txId || !status) return;
+
+ const txModel = new TransactionModel();
+ const tx = await txModel.findById(txId);
+ if (!tx) return;
+
+ if (status === "completed") {
+ await notificationRouter.routeTransactionNotification(tx, "completed");
+ } else if (status === "failed") {
+ await notificationRouter.routeTransactionNotification(tx, "failed", payload.error);
+ }
+ } catch (err) {
+ console.error("NotificationWorker: failed to handle pmessage:", err);
+ }
+ },
+ );
+
+ console.log("NotificationWorker: subscribed to transaction update channels");
+}
+
+export async function stopNotificationWorker(): Promise<void> {
+ try {
+ if (!subscriber) return;
+ await subscriber.unsubscribe(SubscriptionChannels.TRANSACTION_UPDATED);
+ await subscriber.punsubscribe("TRANSACTION_UPDATED:*");
+ await subscriber.quit();
+ subscriber = null;
+ console.log("NotificationWorker: stopped");
+ } catch (err) {
+ console.warn("NotificationWorker: stop error:", err);
+ }
+}
--
2.45.1.windows.1

103 changes: 103 additions & 0 deletions .pull_request/0002-chore-pr-add-PR-bundle-patch-description.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
From 44ee883a769411489145c35d82c0da042411f2bf Mon Sep 17 00:00:00 2001
From: Martin <martinngutswen02@gmail.com>
Date: Fri, 29 May 2026 16:15:22 +0100
Subject: [PATCH 2/4] chore(pr): add PR bundle (patch + description)

---
.pull_request/PR.md | 21 +++++++++++++++++++++
.pull_request/changes.patch | Bin 0 -> 11444 bytes
2 files changed, 21 insertions(+)
create mode 100644 .pull_request/PR.md
create mode 100644 .pull_request/changes.patch

diff --git a/.pull_request/PR.md b/.pull_request/PR.md
new file mode 100644
index 0000000..bc020e1
--- /dev/null
+++ b/.pull_request/PR.md
@@ -0,0 +1,21 @@
+# Implement Redis Pub/Sub real-time notification worker
+
+Adds a Redis-backed notification worker that subscribes to transaction update channels and routes user notifications via the existing NotificationRouter. Wires the worker into the job scheduler and fixes a bug in TransactionModel.updateStatus.
+
+## Files changed
+- src/workers/notificationWorker.ts (new)
+- src/jobs/scheduler.ts (start worker)
+- src/models/transaction.ts (bugfix)
+
+## Testing notes
+1. Ensure `REDIS_URL` points to a running Redis instance.
+2. Start the app: `npm run dev`
+3. Publish test messages to Redis, e.g.:
+ `redis-cli PUBLISH transaction.updated '{"id":"<txId>","status":"completed"}'`
+
+## Acceptance criteria
+- Worker subscribes to transaction channels and routes notifications via `NotificationRouter`.
+- Replaces DB polling for notifications, lowers DB load, and enables sub-second notifications.
+
+## Patch apply (git)
+Apply the patch locally: `git apply .pull_request/changes.patch`
diff --git a/.pull_request/changes.patch b/.pull_request/changes.patch
new file mode 100644
index 0000000000000000000000000000000000000000..49e17b48233105c6672a61ca043d2781196774ea
GIT binary patch
literal 11444
zcmd6t+j1Mn5r(JAcd5!dtXR2#B?_cSTZ$Dsu|p+Qa#@x`Dy0jTD*{3A;E*5;P?Q;)
zPvM918~HNH|4m~!GrP0Ef{q;57O|(9K27)E-Lw49zYpArJJeCR`#O$vccJTn+i){C
zbwi!McIR$icRU%oV>faK?zKDBdFuKaJ=6I8mB#<p{YHGoqBC;eYWB95;LvTk_e80$
z<Dq-tey`sxogZk#1GlN;q29men2oIWSC*5R_xy1ii9dE9i{pLC^2n`AmP(ZD`jM{f
z?TI^c6WMF(t@xduPi3pI`%L<NqpK6?jD@e<u5{m*olo^QmR(2gQ*Y<WU2E2%=ADaH
z<<?}44UOtLf6e{DTd2~EvMf8E_Dt)xqtzT}+}J%6x3QND*%wJN>^OV598Wy&Q`zDR
zPxDgz@rbFn&Cp9U)H8G+$Yv-0N!dP=6)2c?%jIK*+N4-yq3P0jf%iS>R9~Z+G#yBr
z3-`J7Me9TT4HFBz)bpwPr?*te8u-hbWDJoz?m#O@Rn7f}Xu*rdl7dw^lnnT3HTRDl
z(Hwi}j`Zw1KP$$_mg09^(P~k$<v!4z#j=ed@?)eN`Y0KCeW6;(JA)m})-%_0@4Cl&
zb1A-fO5j=P$`13*u_C69RJ}QhHQ%gjE|e?xk^7^5EnbahnDwsShp39YkaH$kK*GL$
z&xM{n_sV_Q;68O<iSnBJovie=#({BTS@TRYuk{>lb7Vz3@GYHDA9F2Uxm?$N4erf<
zk#s{x<@RoDS?LM{uf)IXT~+2g+#P%WJJuW`k#!k)%|R(VvGVZ>u8|2QT1m?WoGXpJ
z^yA4BhQCoRkPl4{MCHVfgTiZ##NY1EJq{1V!`dWf4r5rmk*s>EnZym&?d$0auK}ZY
zKhb;hW1b+vRYR^P?$4rucb<Bk7xS+?C)N*YSm8{6{e)`t<t{OeH}>Zd57nLy>@iKe
z)S~U4G_b>PDthA^$ugBRJ$ct(bq$J=w^{#l{YHtZPB`;ebcmE7hed<&zfQc+JaBnU
zaV9Ci`cp}=A(^J$b~F7(3@FtW-RI$Jctxl&_T^3cy4zCr-j;p)zM?pA?@NkpAyc;+
zV>SMzT4OZ|5l=smr0bI1@HF0)&~zxf4rEz8{6*2W&u;8rLQz9D>Odj}Dj(@@AS}R>
zcup;YFI>5o`sLY3zV?mxk%>mJo<u>8PxvbN48*-Nr`^~2T$ITsR7w}#%i(*Wdwe(`
zFDnWr<{ocSLcydoiK$O7w8lnzCQRNG2B%&Nl@K*$8s*s`I|exnVSjk%C-<YX-^x9e
z)R(dVczK{R)`1(|4f<kH%W`GU5Alfw82egS6zQbAe#vXA2ZG*V9nHdhFDV}J53e)%
z)G%r}7Tl&*DPhCy>7dzI>w*taZGn}bx?xlk8B$!KFNp`Z^1f=FP5o{PZ}M(s!=urq
zugCDv(|L{nC-RQu=jjqqV^i_CDa!OIR();jj<;xBuRfo;=UFCCdk|`GBFnOTs1)lI
z=y&$y33Q9pVkh$3P`B^Q^CNZ!_1h}fBh4FV-m%c+RF;~x(3>eEg?MgO(QUQDu16WG
zXWJyO>Vs~CYIjlfoKjmP&2kKG_Iy<F!2=g%$;j8%G{v-C_ZRm>XtbxO{JYlnGg%!Z
zw{FR_qf()syVSWf{(JYmW~?Qv|5%dIk)C@*JJm{^%Ssoj%^xb0K9ct@k|}!8^Ta!5
zzOuJ!%w#n5unNPn0oi6xeDDue@JRRc13m9!RC`8cERQ*p4}wOqu7Bod9ZSnItt^$1
z?I{o)R7F-V;+J>zx8-A+if7wjZHm%*Qqf_tfp~xv&xIL>x`OJ+{Y~6Q{twycnOMD1
zHv3FZzxA<(HYa*JQxA;1yZW`<Y8C~1QoiK%e@Sn$<=FG1UMh3CzKIX>3}hwK<q>6e
zf<it1S$Av+PT87-yAXFqcS9Dtdt)1{*H-1$dy605`O*EP@lA<X2dh-uGGv|;L-ife
zh1Ezq6O+|eH}oCkuGivY8TmGS2bDRiNzMSPsChQz*U&oF6&4@L=kP-iWsuqFRjZ{^
zWO=3ei@O;56{0%tS3-BzhJ{!d?>o|g@1;G<UCC}1%lmYy$}3rRD1M82_BIcADH+4w
zMd;^k_lKw?PCk^KtiPulPf;LL9Cud0uULl|lB88%=-gwk6nrGqIN)9vO^BWncADk!
zZ;QO(@3zN4uV|g+8(q<tn>VIjTaS&HPwB^Uc|)xoH8tE!t7v=I+$U&8AbmZ#f{3Cn
z%F&@$&=<69L-#0Kl)1#SFB@3bYIzLqAe?od={`-!b72+3{>v&W7V4SM2?TCDUsKFR
zsqu)#@)AMd$@E56dlE8G)Vt(^aW)-$cb|(IL<;p@J@u2QQIsd`>&iOjYr!hy832vK
zPE%U_ky`?e?|bVEWLf$kvRmHQfcOEICz{9pPaF0((Y_~4penG+ZY&8`(XXj>uX7#S
zSBJgpKGB-(>32tWM9SyB;`&0>*C%>H6&7uVwU*m_Qt&r?qZ_x!%E)+dbDf&H@)ajU
zomEln!;lj!>Qc<_Nw3`cU>~bVCIQV11<>!AD6j`%wCdwRT;v#!ZJ>3fbBg^u=Lk`c
z_pstu%Dd+}v$F+SeX3Q!`hEYFoW{Nc6=Pi<_@h~w54`Bz|Kw|{RKH-+AW=9o1`j0j
zcc1m8-^aH)Hs$hFQV*9n)@!>U1?SX87pgO#-dHEMSl??{qMlMn^)egRsiY%H)MDO_
z(pXn<N4APqV?|0CD@JnAGL;XS>26k<vtIa>XO}+RSB#@mBnz}l7Bs!nn9aHNRSV($
z2dcxayuIm3%<?e?Qx3UJx@#WhrFKI)4yqL_!xts1^+O(O_wiIGt5)EUe{V`_QQH%<
znz?M<w%lbF0Ug?;Ecwp0#@l`YnPi#vbX#Gk8}$3N>7C1mE$Xopvv}EWLgf~G3H#W$
zK~R}q*eJB=9wH}HO_AR!bw{d4$nW;`<G_2KeGy5qzy0e0nxtCAuL;fXSQGhnno}z2
zY&8S^4whrjSOwH;wpcwgw%0kAekiXx(AUAVs#ugqbHrMcRJq*E+P$7*b3N+06VG}3
zd6<=1l#6*E{OOm-$M9P%Co>~O%I<lYnYYc&%w3e9d7s)XWQuZjMdK1O)~Td=mMNtp
z+9(Euy>pYlD((NV6Li~LU6(V@oF-}LG0PccdA=_3Fv@9u%Q>%}v*D84r+eN`=cem`
zl()^@&`kCFHRf+*d|NsE|EGVN|17hun#av9d7cQiS|y(|Ct%6j&gu0mR^MkRVM?0W
zO}}Nk5p``=-`8no=J<aP<TEOUyd%hWBCx2{72p!P_h}~`G_ti@b$9e!wLn{i{B!n3
z+wye8tEN@#>qx2NVx>4zbScf}t1q^!@A<s#-IH#zu?t_8?zX!q-R)xk>v^FXvsuSC
z?G^W0?Q2UtlKvMl!Pk_Dj?fdPJXYx<>axV$^X~hWWI&_||4+oO=^``<F=yxh0?*Ad
AJpcdz

literal 0
HcmV?d00001

--
2.45.1.windows.1

Loading
Loading