From c0fb119b26b6462206f8df125ef7eaaf034e92af Mon Sep 17 00:00:00 2001 From: kanak1125 Date: Tue, 22 Apr 2025 15:41:55 +0545 Subject: [PATCH 01/10] feat: add mail processor that gets triggered every 30 secs --- .../down.sql | 6 +++ .../up.sql | 6 +++ backend/src/handlers/contact.rs | 3 ++ backend/src/handlers/mail_handler.rs | 3 ++ backend/src/main.rs | 8 +++ backend/src/models/mail.rs | 27 ++++++++++ backend/src/repositories/mail_repository.rs | 34 ++++++++++++ backend/src/schema.rs | 3 ++ backend/src/servers/servers_handler.rs | 3 ++ backend/src/servers/servers_repo.rs | 3 ++ backend/src/services/mail_service.rs | 52 +++++++++++++++++++ backend/tests/servers.rs | 1 - 12 files changed, 148 insertions(+), 1 deletion(-) create mode 100644 backend/migrations/2025-04-22-082248_add_additional_fields_to_mail_for_queueing/down.sql create mode 100644 backend/migrations/2025-04-22-082248_add_additional_fields_to_mail_for_queueing/up.sql diff --git a/backend/migrations/2025-04-22-082248_add_additional_fields_to_mail_for_queueing/down.sql b/backend/migrations/2025-04-22-082248_add_additional_fields_to_mail_for_queueing/down.sql new file mode 100644 index 0000000..8866867 --- /dev/null +++ b/backend/migrations/2025-04-22-082248_add_additional_fields_to_mail_for_queueing/down.sql @@ -0,0 +1,6 @@ +-- Rollback: Remove queuing and rate-limiting fields from mails table + +ALTER TABLE mails +DROP COLUMN scheduled_at, +DROP COLUMN attempts, +DROP COLUMN last_error; \ No newline at end of file diff --git a/backend/migrations/2025-04-22-082248_add_additional_fields_to_mail_for_queueing/up.sql b/backend/migrations/2025-04-22-082248_add_additional_fields_to_mail_for_queueing/up.sql new file mode 100644 index 0000000..3f865be --- /dev/null +++ b/backend/migrations/2025-04-22-082248_add_additional_fields_to_mail_for_queueing/up.sql @@ -0,0 +1,6 @@ +-- Migration: Add queuing and rate-limiting fields to mails table + +ALTER TABLE mails +ADD COLUMN "scheduled_at" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL, +ADD COLUMN "attempts" INTEGER DEFAULT 0 NOT NULL, +ADD COLUMN "last_error" TEXT; \ No newline at end of file diff --git a/backend/src/handlers/contact.rs b/backend/src/handlers/contact.rs index dad2f00..6e5ec9f 100644 --- a/backend/src/handlers/contact.rs +++ b/backend/src/handlers/contact.rs @@ -259,5 +259,8 @@ pub async fn get_mails_by_contact_id( clicks: mail.clicks, status: mail.status, status_reason: mail.reason, + scheduled_at: mail.scheduled_at, + attempts: mail.attempts, + last_error: mail.last_error, }).collect())) } \ No newline at end of file diff --git a/backend/src/handlers/mail_handler.rs b/backend/src/handlers/mail_handler.rs index 015e4fb..f15b28d 100644 --- a/backend/src/handlers/mail_handler.rs +++ b/backend/src/handlers/mail_handler.rs @@ -74,6 +74,9 @@ pub async fn get_all_mails( status: mail.status.clone(), status_reason: mail.reason.clone(), server_id: mail.server_id.clone(), + scheduled_at: mail.scheduled_at, + attempts: mail.attempts, + last_error: mail.last_error.clone(), }); }); Ok(Json(responses)) diff --git a/backend/src/main.rs b/backend/src/main.rs index 2d93754..97d9c2a 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -5,6 +5,7 @@ use axum::response::Response; use axum::BoxError; use backend::error::AppError; use backend::route::create_router; +use backend::services::mail_service; use diesel::PgConnection; use diesel::Connection; // Import the Connection trait use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; @@ -52,6 +53,13 @@ async fn main() { .layer(cors) .layer(middleware::from_fn(error_handling_middleware)); + // Worker for processing mails... + tokio::spawn(async { + if let Err(err) = mail_service::process_mails(30).await { + eprintln!("Mail worker error: {:?}", err); + } + }); + // Address configuration let addr = env::var("SERVER_ADDRESS").unwrap_or_else(|_| "0.0.0.0:8000".to_string()); let addr: SocketAddr = addr.parse().expect("Invalid server address"); diff --git a/backend/src/models/mail.rs b/backend/src/models/mail.rs index ac9b4f5..6381a23 100644 --- a/backend/src/models/mail.rs +++ b/backend/src/models/mail.rs @@ -27,6 +27,9 @@ pub struct Mail { pub open: Option>, pub clicks: i32, pub server_id: Option, + pub scheduled_at: DateTime, + pub attempts: i32, + pub last_error: Option, } #[derive(Queryable, QueryableByName, ToSchema )] @@ -62,6 +65,15 @@ pub struct MailWithDetails { #[diesel(sql_type = diesel::sql_types::Integer)] pub clicks: i32, + #[schema(value_type = String, example = "2023-01-01T00:00:00Z")] + pub scheduled_at: DateTime, + + #[diesel(sql_type = diesel::sql_types::Integer)] + pub attempts: i32, + + #[diesel(sql_type = diesel::sql_types::Nullable)] + pub last_error: Option, + #[diesel(sql_type = diesel::sql_types::Text)] pub email: String, // This comes from contacts.email @@ -95,6 +107,11 @@ pub struct GetMailResponse { pub clicks: i32, pub status: String, pub status_reason: Option, + + #[schema(value_type = String, example = "2023-01-01T00:00:00Z")] + pub scheduled_at: DateTime, + pub attempts: i32, + pub last_error: Option, } #[derive(Debug, Default, Serialize, Deserialize, ToSchema, Clone, PartialEq, Insertable)] @@ -182,6 +199,11 @@ pub struct UpdateMailRequest { #[schema(value_type = String, example = "2023-01-01T00:00:00Z")] pub open: Option>, pub clicks: Option, + + #[schema(value_type = String, example = "2023-01-01T00:00:00Z")] + pub scheduled_at: DateTime, + pub attempts: i32, + pub last_error: Option, } #[derive(Debug, Serialize, Deserialize, ToSchema)] @@ -203,6 +225,11 @@ pub struct UpdateMailResponse { #[schema(value_type = String, example = "2023-01-01T00:00:00Z")] pub open: Option>, pub clicks: i32, + + #[schema(value_type = String, example = "2023-01-01T00:00:00Z")] + pub scheduled_at: DateTime, + pub attempts: i32, + pub last_error: Option, } #[derive(Debug, Serialize, Deserialize, ToSchema)] diff --git a/backend/src/repositories/mail_repository.rs b/backend/src/repositories/mail_repository.rs index 692907f..ba0244b 100644 --- a/backend/src/repositories/mail_repository.rs +++ b/backend/src/repositories/mail_repository.rs @@ -28,6 +28,7 @@ pub trait MailRepository { async fn delete_mail(&self, mail_id: String) -> Result; async fn increment_mail_clicks(&self, mail_id: String) -> Result; async fn get_mails_by_contact(&self, c_id: Uuid) -> Result, diesel::result::Error>; + async fn get_queued_mails(&self) -> Result, diesel::result::Error>; } pub struct MailRepositoryImpl; @@ -59,6 +60,9 @@ impl MailRepository for MailRepositoryImpl { status, open, clicks, + scheduled_at, + attempts, + last_error, contacts_dsl::email, bounce_logs_dsl::reason.nullable(), )) @@ -136,6 +140,9 @@ impl MailRepository for MailRepositoryImpl { status, open, clicks, + scheduled_at, + attempts, + last_error, contacts_dsl::email, bounce_logs_dsl::reason.nullable() )) @@ -143,4 +150,31 @@ impl MailRepository for MailRepositoryImpl { .order(sent_at.desc()) .load::(&mut conn) } + + async fn get_queued_mails(&self) -> Result, diesel::result::Error> { + let mut conn = get_connection_pool().await; + + mails + .inner_join(contacts_dsl::contacts.on(contact_id.eq(contacts_dsl::id))) + .left_outer_join(bounce_logs_dsl::bounce_logs.on(id.eq(bounce_logs_dsl::mail_id))) + .select(( + id, + mail_message, + template_id, + campaign_id, + server_id, + sent_at, + status, + open, + clicks, + scheduled_at, + attempts, + last_error, + contacts_dsl::email, + bounce_logs_dsl::reason.nullable(), + )) + .filter(status.eq("pending")) + .order(sent_at.asc()) // Order by sent_at ascending here... + .load::(&mut conn) + } } \ No newline at end of file diff --git a/backend/src/schema.rs b/backend/src/schema.rs index 178a68e..43eae54 100644 --- a/backend/src/schema.rs +++ b/backend/src/schema.rs @@ -100,6 +100,9 @@ diesel::table! { open -> Nullable, clicks -> Int4, server_id -> Nullable, + scheduled_at -> Timestamptz, + attempts -> Int4, + last_error -> Nullable, } } diff --git a/backend/src/servers/servers_handler.rs b/backend/src/servers/servers_handler.rs index c730305..eb2b7ad 100644 --- a/backend/src/servers/servers_handler.rs +++ b/backend/src/servers/servers_handler.rs @@ -197,6 +197,9 @@ pub async fn get_mails_from_server( status_reason: mail.reason, mail_message: mail.mail_message, campaign_id: mail.campaign_id, + scheduled_at: mail.scheduled_at, + attempts: mail.attempts, + last_error: mail.last_error, }).collect(); Ok(Json(mails_response)) diff --git a/backend/src/servers/servers_repo.rs b/backend/src/servers/servers_repo.rs index 2b3f939..85f13dc 100644 --- a/backend/src/servers/servers_repo.rs +++ b/backend/src/servers/servers_repo.rs @@ -118,6 +118,9 @@ impl ServerRepo for ServerRepoImpl { status, open, clicks, + scheduled_at, + attempts, + last_error, contacts_dsl::email, bounce_logs_dsl::reason.nullable(), )) diff --git a/backend/src/services/mail_service.rs b/backend/src/services/mail_service.rs index 727db23..3b4f1dc 100644 --- a/backend/src/services/mail_service.rs +++ b/backend/src/services/mail_service.rs @@ -1,6 +1,7 @@ use crate::{models::mail::{DeleteMailResponse, MailWithDetails, NewMail}, repositories::mail_repository::{ MailRepository, MailRepositoryImpl }}; use crate::services::contact_service as contact_service; use chrono::{DateTime, Utc}; +use tokio::time::{interval, Duration}; use uuid::Uuid; use std::sync::Arc; use crate::models::mail::{ @@ -47,6 +48,10 @@ impl MailService { pub async fn get_mails_by_contact(&self, contact_id: Uuid) -> Result, diesel::result::Error> { self.repository.get_mails_by_contact(contact_id).await } + + pub async fn get_queued_mails(&self) -> Result, diesel::result::Error> { + self.repository.get_queued_mails().await + } } /// a function to add new mail into the record when the mail_send is triggered... @@ -106,6 +111,9 @@ pub async fn update_mail(mail_id: String, payload: UpdateMailRequest) -> Result< updated_at: chrono::Utc::now(), open: response.open, clicks: response.clicks, + scheduled_at: response.scheduled_at, + attempts: response.attempts, + last_error: response.last_error, }) } @@ -125,6 +133,9 @@ pub async fn update_mail_status(mail_id: String, new_status: String) -> Result Result Result Result, AppError> { + let mail_repository = Arc::new(MailRepositoryImpl); + let mail_service = MailService::new(mail_repository); + + let response = mail_service.get_queued_mails().await?; + + Ok(response) +} + +/// a function to process mails every 30 seconds... +pub async fn process_mails( + intv: u64, +) -> Result<(), AppError> { + let mut interval = interval(Duration::from_secs(intv)); + + loop { + interval.tick().await; + + match fetch_queued_mails().await { + Ok(emails) if !emails.is_empty() => { + println!("Found {} queued emails.", emails.len()); + for mail in emails { + println!("Processing email to: {}", mail.email); + // Example: pretend to send email + // Then mark it as sent + println!("Email sent to: {}", mail.email); + } + } + Ok(_) => { + println!("No queued emails to process."); + } + Err(e) => { + eprintln!("Error fetching emails: {:?}", e); + } + } + } } \ No newline at end of file diff --git a/backend/tests/servers.rs b/backend/tests/servers.rs index 6f2e199..fc9094b 100644 --- a/backend/tests/servers.rs +++ b/backend/tests/servers.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use axum::http::StatusCode; use backend::error::AppError; -use backend::schema::sql_types::ServerType; use backend::servers::servers_model::{Server, ServerRequest, TlsTypeEnum, ServerTypeEnum}; use backend::servers::servers_repo::MockServerRepo; use backend::servers::servers_services::{ServerService, ServerServiceTrait}; From 24a2001dd3be2b2e6b555a6587ccb3abad13ba7a Mon Sep 17 00:00:00 2001 From: kanak1125 Date: Wed, 23 Apr 2025 12:31:16 +0545 Subject: [PATCH 02/10] feat: create background process for sending queued mails --- backend/src/handlers/bounce_logs_handler.rs | 26 ++++++++++++++----- backend/src/handlers/template.rs | 2 +- backend/src/repositories/mail_repository.rs | 2 +- backend/src/servers/servers_services.rs | 4 +-- backend/src/services/campaign_service.rs | 6 ++--- backend/src/services/mail_service.rs | 2 +- .../campaigns/[id]/analytics/_columns.tsx | 4 +-- .../contacts/[id]/mails/_columns.tsx | 4 +-- 8 files changed, 31 insertions(+), 19 deletions(-) diff --git a/backend/src/handlers/bounce_logs_handler.rs b/backend/src/handlers/bounce_logs_handler.rs index 08a4e70..8621b71 100644 --- a/backend/src/handlers/bounce_logs_handler.rs +++ b/backend/src/handlers/bounce_logs_handler.rs @@ -15,8 +15,12 @@ use uuid::Uuid; enum MailStatus { Draft, - Pending, - Sent, + Scheduled, + Queued, + Processing, // this status might not be necessary... + Submitted, + Delivered, + Failed, Bounced, } @@ -24,8 +28,12 @@ impl MailStatus { pub fn as_str(&self) -> &'static str { match self { MailStatus::Draft => "draft", - MailStatus::Pending => "pending", - MailStatus::Sent => "sent", + MailStatus::Scheduled => "scheduled", + MailStatus::Queued => "queued", + MailStatus::Processing => "processing", + MailStatus::Submitted => "submitted", + MailStatus::Delivered => "delivered", + MailStatus::Failed => "failed", MailStatus::Bounced => "bounced", } } @@ -33,8 +41,12 @@ impl MailStatus { pub fn from_str(s: &str) -> Option { match s { "draft" => Some(MailStatus::Draft), - "pending" => Some(MailStatus::Pending), - "sent" => Some(MailStatus::Sent), + "scheduled" => Some(MailStatus::Scheduled), + "queued" => Some(MailStatus::Queued), + "processing" => Some(MailStatus::Processing), + "submitted" => Some(MailStatus::Submitted), + "delivered" => Some(MailStatus::Delivered), + "failed" => Some(MailStatus::Failed), "bounced" => Some(MailStatus::Bounced), _ => None, } @@ -129,7 +141,7 @@ pub async fn handle_sns_notification ( } "Delivery" => { if let Some(delivery) = sns_event.delivery { - let status = MailStatus::Sent; + let status = MailStatus::Delivered; let _ = mail_service.update_mail_status(sns_event.mail.mail_id, status.as_str()).await; } diff --git a/backend/src/handlers/template.rs b/backend/src/handlers/template.rs index 3114310..566beff 100644 --- a/backend/src/handlers/template.rs +++ b/backend/src/handlers/template.rs @@ -161,7 +161,7 @@ pub async fn send_templated_email( template_id: Some(send_templated_email_response.id), campaign_id: None, sent_at: send_templated_email_response.sent_at, - status: "pending".to_string(), + status: "queued".to_string(), server_id: None, }; diff --git a/backend/src/repositories/mail_repository.rs b/backend/src/repositories/mail_repository.rs index ba0244b..06d8286 100644 --- a/backend/src/repositories/mail_repository.rs +++ b/backend/src/repositories/mail_repository.rs @@ -173,7 +173,7 @@ impl MailRepository for MailRepositoryImpl { contacts_dsl::email, bounce_logs_dsl::reason.nullable(), )) - .filter(status.eq("pending")) + .filter(status.eq("queued")) .order(sent_at.asc()) // Order by sent_at ascending here... .load::(&mut conn) } diff --git a/backend/src/servers/servers_services.rs b/backend/src/servers/servers_services.rs index 9243116..17eb969 100644 --- a/backend/src/servers/servers_services.rs +++ b/backend/src/servers/servers_services.rs @@ -265,7 +265,7 @@ impl ServerServiceTrait for ServerService { template_id, server_id: server.id, mail_send_ids: vec![mail_send_uuid], - status: "pending".to_string(), + status: "queued".to_string(), sent_at: chrono::Utc::now(), }) }, @@ -315,7 +315,7 @@ impl ServerServiceTrait for ServerService { campaign_id: None, server_id: Some(server.id), sent_at: chrono::Utc::now(), - status: "pending".to_string(), + status: "queued".to_string(), }; mail_service::create_mail(new_mail).await.map_err(|err| { diff --git a/backend/src/services/campaign_service.rs b/backend/src/services/campaign_service.rs index 66f3411..b7f3177 100644 --- a/backend/src/services/campaign_service.rs +++ b/backend/src/services/campaign_service.rs @@ -365,7 +365,7 @@ pub async fn send_campaign_email_aws( template_id: Some(Uuid::parse_str(&template.id)?), campaign_id: Some(campaign_id), sent_at: chrono::Utc::now(), - status: "pending".to_string(), + status: "queued".to_string(), server_id: Some(server_uuid), }; @@ -432,9 +432,9 @@ pub async fn send_campaign_email_smtp( mail_message: parsed_html, email: vec![contact.email.clone()], template_id: Some(Uuid::parse_str(&template.id)?), - campaign_id: Some((campaign_id)), + campaign_id: Some(campaign_id), sent_at: chrono::Utc::now(), - status: "pending".to_string(), + status: "queued".to_string(), server_id: Some(server_id), }; mail_service::create_mail(new_mail).await.map_err(|err| { diff --git a/backend/src/services/mail_service.rs b/backend/src/services/mail_service.rs index 3b4f1dc..9cc151a 100644 --- a/backend/src/services/mail_service.rs +++ b/backend/src/services/mail_service.rs @@ -192,7 +192,7 @@ pub async fn fetch_queued_mails() -> Result, AppError> { Ok(response) } -/// a function to process mails every 30 seconds... +/// a function to process mails every interval 'intv'... pub async fn process_mails( intv: u64, ) -> Result<(), AppError> { diff --git a/frontend/app/dashboard/campaigns/[id]/analytics/_columns.tsx b/frontend/app/dashboard/campaigns/[id]/analytics/_columns.tsx index a99b999..52c0adc 100644 --- a/frontend/app/dashboard/campaigns/[id]/analytics/_columns.tsx +++ b/frontend/app/dashboard/campaigns/[id]/analytics/_columns.tsx @@ -9,8 +9,8 @@ import ToolTip from '@/components/common/ToolTip'; const statusTagStyleMap = { draft: 'bg-gray-100 text-gray-800', - pending: 'bg-yellow-100 text-yellow-800', - sent: 'bg-blue-100 text-blue-800', + queued: 'bg-yellow-100 text-yellow-800', + delivered: 'bg-blue-100 text-blue-800', bounced: 'bg-red-100 text-red-800', } diff --git a/frontend/app/dashboard/contacts/[id]/mails/_columns.tsx b/frontend/app/dashboard/contacts/[id]/mails/_columns.tsx index 4a89923..2a3d51c 100644 --- a/frontend/app/dashboard/contacts/[id]/mails/_columns.tsx +++ b/frontend/app/dashboard/contacts/[id]/mails/_columns.tsx @@ -9,8 +9,8 @@ import TemplateName from './components/TemplateName'; const statusTagStyleMap = { draft: 'bg-gray-100 text-gray-800', - pending: 'bg-yellow-100 text-yellow-800', - sent: 'bg-blue-100 text-blue-800', + queued: 'bg-yellow-100 text-yellow-800', + delivered: 'bg-blue-100 text-blue-800', bounced: 'bg-red-100 text-red-800', } From 56c56567ecfdf9291b9fd1f400f75956184a6dcb Mon Sep 17 00:00:00 2001 From: kanak1125 Date: Wed, 23 Apr 2025 13:12:30 +0545 Subject: [PATCH 03/10] fix: resolve the problem with the auto-refetching of the campaign after campaign creation with no campaigns prior --- frontend/app/services/CampaignApi.ts | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/frontend/app/services/CampaignApi.ts b/frontend/app/services/CampaignApi.ts index d6053e3..46681ba 100644 --- a/frontend/app/services/CampaignApi.ts +++ b/frontend/app/services/CampaignApi.ts @@ -25,10 +25,7 @@ export const campaignApi = createApi({ // Query to fetch all lists... getCampaigns: builder.query({ query: () => "", - providesTags: (result) => - result - ? result.map((campaign) => ({ type: "Campaign", id: campaign.id })) - : [{ type: "Campaign" }], + providesTags: ["Campaign"], }), getCampaignById: builder.query({ query: (campaignId) => `/${campaignId}`, @@ -42,7 +39,7 @@ export const campaignApi = createApi({ method: "POST", body: newCampaign, }), - invalidatesTags: [{ type: "Campaign" }], + invalidatesTags: ["Campaign"], }), updateCampaign: builder.mutation< UpdateCampaignResponse, @@ -62,7 +59,7 @@ export const campaignApi = createApi({ console.error("Error updating campaign:", err); } }, - invalidatesTags: [{ type: "Campaign" }], + invalidatesTags: ["Campaign"], }), deleteCampaign: builder.mutation({ query: (campaignId) => ({ @@ -80,7 +77,7 @@ export const campaignApi = createApi({ method: "POST", body: { list_id: listId }, }), - invalidatesTags: [{ type: "Campaign" }], + invalidatesTags: ["Campaign"], }), }), }); From 19fae6ecbce0b60c6e0171875c9b231509dfcdd7 Mon Sep 17 00:00:00 2001 From: kanak1125 Date: Thu, 24 Apr 2025 10:39:44 +0545 Subject: [PATCH 04/10] feat: add enqueue for the emails sent with smtp server and minor UI fixes on sidebar --- backend/src/handlers/campaign.rs | 3 +- backend/src/models/campaign.rs | 10 ++- backend/src/services/campaign_service.rs | 102 +++++++++++++++++++++-- backend/src/services/mail_service.rs | 19 ++++- frontend/app/layout.tsx | 4 +- frontend/components/Sidebar.tsx | 10 +-- 6 files changed, 132 insertions(+), 16 deletions(-) diff --git a/backend/src/handlers/campaign.rs b/backend/src/handlers/campaign.rs index 550ca4c..7158e07 100644 --- a/backend/src/handlers/campaign.rs +++ b/backend/src/handlers/campaign.rs @@ -1,5 +1,6 @@ use uuid::Uuid; use crate::error::AppError; +use crate::models::campaign::AddMailToQueueResponse; use crate::{models::{campaign:: { CampaignSendResponse, CreateCampaignRequest, CreateCampaignResponse, DeleteCampaignResponse, ExtendedCreateCampaignRequest, GetCampaignResponse, UpdateCampaignRequest, UpdateCampaignResponse @@ -122,7 +123,7 @@ pub async fn delete_campaign( )] pub async fn send_campaign_email( Path(campaign_id): Path -) -> Result, AppError> { +) -> Result, AppError> { let uuid_id = Uuid::parse_str(&campaign_id)?; let result = campaign_service::send_campaign_email(uuid_id).await?; diff --git a/backend/src/models/campaign.rs b/backend/src/models/campaign.rs index 5f963a4..ec9cd0d 100644 --- a/backend/src/models/campaign.rs +++ b/backend/src/models/campaign.rs @@ -1,8 +1,9 @@ +use axum::http::status::StatusCode; use chrono::{ DateTime, NaiveDateTime, Utc }; use serde_json::Value; use serde::{ Serialize, Deserialize }; use utoipa::{openapi::schema, ToSchema}; -use diesel::prelude::*; +use diesel::{prelude::*, sql_types::Integer}; use uuid::Uuid; use crate::models::list::ListResponse; @@ -144,3 +145,10 @@ pub struct CampaignSendResponse { pub total_recipients: usize, pub status: String, } + +#[derive(Debug, Default, Serialize, Deserialize, ToSchema)] +pub struct AddMailToQueueResponse { + #[schema(value_type = u16, example = 200)] + pub status: u16, + pub message: String, +} \ No newline at end of file diff --git a/backend/src/services/campaign_service.rs b/backend/src/services/campaign_service.rs index b7f3177..4a5a688 100644 --- a/backend/src/services/campaign_service.rs +++ b/backend/src/services/campaign_service.rs @@ -1,4 +1,4 @@ -use crate::{error::AppError, models::{campaign::{CampaignSendResponse, DeleteCampaignResponse, GetCampaignResponse, UpdateCampaignRequest, UpdateCampaignResponse}, campaign_lists::NewListInCampaign, mail::CreateMailRequest}, repositories::{campaign::{self, CampaginRepositoryImpl, CampaignRepository}, campaign_lists_repo::{CampaignListRepository, CampaignListRepositoryImpl}, list_contact_repo::ListContactRepositoryImpl, mail_repository::MailRepositoryImpl},servers::{servers_handler::get_server_by_id, servers_model::{Server, ServerTypeEnum}, servers_repo::{self, ServerRepoImpl}, servers_services::{self, ServerService, ServerServiceTrait}}, utils::contact_lists_functions::{get_unique_contacts_from_campaign, populate_contact_template}}; +use crate::{error::AppError, models::{campaign::{CampaignSendResponse, DeleteCampaignResponse, GetCampaignResponse, UpdateCampaignRequest, UpdateCampaignResponse, AddMailToQueueResponse}, campaign_lists::NewListInCampaign, mail::CreateMailRequest}, repositories::{campaign::{self, CampaginRepositoryImpl, CampaignRepository}, campaign_lists_repo::{CampaignListRepository, CampaignListRepositoryImpl}, list_contact_repo::ListContactRepositoryImpl, mail_repository::MailRepositoryImpl}, servers::{servers_handler::get_server_by_id, servers_model::{Server, ServerTypeEnum}, servers_repo::{self, ServerRepoImpl}, servers_services::{self, ServerService, ServerServiceTrait}}, services::mail_service::update_mail_status, utils::contact_lists_functions::{get_unique_contacts_from_campaign, populate_contact_template}}; use uuid::Uuid; use std::{collections::HashSet, sync::Arc, env}; use axum::http::StatusCode; @@ -243,8 +243,7 @@ pub async fn delete_campaign(campaign_id: Uuid)->Result Result { - +) -> Result { let campaign = get_campaign_by_id(campaign_id).await?; let campaign_sender_id = campaign .campaign_senders @@ -264,14 +263,22 @@ pub async fn send_campaign_email( match server.server_type { ServerTypeEnum::AWS => { let result = send_campaign_email_aws(campaign_id).await?; - Ok(result) + return Ok(AddMailToQueueResponse { + status: StatusCode::OK.into(), + message: "Campaign email sent successfully".to_string(), + }); }, ServerTypeEnum::SMTP => { - let result = send_campaign_email_smtp(campaign_id, Uuid::parse_str(&server_id).unwrap()).await?; - Ok(result) + // let result = send_campaign_email_smtp(campaign_id, Uuid::parse_str(&server_id).unwrap()).await?; + let result = enqueue_email(campaign_id, server.id).await?; + return Ok(AddMailToQueueResponse { + status: StatusCode::OK.into(), + message: "Campaign email sent successfully".to_string(), + }); } } + // instead of sending the email, lets first enqueue the emails which will be sent by background process in interval... } pub async fn send_campaign_email_aws( @@ -454,3 +461,86 @@ pub async fn send_campaign_email_smtp( }) } +/// a function to send a single email to a contact with smtp server... +pub async fn send_single_email_smtp ( + mail_id: String, + campaign_id: Uuid, + server_id: Uuid, + email: &str, + message: String, + subject: String, +) -> Result { + let server_service = ServerService::new(Arc::new(ServerRepoImpl)); + + let campaign = get_campaign_by_id(campaign_id.clone()) + .await + .map_err(|err| AppError::NotFoundError(Some(err.to_string())))?; + + let sender_id_string = match campaign.campaign_senders { + Some(uuid) => uuid.to_string(), + None => return Err(AppError::NotFoundError(Some("Sender ID not found for campaign.".to_string()))), + }; + + let campaign_sender_response = get_campaign_sender_by_id(sender_id_string).await + .map_err(|err| AppError::InternalServerError(Some(err.to_string())))?; + + let sender_email = campaign_sender_response.from_email; + + let result = server_service.send_mail_with_smtp( + server_id, + &sender_email, + vec![email.to_string()], + None, + None, + &subject, + &message, + ).await.map_err(|err| AppError::InternalServerError(Some(format!("{:?}", err))))?; + + let mail_status = "submitted".to_string(); + + update_mail_status(mail_id, mail_status.clone()).await.map_err(|err| { + AppError::InternalServerError(Some(format!("Failed to update mail status: {}", err))) + })?; + + Ok(CampaignSendResponse { + campaign_id: campaign_id.to_string(), + total_recipients: 1, + status: mail_status, + }) +} + +/// a function to enqueue email for sending... +pub async fn enqueue_email( + campaign_id: Uuid, + server_id: Uuid +) -> Result<(), AppError> { + let campaign = get_campaign_by_id(campaign_id.clone()) + .await + .map_err(|err| AppError::NotFoundError(Some(err.to_string())))?; + + let contacts = get_unique_contacts_from_campaign(campaign_id).await?; + + let template = get_template_by_id(campaign.template_id.clone()).await + .map_err(|err| AppError::InternalServerError(Some(err.to_string())))?; + + for contact in contacts.clone() { + let parsed_html = populate_contact_template(&template, &contact).await.map_err(|err| AppError::InternalServerError(Some(err.to_string())))?; + + let new_mail = CreateMailRequest { + id: Uuid::new_v4().to_string(), + mail_message: parsed_html, + email: vec![contact.email.clone()], + template_id: Some(Uuid::parse_str(&template.id)?), + campaign_id: Some(campaign_id), + sent_at: chrono::Utc::now(), + status: "queued".to_string(), + server_id: Some(server_id), + }; + + mail_service::create_mail(new_mail).await.map_err(|err| { + AppError::InternalServerError(Some(format!("Failed to create mail: {}", err))) + })?; + } + + Ok(()) +} \ No newline at end of file diff --git a/backend/src/services/mail_service.rs b/backend/src/services/mail_service.rs index 9cc151a..bc52b2e 100644 --- a/backend/src/services/mail_service.rs +++ b/backend/src/services/mail_service.rs @@ -1,4 +1,4 @@ -use crate::{models::mail::{DeleteMailResponse, MailWithDetails, NewMail}, repositories::mail_repository::{ MailRepository, MailRepositoryImpl }}; +use crate::{models::mail::{DeleteMailResponse, MailWithDetails, NewMail}, repositories::mail_repository::{ MailRepository, MailRepositoryImpl }, services::campaign_service::{send_campaign_email, send_campaign_email_smtp, send_single_email_smtp}}; use crate::services::contact_service as contact_service; use chrono::{DateTime, Utc}; use tokio::time::{interval, Duration}; @@ -208,6 +208,23 @@ pub async fn process_mails( println!("Processing email to: {}", mail.email); // Example: pretend to send email // Then mark it as sent + let current_campaign_id = mail.campaign_id; + + let subject = format!("Hello {}", mail.email); + + // send the campaign email only if the email has campaign_id (or campaign is associated with the email)... + if let Some(campaign_id) = current_campaign_id { + let result_mail= send_single_email_smtp + ( + mail.id, + campaign_id, mail.server_id.unwrap(), + &mail.email, + mail.mail_message, + subject + ).await?; + } else { + println!("No valid campaign ID found for this email."); + } println!("Email sent to: {}", mail.email); } } diff --git a/frontend/app/layout.tsx b/frontend/app/layout.tsx index 084f6f1..e5f3d4a 100644 --- a/frontend/app/layout.tsx +++ b/frontend/app/layout.tsx @@ -14,8 +14,8 @@ const geistMono = Geist_Mono({ }); export const metadata: Metadata = { - title: "Create Next App", - description: "Generated by create next app", + title: "Mail Service", + description: "An email management platform", }; export default function RootLayout({ diff --git a/frontend/components/Sidebar.tsx b/frontend/components/Sidebar.tsx index f089332..46ed164 100644 --- a/frontend/components/Sidebar.tsx +++ b/frontend/components/Sidebar.tsx @@ -74,15 +74,15 @@ const Sidebar = ({ onClose }: SidebarProps) => { )} -