Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions fastapi_mail/fastmail.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from email.message import EmailMessage, Message
from email.utils import formataddr
from typing import Any, Dict, Optional, Union
from datetime import datetime

import blinker
from jinja2 import Environment, Template
Expand All @@ -12,6 +13,8 @@
from fastapi_mail.errors import PydanticClassRequired
from fastapi_mail.msg import MailMsg
from fastapi_mail.schemas import MessageSchema
from fastapi_mail.queue import EmailQueue
from fastapi_mail.signals import email_dispatched


class _MailMixin:
Expand Down Expand Up @@ -49,6 +52,13 @@ class FastMail(_MailMixin):

def __init__(self, config: ConnectionConfig) -> None:
self.config = config
self._queue: Optional[EmailQueue] = None

@property
def queue(self) -> EmailQueue:
if self._queue is None:
self._queue = EmailQueue(self)
return self._queue

async def get_mail_template(
self, env_path: Environment, template_name: str
Expand Down Expand Up @@ -92,25 +102,28 @@ async def __sender(self) -> Union[EmailStr, str]:
return sender

async def send_message(
self, message: MessageSchema, template_name: Optional[str] = None
) -> None:
self,
message: MessageSchema,
template_name: Optional[str] = None,
queue: bool = False,
schedule_time: Optional[datetime] = None
) -> Optional[str]:
if not queue:
await self._send_message(message, template_name)
return None
return await self.queue.add_to_queue(message, template_name, schedule_time)

async def _send_message(self, message: MessageSchema, template_name: Optional[str] = None) -> None:
if not isinstance(message, MessageSchema):
raise PydanticClassRequired(
"Message schema should be provided from MessageSchema class"
)

raise PydanticClassRequired("Message schema should be provided from MessageSchema class")
if self.config.TEMPLATE_FOLDER and template_name:
template = await self.get_mail_template(
self.config.template_engine(), template_name
)
template = await self.get_mail_template(self.config.template_engine(), template_name)
msg = await self.__prepare_message(message, template)
else:
msg = await self.__prepare_message(message)

async with Connection(self.config) as session:
if not self.config.SUPPRESS_SEND:
await session.session.send_message(msg)

email_dispatched.send(msg)


Expand Down
107 changes: 107 additions & 0 deletions fastapi_mail/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from typing import Optional, Dict
from enum import Enum
import asyncio
from datetime import datetime
from uuid import uuid4

from .schemas import MessageSchema
from .fastmail import FastMail

class EmailStatus(Enum):
QUEUED = "queued"
SENDING = "sending"
SENT = "sent"
FAILED = "failed"

class QueuedEmail:
def __init__(self, message: MessageSchema, template_name: Optional[str] = None):
self.id = str(uuid4())
self.message = message
self.template_name = template_name
self.status = EmailStatus.QUEUED
self.created_at = datetime.now()
self.updated_at = datetime.now()
self.retry_count = 0
self.error = None
self.scheduled_time = None

class EmailQueue:
def __init__(self, fastmail: FastMail, max_retries: int = 3):
self.fastmail = fastmail
self.max_retries = max_retries
self.queue: Dict[str, QueuedEmail] = {}
self._processing = False
self._task = None

async def add_to_queue(self, message: MessageSchema, template_name: Optional[str] = None, schedule_time: Optional[datetime] = None) -> str:
queued_email = QueuedEmail(message, template_name)
if schedule_time:
queued_email.scheduled_time = schedule_time
self.queue[queued_email.id] = queued_email
return queued_email.id

async def process_queue(self):
self._processing = True
while self._processing:
now = datetime.now()
for email in list(self.queue.values()):
if email.status == EmailStatus.QUEUED:
if email.scheduled_time and email.scheduled_time > now:
continue
try:
email.status = EmailStatus.SENDING
email.updated_at = datetime.now()
await self.fastmail._send_message(email.message, email.template_name)
email.status = EmailStatus.SENT
email.updated_at = datetime.now()
except Exception as e:
email.error = str(e)
email.retry_count += 1
if email.retry_count >= self.max_retries:
email.status = EmailStatus.FAILED
else:
email.status = EmailStatus.QUEUED
email.updated_at = datetime.now()
await asyncio.sleep(1)

def start_processing(self):
if not self._task:
self._task = asyncio.create_task(self.process_queue())

def stop_processing(self):
self._processing = False
if self._task:
self._task.cancel()
self._task = None

def get_queue_status(self) -> Dict[str, int]:
status = {
"total": len(self.queue),
"queued": 0,
"sending": 0,
"sent": 0,
"failed": 0
}
for email in self.queue.values():
status[email.status.value] += 1
return status

def get_email_status(self, email_id: str) -> Optional[QueuedEmail]:
return self.queue.get(email_id)

def cancel_email(self, email_id: str) -> bool:
if email_id in self.queue and self.queue[email_id].status == EmailStatus.QUEUED:
del self.queue[email_id]
return True
return False

def retry_failed(self, email_id: str) -> bool:
if email_id in self.queue:
email = self.queue[email_id]
if email.status == EmailStatus.FAILED:
email.status = EmailStatus.QUEUED
email.retry_count = 0
email.error = None
email.updated_at = datetime.now()
return True
return False
120 changes: 120 additions & 0 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import pytest
from datetime import datetime, timedelta
from fastapi_mail import FastMail, ConnectionConfig, MessageSchema, MessageType
from fastapi_mail.queue import EmailQueue, EmailStatus
import asyncio

@pytest.fixture
def queued_message():
return MessageSchema(
subject="Test subject",
recipients=["test@example.com"],
body="Test email body",
subtype=MessageType.plain
)

@pytest.fixture
def queue_config(mail_config):
mail_config["SUPPRESS_SEND"] = 1
return ConnectionConfig(**mail_config)

@pytest.fixture
def email_queue(queue_config):
fastmail = FastMail(queue_config)
return EmailQueue(fastmail)

@pytest.mark.asyncio
async def test_add_to_queue(email_queue, queued_message):
queue_id = await email_queue.add_to_queue(queued_message)
assert queue_id in email_queue.queue
assert email_queue.queue[queue_id].status.name == "QUEUED"
assert email_queue.queue[queue_id].message == queued_message
assert email_queue.queue[queue_id].retry_count == 0

@pytest.mark.asyncio
async def test_scheduled_email(email_queue, queued_message):
schedule_time = datetime.now() + timedelta(hours=1)
queue_id = await email_queue.add_to_queue(queued_message, schedule_time=schedule_time)
queued_email = email_queue.queue[queue_id]
assert queued_email.scheduled_time == schedule_time
assert queued_email.status.name == "QUEUED"

@pytest.mark.asyncio
async def test_process_queue(email_queue, queued_message):
queue_id = await email_queue.add_to_queue(queued_message)
email_queue.start_processing()
await asyncio.sleep(2)
email_queue.stop_processing()
processed_email = email_queue.queue[queue_id]
assert processed_email.status.name == "SENT"

@pytest.mark.asyncio
async def test_queue_status(email_queue, queued_message):
queue_id1 = await email_queue.add_to_queue(queued_message)
queue_id2 = await email_queue.add_to_queue(queued_message)
email_queue.queue[queue_id2].status = EmailStatus.FAILED
email_queue.queue[queue_id2].retry_count = 3
status = email_queue.get_queue_status()
assert status["total"] == 2
assert status["queued"] == 1
assert status["failed"] == 1

@pytest.mark.asyncio
async def test_cancel_email(email_queue, queued_message):
queue_id = await email_queue.add_to_queue(queued_message)
success = email_queue.cancel_email(queue_id)
assert success is True
assert queue_id not in email_queue.queue
assert email_queue.cancel_email("non-existent") is False

@pytest.mark.asyncio
async def test_retry_failed_email(email_queue, queued_message):
queue_id = await email_queue.add_to_queue(queued_message)
email_queue.queue[queue_id].status = EmailStatus.FAILED
email_queue.queue[queue_id].retry_count = 3
email_queue.queue[queue_id].error = "Error occurred"
success = email_queue.retry_failed(queue_id)
assert success is True
retried_email = email_queue.queue[queue_id]
assert retried_email.status.name == "QUEUED"
assert retried_email.retry_count == 0
assert retried_email.error is None

@pytest.mark.asyncio
async def test_get_email_status(email_queue, queued_message):
queue_id = await email_queue.add_to_queue(queued_message)
email_status = email_queue.get_email_status(queue_id)
assert email_status is not None
assert email_status.status.name == "QUEUED"
assert email_queue.get_email_status("non-existent") is None

@pytest.mark.asyncio
async def test_max_retries(email_queue, queued_message):
queue_id = await email_queue.add_to_queue(queued_message)
queued_email = email_queue.queue[queue_id]
for _ in range(email_queue.max_retries):
queued_email.status = EmailStatus.SENDING
queued_email.retry_count += 1
queued_email.status = EmailStatus.QUEUED
queued_email.status = EmailStatus.SENDING
queued_email.retry_count += 1
assert queued_email.retry_count > email_queue.max_retries
assert queued_email.status.name == "FAILED"

@pytest.mark.asyncio
async def test_fastmail_queue_integration(queue_config, queued_message):
fastmail = FastMail(queue_config)
queue_id = await fastmail.send_message(queued_message, queue=True)
assert queue_id is not None
immediate_result = await fastmail.send_message(queued_message, queue=False)
assert immediate_result is None

@pytest.mark.asyncio
async def test_scheduled_processing(email_queue, queued_message):
schedule_time = datetime.now() + timedelta(seconds=2)
queue_id = await email_queue.add_to_queue(queued_message, schedule_time=schedule_time)
email_queue.start_processing()
assert email_queue.queue[queue_id].status.name == "QUEUED"
await asyncio.sleep(3)
email_queue.stop_processing()
assert email_queue.queue[queue_id].status.name == "SENT"