From 1bb72d14b6cba496253b913f87a22a33092bc6a9 Mon Sep 17 00:00:00 2001 From: Devin Date: Tue, 4 Feb 2025 21:13:15 +0530 Subject: [PATCH] adding email queue --- fastapi_mail/fastmail.py | 35 ++++++++---- fastapi_mail/queue.py | 107 ++++++++++++++++++++++++++++++++++ tests/test_queue.py | 120 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 251 insertions(+), 11 deletions(-) create mode 100644 fastapi_mail/queue.py create mode 100644 tests/test_queue.py diff --git a/fastapi_mail/fastmail.py b/fastapi_mail/fastmail.py index d368dd6..17ef92b 100755 --- a/fastapi_mail/fastmail.py +++ b/fastapi_mail/fastmail.py @@ -2,6 +2,7 @@ from email.message import EmailMessage, Message from email.utils import formataddr from typing import Any, Dict, Optional, Union +from datetime import datetime import blinker from jinja2 import Environment, Template @@ -12,6 +13,8 @@ from fastapi_mail.errors import PydanticClassRequired from fastapi_mail.msg import MailMsg from fastapi_mail.schemas import MessageSchema +from fastapi_mail.queue import EmailQueue +from fastapi_mail.signals import email_dispatched class _MailMixin: @@ -49,6 +52,13 @@ class FastMail(_MailMixin): def __init__(self, config: ConnectionConfig) -> None: self.config = config + self._queue: Optional[EmailQueue] = None + + @property + def queue(self) -> EmailQueue: + if self._queue is None: + self._queue = EmailQueue(self) + return self._queue async def get_mail_template( self, env_path: Environment, template_name: str @@ -92,25 +102,28 @@ async def __sender(self) -> Union[EmailStr, str]: return sender async def send_message( - self, message: MessageSchema, template_name: Optional[str] = None - ) -> None: + self, + message: MessageSchema, + template_name: Optional[str] = None, + queue: bool = False, + schedule_time: Optional[datetime] = None + ) -> Optional[str]: + if not queue: + await self._send_message(message, template_name) + return None + return await self.queue.add_to_queue(message, template_name, schedule_time) + + async def _send_message(self, message: MessageSchema, template_name: Optional[str] = None) -> None: if not isinstance(message, MessageSchema): - raise PydanticClassRequired( - "Message schema should be provided from MessageSchema class" - ) - + raise PydanticClassRequired("Message schema should be provided from MessageSchema class") if self.config.TEMPLATE_FOLDER and template_name: - template = await self.get_mail_template( - self.config.template_engine(), template_name - ) + template = await self.get_mail_template(self.config.template_engine(), template_name) msg = await self.__prepare_message(message, template) else: msg = await self.__prepare_message(message) - async with Connection(self.config) as session: if not self.config.SUPPRESS_SEND: await session.session.send_message(msg) - email_dispatched.send(msg) diff --git a/fastapi_mail/queue.py b/fastapi_mail/queue.py new file mode 100644 index 0000000..33e234f --- /dev/null +++ b/fastapi_mail/queue.py @@ -0,0 +1,107 @@ +from typing import Optional, Dict +from enum import Enum +import asyncio +from datetime import datetime +from uuid import uuid4 + +from .schemas import MessageSchema +from .fastmail import FastMail + +class EmailStatus(Enum): + QUEUED = "queued" + SENDING = "sending" + SENT = "sent" + FAILED = "failed" + +class QueuedEmail: + def __init__(self, message: MessageSchema, template_name: Optional[str] = None): + self.id = str(uuid4()) + self.message = message + self.template_name = template_name + self.status = EmailStatus.QUEUED + self.created_at = datetime.now() + self.updated_at = datetime.now() + self.retry_count = 0 + self.error = None + self.scheduled_time = None + +class EmailQueue: + def __init__(self, fastmail: FastMail, max_retries: int = 3): + self.fastmail = fastmail + self.max_retries = max_retries + self.queue: Dict[str, QueuedEmail] = {} + self._processing = False + self._task = None + + async def add_to_queue(self, message: MessageSchema, template_name: Optional[str] = None, schedule_time: Optional[datetime] = None) -> str: + queued_email = QueuedEmail(message, template_name) + if schedule_time: + queued_email.scheduled_time = schedule_time + self.queue[queued_email.id] = queued_email + return queued_email.id + + async def process_queue(self): + self._processing = True + while self._processing: + now = datetime.now() + for email in list(self.queue.values()): + if email.status == EmailStatus.QUEUED: + if email.scheduled_time and email.scheduled_time > now: + continue + try: + email.status = EmailStatus.SENDING + email.updated_at = datetime.now() + await self.fastmail._send_message(email.message, email.template_name) + email.status = EmailStatus.SENT + email.updated_at = datetime.now() + except Exception as e: + email.error = str(e) + email.retry_count += 1 + if email.retry_count >= self.max_retries: + email.status = EmailStatus.FAILED + else: + email.status = EmailStatus.QUEUED + email.updated_at = datetime.now() + await asyncio.sleep(1) + + def start_processing(self): + if not self._task: + self._task = asyncio.create_task(self.process_queue()) + + def stop_processing(self): + self._processing = False + if self._task: + self._task.cancel() + self._task = None + + def get_queue_status(self) -> Dict[str, int]: + status = { + "total": len(self.queue), + "queued": 0, + "sending": 0, + "sent": 0, + "failed": 0 + } + for email in self.queue.values(): + status[email.status.value] += 1 + return status + + def get_email_status(self, email_id: str) -> Optional[QueuedEmail]: + return self.queue.get(email_id) + + def cancel_email(self, email_id: str) -> bool: + if email_id in self.queue and self.queue[email_id].status == EmailStatus.QUEUED: + del self.queue[email_id] + return True + return False + + def retry_failed(self, email_id: str) -> bool: + if email_id in self.queue: + email = self.queue[email_id] + if email.status == EmailStatus.FAILED: + email.status = EmailStatus.QUEUED + email.retry_count = 0 + email.error = None + email.updated_at = datetime.now() + return True + return False \ No newline at end of file diff --git a/tests/test_queue.py b/tests/test_queue.py new file mode 100644 index 0000000..16fb1bc --- /dev/null +++ b/tests/test_queue.py @@ -0,0 +1,120 @@ +import pytest +from datetime import datetime, timedelta +from fastapi_mail import FastMail, ConnectionConfig, MessageSchema, MessageType +from fastapi_mail.queue import EmailQueue, EmailStatus +import asyncio + +@pytest.fixture +def queued_message(): + return MessageSchema( + subject="Test subject", + recipients=["test@example.com"], + body="Test email body", + subtype=MessageType.plain + ) + +@pytest.fixture +def queue_config(mail_config): + mail_config["SUPPRESS_SEND"] = 1 + return ConnectionConfig(**mail_config) + +@pytest.fixture +def email_queue(queue_config): + fastmail = FastMail(queue_config) + return EmailQueue(fastmail) + +@pytest.mark.asyncio +async def test_add_to_queue(email_queue, queued_message): + queue_id = await email_queue.add_to_queue(queued_message) + assert queue_id in email_queue.queue + assert email_queue.queue[queue_id].status.name == "QUEUED" + assert email_queue.queue[queue_id].message == queued_message + assert email_queue.queue[queue_id].retry_count == 0 + +@pytest.mark.asyncio +async def test_scheduled_email(email_queue, queued_message): + schedule_time = datetime.now() + timedelta(hours=1) + queue_id = await email_queue.add_to_queue(queued_message, schedule_time=schedule_time) + queued_email = email_queue.queue[queue_id] + assert queued_email.scheduled_time == schedule_time + assert queued_email.status.name == "QUEUED" + +@pytest.mark.asyncio +async def test_process_queue(email_queue, queued_message): + queue_id = await email_queue.add_to_queue(queued_message) + email_queue.start_processing() + await asyncio.sleep(2) + email_queue.stop_processing() + processed_email = email_queue.queue[queue_id] + assert processed_email.status.name == "SENT" + +@pytest.mark.asyncio +async def test_queue_status(email_queue, queued_message): + queue_id1 = await email_queue.add_to_queue(queued_message) + queue_id2 = await email_queue.add_to_queue(queued_message) + email_queue.queue[queue_id2].status = EmailStatus.FAILED + email_queue.queue[queue_id2].retry_count = 3 + status = email_queue.get_queue_status() + assert status["total"] == 2 + assert status["queued"] == 1 + assert status["failed"] == 1 + +@pytest.mark.asyncio +async def test_cancel_email(email_queue, queued_message): + queue_id = await email_queue.add_to_queue(queued_message) + success = email_queue.cancel_email(queue_id) + assert success is True + assert queue_id not in email_queue.queue + assert email_queue.cancel_email("non-existent") is False + +@pytest.mark.asyncio +async def test_retry_failed_email(email_queue, queued_message): + queue_id = await email_queue.add_to_queue(queued_message) + email_queue.queue[queue_id].status = EmailStatus.FAILED + email_queue.queue[queue_id].retry_count = 3 + email_queue.queue[queue_id].error = "Error occurred" + success = email_queue.retry_failed(queue_id) + assert success is True + retried_email = email_queue.queue[queue_id] + assert retried_email.status.name == "QUEUED" + assert retried_email.retry_count == 0 + assert retried_email.error is None + +@pytest.mark.asyncio +async def test_get_email_status(email_queue, queued_message): + queue_id = await email_queue.add_to_queue(queued_message) + email_status = email_queue.get_email_status(queue_id) + assert email_status is not None + assert email_status.status.name == "QUEUED" + assert email_queue.get_email_status("non-existent") is None + +@pytest.mark.asyncio +async def test_max_retries(email_queue, queued_message): + queue_id = await email_queue.add_to_queue(queued_message) + queued_email = email_queue.queue[queue_id] + for _ in range(email_queue.max_retries): + queued_email.status = EmailStatus.SENDING + queued_email.retry_count += 1 + queued_email.status = EmailStatus.QUEUED + queued_email.status = EmailStatus.SENDING + queued_email.retry_count += 1 + assert queued_email.retry_count > email_queue.max_retries + assert queued_email.status.name == "FAILED" + +@pytest.mark.asyncio +async def test_fastmail_queue_integration(queue_config, queued_message): + fastmail = FastMail(queue_config) + queue_id = await fastmail.send_message(queued_message, queue=True) + assert queue_id is not None + immediate_result = await fastmail.send_message(queued_message, queue=False) + assert immediate_result is None + +@pytest.mark.asyncio +async def test_scheduled_processing(email_queue, queued_message): + schedule_time = datetime.now() + timedelta(seconds=2) + queue_id = await email_queue.add_to_queue(queued_message, schedule_time=schedule_time) + email_queue.start_processing() + assert email_queue.queue[queue_id].status.name == "QUEUED" + await asyncio.sleep(3) + email_queue.stop_processing() + assert email_queue.queue[queue_id].status.name == "SENT" \ No newline at end of file