Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,19 @@ logs: ## display all services logs (follow mode)
@$(COMPOSE) logs -f
.PHONY: logs

logs-worker: ## display worker logs (follow mode)
@$(COMPOSE) logs worker-dev -f
.PHONY: logs-worker

logs-back: ## display backend logs (follow mode)
@$(COMPOSE) logs backend-dev -f
.PHONY: logs-back

logs-front: ## display frontend logs (follow mode)
@$(COMPOSE) logs frontend-dev -f
.PHONY: logs-front


start: ## start all development services
@$(COMPOSE) up --force-recreate --build -d frontend-dev backend-dev worker-dev mta-in --wait
.PHONY: start
Expand Down
2 changes: 1 addition & 1 deletion compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ services:
args:
DOCKER_USER: ${DOCKER_USER:-1000}
user: ${DOCKER_USER:-1000}
command: ["python", "worker.py", "--loglevel=DEBUG"]
command: ["python", "worker.py", "-v", "2"]
environment:
- DJANGO_CONFIGURATION=Development
env_file:
Expand Down
3 changes: 1 addition & 2 deletions src/backend/core/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from sentry_sdk import capture_exception

from core.api.utils import get_file_key
from core.api.viewsets.task import register_task_owner
from core.mda.outbound_tasks import retry_messages_task
from core.services.dns.provisioning import provision_domain_dns
from core.services.exporter.tasks import export_mailbox_task
Expand Down Expand Up @@ -327,7 +326,7 @@ def export_messages_view(self, request, object_id):
# Start the export task
try:
task = export_mailbox_task.delay(str(mailbox_obj.id), str(request.user.id))
register_task_owner(task.id, request.user.id)
task.track_owner(request.user.id)
except Exception: # pylint: disable=broad-exception-caught
logging.exception(
"Failed to queue export task for mailbox %s", mailbox_obj.id
Expand Down
3 changes: 1 addition & 2 deletions src/backend/core/api/viewsets/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from rest_framework.views import APIView

from core import models
from core.api.viewsets.task import register_task_owner
from core.mda.outbound import prepare_outbound_message
from core.mda.outbound_tasks import send_message_task

Expand Down Expand Up @@ -122,7 +121,7 @@ def post(self, request):

# Launch async task for sending the message
task = send_message_task.delay(str(message.id), must_archive=must_archive)
register_task_owner(task.id, request.user.id)
task.track_owner(request.user.id)

# --- Finalize ---
# Message state should be updated by prepare_outbound_message/send_message
Expand Down
115 changes: 69 additions & 46 deletions src/backend/core/api/viewsets/task.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
"""API ViewSet for Celery task status."""
"""API ViewSet for asynchronous task statuses."""

import logging

from django.core.cache import cache
import dramatiq
from dramatiq.results import ResultFailure, ResultMissing, Results

from celery import states as celery_states
from celery.result import AsyncResult
from drf_spectacular.utils import (
OpenApiExample,
extend_schema,
Expand All @@ -17,16 +16,12 @@
from rest_framework.response import Response
from rest_framework.views import APIView

from messages.celery_app import app as celery_app
from core.utils import get_task_progress, get_task_tracking

logger = logging.getLogger(__name__)

TASK_OWNER_CACHE_TTL = 86400 # 24 hours


def register_task_owner(task_id, user_id):
"""Register the owner of a task for permission checks."""
cache.set(f"task_owner:{task_id}", str(user_id), timeout=TASK_OWNER_CACHE_TTL)
TASK_STATES = ["PENDING", "SUCCESS", "FAILURE", "PROGRESS"]


@extend_schema(
Expand All @@ -44,11 +39,13 @@
200: inline_serializer(
name="TaskStatusResponse",
fields={
"status": drf_serializers.ChoiceField(
choices=sorted({*celery_states.ALL_STATES, "PROGRESS"})
),
"status": drf_serializers.ChoiceField(choices=sorted(TASK_STATES)),
"result": drf_serializers.JSONField(allow_null=True),
"error": drf_serializers.CharField(allow_null=True),
# Present when status == "PROGRESS"
"progress": drf_serializers.IntegerField(required=False),
"message": drf_serializers.CharField(required=False, allow_blank=True),
"timestamp": drf_serializers.FloatField(required=False),
},
)
},
Expand All @@ -69,43 +66,69 @@
],
)
class TaskDetailView(APIView):
"""View to retrieve the status of a Celery task."""
"""View to retrieve the status of a task."""

permission_classes = [permissions.IsAuthenticated]

def get(self, request, task_id):
"""Get the status of a Celery task."""
owner_id = cache.get(f"task_owner:{task_id}")
if owner_id is None:
"""Get the status of a task."""
tracking = get_task_tracking(task_id)
if tracking is None:
raise PermissionDenied("Task not found or access expired.")
if str(request.user.id) != owner_id:
if str(request.user.id) != tracking["owner"]:
raise PermissionDenied("You do not have access to this task.")

task_result = AsyncResult(task_id, app=celery_app)

# By default unknown tasks will be in PENDING. There is no reliable
# way to check if a task exists or not with Celery.
# https://github.com/celery/celery/issues/3596#issuecomment-262102185

# Prepare the response data
result_data = {
"status": task_result.status,
"result": None,
"error": None,
}

# If the result is a dict with status/result/error, unpack and propagate status
if isinstance(task_result.result, dict) and set(task_result.result.keys()) >= {
"status",
"result",
"error",
}:
result_data["status"] = task_result.result["status"]
result_data["result"] = task_result.result["result"]
result_data["error"] = task_result.result["error"]
else:
result_data["result"] = task_result.result
if task_result.state == "PROGRESS" and task_result.info:
result_data.update(task_result.info)

return Response(result_data)
# Try to fetch the result from dramatiq's native result backend
message = dramatiq.Message(
queue_name=tracking["queue_name"],
actor_name=tracking["actor_name"],
args=(),
kwargs={},
options={},
message_id=task_id,
)
try:
result_data = message.get_result(block=False)
except ResultMissing:
result_data = None
except ResultFailure as exc:
return Response({
"status": "FAILURE",
"result": None,
"error": str(exc),
})
Comment on lines +95 to +99

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.

Copilot Autofix

AI 3 months ago

In general, the fix is to avoid returning raw exception messages (or stack traces) directly to the client. Instead, log the detailed exception server-side and send the client a generic, non-sensitive error message or code while preserving the existing API contract where possible.

For this specific code, the best minimal fix is: in the except ResultFailure as exc: block, log the exception (including the stack trace) using the existing logger, and change the "error" field in the response to a generic message such as "Task failed" or "An internal error has occurred.". This keeps the response structure (status, result, error) and HTTP status code unchanged, so existing clients still receive "status": "FAILURE" and an error string, but without leaking internal exception details.

Concretely, in src/backend/core/api/viewsets/task.py, in the TaskDetailView.get method, update the except ResultFailure as exc: clause starting at line 94. Add a logger.exception(...) call before returning the response, and change "error": str(exc) to a generic error string. No new imports are needed because logging and logger are already defined at the top of the file.

Suggested changeset 1
src/backend/core/api/viewsets/task.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/src/backend/core/api/viewsets/task.py b/src/backend/core/api/viewsets/task.py
--- a/src/backend/core/api/viewsets/task.py
+++ b/src/backend/core/api/viewsets/task.py
@@ -92,10 +92,11 @@
         except ResultMissing:
             result_data = None
         except ResultFailure as exc:
+            logger.exception("Task %s failed when fetching result.", task_id)
             return Response({
                 "status": "FAILURE",
                 "result": None,
-                "error": str(exc),
+                "error": "Task failed.",
             })
 
         if result_data is not None:
EOF
@@ -92,10 +92,11 @@
except ResultMissing:
result_data = None
except ResultFailure as exc:
logger.exception("Task %s failed when fetching result.", task_id)
return Response({
"status": "FAILURE",
"result": None,
"error": str(exc),
"error": "Task failed.",
})

if result_data is not None:
Copilot is powered by AI and may make mistakes. Always verify output.
Comment on lines +90 to +99
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Find the project's dependency file to check Dramatiq version
fd -e "requirements.txt" -e "pyproject.toml" -e "poetry.lock" -e "Pipfile" | head -20

Repository: suitenumerique/messages

Length of output: 49


🏁 Script executed:

# Check the actual code at the specified location
cat -n src/backend/core/api/viewsets/task.py | sed -n '85,105p'

Repository: suitenumerique/messages

Length of output: 910


🏁 Script executed:

# Search for any existing Sentry imports in the backend
rg "from sentry_sdk import|import sentry_sdk" src/backend/ -A 2

Repository: suitenumerique/messages

Length of output: 1588


🏁 Script executed:

# Check if there's a logger already configured in this file
rg "import logging|logger = " src/backend/core/api/viewsets/task.py -A 2

Repository: suitenumerique/messages

Length of output: 140


🏁 Script executed:

# Find dependency files
find . -name "requirements*.txt" -o -name "pyproject.toml" -o -name "setup.py" | grep -v __pycache__ | head -10

Repository: suitenumerique/messages

Length of output: 226


🏁 Script executed:

# Check git history or setup files for Dramatiq version
rg "dramatiq" . --max-count=5 -A 2 | head -30

Repository: suitenumerique/messages

Length of output: 2047


🌐 Web query:

Dramatiq 2.0.1 ResultFailure exception __str__ what does it return

💡 Result:

In Dramatiq 2.0.1, ResultFailure does not override __str__; it inherits it from dramatiq.errors.DramatiqError. That __str__ returns:

  • str(self.message) if it’s truthy, otherwise
  • repr(self.message) (fallback for falsy / empty messages). [1]

ResultFailure.__init__ just stores the message plus orig_exc_type / orig_exc_msg; those extra fields do not affect __str__ directly. [2]

So str(e) where e is a ResultFailure is essentially whatever message Dramatiq constructed when raising it (commonly shown as "actor raised <ExceptionType>: <ExceptionMessage>"). [3]

Sources:
[1] DramatiqError __str__ implementation https://dramatiq.io/_modules/dramatiq/errors.html
[2] ResultFailure definition https://dramatiq.io/_modules/dramatiq/results/errors.html
[3] Docs example showing the message format https://dramatiq.io/reference.html


🏁 Script executed:

# Check for similar exception handling patterns in the codebase
rg "ResultFailure|ResultMissing" src/backend/ -B 3 -A 5

Repository: suitenumerique/messages

Length of output: 1355


🏁 Script executed:

# Look for other places where exceptions are returned to clients in viewsets
rg "return Response.*str\(exc\)" src/backend/ -B 2 -A 2

Repository: suitenumerique/messages

Length of output: 49


🏁 Script executed:

# Check the imports in the task viewset file
cat -n src/backend/core/api/viewsets/task.py | head -30

Repository: suitenumerique/messages

Length of output: 991


Capture exceptions internally and return a generic error message to clients.

str(exc) exposes the Dramatiq task failure message (formatted as "actor raised <ExceptionType>: <ExceptionMessage>") which can leak internal exception details to clients. Log and capture the exception, then return a generic error response.

Add the import and update the exception handler:

Fix
import dramatiq
+from sentry_sdk import capture_exception
        except ResultFailure as exc:
+            logger.exception("Task %s failed", task_id)
+            capture_exception(exc)
             return Response({
                 "status": "FAILURE",
                 "result": None,
-                "error": str(exc),
+                "error": "Task execution failed.",
             })
🧰 Tools
🪛 GitHub Check: CodeQL

[warning] 95-99: Information exposure through an exception
Stack trace information flows to this location and may be exposed to an external user.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/api/viewsets/task.py` around lines 90 - 99, The
ResultFailure handler currently returns str(exc) which can leak internal
exception details; update the exception handling around message.get_result (the
except ResultFailure as exc block) to log the full exception internally (e.g.,
logger.exception or process_logger.exception after importing the logger) and
return a generic client-safe Response payload (e.g., {"status": "FAILURE",
"result": None, "error": "Task failed"}). Add the necessary logger import and
replace the returned error string with the generic message while ensuring the
original exception is captured in the internal log call.


if result_data is not None:
response = {"status": "SUCCESS", "result": result_data, "error": None}
# If the result follows the {status, result, error} convention, unpack it
if (
isinstance(result_data, dict)
and {"status", "result", "error"} <= result_data.keys()
):
response["status"] = result_data["status"]
response["result"] = result_data["result"]
response["error"] = result_data["error"]
return Response(response)

# Check if we have progress data for this task
progress_data = get_task_progress(task_id)
if progress_data:
return Response(
{
"status": "PROGRESS",
"result": None,
"error": None,
"progress": progress_data.get("progress"),
"message": progress_data.get("metadata", {}).get("message"),
"timestamp": progress_data.get("timestamp"),
}
Comment on lines +113 to +124
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Expose progress metadata in PROGRESS responses.

set_task_progress stores metadata (e.g., mbox tasks put counts under metadata["result"]), but the API currently drops it and returns message=None. This hides meaningful progress details from clients.

🔧 Proposed fix
         progress_data = get_task_progress(task_id)
         if progress_data:
+            metadata = progress_data.get("metadata", {}) or {}
             return Response(
                 {
                     "status": "PROGRESS",
-                    "result": None,
-                    "error": None,
+                    "result": metadata or None,
+                    "error": metadata.get("error"),
                     "progress": progress_data.get("progress"),
-                    "message": progress_data.get("metadata", {}).get("message"),
+                    "message": metadata.get("message")
+                    or metadata.get("result", {}).get("message_status"),
                     "timestamp": progress_data.get("timestamp"),
                 }
             )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/api/viewsets/task.py` around lines 113 - 124, The PROGRESS
response drops useful metadata set by set_task_progress; update the block that
uses get_task_progress (in the viewset handling task status) to include the
metadata payload in the Response: return the metadata object (e.g.,
progress_data.get("metadata")) and surface any nested fields like
metadata.get("result") and metadata.get("message") instead of returning
message=None; keep existing keys (status, result, error, progress, timestamp)
but populate result/message from progress_data["metadata"] when present so
clients receive the stored progress details.

)

# Default to pending when no result and no progress
return Response(
{
"status": "PENDING",
"result": None,
"error": None,
}
)
2 changes: 1 addition & 1 deletion src/backend/core/management/commands/run_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def handle(self, *args, **options):

try:
# Execute task synchronously
result = task_func.apply(args=task_args, kwargs=kwargs)
result = task_func(*task_args, **kwargs)

# Output result
if options["json"]:
Expand Down
20 changes: 10 additions & 10 deletions src/backend/core/mda/inbound_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@
from django.core.cache import cache
from django.utils import timezone

import requests
from celery.utils.log import get_task_logger
import logging

from core import models
from core.mda.inbound_create import _create_message_from_inbound
from core.mda.rfc5322 import parse_email_message
import requests
from core.utils import cron_task, register_task

from messages.celery_app import app as celery_app

logger = get_task_logger(__name__)
logger = logging.getLogger(__name__)


def _check_spam_with_hardcoded_rules(
Expand Down Expand Up @@ -167,8 +166,8 @@ def _check_spam_with_rspamd(
return False, str(e)


@celery_app.task(bind=True)
def process_inbound_message_task(self, inbound_message_id: str):
@register_task(queue="inbound")
def process_inbound_message_task(inbound_message_id: str):
"""Process an inbound message from the queue: check spam and create message.

Args:
Expand Down Expand Up @@ -285,12 +284,13 @@ def process_inbound_message_task(self, inbound_message_id: str):
cache.delete(lock_key)


@celery_app.task(bind=True)
def process_inbound_messages_queue_task(self, batch_size: int = 10):
@cron_task(interval=300)
@register_task(queue="inbound")
def process_inbound_messages_queue_task(batch_size: int = 10):
"""Retry processing of inbound messages that are older than 5 minutes.

This task only handles retries for messages that may have failed or gotten stuck.
Regular messages are processed immediately when created via process_inbound_message_task.delay().
Regular messages are processed immediately when created via process_inbound_message_task.send().

Args:
batch_size: Number of messages to process in this batch
Expand Down
2 changes: 1 addition & 1 deletion src/backend/core/mda/outbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ def _validate_attachments_size(total_size: int) -> None:
def send_message(message: models.Message, force_mta_out: bool = False):
"""Send an existing Message, internally or externally.

This part is called asynchronously from the celery worker.
This part is called asynchronously from the background worker.
"""

# Refuse to send messages that are draft or not senders
Expand Down
Loading
Loading