Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
769 changes: 164 additions & 605 deletions backend/Cargo.lock

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2021"

[dependencies]
axum = "0.8.1"
tokio = { version = "1.43.0", features = ["full"] }
tokio = { version = "1.43.1", features = ["full"] }
dotenv = "0.15.0"
tower-http = { version = "0.6", features = ["cors", "trace"] }
serde = { version = "1.0.217", features = ["derive"] }
Expand All @@ -27,7 +27,7 @@ diesel_migrations = "2.2.0"
uuid = { version = "1.12.1", features = ["serde", "v4" ] }

utoipa = { version = "5.3.1", features = ["axum_extras"] }
utoipa-swagger-ui = {version = "9.0.0", features=["axum"] }
utoipa-swagger-ui = {version = "9.0.1", features=["axum"] }
url = "2.5.4"

warp = "0.3.7"
Expand All @@ -36,15 +36,14 @@ thiserror = "2"
anyhow = "1.0.95"
mockall = "0.13.1"
async-trait = "0.1.85"
reqwest = { version = "0.12", features = ["json"] }
reqwest = { version = "0.12.15", features = ["json"] }

diesel-derive-enum = {version= "2.1.0", features=["postgres"] }

multipart = "0.18.0"
axum-extra = { version = "0.10.0", features = ["multipart"] }
csv = "1.3.1"

lettre = "0.11.14"
lettre = "0.11.15"
tokenbucket = "0.1.6"
governor = "0.10.0"

Expand Down
1 change: 1 addition & 0 deletions backend/src/handlers/contact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,5 +262,6 @@ pub async fn get_mails_by_contact_id(
scheduled_at: mail.scheduled_at,
attempts: mail.attempts,
last_error: mail.last_error,
from_name: mail.from_name,
}).collect()))
}
43 changes: 43 additions & 0 deletions backend/src/handlers/mail_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub async fn get_all_mails(
scheduled_at: mail.scheduled_at,
attempts: mail.attempts,
last_error: mail.last_error.clone(),
from_name: mail.from_name.clone(),
});
});
Ok(Json(responses))
Expand Down Expand Up @@ -119,4 +120,46 @@ pub async fn delete_mail(
let deleted_mail = mail_service.delete_mail(mail_id).await?;

Ok(Json(deleted_mail))
}


#[utoipa::path(
get,
path = "/api/mails/bounce",
responses(
(status = 200, description = "Get all bounced mails", body = Vec<GetMailResponse>),
(status = 404)
)
)]
pub async fn get_bounced_mails(
Extension(mail_service): Extension<Arc<MailService>>,
) -> Result<Json<Vec<GetMailResponse>>, AppError> {
let all_bounced_mails = mail_service.fetch_bounced_mails().await?;

let mut responses = Vec::new();

if all_bounced_mails.is_empty() {
return Ok(Json(vec![]));
}
all_bounced_mails.iter().for_each(|mail| {
responses.push(GetMailResponse {
id: mail.id.clone(),
mail_message: mail.mail_message.clone(),
email: mail.email.clone(),
template_id: mail.template_id.clone(),
campaign_id: mail.campaign_id,
sent_at: mail.sent_at,
open: mail.open,
clicks: mail.clicks,
status: mail.status.clone(),
status_reason: mail.reason.clone(),
server_id: mail.server_id.clone(),
scheduled_at: mail.scheduled_at,
attempts: mail.attempts,
last_error: mail.last_error.clone(),
from_name: mail.from_name.clone(),
});
});
Ok(Json(responses))

}
29 changes: 22 additions & 7 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,32 @@ async fn main() {

// Instantiate the server service and repository one time, and inject it to the process_mails background process...
let server_repo = Arc::new(servers_repo::ServerRepoImpl);
let server_service = servers_services::ServerService::new(server_repo);
let server_service = Arc::new(servers_services::ServerService::new(server_repo));

let mail_repo = Arc::new(mail_repository::MailRepositoryImpl);
let mail_service = mail_service::MailService::new(mail_repo);
let mail_service = Arc::new(mail_service::MailService::new(mail_repo));

// Worker for processing mails...
tokio::spawn(async move {
if let Err(err) = mail_service.process_mails(server_service.into()).await.map_err(|err| AppError::InternalServerError(Some(format!("Mail worker error: {:?}", err.to_string())))) {
eprintln!("Error occurred in mail worker: {:?}", err);
}
});
{
let mail_service = Arc::clone(&mail_service);
let server_service = Arc::clone(&server_service);
tokio::spawn(async move {
if let Err(err) = mail_service.process_mails(server_service.into()).await.map_err(|err| AppError::InternalServerError(Some(format!("Mail worker error: {:?}", err.to_string())))) {
eprintln!("Error occurred in mail worker: {:?}", err);
}
});
}

// Worker for retrying submitted but not delivered mails...
{
let mail_service = Arc::clone(&mail_service);
let server_service = Arc::clone(&server_service);
tokio::spawn(async move {
if let Err(err) = mail_service.process_submitted_mails(server_service.into()).await.map_err(|err| AppError::InternalServerError(Some(format!("Mail retry worker error: {:?}", err.to_string())))) {
eprintln!("Error occurred in mail retry worker: {:?}", err);
}
});
}

// Address configuration
let addr = env::var("SERVER_ADDRESS").unwrap_or_else(|_| "0.0.0.0:8000".to_string());
Expand Down
4 changes: 4 additions & 0 deletions backend/src/models/mail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ pub struct MailWithDetails {

#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
pub reason: Option<String>, // This comes from bounce_logs.reason

#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
pub from_name: Option<String>,
}


Expand Down Expand Up @@ -108,6 +111,7 @@ pub struct GetMailResponse {
pub clicks: i32,
pub status: String,
pub status_reason: Option<String>,
pub from_name: Option<String>,

#[schema(value_type = String, example = "2023-01-01T00:00:00Z")]
pub scheduled_at: DateTime<Utc>,
Expand Down
4 changes: 3 additions & 1 deletion backend/src/repositories/campaign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl CampaignRepository for CampaginRepositoryImpl {
created_at,
updated_at
))
.order(updated_at.desc())
.load::<Campaign>(&mut conn)
}
async fn update_campaign(&self, campaign_id: Uuid, payload: UpdateCampaignRequest)->Result<Campaign, diesel::result::Error> {
Expand All @@ -63,7 +64,8 @@ impl CampaignRepository for CampaginRepositoryImpl {
template_id.eq(&payload.template_id),
status.eq(&payload.status),
campaign_senders.eq(&payload.campaign_senders),
scheduled_at.eq(&payload.scheduled_at)
scheduled_at.eq(&payload.scheduled_at),
updated_at.eq(diesel::dsl::now),
))
.get_result(&mut conn)
}
Expand Down
28 changes: 15 additions & 13 deletions backend/src/repositories/list_repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ pub async fn get_connection_pool() -> DbPooledConnection {
#[async_trait]
pub trait ListRepository {
async fn create_list(&self, payload: CreateListRequest) -> Result<List, diesel::result::Error>;
async fn get_all_lists(&self, namespaceId: Uuid) -> Result<Vec<List>, diesel::result::Error>;
async fn get_list_by_id(&self, namespaceId: Uuid, list_id: Uuid) -> Result<List, diesel::result::Error>;
async fn update_list(&self, namespaceId: Uuid, list_id: Uuid, payload: UpdateListRequest) -> Result<List, diesel::result::Error>;
async fn delete_list(&self, namespaceId: Uuid, list_id: Uuid)->Result<List, diesel::result::Error>;
async fn get_all_lists(&self, namespace_uuid: Uuid) -> Result<Vec<List>, diesel::result::Error>;
async fn get_list_by_id(&self, namespace_uuid: Uuid, list_id: Uuid) -> Result<List, diesel::result::Error>;
async fn update_list(&self, namespace_uuid: Uuid, list_id: Uuid, payload: UpdateListRequest) -> Result<List, diesel::result::Error>;
async fn delete_list(&self, namespace_uuid: Uuid, list_id: Uuid)->Result<List, diesel::result::Error>;

}

Expand All @@ -46,7 +46,7 @@ async fn create_list (
.get_result::<List>(&mut conn)
}

async fn get_all_lists(&self, namespaceId: Uuid) -> Result<Vec<List>, diesel::result::Error> {
async fn get_all_lists(&self, namespace_uuid: Uuid) -> Result<Vec<List>, diesel::result::Error> {
let mut conn = get_connection_pool().await;

lists
Expand All @@ -58,23 +58,24 @@ async fn get_all_lists(&self, namespaceId: Uuid) -> Result<Vec<List>, diesel::re
created_at,
updated_at
))
.filter(namespace_id.eq(namespaceId))
.filter(namespace_id.eq(namespace_uuid))
.order(updated_at.desc())
.load::<List>(&mut conn)
}

async fn get_list_by_id(&self, namespaceId: Uuid, list_id: Uuid) -> Result<List, diesel::result::Error> {
async fn get_list_by_id(&self, namespace_uuid: Uuid, list_id: Uuid) -> Result<List, diesel::result::Error> {
let mut conn = get_connection_pool().await;

// Ensure you're querying with both Uuids
lists
.filter(namespace_id.eq(namespaceId)) // Make sure this is referencing the correct variable
.filter(namespace_id.eq(namespace_uuid)) // Make sure this is referencing the correct variable
.filter(id.eq(list_id)) // Filter by list_id as well
.first(&mut conn)
}

async fn update_list (
&self,
namespaceId: Uuid,
namespace_uuid: Uuid,
list_id: Uuid,
payload: UpdateListRequest
) -> Result<List, diesel::result::Error> {
Expand All @@ -83,18 +84,19 @@ async fn update_list (

diesel::update(lists)
.filter(id.eq(list_id))
.filter(namespace_id.eq(namespaceId))
.filter(namespace_id.eq(namespace_uuid))
.set((
name.eq(payload.name),
description.eq(payload.description)
description.eq(payload.description),
updated_at.eq(diesel::dsl::now),
))
.get_result(&mut conn)
}

async fn delete_list(&self, namespaceId: Uuid, list_id: Uuid) -> Result<List, diesel::result::Error> {
async fn delete_list(&self, namespace_uuid: Uuid, list_id: Uuid) -> Result<List, diesel::result::Error> {
let mut conn = get_connection_pool().await;

diesel::delete(lists.filter(namespace_id.eq(namespaceId)))
diesel::delete(lists.filter(namespace_id.eq(namespace_uuid)))
.filter(id.eq(list_id))
.get_result(&mut conn)
}
Expand Down
81 changes: 75 additions & 6 deletions backend/src/repositories/mail_repository.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use crate::servers::servers_model::ServerTypeEnum;
use crate::{ appState::DbPooledConnection, GLOBAL_APP_STATE };
use crate::schema::mails::dsl::*;
use crate::schema::contacts::dsl as contacts_dsl;
use crate::schema::bounce_logs::dsl as bounce_logs_dsl;
use crate::schema::campaign_senders::dsl as campaign_senders_dsl;
use crate::schema::campaigns::dsl as campaigns_dsl;
use crate::schema::servers::{dsl as servers_dsl, server_type};
use diesel::prelude::*;
use chrono::{ Utc, DateTime };
use chrono::{ Utc, DateTime, Duration };
use crate::models::mail::{
Mail, MailWithDetails, NewMail, UpdateMailRequest
};
use uuid::Uuid;
use diesel::dsl::sql;
use diesel::sql_types::{ Nullable, Text };
use mockall::{ automock, predicate::* };
use async_trait::async_trait;

Expand All @@ -28,7 +34,10 @@ pub trait MailRepository {
async fn delete_mail(&self, mail_id: String) -> Result<Mail, diesel::result::Error>;
async fn increment_mail_clicks(&self, mail_id: String) -> Result<Mail, diesel::result::Error>;
async fn get_mails_by_contact(&self, c_id: Uuid) -> Result<Vec<MailWithDetails>, diesel::result::Error>;
async fn get_queued_mails(&self) -> Result<Vec<MailWithDetails>, diesel::result::Error>;
async fn get_mails_by_status(&self, mail_status: &str, is_ascending: bool) -> Result<Vec<MailWithDetails>, diesel::result::Error>;
async fn get_stale_submitted_mails(&self) -> Result<Vec<MailWithDetails>, diesel::result::Error>;
async fn update_mail_attempts(&self, mail_id: String) -> Result<Mail, diesel::result::Error>;
async fn udpate_mail_last_try_error(&self, mail_id: String, error: &str) -> Result<Mail, diesel::result::Error>;
}

pub struct MailRepositoryImpl;
Expand Down Expand Up @@ -65,6 +74,7 @@ impl MailRepository for MailRepositoryImpl {
last_error,
contacts_dsl::email,
bounce_logs_dsl::reason.nullable(),
sql::<Nullable<Text>>("NULL")
))
.into_boxed();

Expand Down Expand Up @@ -144,19 +154,58 @@ impl MailRepository for MailRepositoryImpl {
attempts,
last_error,
contacts_dsl::email,
bounce_logs_dsl::reason.nullable()
bounce_logs_dsl::reason.nullable(),
sql::<Nullable<Text>>("NULL")
))
.filter(contact_id.eq(c_id))
.order(sent_at.desc())
.load::<MailWithDetails>(&mut conn)
}

async fn get_queued_mails(&self) -> Result<Vec<MailWithDetails>, diesel::result::Error> {
async fn get_mails_by_status(&self, mail_status: &str, is_ascending: bool) -> Result<Vec<MailWithDetails>, diesel::result::Error> {
let mut conn = get_connection_pool().await;

let mut query_results = mails
.inner_join(contacts_dsl::contacts.on(contact_id.eq(contacts_dsl::id)))
.inner_join(campaigns_dsl::campaigns.on(campaign_id.eq(campaigns_dsl::id.nullable())))
.inner_join(campaign_senders_dsl::campaign_senders.on(campaigns_dsl::campaign_senders.eq(campaign_senders_dsl::id.nullable())))
.left_outer_join(bounce_logs_dsl::bounce_logs.on(id.eq(bounce_logs_dsl::mail_id)))
.select((
id,
mail_message,
template_id,
campaign_id,
server_id,
sent_at,
status,
open,
clicks,
scheduled_at,
attempts,
last_error,
contacts_dsl::email,
bounce_logs_dsl::reason.nullable(),
campaign_senders_dsl::from_name.nullable(),
))
.filter(status.eq(mail_status))
.into_boxed();

if is_ascending {
query_results = query_results.order(sent_at.asc());
} else {
query_results = query_results.order(sent_at.desc());
}
query_results.load::<MailWithDetails>(&mut conn)
}

async fn get_stale_submitted_mails(&self) -> Result<Vec<MailWithDetails>, diesel::result::Error> {
let mut conn = get_connection_pool().await;
let stale_duration_minutes = 30; // only send retry those mails that are older than 30 minutes...

mails
.inner_join(contacts_dsl::contacts.on(contact_id.eq(contacts_dsl::id)))
.left_outer_join(bounce_logs_dsl::bounce_logs.on(id.eq(bounce_logs_dsl::mail_id)))
.left_outer_join(servers_dsl::servers.on(server_id.eq(servers_dsl::id.nullable())))
.select((
id,
mail_message,
Expand All @@ -172,9 +221,29 @@ impl MailRepository for MailRepositoryImpl {
last_error,
contacts_dsl::email,
bounce_logs_dsl::reason.nullable(),
sql::<Nullable<Text>>("NULL")
))
.filter(status.eq("queued"))
.order(sent_at.asc()) // Order by sent_at ascending here...
.filter(status.eq("submitted"))
.filter(server_type.eq(ServerTypeEnum::AWS))
.filter(sent_at.lt(Utc::now().naive_utc() - Duration::minutes(stale_duration_minutes)))
.filter(attempts.le(3))
.order(sent_at.asc())
.load::<MailWithDetails>(&mut conn)
}

async fn update_mail_attempts(&self, mail_id: String) -> Result<Mail, diesel::result::Error> {
let mut conn = get_connection_pool().await;

diesel::update(mails.find(mail_id))
.set(attempts.eq(attempts + 1))
.get_result(&mut conn)
}

async fn udpate_mail_last_try_error(&self, mail_id: String, error: &str) -> Result<Mail, diesel::result::Error> {
let mut conn = get_connection_pool().await;

diesel::update(mails.find(mail_id))
.set(last_error.eq(error))
.get_result(&mut conn)
}
}
1 change: 1 addition & 0 deletions backend/src/repositories/template_repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl TemplateRepository for TemplateRespositoryImpl {
created_at,
updated_at,
)) // Select columns explicitly
.order(updated_at.desc())
.load::<Template>(&mut conn)
}

Expand Down
Loading