diff --git a/backend/Cargo.lock b/backend/Cargo.lock index d62a184..697b2de 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -627,6 +627,7 @@ dependencies = [ "diesel_migrations", "dotenv", "email_address", + "governor", "hyper 1.6.0", "lettre", "mockall", @@ -638,6 +639,7 @@ dependencies = [ "serde_json", "tera", "thiserror 2.0.11", + "tokenbucket", "tokio", "tower 0.5.2", "tower-http", @@ -1006,6 +1008,20 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.7.0" @@ -1292,6 +1308,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "foreign-types" version = "0.3.2" @@ -1373,6 +1395,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" @@ -1418,8 +1446,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.13.3+wasi-0.2.2", + "wasm-bindgen", "windows-targets 0.52.6", ] @@ -1453,6 +1483,29 @@ dependencies = [ "walkdir", ] +[[package]] +name = "governor" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbe789d04bf14543f03c4b60cd494148aa79438c8440ae7d81a7778147745c3" +dependencies = [ + "cfg-if", + "dashmap", + "futures-sink", + "futures-timer", + "futures-util", + "getrandom 0.3.1", + "hashbrown 0.15.2", + "nonzero_ext", + "parking_lot", + "portable-atomic", + "quanta", + "rand 0.9.1", + "smallvec", + "spinning_top", + "web-time", +] + [[package]] name = "group" version = "0.12.1" @@ -1523,6 +1576,11 @@ name = "hashbrown" version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] [[package]] name = "headers" @@ -2470,6 +2528,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -2819,6 +2883,12 @@ dependencies = [ "typemap", ] +[[package]] +name = "portable-atomic" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e" + [[package]] name = "postgres-protocol" version = "0.6.7" @@ -2916,6 +2986,21 @@ dependencies = [ "cc", ] +[[package]] +name = "quanta" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -2978,6 +3063,16 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "rand" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", +] + [[package]] name = "rand_chacha" version = "0.1.1" @@ -2998,6 +3093,16 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", +] + [[package]] name = "rand_core" version = "0.3.1" @@ -3022,6 +3127,15 @@ dependencies = [ "getrandom 0.2.15", ] +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.1", +] + [[package]] name = "rand_hc" version = "0.1.0" @@ -3084,6 +3198,15 @@ dependencies = [ "rand_core 0.3.1", ] +[[package]] +name = "raw-cpuid" +version = "11.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6df7ab838ed27997ba19a4664507e6f82b41fe6e20be42929332156e5e85146" +dependencies = [ + "bitflags 2.8.0", +] + [[package]] name = "rdrand" version = "0.4.0" @@ -3691,6 +3814,15 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "spki" version = "0.6.0" @@ -3993,6 +4125,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" +[[package]] +name = "tokenbucket" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43b432f4112b803f1ced6a45a6639965e6ac7b43e9f39cf758121848300ed525" + [[package]] name = "tokio" version = "1.43.0" @@ -4724,6 +4862,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "whoami" version = "1.5.2" diff --git a/backend/Cargo.toml b/backend/Cargo.toml index bbf9e44..52e14ab 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -45,7 +45,8 @@ axum-extra = { version = "0.10.0", features = ["multipart"] } csv = "1.3.1" lettre = "0.11.14" - +tokenbucket = "0.1.6" +governor = "0.10.0" [dev-dependencies] diff --git a/backend/migrations/2025-04-22-082248_add_additional_fields_to_mail_for_queueing/down.sql b/backend/migrations/2025-04-22-082248_add_additional_fields_to_mail_for_queueing/down.sql new file mode 100644 index 0000000..8866867 --- /dev/null +++ b/backend/migrations/2025-04-22-082248_add_additional_fields_to_mail_for_queueing/down.sql @@ -0,0 +1,6 @@ +-- Rollback: Remove queuing and rate-limiting fields from mails table + +ALTER TABLE mails +DROP COLUMN scheduled_at, +DROP COLUMN attempts, +DROP COLUMN last_error; \ No newline at end of file diff --git a/backend/migrations/2025-04-22-082248_add_additional_fields_to_mail_for_queueing/up.sql b/backend/migrations/2025-04-22-082248_add_additional_fields_to_mail_for_queueing/up.sql new file mode 100644 index 0000000..3f865be --- /dev/null +++ b/backend/migrations/2025-04-22-082248_add_additional_fields_to_mail_for_queueing/up.sql @@ -0,0 +1,6 @@ +-- Migration: Add queuing and rate-limiting fields to mails table + +ALTER TABLE mails +ADD COLUMN "scheduled_at" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL, +ADD COLUMN "attempts" INTEGER DEFAULT 0 NOT NULL, +ADD COLUMN "last_error" TEXT; \ No newline at end of file diff --git a/backend/migrations/2025-04-24-050126_add__rate_limit__column/down.sql b/backend/migrations/2025-04-24-050126_add__rate_limit__column/down.sql new file mode 100644 index 0000000..be273eb --- /dev/null +++ b/backend/migrations/2025-04-24-050126_add__rate_limit__column/down.sql @@ -0,0 +1,3 @@ +-- Down Migration: remove the rate_limit column +ALTER TABLE "servers" +DROP COLUMN IF EXISTS "rate_limit"; \ No newline at end of file diff --git a/backend/migrations/2025-04-24-050126_add__rate_limit__column/up.sql b/backend/migrations/2025-04-24-050126_add__rate_limit__column/up.sql new file mode 100644 index 0000000..e40cbe6 --- /dev/null +++ b/backend/migrations/2025-04-24-050126_add__rate_limit__column/up.sql @@ -0,0 +1,3 @@ +-- Up Migration: add the rate_limit column with a default value +ALTER TABLE "servers" +ADD COLUMN "rate_limit" INTEGER NOT NULL DEFAULT 60; \ No newline at end of file diff --git a/backend/src/handlers/bounce_logs_handler.rs b/backend/src/handlers/bounce_logs_handler.rs index 08a4e70..1bfda59 100644 --- a/backend/src/handlers/bounce_logs_handler.rs +++ b/backend/src/handlers/bounce_logs_handler.rs @@ -4,19 +4,24 @@ use std::sync::Arc; use crate::{error::AppError, models::{bounce_logs:: { - Message, CreateBounceLogRequest, CreateBounceLogResponse, GetBounceLogResponse, SnsNotification - }, mail::UpdateMailRequest}, repositories::{contact::ContactRepositoryImpl, mail_repository::MailRepositoryImpl}, services::{bounce_logs_service, contact_service::ContactService, mail_service::{self, MailService}} + CreateBounceLogRequest, CreateBounceLogResponse, GetBounceLogResponse, Message, SnsNotification + }, mail::UpdateMailRequest}, repositories::{contact::ContactRepositoryImpl, mail_repository::MailRepositoryImpl}, services::{bounce_logs_service, contact_service::ContactService, mail_service::{self, MailService, MailServiceTrait}} }; use axum::{ - extract:: Path, Json, http::StatusCode + extract:: Path, Json, http::StatusCode, Extension }; use uuid::Uuid; +use crate::utils::bounce_logs::search_header_from_header_list; enum MailStatus { Draft, - Pending, - Sent, + Scheduled, + Queued, + Processing, // this status might not be necessary... + Submitted, + Delivered, + Failed, Bounced, } @@ -24,8 +29,12 @@ impl MailStatus { pub fn as_str(&self) -> &'static str { match self { MailStatus::Draft => "draft", - MailStatus::Pending => "pending", - MailStatus::Sent => "sent", + MailStatus::Scheduled => "scheduled", + MailStatus::Queued => "queued", + MailStatus::Processing => "processing", + MailStatus::Submitted => "submitted", + MailStatus::Delivered => "delivered", + MailStatus::Failed => "failed", MailStatus::Bounced => "bounced", } } @@ -33,8 +42,12 @@ impl MailStatus { pub fn from_str(s: &str) -> Option { match s { "draft" => Some(MailStatus::Draft), - "pending" => Some(MailStatus::Pending), - "sent" => Some(MailStatus::Sent), + "scheduled" => Some(MailStatus::Scheduled), + "queued" => Some(MailStatus::Queued), + "processing" => Some(MailStatus::Processing), + "submitted" => Some(MailStatus::Submitted), + "delivered" => Some(MailStatus::Delivered), + "failed" => Some(MailStatus::Failed), "bounced" => Some(MailStatus::Bounced), _ => None, } @@ -50,11 +63,9 @@ impl MailStatus { ) )] pub async fn handle_sns_notification ( + Extension(mail_service): Extension>, payload: Json, ) -> Result<(), AppError> { - let mail_repository = Arc::new(MailRepositoryImpl); - let mail_service = MailService::new(mail_repository); - // automate the subscription confirmation... if payload.notification_type == "SubscriptionConfirmation" { // subscribe to the public api endpoint... @@ -74,12 +85,13 @@ pub async fn handle_sns_notification ( .map_err(|err| AppError::NotFoundError(Some(err.to_string())))?; if sns_event.notification_type.is_none() { - if let Some(event_type) = sns_event.event_type { - let mail_id = sns_event.mail.mail_id; + if let Some(event_type) = sns_event.event_type.as_ref() { + let mail_id = search_header_from_header_list(&sns_event, "mailId") + .ok_or_else(|| AppError::NotFoundError(Some("mailId not found in SNS event".to_string())))?; match event_type.as_str() { "Open" => { - mail_service.update_mail(mail_id, UpdateMailRequest { + mail_service.update_mail(mail_id,UpdateMailRequest { open: Some(chrono::Utc::now()), ..Default::default() }).await?; @@ -100,17 +112,19 @@ pub async fn handle_sns_notification ( match sns_event.notification_type.as_ref().unwrap().as_str() { "Bounce" => { - if let Some(bounce) = sns_event.bounce { - let recipients = bounce.bounced_recipients; + if let Some(bounce) = sns_event.bounce.as_ref() { + let recipients = &bounce.bounced_recipients; let status = MailStatus::Bounced; - let mail_id = sns_event.mail.mail_id.clone(); - let _ = mail_service.update_mail_status(mail_id, status.as_str()).await; + let mail_id = search_header_from_header_list(&sns_event, "mailId") + .ok_or_else(|| AppError::NotFoundError(Some("mailId not found in SNS event".to_string())))?; + + let _ = mail_service.update_mail_status(mail_id.clone(), status.as_str()).await; for recp in recipients { let contact_repository = Arc::new(ContactRepositoryImpl); let contact_service = ContactService::new(contact_repository); - let contact = contact_service.get_contact_by_email(recp.email_address).await; + let contact = contact_service.get_contact_by_email(recp.email_address.clone()).await; let new_bounce = CreateBounceLogRequest { contact_id: contact.unwrap().id, @@ -119,7 +133,7 @@ pub async fn handle_sns_notification ( kind: bounce.bounce_type.clone(), campaign_id: None, reason: bounce.bounce_sub_type.clone(), - mail_id: sns_event.mail.mail_id.clone(), + mail_id: mail_id.clone(), }; // Add the bounce to the DB @@ -128,10 +142,13 @@ pub async fn handle_sns_notification ( } } "Delivery" => { - if let Some(delivery) = sns_event.delivery { - let status = MailStatus::Sent; + if let Some(delivery) = sns_event.delivery.as_ref() { + let status = MailStatus::Delivered; - let _ = mail_service.update_mail_status(sns_event.mail.mail_id, status.as_str()).await; + let mail_id = search_header_from_header_list(&sns_event, "mailId") + .ok_or_else(|| AppError::NotFoundError(Some("mailId not found in SNS event".to_string())))?; + + let _ = mail_service.update_mail_status(mail_id, status.as_str()).await; } } _ => { diff --git a/backend/src/handlers/campaign.rs b/backend/src/handlers/campaign.rs index 550ca4c..7158e07 100644 --- a/backend/src/handlers/campaign.rs +++ b/backend/src/handlers/campaign.rs @@ -1,5 +1,6 @@ use uuid::Uuid; use crate::error::AppError; +use crate::models::campaign::AddMailToQueueResponse; use crate::{models::{campaign:: { CampaignSendResponse, CreateCampaignRequest, CreateCampaignResponse, DeleteCampaignResponse, ExtendedCreateCampaignRequest, GetCampaignResponse, UpdateCampaignRequest, UpdateCampaignResponse @@ -122,7 +123,7 @@ pub async fn delete_campaign( )] pub async fn send_campaign_email( Path(campaign_id): Path -) -> Result, AppError> { +) -> Result, AppError> { let uuid_id = Uuid::parse_str(&campaign_id)?; let result = campaign_service::send_campaign_email(uuid_id).await?; diff --git a/backend/src/handlers/contact.rs b/backend/src/handlers/contact.rs index dad2f00..6e5ec9f 100644 --- a/backend/src/handlers/contact.rs +++ b/backend/src/handlers/contact.rs @@ -259,5 +259,8 @@ pub async fn get_mails_by_contact_id( clicks: mail.clicks, status: mail.status, status_reason: mail.reason, + scheduled_at: mail.scheduled_at, + attempts: mail.attempts, + last_error: mail.last_error, }).collect())) } \ No newline at end of file diff --git a/backend/src/handlers/mail_handler.rs b/backend/src/handlers/mail_handler.rs index 015e4fb..c343818 100644 --- a/backend/src/handlers/mail_handler.rs +++ b/backend/src/handlers/mail_handler.rs @@ -1,10 +1,11 @@ use crate::models::mail::{ CreateMailRequest, CreateMailResponse, DeleteMailResponse, GetMailResponse, MailQuery, UpdateMailRequest, UpdateMailResponse }; -use crate::services::mail_service as mail_service; +use crate::services::mail_service::{MailService, MailServiceTrait}; +use std::sync::Arc; use axum::{ - extract:: { Path, Query }, Json + extract:: { Extension, Path, Query }, Json }; use crate::error::AppError; @@ -17,9 +18,10 @@ use crate::error::AppError; ) )] pub async fn add_mail( + Extension(mail_service): Extension>, Json(payload): Json, ) -> Result>, AppError> { - let created_mail = mail_service::create_mail(payload).await?; + let created_mail = mail_service.create_mail(payload).await?; let mut responses = Vec::new(); @@ -48,9 +50,10 @@ pub async fn add_mail( ) )] pub async fn get_all_mails( + Extension(mail_service): Extension>, query: Query ) -> Result>, AppError> { - let all_mails = mail_service::get_all_mails( + let all_mails = mail_service.get_all_mails( query.campaign_ids, query.from, query.to @@ -74,6 +77,9 @@ pub async fn get_all_mails( status: mail.status.clone(), status_reason: mail.reason.clone(), server_id: mail.server_id.clone(), + scheduled_at: mail.scheduled_at, + attempts: mail.attempts, + last_error: mail.last_error.clone(), }); }); Ok(Json(responses)) @@ -89,10 +95,11 @@ pub async fn get_all_mails( ) )] pub async fn update_mail( + Extension(mail_service): Extension>, Path(mail_id): Path, Json(payload): Json, ) -> Result, AppError> { - let updated_mail = mail_service::update_mail(mail_id, payload).await?; + let updated_mail = mail_service.update_mail(mail_id, payload).await?; Ok(Json(updated_mail)) } @@ -106,9 +113,10 @@ pub async fn update_mail( ) )] pub async fn delete_mail( + Extension(mail_service): Extension>, Path(mail_id): Path, ) -> Result, AppError> { - let deleted_mail = mail_service::delete_mail(mail_id).await?; + let deleted_mail = mail_service.delete_mail(mail_id).await?; Ok(Json(deleted_mail)) } \ No newline at end of file diff --git a/backend/src/handlers/template.rs b/backend/src/handlers/template.rs index 3114310..e034266 100644 --- a/backend/src/handlers/template.rs +++ b/backend/src/handlers/template.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use crate::error::AppError; use crate::models::mail::CreateMailRequest; use crate::models::template::{ CreateTemplateRequest, CreateTemplateResponse, DeleteTemplateResponse, GetTemplateResponse, ParseMjml2HtmlRequest, ParseMjml2HtmlResponse, SendMailRequest, SendMailResponse, TemplateResponse, UpdateTemplateRequest, UpdateTemplateResponse }; @@ -5,8 +6,9 @@ use serde_json::Value; use uuid::Uuid; use axum::{ - extract:: Path, Json, http::StatusCode + extract:: Path, Json, http::StatusCode, Extension }; +use crate::services::mail_service::MailService; use crate::services::template_service; use crate::handlers::mail_handler as mail_handler; @@ -140,6 +142,7 @@ pub async fn delete_template( ) )] pub async fn send_templated_email( + Extension(mail_service): Extension>, Path(template_id): Path, Json(payload): Json ) -> Result, AppError> { @@ -161,11 +164,11 @@ pub async fn send_templated_email( template_id: Some(send_templated_email_response.id), campaign_id: None, sent_at: send_templated_email_response.sent_at, - status: "pending".to_string(), + status: "queued".to_string(), server_id: None, }; - let _ = mail_handler::add_mail(Json(payload)).await; + let _ = mail_handler::add_mail(Extension(mail_service), Json(payload)).await; Ok(Json(send_templated_email_response)) } diff --git a/backend/src/lib.rs b/backend/src/lib.rs index 5070616..5ea807a 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -78,6 +78,7 @@ pub mod utils { pub mod email_utils; pub mod mjml_parser; pub mod server_utils; + pub mod bounce_logs; } pub mod middleware; diff --git a/backend/src/main.rs b/backend/src/main.rs index 2d93754..5c9133b 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -4,7 +4,10 @@ use axum::middleware; use axum::response::Response; use axum::BoxError; use backend::error::AppError; +use backend::repositories::mail_repository; use backend::route::create_router; +use backend::services::mail_service::{self, MailServiceTrait}; +use backend::servers::{ servers_repo, servers_services }; use diesel::PgConnection; use diesel::Connection; // Import the Connection trait use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; @@ -16,7 +19,7 @@ use axum::http::{ }; use tower_http::cors::CorsLayer; use std::convert::Infallible; -use std::{env, net::SocketAddr}; +use std::{env, net::SocketAddr, sync::Arc}; use backend::middleware::error_handling_middleware; pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); @@ -52,6 +55,20 @@ async fn main() { .layer(cors) .layer(middleware::from_fn(error_handling_middleware)); + // Instantiate the server service and repository one time, and inject it to the process_mails background process... + let server_repo = Arc::new(servers_repo::ServerRepoImpl); + let server_service = servers_services::ServerService::new(server_repo); + + let mail_repo = Arc::new(mail_repository::MailRepositoryImpl); + let mail_service = mail_service::MailService::new(mail_repo); + + // Worker for processing mails... + tokio::spawn(async move { + if let Err(err) = mail_service.process_mails(server_service.into()).await.map_err(|err| AppError::InternalServerError(Some(format!("Mail worker error: {:?}", err.to_string())))) { + eprintln!("Error occurred in mail worker: {:?}", err); + } + }); + // Address configuration let addr = env::var("SERVER_ADDRESS").unwrap_or_else(|_| "0.0.0.0:8000".to_string()); let addr: SocketAddr = addr.parse().expect("Invalid server address"); diff --git a/backend/src/models/bounce_logs.rs b/backend/src/models/bounce_logs.rs index e5a714a..1413d4d 100644 --- a/backend/src/models/bounce_logs.rs +++ b/backend/src/models/bounce_logs.rs @@ -83,6 +83,12 @@ pub struct SnsNotification { } +#[derive(Debug, Serialize, Deserialize, Clone, ToSchema)] +pub struct Header { + pub name: String, + pub value: String, +} + #[derive(Debug, Deserialize, Serialize, Clone, ToSchema)] pub struct Message { #[serde(rename = "notificationType")] @@ -92,7 +98,7 @@ pub struct Message { pub bounce: Option, pub delivery: Option, pub open: Option, - pub mail: MailDetails, + pub mail: MailDetails } #[derive(Debug, Deserialize, Serialize, Clone, ToSchema)] @@ -119,6 +125,7 @@ pub struct MailDetails { pub destination: Vec, #[serde(rename = "messageId")] pub mail_id: String, + pub headers: Vec
} #[derive(Debug, Deserialize, Serialize, Clone, ToSchema)] diff --git a/backend/src/models/campaign.rs b/backend/src/models/campaign.rs index 5f963a4..ec9cd0d 100644 --- a/backend/src/models/campaign.rs +++ b/backend/src/models/campaign.rs @@ -1,8 +1,9 @@ +use axum::http::status::StatusCode; use chrono::{ DateTime, NaiveDateTime, Utc }; use serde_json::Value; use serde::{ Serialize, Deserialize }; use utoipa::{openapi::schema, ToSchema}; -use diesel::prelude::*; +use diesel::{prelude::*, sql_types::Integer}; use uuid::Uuid; use crate::models::list::ListResponse; @@ -144,3 +145,10 @@ pub struct CampaignSendResponse { pub total_recipients: usize, pub status: String, } + +#[derive(Debug, Default, Serialize, Deserialize, ToSchema)] +pub struct AddMailToQueueResponse { + #[schema(value_type = u16, example = 200)] + pub status: u16, + pub message: String, +} \ No newline at end of file diff --git a/backend/src/models/mail.rs b/backend/src/models/mail.rs index ac9b4f5..208cea5 100644 --- a/backend/src/models/mail.rs +++ b/backend/src/models/mail.rs @@ -4,6 +4,7 @@ use utoipa::ToSchema; use diesel::{pg::Pg, prelude::*}; use uuid::Uuid; + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] enum MailStatus { @@ -27,6 +28,9 @@ pub struct Mail { pub open: Option>, pub clicks: i32, pub server_id: Option, + pub scheduled_at: DateTime, + pub attempts: i32, + pub last_error: Option, } #[derive(Queryable, QueryableByName, ToSchema )] @@ -62,6 +66,15 @@ pub struct MailWithDetails { #[diesel(sql_type = diesel::sql_types::Integer)] pub clicks: i32, + #[schema(value_type = String, example = "2023-01-01T00:00:00Z")] + pub scheduled_at: DateTime, + + #[diesel(sql_type = diesel::sql_types::Integer)] + pub attempts: i32, + + #[diesel(sql_type = diesel::sql_types::Nullable)] + pub last_error: Option, + #[diesel(sql_type = diesel::sql_types::Text)] pub email: String, // This comes from contacts.email @@ -95,6 +108,11 @@ pub struct GetMailResponse { pub clicks: i32, pub status: String, pub status_reason: Option, + + #[schema(value_type = String, example = "2023-01-01T00:00:00Z")] + pub scheduled_at: DateTime, + pub attempts: i32, + pub last_error: Option, } #[derive(Debug, Default, Serialize, Deserialize, ToSchema, Clone, PartialEq, Insertable)] @@ -182,6 +200,11 @@ pub struct UpdateMailRequest { #[schema(value_type = String, example = "2023-01-01T00:00:00Z")] pub open: Option>, pub clicks: Option, + + #[schema(value_type = String, example = "2023-01-01T00:00:00Z")] + pub scheduled_at: DateTime, + pub attempts: i32, + pub last_error: Option, } #[derive(Debug, Serialize, Deserialize, ToSchema)] @@ -203,6 +226,29 @@ pub struct UpdateMailResponse { #[schema(value_type = String, example = "2023-01-01T00:00:00Z")] pub open: Option>, pub clicks: i32, + + #[schema(value_type = String, example = "2023-01-01T00:00:00Z")] + pub scheduled_at: DateTime, + pub attempts: i32, + pub last_error: Option, +} + +impl From for UpdateMailResponse { + fn from(mail: Mail) -> Self { + Self { + id: mail.id, + mail_message: mail.mail_message, + template_id: mail.template_id, + campaign_id: mail.campaign_id, + status: Some(mail.status), + updated_at: chrono::Utc::now(), + open: mail.open, + clicks: mail.clicks, + scheduled_at: mail.scheduled_at, + attempts: mail.attempts, + last_error: mail.last_error, + } + } } #[derive(Debug, Serialize, Deserialize, ToSchema)] diff --git a/backend/src/repositories/mail_repository.rs b/backend/src/repositories/mail_repository.rs index 692907f..06d8286 100644 --- a/backend/src/repositories/mail_repository.rs +++ b/backend/src/repositories/mail_repository.rs @@ -28,6 +28,7 @@ pub trait MailRepository { async fn delete_mail(&self, mail_id: String) -> Result; async fn increment_mail_clicks(&self, mail_id: String) -> Result; async fn get_mails_by_contact(&self, c_id: Uuid) -> Result, diesel::result::Error>; + async fn get_queued_mails(&self) -> Result, diesel::result::Error>; } pub struct MailRepositoryImpl; @@ -59,6 +60,9 @@ impl MailRepository for MailRepositoryImpl { status, open, clicks, + scheduled_at, + attempts, + last_error, contacts_dsl::email, bounce_logs_dsl::reason.nullable(), )) @@ -136,6 +140,9 @@ impl MailRepository for MailRepositoryImpl { status, open, clicks, + scheduled_at, + attempts, + last_error, contacts_dsl::email, bounce_logs_dsl::reason.nullable() )) @@ -143,4 +150,31 @@ impl MailRepository for MailRepositoryImpl { .order(sent_at.desc()) .load::(&mut conn) } + + async fn get_queued_mails(&self) -> Result, diesel::result::Error> { + let mut conn = get_connection_pool().await; + + mails + .inner_join(contacts_dsl::contacts.on(contact_id.eq(contacts_dsl::id))) + .left_outer_join(bounce_logs_dsl::bounce_logs.on(id.eq(bounce_logs_dsl::mail_id))) + .select(( + id, + mail_message, + template_id, + campaign_id, + server_id, + sent_at, + status, + open, + clicks, + scheduled_at, + attempts, + last_error, + contacts_dsl::email, + bounce_logs_dsl::reason.nullable(), + )) + .filter(status.eq("queued")) + .order(sent_at.asc()) // Order by sent_at ascending here... + .load::(&mut conn) + } } \ No newline at end of file diff --git a/backend/src/routes/bounce_logs_route.rs b/backend/src/routes/bounce_logs_route.rs index 71fba63..05bb148 100644 --- a/backend/src/routes/bounce_logs_route.rs +++ b/backend/src/routes/bounce_logs_route.rs @@ -1,18 +1,27 @@ +use std::sync::Arc; use axum::{ routing::{ get, post, delete - }, Router }; + }, Router, Extension }; use crate::handlers::bounce_logs_handler::{ delete_bounce, get_all_bounces, get_bounces_by_contact_id, handle_sns_notification }; +use crate::services::mail_service; +use crate::repositories::mail_repository; + + pub fn bounce_logs_routes() -> Router { + let mail_repo = Arc::new(mail_repository::MailRepositoryImpl); + let mail_service = mail_service::MailService::new(mail_repo); + Router::new() .route("/sns/bounce", post(handle_sns_notification)) .route("/", get(get_all_bounces)) .route("/contacts/{contactId}", get(get_bounces_by_contact_id)) .route("/{bounceId}", delete(delete_bounce)) + .layer(Extension(Arc::new(mail_service))) } \ No newline at end of file diff --git a/backend/src/routes/mail.rs b/backend/src/routes/mail.rs index 7c19d3f..2eeb0bf 100644 --- a/backend/src/routes/mail.rs +++ b/backend/src/routes/mail.rs @@ -1,22 +1,25 @@ +use std::sync::Arc; + use axum::{ routing::{ - get, - post, - patch, - delete - }, Router }; + delete, get, patch, post + }, Extension, Router }; use crate::handlers::mail_handler::{ - add_mail, - get_all_mails, - update_mail, - delete_mail + add_mail, delete_mail, get_all_mails, update_mail }; +use crate::services::mail_service as service; +use crate::repositories::mail_repository; + pub fn mail_routes() -> Router { + let mail_repo = Arc::new(mail_repository::MailRepositoryImpl); + let mail_service = service::MailService::new(mail_repo); + Router::new() .route("/", post(add_mail)) .route("/", get(get_all_mails)) .route("/{mailId}", patch(update_mail)) .route("/{mailId}", delete(delete_mail)) + .layer(Extension(Arc::new(mail_service))) } \ No newline at end of file diff --git a/backend/src/routes/template.rs b/backend/src/routes/template.rs index 29e8068..19a293a 100644 --- a/backend/src/routes/template.rs +++ b/backend/src/routes/template.rs @@ -1,17 +1,22 @@ +use std::sync::Arc; + use axum::{ routing::{ - get, - post, - patch, - delete - }, Router }; + delete, get, patch, post + }, Extension, Router }; use crate::handlers::template::{ delete_template, get_templates, get_templates_by_id, create_template, update_template, send_templated_email, parse_mjml_to_html }; +use crate::services::mail_service; +use crate::repositories::mail_repository; + pub fn template_routes() -> Router { + let mail_repo = Arc::new(mail_repository::MailRepositoryImpl); + let mail_service = mail_service::MailService::new(mail_repo); + Router::new() .route("/{templateId}", get(get_templates_by_id)) .route("/", get(get_templates)) @@ -20,4 +25,5 @@ pub fn template_routes() -> Router { .route("/{templateId}", delete(delete_template)) .route("/{templateId}/send", post(send_templated_email)) .route("/parse-mjml", post(parse_mjml_to_html)) + .layer(Extension(Arc::new(mail_service))) } \ No newline at end of file diff --git a/backend/src/schema.rs b/backend/src/schema.rs index 178a68e..c41f712 100644 --- a/backend/src/schema.rs +++ b/backend/src/schema.rs @@ -100,6 +100,9 @@ diesel::table! { open -> Nullable, clicks -> Int4, server_id -> Nullable, + scheduled_at -> Timestamptz, + attempts -> Int4, + last_error -> Nullable, } } @@ -131,6 +134,7 @@ diesel::table! { created_at -> Timestamptz, updated_at -> Timestamptz, default_from_email -> Varchar, + rate_limit -> Int4, } } diff --git a/backend/src/servers/servers_handler.rs b/backend/src/servers/servers_handler.rs index c730305..542e03d 100644 --- a/backend/src/servers/servers_handler.rs +++ b/backend/src/servers/servers_handler.rs @@ -9,6 +9,9 @@ use std::sync::Arc; use uuid::Uuid; use crate::servers::servers_services::{ServerServiceTrait, ServerService}; +use crate::services::mail_service::MailService; +use crate::utils::server_utils::secure_server_response; + use serde::Serialize; @@ -197,6 +200,9 @@ pub async fn get_mails_from_server( status_reason: mail.reason, mail_message: mail.mail_message, campaign_id: mail.campaign_id, + scheduled_at: mail.scheduled_at, + attempts: mail.attempts, + last_error: mail.last_error, }).collect(); Ok(Json(mails_response)) diff --git a/backend/src/servers/servers_model.rs b/backend/src/servers/servers_model.rs index 3d79afe..e8314bd 100644 --- a/backend/src/servers/servers_model.rs +++ b/backend/src/servers/servers_model.rs @@ -52,6 +52,7 @@ pub struct Server { pub created_at: DateTime, pub updated_at: DateTime, pub default_from_email: String, + pub rate_limit: i32 } // Create DTOs #[derive(Debug, Default, Serialize, Deserialize, ToSchema, Clone, PartialEq)] @@ -86,6 +87,9 @@ pub struct ServerRequest { #[schema(example = "from.email@test.io")] pub default_from_email: String, + + #[schema(example = 30)] + pub rate_limit: i32 } // Get DTO @@ -111,11 +115,13 @@ pub struct ServerResponse { pub updated_at: DateTime, #[schema(example = "from.email@test.io")] pub default_from_email: String, + #[schema(example = 30)] + pub rate_limit: i32 } impl From for ServerResponse { fn from(server: Server) -> Self { - let safe_aws = server.aws_credentials.map(|creds| secure_server_response(creds)); + // let safe_aws = server.aws_credentials.map(|creds| secure_server_response(creds)); Self { id: server.id, @@ -127,10 +133,11 @@ impl From for ServerResponse { tls_type: server.tls_type, port: server.port, server_type: server.server_type, - aws_credentials: safe_aws, + aws_credentials: server.aws_credentials, created_at: server.created_at, updated_at: server.updated_at, default_from_email: server.default_from_email, + rate_limit: server.rate_limit, } } } diff --git a/backend/src/servers/servers_repo.rs b/backend/src/servers/servers_repo.rs index 2b3f939..1d56ae4 100644 --- a/backend/src/servers/servers_repo.rs +++ b/backend/src/servers/servers_repo.rs @@ -55,6 +55,7 @@ impl ServerRepo for ServerRepoImpl { created_at, updated_at, default_from_email, + rate_limit, )) .load::(&mut conn) } @@ -78,6 +79,7 @@ impl ServerRepo for ServerRepoImpl { aws_credentials.eq(&payload.aws_credentials), port.eq(&payload.port), default_from_email.eq(&payload.default_from_email), + rate_limit.eq(&payload.rate_limit), )) .get_result(&mut conn) } @@ -118,6 +120,9 @@ impl ServerRepo for ServerRepoImpl { status, open, clicks, + scheduled_at, + attempts, + last_error, contacts_dsl::email, bounce_logs_dsl::reason.nullable(), )) diff --git a/backend/src/servers/servers_routes.rs b/backend/src/servers/servers_routes.rs index 90773e4..e2f8716 100644 --- a/backend/src/servers/servers_routes.rs +++ b/backend/src/servers/servers_routes.rs @@ -10,16 +10,9 @@ use::axum::{ }, Router }; -use crate::servers::servers_handler::{ - create_server, - get_servers, - get_server_by_id, - delete_server, - update_server, - check_credentials, - send_mail_from_server, - get_mails_from_server -}; +use crate::{repositories::mail_repository, servers::servers_handler::{ + check_credentials, create_server, delete_server, get_mails_from_server, get_server_by_id, get_servers, send_mail_from_server, update_server +}, services::mail_service}; use crate::servers::servers_services; use crate::servers::servers_repo; diff --git a/backend/src/servers/servers_services.rs b/backend/src/servers/servers_services.rs index 9243116..34e0aaa 100644 --- a/backend/src/servers/servers_services.rs +++ b/backend/src/servers/servers_services.rs @@ -1,4 +1,4 @@ -use crate::{error::AppError, schema::sql_types::TlsType, models::contact::Contact, servers::{servers_repo::{ServerRepo, ServerRepoImpl}, servers_services}}; +use crate::{error::AppError, models::contact::Contact, schema::sql_types::TlsType, servers::{servers_repo::{ServerRepo, ServerRepoImpl}, servers_services}, services::mail_service::MailServiceTrait}; use uuid::Uuid; use std::sync::Arc; @@ -6,6 +6,7 @@ use axum::extract::Extension; use axum::http::StatusCode; use crate::models::mail::{CreateMailRequest, MailWithDetails}; use crate::services::mail_service; +use crate::repositories::mail_repository::{ MailRepository, MailRepositoryImpl }; use crate::servers::servers_model::{ Server, ServerRequest, SendMailFromServerResponse, ServerTypeEnum }; use crate::services::template_service::{get_template_by_id, send_templated_email}; use crate::models::template::SendMailRequest; @@ -19,8 +20,10 @@ use super::servers_model::TlsTypeEnum; use lettre::{ message::{Message, MultiPart, SinglePart}, transport::smtp::{authentication::Credentials, client::{Tls, TlsParameters}}, SmtpTransport, Transport }; +use mockall::{ automock, predicate::* }; +#[automock] #[async_trait] pub trait ServerServiceTrait { async fn create_server(&self, payload: ServerRequest) -> Result; @@ -42,6 +45,7 @@ pub trait ServerServiceTrait { async fn send_mail_from_server(&self, server_id: &str, template_id: Uuid, receiver: String) -> Result; async fn get_mails_by_server_id(&self, server_id: Uuid) -> Result, AppError>; } + #[derive(Clone)] pub struct ServerService { repository: Arc @@ -228,6 +232,9 @@ impl ServerServiceTrait for ServerService { let receivers = receiver.split(",").map(|s| s.to_string()).collect::>(); let mut mail_send_ids: Vec = Vec::new(); + let mail_repo = Arc::new(MailRepositoryImpl); + let mail_service = mail_service::MailService::new(mail_repo); + match server.server_type { // send mail using the AWS credentials... ServerTypeEnum::AWS => { @@ -255,17 +262,18 @@ impl ServerServiceTrait for ServerService { sent_at: chrono::Utc::now(), status: "sent".to_string(), }; - - mail_service::create_mail(new_mail).await.map_err(|err| { + + mail_service.create_mail(new_mail).await.map_err(|err| { AppError::InternalServerError(Some(format!("Failed to create mail: {}", err))) })?; + Ok(SendMailFromServerResponse { id: mail_send_uuid, template_id, server_id: server.id, mail_send_ids: vec![mail_send_uuid], - status: "pending".to_string(), + status: "queued".to_string(), sent_at: chrono::Utc::now(), }) }, @@ -315,10 +323,10 @@ impl ServerServiceTrait for ServerService { campaign_id: None, server_id: Some(server.id), sent_at: chrono::Utc::now(), - status: "pending".to_string(), + status: "queued".to_string(), }; - mail_service::create_mail(new_mail).await.map_err(|err| { + mail_service.create_mail(new_mail).await.map_err(|err| { AppError::InternalServerError(Some(format!("Failed to create mail: {}", err))) })?; diff --git a/backend/src/services/aws_service.rs b/backend/src/services/aws_service.rs index 675351f..0a60014 100644 --- a/backend/src/services/aws_service.rs +++ b/backend/src/services/aws_service.rs @@ -1,4 +1,4 @@ -use aws_sdk_sesv2::{error::SdkError, operation::send_email::SendEmailOutput, types::{Body, Content, Destination, EmailContent, Message, SuppressionListReason }, Client }; +use aws_sdk_sesv2::{error::SdkError, operation::send_email::SendEmailOutput, types::{Body, Content, Destination, EmailContent, Message, MessageHeader, SuppressionListReason }, Client }; use aws_config::{BehaviorVersion, Region}; use std::{env, error::Error, sync::Arc}; @@ -46,7 +46,8 @@ pub async fn send_mail( cc: Option>, bcc: Option>, subject: &str, - html_data: &str + html_data: &str, + mail_id: Option<&str> ) -> Result> { let mut destination = Destination::builder().build(); destination.to_addresses = Some(to.clone()); @@ -92,16 +93,26 @@ pub async fn send_mail( let msg = Message::builder() .subject(subject_content) + .set_headers(Some(vec![ + MessageHeader::builder() + .name("mailId") + .value(mail_id.unwrap()) + .build()? + ])) .body(body) .build(); let email_content = EmailContent::builder().simple(msg).build(); + let configuration_set_name = env::var("AWS_SES_CONFIGURATION_SET_NAME") + .expect("You must have env of configuration set name for open and click rates tracking named: AWS_SES_CONFIGURATION_SET_NAME"); + let result = client .send_email() .from_email_address(from) .destination(destination) .content(email_content) + .set_configuration_set_name(Some(configuration_set_name)) .send() .await; diff --git a/backend/src/services/campaign_service.rs b/backend/src/services/campaign_service.rs index 66f3411..460231f 100644 --- a/backend/src/services/campaign_service.rs +++ b/backend/src/services/campaign_service.rs @@ -1,4 +1,4 @@ -use crate::{error::AppError, models::{campaign::{CampaignSendResponse, DeleteCampaignResponse, GetCampaignResponse, UpdateCampaignRequest, UpdateCampaignResponse}, campaign_lists::NewListInCampaign, mail::CreateMailRequest}, repositories::{campaign::{self, CampaginRepositoryImpl, CampaignRepository}, campaign_lists_repo::{CampaignListRepository, CampaignListRepositoryImpl}, list_contact_repo::ListContactRepositoryImpl, mail_repository::MailRepositoryImpl},servers::{servers_handler::get_server_by_id, servers_model::{Server, ServerTypeEnum}, servers_repo::{self, ServerRepoImpl}, servers_services::{self, ServerService, ServerServiceTrait}}, utils::contact_lists_functions::{get_unique_contacts_from_campaign, populate_contact_template}}; +use crate::{error::AppError, models::{campaign::{CampaignSendResponse, DeleteCampaignResponse, GetCampaignResponse, UpdateCampaignRequest, UpdateCampaignResponse, AddMailToQueueResponse}, campaign_lists::NewListInCampaign, mail::CreateMailRequest}, repositories::{campaign::{self, CampaginRepositoryImpl, CampaignRepository}, campaign_lists_repo::{CampaignListRepository, CampaignListRepositoryImpl}, list_contact_repo::ListContactRepositoryImpl, mail_repository::{ MailRepository, MailRepositoryImpl}}, servers::{servers_handler::get_server_by_id, servers_model::{Server, ServerTypeEnum}, servers_repo::{self, ServerRepoImpl}, servers_services::{self, ServerService, ServerServiceTrait}}, utils::contact_lists_functions::{get_unique_contacts_from_campaign, populate_contact_template}}; use uuid::Uuid; use std::{collections::HashSet, sync::Arc, env}; use axum::http::StatusCode; @@ -13,7 +13,7 @@ use aws_sdk_sesv2::types::{builders::{BodyBuilder, ContentBuilder}, Body, Conten use crate::services::{aws_service, list_service::ListContactService, template_service::get_template_by_id}; use anyhow::{anyhow, Result}; -use super::{aws_service::create_aws_client_db, campaign_sender_service::get_campaign_sender_by_id, mail_service::MailService }; +use super::{aws_service::create_aws_client_db, campaign_sender_service::get_campaign_sender_by_id, mail_service::{MailService, MailServiceTrait} }; use crate::services::mail_service as mail_service; @@ -241,10 +241,10 @@ pub async fn delete_campaign(campaign_id: Uuid)->Result Result { - +) -> Result { let campaign = get_campaign_by_id(campaign_id).await?; let campaign_sender_id = campaign .campaign_senders @@ -260,18 +260,12 @@ pub async fn send_campaign_email( let server = server_service.get_server_by_id(server_id.as_str()).await?; - - match server.server_type { - ServerTypeEnum::AWS => { - let result = send_campaign_email_aws(campaign_id).await?; - Ok(result) - }, - ServerTypeEnum::SMTP => { - let result = send_campaign_email_smtp(campaign_id, Uuid::parse_str(&server_id).unwrap()).await?; - Ok(result) - } - } - + // modified to add the mails to queue rather than directly sending them... + let _result = enqueue_email(campaign_id, server.id).await?; + return Ok(AddMailToQueueResponse { + status: StatusCode::OK.into(), + message: "Campaign email sent successfully".to_string(), + }); } pub async fn send_campaign_email_aws( @@ -308,6 +302,9 @@ pub async fn send_campaign_email_aws( let client = aws_service::create_aws_client_db(&server_id).await; let request = client.list_email_identities(); + let mail_repo = Arc::new(MailRepositoryImpl); + let mail_service = mail_service::MailService::new(mail_repo); + // Send the request and await the response let result = request.send().await.map_err(|err| AppError::InternalServerError(Some(err.to_string())))?; @@ -365,11 +362,11 @@ pub async fn send_campaign_email_aws( template_id: Some(Uuid::parse_str(&template.id)?), campaign_id: Some(campaign_id), sent_at: chrono::Utc::now(), - status: "pending".to_string(), + status: "queued".to_string(), server_id: Some(server_uuid), }; - mail_service::create_mail(new_mail).await?; + mail_service.create_mail(new_mail).await?; } Ok(CampaignSendResponse { campaign_id: campaign_id.to_string(), @@ -412,6 +409,9 @@ pub async fn send_campaign_email_smtp( let sender_email = campaign_sender_response.from_email; let server_service = ServerService::new(Arc::new(ServerRepoImpl)); + let mail_repo = Arc::new(MailRepositoryImpl); + let mail_service = mail_service::MailService::new(mail_repo); + for contact in contacts.clone() { let parsed_html = populate_contact_template(&template, &contact).await.map_err(|err| AppError::InternalServerError(Some(err.to_string())))?; @@ -432,12 +432,12 @@ pub async fn send_campaign_email_smtp( mail_message: parsed_html, email: vec![contact.email.clone()], template_id: Some(Uuid::parse_str(&template.id)?), - campaign_id: Some((campaign_id)), + campaign_id: Some(campaign_id), sent_at: chrono::Utc::now(), - status: "pending".to_string(), + status: "queued".to_string(), server_id: Some(server_id), }; - mail_service::create_mail(new_mail).await.map_err(|err| { + mail_service.create_mail(new_mail).await.map_err(|err| { AppError::InternalServerError(Some(format!("Failed to create mail: {}", err))) })?; }, @@ -454,3 +454,110 @@ pub async fn send_campaign_email_smtp( }) } +/// a function to send a single email to a contact with smtp server... +pub async fn send_single_email ( + server_type: ServerTypeEnum, + mail_id: String, + campaign_id: Uuid, + server_id: Uuid, + email: String, + message: String, + subject: String, +) -> Result { + let server_service = ServerService::new(Arc::new(ServerRepoImpl)); + + let campaign = get_campaign_by_id(campaign_id.clone()) + .await + .map_err(|err| AppError::NotFoundError(Some(err.to_string())))?; + + let sender_id_string = match campaign.campaign_senders { + Some(uuid) => uuid.to_string(), + None => return Err(AppError::NotFoundError(Some("Sender ID not found for campaign.".to_string()))), + }; + + let campaign_sender_response = get_campaign_sender_by_id(sender_id_string).await + .map_err(|err| AppError::InternalServerError(Some(err.to_string())))?; + + let sender_email = campaign_sender_response.from_email; + + let mail_repo = Arc::new(MailRepositoryImpl); + let mail_service = mail_service::MailService::new(mail_repo); + + let _result = match server_type { + ServerTypeEnum::AWS => { + let client = aws_service::create_aws_client_db(&server_id.to_string()).await; + + aws_service::send_mail( + client, + &sender_email, + vec![email], + None, + None, + &subject, + &message, + Some(&mail_id), + ).await.map_err(|err| AppError::InternalServerError(Some(format!("{:?}", err))))?; + }, + ServerTypeEnum::SMTP => { + server_service.send_mail_with_smtp( + server_id, + &sender_email, + vec![email], + None, + None, + &subject, + &message, + ).await.map_err(|err| AppError::InternalServerError(Some(format!("{:?}", err))))?; + } + }; + let mail_status = "submitted"; + + mail_service.update_mail_status(mail_id, mail_status).await.map_err(|err| { + AppError::InternalServerError(Some(format!("Failed to update mail status: {}", err))) + })?; + + Ok(CampaignSendResponse { + campaign_id: campaign_id.to_string(), + total_recipients: 1, + status: mail_status.to_string(), + }) +} + +/// a function to enqueue email for sending... +pub async fn enqueue_email( + campaign_id: Uuid, + server_id: Uuid +) -> Result<(), AppError> { + let campaign = get_campaign_by_id(campaign_id.clone()) + .await + .map_err(|err| AppError::NotFoundError(Some(err.to_string())))?; + + let contacts = get_unique_contacts_from_campaign(campaign_id).await?; + + let template = get_template_by_id(campaign.template_id.clone()).await + .map_err(|err| AppError::InternalServerError(Some(err.to_string())))?; + + let mail_repo = Arc::new(MailRepositoryImpl); + let mail_service = mail_service::MailService::new(mail_repo); + + for contact in contacts.clone() { + let parsed_html = populate_contact_template(&template, &contact).await.map_err(|err| AppError::InternalServerError(Some(err.to_string())))?; + + let new_mail = CreateMailRequest { + id: Uuid::new_v4().to_string(), + mail_message: parsed_html, + email: vec![contact.email.clone()], + template_id: Some(Uuid::parse_str(&template.id)?), + campaign_id: Some(campaign_id), + sent_at: chrono::Utc::now(), + status: "queued".to_string(), + server_id: Some(server_id), + }; + + mail_service.create_mail(new_mail).await.map_err(|err| { + AppError::InternalServerError(Some(format!("Failed to create mail: {}", err))) + })?; + } + + Ok(()) +} \ No newline at end of file diff --git a/backend/src/services/contact_service.rs b/backend/src/services/contact_service.rs index 9a2e6b9..c1968ba 100644 --- a/backend/src/services/contact_service.rs +++ b/backend/src/services/contact_service.rs @@ -1,7 +1,6 @@ use crate::{models::contact::{ContactList, GetContactResponsee}, repositories::contact::{self, ContactRepository, ContactRepositoryImpl}}; use uuid::Uuid; use std::{collections::HashMap, sync::Arc}; -use axum::{http::StatusCode, Json}; use crate::models::contact::{ Contact, CreateContactRequest, @@ -10,9 +9,10 @@ use crate::models::contact::{ }; use crate::models::mail::MailWithDetails; use crate::error::AppError; -use crate::services::mail_service::get_mails_by_contact; +use super::{list_service, mail_service::MailServiceTrait}; +use crate::repositories::mail_repository::{ MailRepository, MailRepositoryImpl }; +use crate::services::mail_service; -use super::list_service; pub struct ContactService { repository: Arc @@ -254,7 +254,10 @@ pub async fn import_contacts( } pub async fn get_mails_for_contact (contact_id: Uuid) -> Result, AppError> { - let mails = get_mails_by_contact(contact_id).await?; + let mail_repo = Arc::new(MailRepositoryImpl); + let mail_service = mail_service::MailService::new(mail_repo); + + let mails = mail_service.get_mails_by_contact(contact_id).await?; Ok(mails) } \ No newline at end of file diff --git a/backend/src/services/mail_service.rs b/backend/src/services/mail_service.rs index 727db23..5e88d04 100644 --- a/backend/src/services/mail_service.rs +++ b/backend/src/services/mail_service.rs @@ -1,8 +1,10 @@ -use crate::{models::mail::{DeleteMailResponse, MailWithDetails, NewMail}, repositories::mail_repository::{ MailRepository, MailRepositoryImpl }}; +use crate::{models::mail::{DeleteMailResponse, MailWithDetails, NewMail}, repositories::mail_repository::MailRepository, services::campaign_service::send_single_email}; +use crate::servers::servers_services::{ ServerService, ServerServiceTrait }; use crate::services::contact_service as contact_service; use chrono::{DateTime, Utc}; +use tokio::time::{interval, Duration}; use uuid::Uuid; -use std::sync::Arc; +use std::{collections::HashMap, num::NonZeroU32, sync::Arc, time::Instant}; use crate::models::mail::{ Mail, CreateMailRequest, @@ -10,161 +12,219 @@ use crate::models::mail::{ UpdateMailResponse }; use crate::error::AppError; +use governor::{Quota, RateLimiter, clock::DefaultClock, state::{ InMemoryState, NotKeyed }, middleware::NoOpMiddleware}; +use crate::servers::servers_model::ServerTypeEnum; +use async_trait::async_trait; +use mockall::{ automock, predicate::* }; + +/// a structure to hold the server state along with its rate limiter and the server_type to decide from what server (either AWS or SMTP) to send the email... +#[derive(Debug)] +pub struct ServerState { + pub limiter: RateLimiter, + pub server_type: ServerTypeEnum, +} + +#[automock] +#[async_trait] +pub trait MailServiceTrait { + async fn create_mail(&self, payload: CreateMailRequest) -> Result, AppError>; + async fn get_all_mails(&self, campaign_ids: Option, from: Option>, to: Option>) -> Result, AppError>; + async fn update_mail(&self, mail_id: String, payload: UpdateMailRequest) -> Result; + async fn update_mail_status(&self, mail_id: String, new_status: &str) -> Result; + async fn delete_mail(&self, mail_id: String) -> Result; + async fn increment_mail_clicks(&self, mail_id: String) -> Result; + async fn get_mails_by_contact(&self, contact_id: Uuid) -> Result, AppError>; + async fn fetch_queued_mails(&self) -> Result, AppError>; + async fn process_mails( + &self, + server_service: Arc, + ) -> Result<(), AppError>; +} + +#[derive(Clone)] pub struct MailService { repository: Arc } +#[automock] impl MailService { pub fn new(repository: Arc) -> Self { Self { repository } } +} - pub async fn create_mail(&self, payload: NewMail) -> Result { - self.repository.create_mail(payload).await +#[async_trait] +impl MailServiceTrait for MailService { + /// a function to add new mail into the record when the mail_send is triggered... + async fn create_mail(&self, payload: CreateMailRequest) -> Result, AppError> { + let mut responses = Vec::new(); // Vec; + for email in payload.email { + let contact = contact_service::get_contact_by_email(email).await?; + + let new_mail = NewMail { + id: payload.id.clone(), + mail_message: payload.mail_message.clone(), + contact_id: contact.id, + template_id: payload.template_id, + campaign_id: payload.campaign_id, + server_id: payload.server_id, + sent_at: payload.sent_at, + status: payload.status.clone(), + }; + let response = self.repository.create_mail(new_mail).await?; + + responses.push(response); + } + + Ok(responses) } - pub async fn get_all_mails(&self, campaign_ids: Option, from: Option>, to: Option>) -> Result, diesel::result::Error> { - self.repository.get_all_mails(campaign_ids, from, to).await - } + /// a function to get all mails from the record... + async fn get_all_mails( + &self, + campaign_ids: Option, + from: Option>, + to: Option> + ) -> Result, AppError> { + let response = self.repository.get_all_mails(campaign_ids, from, to).await?; - pub async fn update_mail(&self, mail_id: String, payload: UpdateMailRequest) -> Result { - self.repository.update_mail(mail_id, payload).await + Ok(response) } - pub async fn update_mail_status(&self, mail_id: String, new_status: &str) -> Result { - self.repository.update_mail_status(mail_id, new_status).await - } + /// a function to update mail in the record... + async fn update_mail(&self, mail_id: String, payload: UpdateMailRequest) -> Result { + let response = self.repository.update_mail(mail_id, payload).await?; - pub async fn delete_mail(&self, mail_id: String) -> Result { - self.repository.delete_mail(mail_id).await - } + let updated_mail_response: UpdateMailResponse = response.into(); - pub async fn increment_mail_clicks(&self, mail_id: String) -> Result { - self.repository.increment_mail_clicks(mail_id).await + Ok(updated_mail_response) } - pub async fn get_mails_by_contact(&self, contact_id: Uuid) -> Result, diesel::result::Error> { - self.repository.get_mails_by_contact(contact_id).await - } -} + /// a function to update mail status... + async fn update_mail_status(&self, mail_id: String, new_status: &str) -> Result { + let response = self.repository.update_mail_status(mail_id, new_status).await?; + + let updated_mail_response: UpdateMailResponse = response.into(); -/// a function to add new mail into the record when the mail_send is triggered... -pub async fn create_mail(payload: CreateMailRequest) -> Result, AppError> { - let mail_repository = Arc::new(MailRepositoryImpl); - let mail_service = MailService::new(mail_repository); - - let mut responses = Vec::new(); // Vec; - for email in payload.email { - let contact = contact_service::get_contact_by_email(email).await?; - - let new_mail = NewMail { - id: payload.id.clone(), - mail_message: payload.mail_message.clone(), - contact_id: contact.id, - template_id: payload.template_id, - campaign_id: payload.campaign_id, - server_id: payload.server_id, - sent_at: payload.sent_at, - status: payload.status.clone(), - }; - let response = mail_service.create_mail(new_mail).await?; - - responses.push(response); + Ok(updated_mail_response) } - Ok(responses) -} + /// a function to delete mail from the db relation... + async fn delete_mail(&self, mail_id: String) -> Result { + let response = self.repository.delete_mail(mail_id).await?; -/// a function to get all mails from the record... -pub async fn get_all_mails( - campaign_ids: Option, - from: Option>, - to: Option> -) -> Result, AppError> { - let mail_repository = Arc::new(MailRepositoryImpl); - let mail_service = MailService::new(mail_repository); - - let response = mail_service.get_all_mails(campaign_ids, from, to).await?; - - Ok(response) -} + Ok(DeleteMailResponse { + id: response.id, + status: Some(response.status), + contact_id: Some(response.contact_id), + }) + } -/// a function to update mail in the record... -pub async fn update_mail(mail_id: String, payload: UpdateMailRequest) -> Result { - let mail_repository = Arc::new(MailRepositoryImpl); - let mail_service = MailService::new(mail_repository); - - let response = mail_service.update_mail(mail_id, payload).await?; - - Ok(UpdateMailResponse { - id: response.id, - mail_message: response.mail_message, - template_id: response.template_id, - campaign_id: response.campaign_id, - status: Some(response.status), - updated_at: chrono::Utc::now(), - open: response.open, - clicks: response.clicks, - }) -} + async fn increment_mail_clicks(&self, mail_id: String) -> Result { + let response = self.repository.increment_mail_clicks(mail_id).await?; -/// a function to update mail status... -pub async fn update_mail_status(mail_id: String, new_status: String) -> Result { - let mail_repository = Arc::new(MailRepositoryImpl); - let mail_service = MailService::new(mail_repository); - - let response = mail_service.update_mail_status(mail_id, &new_status).await?; - - Ok(UpdateMailResponse { - id: response.id, - mail_message: response.mail_message, - template_id: response.template_id, - campaign_id: response.campaign_id, - status: Some(response.status), - updated_at: chrono::Utc::now(), - open: response.open, - clicks: response.clicks, - }) -} + let updated_mail_response: UpdateMailResponse = response.into(); -/// a function to delete mail from the db relation... -pub async fn delete_mail(mail_id: String) -> Result { - let mail_repository = Arc::new(MailRepositoryImpl); - let mail_service = MailService::new(mail_repository); + Ok(updated_mail_response) + } - let response = mail_service.delete_mail(mail_id).await?; + async fn get_mails_by_contact(&self, contact_id: Uuid) -> Result, AppError> { + let response = self.repository.get_mails_by_contact(contact_id).await?; - Ok(DeleteMailResponse { - id: response.id, - status: Some(response.status), - contact_id: Some(response.contact_id), - }) -} + Ok(response) + } -pub async fn increment_mail_clicks(mail_id: String) -> Result { - let mail_repository = Arc::new(MailRepositoryImpl); - let mail_service = MailService::new(mail_repository); - - let response = mail_service.increment_mail_clicks(mail_id).await?; - - Ok(UpdateMailResponse { - id: response.id, - mail_message: response.mail_message, - template_id: response.template_id, - campaign_id: response.campaign_id, - status: Some(response.status), - updated_at: chrono::Utc::now(), - open: response.open, - clicks: response.clicks, - }) -} + async fn fetch_queued_mails(&self) -> Result, AppError> { + let response = self.repository.get_queued_mails().await?; -pub async fn get_mails_by_contact(contact_id: Uuid) -> Result, AppError> { - let mail_repository = Arc::new(MailRepositoryImpl); - let mail_service = MailService::new(mail_repository); + Ok(response) + } - let response = mail_service.get_mails_by_contact(contact_id).await?; + /// Process queued mails with per-server rate limiting using the governor crate... + async fn process_mails( + &self, + server_service: Arc, + ) -> Result<(), AppError> { + // Create a rate limiter for each server... + let mut limiters: HashMap = HashMap::new(); + for server in server_service.get_all_servers().await? { + // rate_limit defines max tokens per second + let per_sec = NonZeroU32::new(server.rate_limit as u32) + .unwrap_or_else(|| NonZeroU32::new(1).unwrap()); + let quota = Quota::per_second(per_sec); + + let limiter = RateLimiter::direct(quota); + + let server_state = ServerState { + limiter, + server_type: server.server_type, + }; + + limiters.insert(server.id, server_state); + } + + // run the process in a loop each second... + let mut ticker = interval(Duration::from_secs(5)); + loop { + ticker.tick().await; + + + let mails = self.fetch_queued_mails().await?; + + if mails.is_empty() { + println!("No queued mails to process"); + continue; + } + for mail in mails { + let sid = match mail.server_id { + Some(id) => id, + None => continue, + }; + + if let Some(server_state) = limiters.get(&sid) { + println!("[Server {:?}] waiting for token to send mail {} to {}", sid, mail.id, mail.email); + let start = Instant::now(); + // try by adding the until_ready() to the limiter... + server_state.limiter.until_ready().await; + let waited = start.elapsed(); + println!("[Server {:?}] waited {:?} before sending mail {}", sid, waited, mail.id); + let email = mail.email.clone(); + + // create n background task to send the email bound by server rate limit... + tokio::spawn( + send_single_email( + server_state.server_type, + mail.id.clone(), + mail.campaign_id.unwrap(), + sid, + email, + mail.mail_message.clone(), + format!("Hello {}", mail.email), + ) + ); + + // Update status on success + self.repository.update_mail_status(mail.id, "submitted").await?; + } + } + } + } +} - Ok(response) +/// Process exactly one batch of queued mails (no loop or ticker). +/// the function for testing... +pub async fn process_one_batch( + servers: &HashMap, // server details... + fetch: impl Fn() -> Vec + Send + Sync, // fetch all the mailDetails... + send: impl Fn(&MailWithDetails) + Send + Sync, // send the emails... + update: impl Fn(String) + Send + Sync, // update the mail status... +) { + for mail in fetch() { + if let Some(state) = servers.get(&mail.server_id.unwrap()) { + state.limiter.until_ready().await; + send(&mail); + update(mail.id.clone()); + } + } } \ No newline at end of file diff --git a/backend/src/services/template_service.rs b/backend/src/services/template_service.rs index 866f83f..d1575e0 100644 --- a/backend/src/services/template_service.rs +++ b/backend/src/services/template_service.rs @@ -161,6 +161,7 @@ pub async fn send_templated_email( Some(bcc_list.clone()), &payload.subject, &parsed_html, + None, ) .await?; diff --git a/backend/src/utils/bounce_logs.rs b/backend/src/utils/bounce_logs.rs new file mode 100644 index 0000000..5188552 --- /dev/null +++ b/backend/src/utils/bounce_logs.rs @@ -0,0 +1,9 @@ +use crate::models::bounce_logs::Message; + +pub fn search_header_from_header_list(message: &Message, header_name:&str) -> Option { + let message_id = message.mail.headers.iter() + .find(|header| header.name.eq_ignore_ascii_case(header_name)) + .map(|header| header.value.clone()); + + message_id +} \ No newline at end of file diff --git a/backend/tests/rate_limiting.rs b/backend/tests/rate_limiting.rs new file mode 100644 index 0000000..e107aaf --- /dev/null +++ b/backend/tests/rate_limiting.rs @@ -0,0 +1,126 @@ +use std::{sync::{Arc, Mutex}, time::Duration, collections::HashMap}; +use tokio::time::Instant; +use uuid::Uuid; +use chrono::Utc; + +use backend::{models::mail::{Mail, MailWithDetails}, services::mail_service::MailServiceTrait}; +use backend::repositories::mail_repository::MockMailRepository; +use backend::servers::{ + servers_model::{Server, ServerTypeEnum, TlsTypeEnum}, + servers_repo::MockServerRepo, + servers_services::ServerService, +}; +use backend::services::mail_service::MailService; + + +#[tokio::test] +async fn test_process_mails_rate_limiting() { + // mock server with rate_limit of 1 mail/sec + let server_id = Uuid::new_v4(); + let fake_server = Server { + id: server_id, + active: true, + host: "smtp.test".into(), + smtp_username: "user1".into(), + smtp_password: "pass".into(), + namespace_id: Uuid::new_v4(), + tls_type: TlsTypeEnum::SSLTLS, + port: 587, + server_type: ServerTypeEnum::SMTP, + aws_credentials: None, + created_at: Utc::now(), + updated_at: Utc::now(), + default_from_email: "noreply@test".into(), + rate_limit: 1, + }; + let mut mock_srv_repo = MockServerRepo::new(); + mock_srv_repo + .expect_get_all_servers() + .returning(move || Ok(vec![fake_server.clone()])); + let server_service = Arc::new(ServerService::new(Arc::new(mock_srv_repo))); + + // mock mail repo that returns exactly one queued mail each tick... + let mail_id = "m1".to_string(); + let mut mock_mail_repo = MockMailRepository::new(); + // every call to get_queued_mails returns our single pending mail... + mock_mail_repo + .expect_get_queued_mails() + .returning(move || { + Ok(vec![ MailWithDetails { + id: mail_id.clone(), + mail_message: "hi".into(), + template_id: None, + campaign_id: Some(Uuid::new_v4()), + server_id: Some(server_id), + sent_at: Utc::now(), + status: "pending".into(), + open: None, + clicks: 0, + scheduled_at: Utc::now(), + attempts: 0, + last_error: None, + email: "abc@example.com".into(), + reason: None, + }]) + }); + + // record the instants when update_mail_status is called... + let times = Arc::new(Mutex::new(Vec::new())); + let times_clone = times.clone(); + mock_mail_repo + .expect_update_mail_status() + .returning(move |id, new_status| { + // record the instant... + times_clone.lock().unwrap().push(Instant::now()); + // return some dummy Mail back to caller... + Ok(Mail { + id: id.clone(), + mail_message: "".into(), + contact_id: Uuid::new_v4(), + template_id: None, + campaign_id: None, + sent_at: Utc::now(), + status: new_status.to_string(), + open: None, + clicks: 0, + server_id: None, + scheduled_at: Utc::now(), + attempts: 0, + last_error: None, + }) + }); + + let mail_service = Arc::new(MailService::new(Arc::new(mock_mail_repo))); + + // run the real worker (process) in the background... + let handle = tokio::spawn({ + let mail_service = mail_service.clone(); + let server_service = server_service.clone(); + async move { + mail_service.process_mails(server_service).await.unwrap(); + } + }); + tokio::time::sleep(Duration::from_secs(10)).await; + handle.abort(); + + // assert we got at least two updates, each ≥1s apart... + let sent = times.lock().unwrap(); + assert!( + sent.len() >= 2, + "sent.len()={}", + sent.len() + ); + assert!( + sent.len() <= 10, + "Expected at most 10 mails sent, but got {}", + sent.len() + ); + for window in sent.windows(2) { + let delta = window[1] - window[0]; + assert!( + delta >= Duration::from_secs(1), + "two sends too close: {:?}", + delta + ); + } +} diff --git a/backend/tests/servers.rs b/backend/tests/servers.rs index 6f2e199..32b9782 100644 --- a/backend/tests/servers.rs +++ b/backend/tests/servers.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use axum::http::StatusCode; use backend::error::AppError; -use backend::schema::sql_types::ServerType; use backend::servers::servers_model::{Server, ServerRequest, TlsTypeEnum, ServerTypeEnum}; use backend::servers::servers_repo::MockServerRepo; use backend::servers::servers_services::{ServerService, ServerServiceTrait}; @@ -23,6 +22,7 @@ async fn test_create_server() { aws_credentials: None, port: 587, default_from_email: "user@example.com".to_string(), + rate_limit: 30, }; let expected_server = Server { id: Uuid::new_v4(), @@ -38,6 +38,7 @@ async fn test_create_server() { created_at: Utc::now(), updated_at: Utc::now(), default_from_email: "user@example.com".to_string(), + rate_limit: 30, }; mock_repo .expect_create_server() @@ -69,6 +70,7 @@ async fn test_get_all_servers() { created_at: Utc::now(), updated_at: Utc::now(), default_from_email: "user@example.com".to_string(), + rate_limit: 30, } ]; mock_repo.expect_get_all_servers().returning(move || Ok(expected_servers.clone())); @@ -99,6 +101,7 @@ async fn test_get_server_by_id() { created_at: Utc::now(), updated_at: Utc::now(), default_from_email: "user@example.com".to_string(), + rate_limit: 30, }; mock_repo.expect_get_server_by_id().with(eq(server_id)).returning(move |_| Ok(expected_server.clone())); let server_service = ServerService::new(Arc::new(mock_repo)); @@ -145,6 +148,7 @@ async fn test_update_server() { aws_credentials: None, port: 465, default_from_email: "user@example.com".to_string(), + rate_limit: 30, }; let updated_server = Server { id: server_id, @@ -160,6 +164,7 @@ async fn test_update_server() { created_at: Utc::now(), updated_at: Utc::now(), default_from_email: "user@example.com".to_string(), + rate_limit: 30, }; mock_repo.expect_update_server().with(eq(server_id), eq(update_payload.clone())).returning(move |_, _| Ok(updated_server.clone())); let server_service = ServerService::new(Arc::new(mock_repo)); @@ -186,6 +191,7 @@ async fn test_delete_server() { created_at: Utc::now(), updated_at: Utc::now(), default_from_email: "user@example.com".to_string(), + rate_limit: 30, }; mock_repo.expect_delete_server().with(eq(server_id)).returning(move |_| Ok(expected_server.clone())); let server_service = ServerService::new(Arc::new(mock_repo)); diff --git a/frontend/app/dashboard/campaigns/[id]/analytics/_columns.tsx b/frontend/app/dashboard/campaigns/[id]/analytics/_columns.tsx index a99b999..7bae8d3 100644 --- a/frontend/app/dashboard/campaigns/[id]/analytics/_columns.tsx +++ b/frontend/app/dashboard/campaigns/[id]/analytics/_columns.tsx @@ -9,8 +9,9 @@ import ToolTip from '@/components/common/ToolTip'; const statusTagStyleMap = { draft: 'bg-gray-100 text-gray-800', - pending: 'bg-yellow-100 text-yellow-800', - sent: 'bg-blue-100 text-blue-800', + queued: 'bg-yellow-100 text-yellow-800', + submitted: 'bg-blue-100 text-blue-800', + delivered: 'bg-green-100 text-green-800', bounced: 'bg-red-100 text-red-800', } diff --git a/frontend/app/dashboard/contacts/[id]/mails/_columns.tsx b/frontend/app/dashboard/contacts/[id]/mails/_columns.tsx index 4a89923..2a3d51c 100644 --- a/frontend/app/dashboard/contacts/[id]/mails/_columns.tsx +++ b/frontend/app/dashboard/contacts/[id]/mails/_columns.tsx @@ -9,8 +9,8 @@ import TemplateName from './components/TemplateName'; const statusTagStyleMap = { draft: 'bg-gray-100 text-gray-800', - pending: 'bg-yellow-100 text-yellow-800', - sent: 'bg-blue-100 text-blue-800', + queued: 'bg-yellow-100 text-yellow-800', + delivered: 'bg-blue-100 text-blue-800', bounced: 'bg-red-100 text-red-800', } diff --git a/frontend/app/dashboard/servers/_components/AwsServerForm.tsx b/frontend/app/dashboard/servers/_components/AwsServerForm.tsx index 15df928..e422601 100644 --- a/frontend/app/dashboard/servers/_components/AwsServerForm.tsx +++ b/frontend/app/dashboard/servers/_components/AwsServerForm.tsx @@ -17,6 +17,7 @@ import { FieldErrors, } from "react-hook-form"; import { Server } from "@/lib/type"; +import serverRegions from "../_constants/server"; interface AwsServerFormProps { register: UseFormRegister; @@ -78,40 +79,13 @@ export default function AwsServerForm({ - - US East (N. Virginia) - - US East (Ohio) - - US West (N. California) - - US West (Oregon) - - Asia Pacific (Mumbai) - - - Asia Pacific (Seoul) - - - Asia Pacific (Singapore) - - - Asia Pacific (Sydney) - - - Asia Pacific (Tokyo) - - Canada (Central) - - Europe (Frankfurt) - - Europe (Ireland) - Europe (London) - Europe (Paris) - Europe (Stockholm) - - South America (São Paulo) - + { + serverRegions.map(region => ( + + { region.label } + + )) + } )} @@ -131,20 +105,37 @@ export default function AwsServerForm({ /> - -
- + +
+
+ + trigger("default_from_email"), + })} + placeholder="from.email@test.io" + /> + {errors.default_from_email && ( + + {errors.default_from_email.message} + + )} +
+
+ trigger("default_from_email"), + {...register("rate_limit", { + onChange: () => trigger("rate_limit"), + valueAsNumber: true, })} - placeholder="from.email@test.io" + placeholder="30" /> - {errors.default_from_email && ( + {errors.rate_limit && ( - {errors.default_from_email.message} + {errors.rate_limit.message} )} +
); diff --git a/frontend/app/dashboard/servers/_components/ServerCard.tsx b/frontend/app/dashboard/servers/_components/ServerCard.tsx index 2f4aa75..dd699ef 100644 --- a/frontend/app/dashboard/servers/_components/ServerCard.tsx +++ b/frontend/app/dashboard/servers/_components/ServerCard.tsx @@ -51,6 +51,7 @@ export default function ServerCard({ server, onCancel }: ServerCardProps) { const testAndSaveSmtpConnection = async () => { try { const formData = getValues(); + console.log("THE FORM VALUE FROM THE SERVER ===> ", formData); const payload = { ...formData }; const result = await checkCredentials(payload).unwrap(); diff --git a/frontend/app/dashboard/servers/_components/SmtpServerForm.tsx b/frontend/app/dashboard/servers/_components/SmtpServerForm.tsx index 45b0ec4..efd8688 100644 --- a/frontend/app/dashboard/servers/_components/SmtpServerForm.tsx +++ b/frontend/app/dashboard/servers/_components/SmtpServerForm.tsx @@ -130,7 +130,21 @@ export default function SmtpServerForm({
-
+
+
+ + trigger("default_from_email"), + })} + placeholder="from.email@test.io" + /> + {errors.default_from_email && ( + + {errors.default_from_email.message} + + )} +
- + trigger("default_from_email"), + {...register("rate_limit", { + onChange: () => trigger("rate_limit"), + valueAsNumber: true, })} - placeholder="from.email@test.io" + placeholder="30" /> - {errors.default_from_email && ( + {errors.rate_limit && ( - {errors.default_from_email.message} + {errors.rate_limit.message} )}
diff --git a/frontend/app/dashboard/servers/_constants/server.ts b/frontend/app/dashboard/servers/_constants/server.ts new file mode 100644 index 0000000..d510f30 --- /dev/null +++ b/frontend/app/dashboard/servers/_constants/server.ts @@ -0,0 +1,68 @@ +const serverRegions = [ + { + label: 'US East (N. Virginia)', + value: 'us-east-1', + }, + { + label: 'US East (Ohio)', + value: 'us-east-2', + }, + { + label: 'US West (N. California)', + value: 'us-west-1', + }, + { + label: 'US West (Oregon)', + value: 'us-west-2', + }, + { + label: 'Asia Pacific (Mumbai)', + value: 'ap-south-1', + }, + { + label: 'Asia Pacific (Seoul)', + value: 'ap-northeast-2', + }, + { + label: 'Asia Pacific (Singapore)', + value: 'ap-southeast-1', + }, + { + label: 'Asia Pacific (Sydney)', + value: 'ap-southeast-2', + }, + { + label: 'Asia Pacific (Tokyo)', + value: 'ap-northeast-1', + }, + { + label: 'Canada (Central)', + value: 'ca-central-1', + }, + { + label: 'Europe (Frankfurt)', + value: 'eu-central-1', + }, + { + label: 'Europe (Ireland)', + value: 'eu-west-1', + }, + { + label: 'Europe (London)', + value: 'eu-west-2', + }, + { + label: 'Europe (Paris)', + value: 'eu-west-3', + }, + { + label: 'Europe (Stockholm)', + value: 'eu-north-1', + }, + { + label: 'South America (São Paulo)', + value: 'sa-east-1', + }, +]; + +export default serverRegions; \ No newline at end of file diff --git a/frontend/app/layout.tsx b/frontend/app/layout.tsx index 084f6f1..e5f3d4a 100644 --- a/frontend/app/layout.tsx +++ b/frontend/app/layout.tsx @@ -14,8 +14,8 @@ const geistMono = Geist_Mono({ }); export const metadata: Metadata = { - title: "Create Next App", - description: "Generated by create next app", + title: "Mail Service", + description: "An email management platform", }; export default function RootLayout({ diff --git a/frontend/app/services/CampaignApi.ts b/frontend/app/services/CampaignApi.ts index d6053e3..46681ba 100644 --- a/frontend/app/services/CampaignApi.ts +++ b/frontend/app/services/CampaignApi.ts @@ -25,10 +25,7 @@ export const campaignApi = createApi({ // Query to fetch all lists... getCampaigns: builder.query({ query: () => "", - providesTags: (result) => - result - ? result.map((campaign) => ({ type: "Campaign", id: campaign.id })) - : [{ type: "Campaign" }], + providesTags: ["Campaign"], }), getCampaignById: builder.query({ query: (campaignId) => `/${campaignId}`, @@ -42,7 +39,7 @@ export const campaignApi = createApi({ method: "POST", body: newCampaign, }), - invalidatesTags: [{ type: "Campaign" }], + invalidatesTags: ["Campaign"], }), updateCampaign: builder.mutation< UpdateCampaignResponse, @@ -62,7 +59,7 @@ export const campaignApi = createApi({ console.error("Error updating campaign:", err); } }, - invalidatesTags: [{ type: "Campaign" }], + invalidatesTags: ["Campaign"], }), deleteCampaign: builder.mutation({ query: (campaignId) => ({ @@ -80,7 +77,7 @@ export const campaignApi = createApi({ method: "POST", body: { list_id: listId }, }), - invalidatesTags: [{ type: "Campaign" }], + invalidatesTags: ["Campaign"], }), }), }); diff --git a/frontend/components/Sidebar.tsx b/frontend/components/Sidebar.tsx index f089332..46ed164 100644 --- a/frontend/components/Sidebar.tsx +++ b/frontend/components/Sidebar.tsx @@ -74,15 +74,15 @@ const Sidebar = ({ onClose }: SidebarProps) => { )} -