From b40ae10e1c34dd9cd2965152b4334d041643872b Mon Sep 17 00:00:00 2001 From: FaithOnuh Date: Mon, 30 Mar 2026 13:03:15 +0000 Subject: [PATCH] feat(notifications): arbitrator assignment notification (Issue #126) - Migration: on_arbitrator_assigned pref column + in_app_notifications table - TemplateId::ArbitratorAssigned with email/SMS/push templates including dispute summary and link - NotificationService.notify_arbitrator_assigned: triggered on dispute_raised events that carry an arbitrator field; respects email opt-out preference - Always creates in-app notification record regardless of channel prefs - DB: create/list/mark-read/mark-all-read for in_app_notifications - Routes: GET /notifications/inbox/:address PATCH /notifications/inbox/:address/:id/read POST /notifications/inbox/:address/read-all Closes #126 --- ...0260330000001_arbitrator_notifications.sql | 18 +++++ indexer/src/database.rs | 80 +++++++++++++++++++ indexer/src/handlers.rs | 50 ++++++++++++ indexer/src/main.rs | 3 + indexer/src/models.rs | 20 +++++ indexer/src/notification_service/mod.rs | 70 ++++++++++++++++ indexer/src/notification_service/templates.rs | 9 +++ 7 files changed, 250 insertions(+) create mode 100644 indexer/migrations/20260330000001_arbitrator_notifications.sql diff --git a/indexer/migrations/20260330000001_arbitrator_notifications.sql b/indexer/migrations/20260330000001_arbitrator_notifications.sql new file mode 100644 index 00000000..9bf7e1e6 --- /dev/null +++ b/indexer/migrations/20260330000001_arbitrator_notifications.sql @@ -0,0 +1,18 @@ +-- Add arbitrator assignment notification preference +ALTER TABLE notification_preferences + ADD COLUMN IF NOT EXISTS on_arbitrator_assigned BOOLEAN NOT NULL DEFAULT TRUE; + +-- In-app notifications inbox (separate from the outbound log) +CREATE TABLE IF NOT EXISTS in_app_notifications ( + id BIGSERIAL PRIMARY KEY, + address VARCHAR(100) NOT NULL, + template_id VARCHAR(64) NOT NULL, + title TEXT NOT NULL, + body TEXT NOT NULL, + metadata JSONB NOT NULL DEFAULT '{}', + is_read BOOLEAN NOT NULL DEFAULT FALSE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_in_app_notif_address ON in_app_notifications (address, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_in_app_notif_unread ON in_app_notifications (address) WHERE is_read = FALSE; diff --git a/indexer/src/database.rs b/indexer/src/database.rs index d0aea895..f77789e9 100644 --- a/indexer/src/database.rs +++ b/indexer/src/database.rs @@ -1942,6 +1942,86 @@ impl Database { Ok(()) } + // ========================================================================= + // In-App Notifications (Issue #126) + // ========================================================================= + + pub async fn create_in_app_notification( + &self, + address: &str, + template_id: &str, + title: &str, + body: &str, + metadata: serde_json::Value, + ) -> Result { + let record = sqlx::query_as::<_, crate::models::InAppNotification>( + r#" + INSERT INTO in_app_notifications (address, template_id, title, body, metadata) + VALUES ($1, $2, $3, $4, $5) + RETURNING * + "#, + ) + .bind(address) + .bind(template_id) + .bind(title) + .bind(body) + .bind(metadata) + .fetch_one(&self.pool) + .await?; + Ok(record) + } + + pub async fn list_in_app_notifications( + &self, + address: &str, + unread_only: bool, + ) -> Result, crate::error::AppError> { + let records = if unread_only { + sqlx::query_as::<_, crate::models::InAppNotification>( + "SELECT * FROM in_app_notifications WHERE address = $1 AND is_read = FALSE ORDER BY created_at DESC", + ) + .bind(address) + .fetch_all(&self.pool) + .await? + } else { + sqlx::query_as::<_, crate::models::InAppNotification>( + "SELECT * FROM in_app_notifications WHERE address = $1 ORDER BY created_at DESC LIMIT 50", + ) + .bind(address) + .fetch_all(&self.pool) + .await? + }; + Ok(records) + } + + pub async fn mark_in_app_notification_read( + &self, + id: i64, + address: &str, + ) -> Result { + let result = sqlx::query( + "UPDATE in_app_notifications SET is_read = TRUE WHERE id = $1 AND address = $2", + ) + .bind(id) + .bind(address) + .execute(&self.pool) + .await?; + Ok(result.rows_affected() > 0) + } + + pub async fn mark_all_in_app_notifications_read( + &self, + address: &str, + ) -> Result { + let result = sqlx::query( + "UPDATE in_app_notifications SET is_read = TRUE WHERE address = $1 AND is_read = FALSE", + ) + .bind(address) + .execute(&self.pool) + .await?; + Ok(result.rows_affected()) + } + fn row_to_compliance_check( &self, row: &sqlx::postgres::PgRow, diff --git a/indexer/src/handlers.rs b/indexer/src/handlers.rs index e6bd5664..0e58bcf3 100644 --- a/indexer/src/handlers.rs +++ b/indexer/src/handlers.rs @@ -588,6 +588,7 @@ pub async fn register_push_token( on_dispute_raised: None, on_dispute_resolved: None, on_trade_cancelled: None, + on_arbitrator_assigned: None, }, ) .await?; @@ -611,6 +612,55 @@ pub async fn unregister_push_token( }))) } +// ============================================================================= +// In-App Notification Handlers (Issue #126) +// ============================================================================= + +#[derive(Deserialize)] +pub struct InAppNotifQuery { + pub unread_only: Option, +} + +/// GET /notifications/inbox/:address — list in-app notifications. +pub async fn get_in_app_notifications( + Path(address): Path, + Query(params): Query, + State(state): State, +) -> Result, AppError> { + let items = state + .database + .list_in_app_notifications(&address, params.unread_only.unwrap_or(false)) + .await?; + Ok(Json(json!({ "notifications": items }))) +} + +/// PATCH /notifications/inbox/:address/:id/read — mark a single notification as read. +pub async fn mark_in_app_notification_read( + Path((address, id)): Path<(String, i64)>, + State(state): State, +) -> Result, AppError> { + let updated = state + .database + .mark_in_app_notification_read(id, &address) + .await?; + if !updated { + return Err(AppError::NotFound("Notification not found".into())); + } + Ok(Json(json!({ "status": "ok" }))) +} + +/// POST /notifications/inbox/:address/read-all — mark all notifications as read. +pub async fn mark_all_in_app_notifications_read( + Path(address): Path, + State(state): State, +) -> Result, AppError> { + let count = state + .database + .mark_all_in_app_notifications_read(&address) + .await?; + Ok(Json(json!({ "status": "ok", "marked_read": count }))) +} + // ============================================================================= // Gateway Handlers // ============================================================================= diff --git a/indexer/src/main.rs b/indexer/src/main.rs index 1f29497f..1c032575 100644 --- a/indexer/src/main.rs +++ b/indexer/src/main.rs @@ -354,6 +354,9 @@ async fn main() -> Result<(), Box> { get(get_notification_preferences).put(upsert_notification_preferences), ) .route("/notifications/log/:address", get(get_notification_log)) + .route("/notifications/inbox/:address", get(get_in_app_notifications)) + .route("/notifications/inbox/:address/:id/read", axum::routing::patch(mark_in_app_notification_read)) + .route("/notifications/inbox/:address/read-all", post(mark_all_in_app_notifications_read)) .route("/push/register", post(register_push_token)) .route("/push/unregister/:device_token", delete(unregister_push_token)) // Performance monitoring diff --git a/indexer/src/models.rs b/indexer/src/models.rs index cb3842ea..221b6bb6 100644 --- a/indexer/src/models.rs +++ b/indexer/src/models.rs @@ -515,6 +515,8 @@ pub struct NotificationPreferences { pub on_dispute_raised: bool, pub on_dispute_resolved: bool, pub on_trade_cancelled: bool, + #[sqlx(default)] + pub on_arbitrator_assigned: bool, pub updated_at: DateTime, } @@ -535,6 +537,7 @@ impl NotificationPreferences { on_dispute_raised: true, on_dispute_resolved: true, on_trade_cancelled: true, + on_arbitrator_assigned: true, updated_at: Utc::now(), } } @@ -556,6 +559,7 @@ pub struct UpdateNotificationPreferences { pub on_dispute_raised: Option, pub on_dispute_resolved: Option, pub on_trade_cancelled: Option, + pub on_arbitrator_assigned: Option, } #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] @@ -644,3 +648,19 @@ pub struct PushRegistrationRequest { pub platform: String, pub address: String, } + +// ============================================================================= +// In-App Notifications (Issue #126) +// ============================================================================= + +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct InAppNotification { + pub id: i64, + pub address: String, + pub template_id: String, + pub title: String, + pub body: String, + pub metadata: serde_json::Value, + pub is_read: bool, + pub created_at: DateTime, +} diff --git a/indexer/src/notification_service/mod.rs b/indexer/src/notification_service/mod.rs index 353cb628..490c83a4 100644 --- a/indexer/src/notification_service/mod.rs +++ b/indexer/src/notification_service/mod.rs @@ -22,6 +22,15 @@ impl NotificationService { /// Called by the event monitor for every new contract event. pub async fn process_event(&self, event: &Event) { + // Handle arbitrator assignment notification separately (Issue #126) + if event.event_type == "dispute_raised" { + if let Some(arbitrator) = event.data.get("arbitrator").and_then(|v| v.as_str()) { + if !arbitrator.trim().is_empty() { + self.notify_arbitrator_assigned(arbitrator, &event.data).await; + } + } + } + let Some(template_id) = TemplateId::from_event_type(&event.event_type) else { return; }; @@ -101,6 +110,66 @@ impl NotificationService { } } } + + /// Notify an arbitrator when they are assigned to a dispute (Issue #126). + /// Sends email/SMS/push based on their preferences and creates an in-app record. + pub async fn notify_arbitrator_assigned(&self, arbitrator: &str, data: &serde_json::Value) { + let template_id = TemplateId::ArbitratorAssigned; + let mut vars = event_vars(data); + + // Build a dispute link using trade_id + let trade_id = vars.get("trade_id").cloned().unwrap_or_default(); + vars.insert("dispute_link", format!("/trades/{}/dispute", trade_id)); + + let prefs = match self.db.get_notification_preferences(arbitrator).await { + Ok(Some(p)) => p, + Ok(None) => NotificationPreferences::default_for_address(arbitrator), + Err(_) => return, + }; + + if !prefs.on_arbitrator_assigned { + // Arbitrator has opted out — still create in-app record + } else { + let template = templates::get(&template_id); + + if prefs.email_enabled { + if let Some(email) = non_empty(prefs.email_address.as_deref()) { + let rendered = templates::render(&template, NotificationChannel::Email, &vars); + let result = channels::send_email(&self.cfg, email, &rendered.subject, &rendered.body).await; + self.db + .log_notification(arbitrator, "email", template_id.as_str(), Some(&rendered.subject), &rendered.body, result) + .await; + } + } + + if prefs.sms_enabled { + if let Some(phone) = non_empty(prefs.phone_number.as_deref()) { + let rendered = templates::render(&template, NotificationChannel::Sms, &vars); + let result = channels::send_sms(&self.cfg, phone, &rendered.body).await; + self.db + .log_notification(arbitrator, "sms", template_id.as_str(), None, &rendered.body, result) + .await; + } + } + + if prefs.push_enabled { + if let Some(token) = non_empty(prefs.push_token.as_deref()) { + let rendered = templates::render(&template, NotificationChannel::Push, &vars); + let result = channels::send_push(&self.cfg, token, &rendered.subject, &rendered.body).await; + self.db + .log_notification(arbitrator, "push", template_id.as_str(), Some(&rendered.subject), &rendered.body, result) + .await; + } + } + } + + // Always create an in-app notification record + let template = templates::get(&template_id); + let rendered = templates::render(&template, NotificationChannel::Push, &vars); + let _ = self.db + .create_in_app_notification(arbitrator, template_id.as_str(), &rendered.subject, &rendered.body, data.clone()) + .await; + } } fn non_empty(value: Option<&str>) -> Option<&str> { @@ -166,6 +235,7 @@ fn prefs_allow(prefs: &NotificationPreferences, id: &TemplateId) -> bool { TemplateId::DisputeRaised => prefs.on_dispute_raised, TemplateId::DisputeResolved => prefs.on_dispute_resolved, TemplateId::TradeCancelled => prefs.on_trade_cancelled, + TemplateId::ArbitratorAssigned => prefs.on_arbitrator_assigned, } } diff --git a/indexer/src/notification_service/templates.rs b/indexer/src/notification_service/templates.rs index cbfa1f2d..c4653a95 100644 --- a/indexer/src/notification_service/templates.rs +++ b/indexer/src/notification_service/templates.rs @@ -9,6 +9,7 @@ pub enum TemplateId { DisputeRaised, DisputeResolved, TradeCancelled, + ArbitratorAssigned, } impl TemplateId { @@ -21,6 +22,7 @@ impl TemplateId { TemplateId::DisputeRaised => "dispute_raised", TemplateId::DisputeResolved => "dispute_resolved", TemplateId::TradeCancelled => "trade_cancelled", + TemplateId::ArbitratorAssigned => "arbitrator_assigned", } } @@ -146,6 +148,13 @@ pub fn get(id: &TemplateId) -> Template { push_title: "Trade cancelled", push_body: "Trade #{{trade_id}} was cancelled.", }, + TemplateId::ArbitratorAssigned => Template { + email_subject: "You have been assigned to dispute on trade #{{trade_id}}", + email_body: "You have been assigned as arbitrator for the dispute on trade #{{trade_id}}.\n\nTrade summary:\n Seller: {{seller}}\n Buyer: {{buyer}}\n Amount: {{amount}} USDC\n\nPlease review the case and submit your resolution.\n\nView dispute: {{dispute_link}}", + sms_body: "You are assigned as arbitrator for trade #{{trade_id}} dispute. Review at {{dispute_link}}", + push_title: "Arbitrator assignment: trade #{{trade_id}}", + push_body: "You have been assigned to arbitrate the dispute on trade #{{trade_id}}.", + }, } }