Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions indexer/migrations/20260330000001_arbitrator_notifications.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-- Add arbitrator assignment notification preference
ALTER TABLE notification_preferences
ADD COLUMN IF NOT EXISTS on_arbitrator_assigned BOOLEAN NOT NULL DEFAULT TRUE;

-- In-app notifications inbox (separate from the outbound log)
CREATE TABLE IF NOT EXISTS in_app_notifications (
id BIGSERIAL PRIMARY KEY,
address VARCHAR(100) NOT NULL,
template_id VARCHAR(64) NOT NULL,
title TEXT NOT NULL,
body TEXT NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
is_read BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_in_app_notif_address ON in_app_notifications (address, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_in_app_notif_unread ON in_app_notifications (address) WHERE is_read = FALSE;
80 changes: 80 additions & 0 deletions indexer/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1942,6 +1942,86 @@ impl Database {
Ok(())
}

// =========================================================================
// In-App Notifications (Issue #126)
// =========================================================================

pub async fn create_in_app_notification(
&self,
address: &str,
template_id: &str,
title: &str,
body: &str,
metadata: serde_json::Value,
) -> Result<crate::models::InAppNotification, crate::error::AppError> {
let record = sqlx::query_as::<_, crate::models::InAppNotification>(
r#"
INSERT INTO in_app_notifications (address, template_id, title, body, metadata)
VALUES ($1, $2, $3, $4, $5)
RETURNING *
"#,
)
.bind(address)
.bind(template_id)
.bind(title)
.bind(body)
.bind(metadata)
.fetch_one(&self.pool)
.await?;
Ok(record)
}

pub async fn list_in_app_notifications(
&self,
address: &str,
unread_only: bool,
) -> Result<Vec<crate::models::InAppNotification>, crate::error::AppError> {
let records = if unread_only {
sqlx::query_as::<_, crate::models::InAppNotification>(
"SELECT * FROM in_app_notifications WHERE address = $1 AND is_read = FALSE ORDER BY created_at DESC",
)
.bind(address)
.fetch_all(&self.pool)
.await?
} else {
sqlx::query_as::<_, crate::models::InAppNotification>(
"SELECT * FROM in_app_notifications WHERE address = $1 ORDER BY created_at DESC LIMIT 50",
)
.bind(address)
.fetch_all(&self.pool)
.await?
};
Ok(records)
}

pub async fn mark_in_app_notification_read(
&self,
id: i64,
address: &str,
) -> Result<bool, crate::error::AppError> {
let result = sqlx::query(
"UPDATE in_app_notifications SET is_read = TRUE WHERE id = $1 AND address = $2",
)
.bind(id)
.bind(address)
.execute(&self.pool)
.await?;
Ok(result.rows_affected() > 0)
}

pub async fn mark_all_in_app_notifications_read(
&self,
address: &str,
) -> Result<u64, crate::error::AppError> {
let result = sqlx::query(
"UPDATE in_app_notifications SET is_read = TRUE WHERE address = $1 AND is_read = FALSE",
)
.bind(address)
.execute(&self.pool)
.await?;
Ok(result.rows_affected())
}

fn row_to_compliance_check(
&self,
row: &sqlx::postgres::PgRow,
Expand Down
50 changes: 50 additions & 0 deletions indexer/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ pub async fn register_push_token(
on_dispute_raised: None,
on_dispute_resolved: None,
on_trade_cancelled: None,
on_arbitrator_assigned: None,
},
)
.await?;
Expand All @@ -611,6 +612,55 @@ pub async fn unregister_push_token(
})))
}

// =============================================================================
// In-App Notification Handlers (Issue #126)
// =============================================================================

#[derive(Deserialize)]
pub struct InAppNotifQuery {
pub unread_only: Option<bool>,
}

/// GET /notifications/inbox/:address — list in-app notifications.
pub async fn get_in_app_notifications(
Path(address): Path<String>,
Query(params): Query<InAppNotifQuery>,
State(state): State<AppState>,
) -> Result<Json<serde_json::Value>, AppError> {
let items = state
.database
.list_in_app_notifications(&address, params.unread_only.unwrap_or(false))
.await?;
Ok(Json(json!({ "notifications": items })))
}

/// PATCH /notifications/inbox/:address/:id/read — mark a single notification as read.
pub async fn mark_in_app_notification_read(
Path((address, id)): Path<(String, i64)>,
State(state): State<AppState>,
) -> Result<Json<serde_json::Value>, AppError> {
let updated = state
.database
.mark_in_app_notification_read(id, &address)
.await?;
if !updated {
return Err(AppError::NotFound("Notification not found".into()));
}
Ok(Json(json!({ "status": "ok" })))
}

/// POST /notifications/inbox/:address/read-all — mark all notifications as read.
pub async fn mark_all_in_app_notifications_read(
Path(address): Path<String>,
State(state): State<AppState>,
) -> Result<Json<serde_json::Value>, AppError> {
let count = state
.database
.mark_all_in_app_notifications_read(&address)
.await?;
Ok(Json(json!({ "status": "ok", "marked_read": count })))
}

// =============================================================================
// Gateway Handlers
// =============================================================================
Expand Down
3 changes: 3 additions & 0 deletions indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
get(get_notification_preferences).put(upsert_notification_preferences),
)
.route("/notifications/log/:address", get(get_notification_log))
.route("/notifications/inbox/:address", get(get_in_app_notifications))
.route("/notifications/inbox/:address/:id/read", axum::routing::patch(mark_in_app_notification_read))
.route("/notifications/inbox/:address/read-all", post(mark_all_in_app_notifications_read))
.route("/push/register", post(register_push_token))
.route("/push/unregister/:device_token", delete(unregister_push_token))
// Performance monitoring
Expand Down
20 changes: 20 additions & 0 deletions indexer/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ pub struct NotificationPreferences {
pub on_dispute_raised: bool,
pub on_dispute_resolved: bool,
pub on_trade_cancelled: bool,
#[sqlx(default)]
pub on_arbitrator_assigned: bool,
pub updated_at: DateTime<Utc>,
}

Expand All @@ -535,6 +537,7 @@ impl NotificationPreferences {
on_dispute_raised: true,
on_dispute_resolved: true,
on_trade_cancelled: true,
on_arbitrator_assigned: true,
updated_at: Utc::now(),
}
}
Expand All @@ -556,6 +559,7 @@ pub struct UpdateNotificationPreferences {
pub on_dispute_raised: Option<bool>,
pub on_dispute_resolved: Option<bool>,
pub on_trade_cancelled: Option<bool>,
pub on_arbitrator_assigned: Option<bool>,
}

#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
Expand Down Expand Up @@ -644,3 +648,19 @@ pub struct PushRegistrationRequest {
pub platform: String,
pub address: String,
}

// =============================================================================
// In-App Notifications (Issue #126)
// =============================================================================

#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct InAppNotification {
pub id: i64,
pub address: String,
pub template_id: String,
pub title: String,
pub body: String,
pub metadata: serde_json::Value,
pub is_read: bool,
pub created_at: DateTime<Utc>,
}
70 changes: 70 additions & 0 deletions indexer/src/notification_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ impl NotificationService {

/// Called by the event monitor for every new contract event.
pub async fn process_event(&self, event: &Event) {
// Handle arbitrator assignment notification separately (Issue #126)
if event.event_type == "dispute_raised" {
if let Some(arbitrator) = event.data.get("arbitrator").and_then(|v| v.as_str()) {
if !arbitrator.trim().is_empty() {
self.notify_arbitrator_assigned(arbitrator, &event.data).await;
}
}
}

let Some(template_id) = TemplateId::from_event_type(&event.event_type) else {
return;
};
Expand Down Expand Up @@ -101,6 +110,66 @@ impl NotificationService {
}
}
}

/// Notify an arbitrator when they are assigned to a dispute (Issue #126).
/// Sends email/SMS/push based on their preferences and creates an in-app record.
pub async fn notify_arbitrator_assigned(&self, arbitrator: &str, data: &serde_json::Value) {
let template_id = TemplateId::ArbitratorAssigned;
let mut vars = event_vars(data);

// Build a dispute link using trade_id
let trade_id = vars.get("trade_id").cloned().unwrap_or_default();
vars.insert("dispute_link", format!("/trades/{}/dispute", trade_id));

let prefs = match self.db.get_notification_preferences(arbitrator).await {
Ok(Some(p)) => p,
Ok(None) => NotificationPreferences::default_for_address(arbitrator),
Err(_) => return,
};

if !prefs.on_arbitrator_assigned {
// Arbitrator has opted out — still create in-app record
} else {
let template = templates::get(&template_id);

if prefs.email_enabled {
if let Some(email) = non_empty(prefs.email_address.as_deref()) {
let rendered = templates::render(&template, NotificationChannel::Email, &vars);
let result = channels::send_email(&self.cfg, email, &rendered.subject, &rendered.body).await;
self.db
.log_notification(arbitrator, "email", template_id.as_str(), Some(&rendered.subject), &rendered.body, result)
.await;
}
}

if prefs.sms_enabled {
if let Some(phone) = non_empty(prefs.phone_number.as_deref()) {
let rendered = templates::render(&template, NotificationChannel::Sms, &vars);
let result = channels::send_sms(&self.cfg, phone, &rendered.body).await;
self.db
.log_notification(arbitrator, "sms", template_id.as_str(), None, &rendered.body, result)
.await;
}
}

if prefs.push_enabled {
if let Some(token) = non_empty(prefs.push_token.as_deref()) {
let rendered = templates::render(&template, NotificationChannel::Push, &vars);
let result = channels::send_push(&self.cfg, token, &rendered.subject, &rendered.body).await;
self.db
.log_notification(arbitrator, "push", template_id.as_str(), Some(&rendered.subject), &rendered.body, result)
.await;
}
}
}

// Always create an in-app notification record
let template = templates::get(&template_id);
let rendered = templates::render(&template, NotificationChannel::Push, &vars);
let _ = self.db
.create_in_app_notification(arbitrator, template_id.as_str(), &rendered.subject, &rendered.body, data.clone())
.await;
}
}

fn non_empty(value: Option<&str>) -> Option<&str> {
Expand Down Expand Up @@ -166,6 +235,7 @@ fn prefs_allow(prefs: &NotificationPreferences, id: &TemplateId) -> bool {
TemplateId::DisputeRaised => prefs.on_dispute_raised,
TemplateId::DisputeResolved => prefs.on_dispute_resolved,
TemplateId::TradeCancelled => prefs.on_trade_cancelled,
TemplateId::ArbitratorAssigned => prefs.on_arbitrator_assigned,
}
}

Expand Down
9 changes: 9 additions & 0 deletions indexer/src/notification_service/templates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub enum TemplateId {
DisputeRaised,
DisputeResolved,
TradeCancelled,
ArbitratorAssigned,
}

impl TemplateId {
Expand All @@ -21,6 +22,7 @@ impl TemplateId {
TemplateId::DisputeRaised => "dispute_raised",
TemplateId::DisputeResolved => "dispute_resolved",
TemplateId::TradeCancelled => "trade_cancelled",
TemplateId::ArbitratorAssigned => "arbitrator_assigned",
}
}

Expand Down Expand Up @@ -146,6 +148,13 @@ pub fn get(id: &TemplateId) -> Template {
push_title: "Trade cancelled",
push_body: "Trade #{{trade_id}} was cancelled.",
},
TemplateId::ArbitratorAssigned => Template {
email_subject: "You have been assigned to dispute on trade #{{trade_id}}",
email_body: "You have been assigned as arbitrator for the dispute on trade #{{trade_id}}.\n\nTrade summary:\n Seller: {{seller}}\n Buyer: {{buyer}}\n Amount: {{amount}} USDC\n\nPlease review the case and submit your resolution.\n\nView dispute: {{dispute_link}}",
sms_body: "You are assigned as arbitrator for trade #{{trade_id}} dispute. Review at {{dispute_link}}",
push_title: "Arbitrator assignment: trade #{{trade_id}}",
push_body: "You have been assigned to arbitrate the dispute on trade #{{trade_id}}.",
},
}
}

Expand Down