Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
297 changes: 297 additions & 0 deletions src/backend/core/api/viewsets/inbound/brevo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
"""Brevo inbound channel implementation for handling email from Brevo parse webhooks."""

import hashlib
import logging
import secrets
from email.utils import parsedate_to_datetime
from typing import Any, Dict, List, Optional

from django.conf import settings
from django.core.exceptions import ValidationError
from django.core.validators import validate_email
from django.utils import timezone

from drf_spectacular.utils import extend_schema
from rest_framework import status, viewsets
from rest_framework.authentication import BaseAuthentication
from rest_framework.decorators import action
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response

from core import models
from core.mda.inbound import check_local_recipients, deliver_inbound_message
from core.mda.rfc5322 import compose_email

logger = logging.getLogger(__name__)


class BrevoAuthentication(BaseAuthentication):
"""
Custom authentication for Brevo webhook endpoints.

Supports two authentication methods:
1. Channel ID authentication (X-Channel-ID header) - for channel-specific webhooks
2. HMAC signature validation (X-Brevo-Signature header) - for global webhooks

Returns None or (user, auth)
"""

def authenticate(self, request):
channel_id = request.headers.get("X-Channel-ID")
if channel_id:
return self._authenticate_by_channel_id(channel_id)

signature = request.headers.get("X-Brevo-Signature")
if signature:
return self._authenticate_by_signature(request, signature)

raise AuthenticationFailed("Missing authentication credentials")

def _authenticate_by_channel_id(self, channel_id: str):
try:
channel = models.Channel.objects.get(id=channel_id, type="brevo")
except (models.Channel.DoesNotExist, ValidationError) as e:
raise AuthenticationFailed("Invalid channel_id") from e
return (None, {"channel": channel, "auth_type": "channel_id"})

def _authenticate_by_signature(self, request, signature: str):
secret = getattr(settings, "BREVO_WEBHOOK_SECRET", None)
if not secret:
raise AuthenticationFailed("Brevo webhook secret not configured")

body = request.body
expected_signature = hashlib.sha256(secret.encode() + body).hexdigest()
if not secrets.compare_digest(signature, expected_signature):
raise AuthenticationFailed("Invalid signature")

return (None, {"auth_type": "hmac"})

def authenticate_header(self, request):
return 'Bearer realm="Brevo"'


def convert_brevo_payload_to_parsed_email(item: Dict[str, Any]) -> Dict[str, Any]:
"""Convert Brevo parsed email payload to internal email format."""
from_email = item.get("From", {})
to_emails = item.get("To", [])
cc_emails = item.get("Cc", [])
reply_to = item.get("ReplyTo")

subject = item.get("Subject", "(no subject)")
extracted_message = item.get("ExtractedMarkdownMessage", "")
raw_html_body = item.get("RawHtmlBody")
raw_text_body = item.get("RawTextBody")
in_reply_to = item.get("InReplyTo")
sent_at = item.get("SentAtDate")

html_body = ""
if raw_html_body:
html_body = raw_html_body
elif extracted_message:
html_body = extracted_message.replace("\n", "<br/>")

text_body = raw_text_body or extracted_message or ""

parsed_email: Dict[str, Any] = {
"subject": subject,
"from": {
"email": from_email.get("Address", ""),
"name": from_email.get("Name"),
},
"to": [
{
"email": to.get("Address", ""),
"name": to.get("Name"),
}
for to in to_emails
],
"cc": [
{
"email": cc.get("Address", ""),
"name": cc.get("Name"),
}
for cc in cc_emails
],
"headers": {},
}

if reply_to:
parsed_email["reply_to"] = {
"email": reply_to.get("Address", ""),
"name": reply_to.get("Name"),
}

if in_reply_to:
parsed_email["in_reply_to"] = in_reply_to
parsed_email["references"] = in_reply_to

if sent_at:
try:
parsed_email["date"] = parsedate_to_datetime(sent_at)
except (ValueError, TypeError):
parsed_email["date"] = timezone.now()
else:
parsed_email["date"] = timezone.now()

if html_body:
parsed_email["htmlBody"] = [{"content": html_body}]
if text_body:
parsed_email["textBody"] = [{"content": text_body}]

return parsed_email


class InboundBrevoViewSet(viewsets.GenericViewSet):
"""Handles incoming email messages from Brevo inbound parse webhooks."""

CHANNEL_TYPE = "brevo"
CHANNEL_DESCRIPTION = "Brevo inbound email parsing"

permission_classes = [IsAuthenticated]
authentication_classes = [BrevoAuthentication]
Comment on lines +145 to +152
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

IsAuthenticated permission incompatible with user=None from BrevoAuthentication.

BrevoAuthentication.authenticate() returns (None, auth_data) with user=None. DRF's IsAuthenticated permission checks request.user.is_authenticated, which will fail when user is None, causing all requests to return 403 Forbidden. This explains the pipeline failures where tests receive 403 instead of expected status codes.

Either return an anonymous user object that passes is_authenticated, or use AllowAny permissions with manual auth validation in the viewset.

🐛 Proposed fix using AllowAny with manual validation
-from rest_framework.permissions import IsAuthenticated
+from rest_framework.permissions import AllowAny

 class InboundBrevoViewSet(viewsets.GenericViewSet):
     """Handles incoming email messages from Brevo inbound parse webhooks."""

     CHANNEL_TYPE = "brevo"
     CHANNEL_DESCRIPTION = "Brevo inbound email parsing"

-    permission_classes = [IsAuthenticated]
+    permission_classes = [AllowAny]
     authentication_classes = [BrevoAuthentication]

Alternatively, create an anonymous user wrapper:

🔧 Alternative: Return authenticated anonymous user
+from django.contrib.auth.models import AnonymousUser
+
+class BrevoWebhookUser(AnonymousUser):
+    """Anonymous user that passes is_authenticated for webhook auth."""
+    `@property`
+    def is_authenticated(self):
+        return True

 class BrevoAuthentication(BaseAuthentication):
     # ...
     def _authenticate_by_channel_id(self, channel_id: str):
         try:
             channel = models.Channel.objects.get(id=channel_id, type="brevo")
         except (models.Channel.DoesNotExist, ValidationError) as e:
             raise AuthenticationFailed("Invalid channel_id") from e
-        return (None, {"channel": channel, "auth_type": "channel_id"})
+        return (BrevoWebhookUser(), {"channel": channel, "auth_type": "channel_id"})

     def _authenticate_by_signature(self, request, signature: str):
         # ...
-        return (None, {"auth_type": "hmac"})
+        return (BrevoWebhookUser(), {"auth_type": "hmac"})
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/api/viewsets/inbound/brevo.py` around lines 145 - 152,
InboundBrevoViewSet is using IsAuthenticated while
BrevoAuthentication.authenticate() returns (None, auth_data), causing
request.user to be None and IsAuthenticated to reject requests; change
permission_classes to [AllowAny] and enforce BrevoAuthentication manually in the
viewset (e.g., override initial or a before-action method on InboundBrevoViewSet
to call BrevoAuthentication().authenticate(request) and raise PermissionDenied
when auth fails), so authentication logic lives in
BrevoAuthentication.authenticate and permission gating is handled explicitly in
the viewset.


@extend_schema(exclude=True)
@action(
detail=False,
methods=["post"],
url_path="webhook",
url_name="inbound-brevo-webhook",
)
def webhook(self, request):
"""Handle incoming Brevo webhook with parsed email(s)."""

data = request.data
items = data.get("items", [])

if not items:
return Response(
{"detail": "No items in payload"},
status=status.HTTP_400_BAD_REQUEST,
)

if not isinstance(items, list):
return Response(
{"detail": "Items must be a list"},
status=status.HTTP_400_BAD_REQUEST,
)

auth_data = request.auth
channel = auth_data.get("channel")

results = []
success_count = 0
failure_count = 0

for item in items:
result = self._process_brevo_item(item, channel)
if result["success"]:
success_count += 1
else:
failure_count += 1
results.append(result)

if failure_count > 0 and success_count == 0:
return Response(
{
"status": "error",
"detail": "Failed to process all messages",
"results": results,
},
Comment on lines +196 to +200

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.

Copilot Autofix

AI 2 months ago

In general, the fix is to avoid returning raw exception messages to the client while still logging full details on the server. The server should send a generic error description (for example, "Internal processing error") and keep logger.exception(...) so that developers can diagnose issues from logs.

For this specific code, the best fix with minimal behavior change is:

  • Keep the logger.exception("Error processing Brevo item: %s", e) call as-is so stack traces are logged server-side.
  • Change the returned dict in the except block of _process_brevo_item to use a generic message instead of str(e). For example:
    • return {"success": False, "error": "Failed to process Brevo item"}.
  • No changes are needed in the webhook method; it will still aggregate results, but those results will now contain only non-sensitive error strings.
  • No new imports or helper methods are strictly necessary.

All changes are confined to src/backend/core/api/viewsets/inbound/brevo.py, within the shown _process_brevo_item method, specifically lines 274–276.

Suggested changeset 1
src/backend/core/api/viewsets/inbound/brevo.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/src/backend/core/api/viewsets/inbound/brevo.py b/src/backend/core/api/viewsets/inbound/brevo.py
--- a/src/backend/core/api/viewsets/inbound/brevo.py
+++ b/src/backend/core/api/viewsets/inbound/brevo.py
@@ -273,7 +273,7 @@
 
         except Exception as e:
             logger.exception("Error processing Brevo item: %s", e)
-            return {"success": False, "error": str(e)}
+            return {"success": False, "error": "Failed to process Brevo item"}
 
     @extend_schema(exclude=True)
     @action(
EOF
@@ -273,7 +273,7 @@

except Exception as e:
logger.exception("Error processing Brevo item: %s", e)
return {"success": False, "error": str(e)}
return {"success": False, "error": "Failed to process Brevo item"}

@extend_schema(exclude=True)
@action(
Copilot is powered by AI and may make mistakes. Always verify output.
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)

if failure_count > 0:
return Response(
{
"status": "partial_success",
"processed": success_count,
"failed": failure_count,
"results": results,
},
Comment on lines +206 to +211

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.

Copilot Autofix

AI 2 months ago

In general, to fix information exposure through exceptions, stop returning raw exception objects or messages in API responses. Instead, log detailed errors on the server and send a generic, user‑safe message (optionally with a non‑sensitive error code) in the response.

Here, the best fix is to change _process_brevo_item so that the except Exception as e block no longer includes str(e) in the returned dictionary. We already call logger.exception("Error processing Brevo item: %s", e), which captures the stack trace and message server‑side. The webhook response should instead return a generic error description, e.g. "Internal error processing message" and perhaps a stable error code like "internal_error". This keeps the public API behavior (indicating which items failed and that they failed) while avoiding leaking implementation details.

Concretely, in src/backend/core/api/viewsets/inbound/brevo.py, around lines 274–276, replace:

        except Exception as e:
            logger.exception("Error processing Brevo item: %s", e)
            return {"success": False, "error": str(e)}

with something like:

        except Exception as e:
            logger.exception("Error processing Brevo item: %s", e)
            return {
                "success": False,
                "error": "Internal error processing message",
                "error_code": "internal_error",
            }

No new imports or helper methods are needed.

Suggested changeset 1
src/backend/core/api/viewsets/inbound/brevo.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/src/backend/core/api/viewsets/inbound/brevo.py b/src/backend/core/api/viewsets/inbound/brevo.py
--- a/src/backend/core/api/viewsets/inbound/brevo.py
+++ b/src/backend/core/api/viewsets/inbound/brevo.py
@@ -273,7 +273,11 @@
 
         except Exception as e:
             logger.exception("Error processing Brevo item: %s", e)
-            return {"success": False, "error": str(e)}
+            return {
+                "success": False,
+                "error": "Internal error processing message",
+                "error_code": "internal_error",
+            }
 
     @extend_schema(exclude=True)
     @action(
EOF
@@ -273,7 +273,11 @@

except Exception as e:
logger.exception("Error processing Brevo item: %s", e)
return {"success": False, "error": str(e)}
return {
"success": False,
"error": "Internal error processing message",
"error_code": "internal_error",
}

@extend_schema(exclude=True)
@action(
Copilot is powered by AI and may make mistakes. Always verify output.
status=status.HTTP_207_MULTI_STATUS,
)

logger.info(
"Successfully processed %d Brevo inbound messages",
success_count,
)
return Response(
{
"status": "ok",
"processed": success_count,
"results": results,
}
Comment on lines +220 to +224

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.

Copilot Autofix

AI 2 months ago

In general, to fix information exposure through exceptions, you should avoid including raw exception messages or stack traces in HTTP responses. Instead, log the full details on the server (for operators and developers) and return a generic, non-sensitive error description to the client, optionally with a stable error code.

In this file, the minimal, non‑breaking fix is to change _process_brevo_item so that it no longer returns str(e) as the error value. We should keep the logger.exception(...) call so the full stack trace and exception details are available in logs, but the dictionary returned to the caller should contain only a generic error message, such as "Internal error processing message" and possibly a stable error code like "processing_error". The rest of the logic in webhook that aggregates results and counts successes/failures can remain unchanged; it will still indicate which items failed without leaking internal details.

Concretely:

  • Edit src/backend/core/api/viewsets/inbound/brevo.py in _process_brevo_item, lines 274–276.
  • Keep the logger.exception("Error processing Brevo item: %s", e) line.
  • Replace return {"success": False, "error": str(e)} with a generic structure like:
    • {"success": False, "error": "Internal error processing message", "error_code": "processing_error"} (or simply a generic error string if you prefer).
      No new imports or helper methods are required; we just alter the returned payload.

Suggested changeset 1
src/backend/core/api/viewsets/inbound/brevo.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/src/backend/core/api/viewsets/inbound/brevo.py b/src/backend/core/api/viewsets/inbound/brevo.py
--- a/src/backend/core/api/viewsets/inbound/brevo.py
+++ b/src/backend/core/api/viewsets/inbound/brevo.py
@@ -273,7 +273,11 @@
 
         except Exception as e:
             logger.exception("Error processing Brevo item: %s", e)
-            return {"success": False, "error": str(e)}
+            return {
+                "success": False,
+                "error": "Internal error processing message",
+                "error_code": "processing_error",
+            }
 
     @extend_schema(exclude=True)
     @action(
EOF
@@ -273,7 +273,11 @@

except Exception as e:
logger.exception("Error processing Brevo item: %s", e)
return {"success": False, "error": str(e)}
return {
"success": False,
"error": "Internal error processing message",
"error_code": "processing_error",
}

@extend_schema(exclude=True)
@action(
Copilot is powered by AI and may make mistakes. Always verify output.
Comment on lines +194 to +224
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid exposing internal error details in response results.

The results list returned in error/partial responses includes str(e) from caught exceptions (line 276), which can leak stack traces or internal implementation details to external callers. This was flagged by static analysis.

Sanitize error messages before including them in responses.

🛡️ Proposed fix to sanitize error messages
         except Exception as e:
             logger.exception("Error processing Brevo item: %s", e)
-            return {"success": False, "error": str(e)}
+            return {"success": False, "error": "Internal processing error"}
🧰 Tools
🪛 GitHub Check: CodeQL

[warning] 196-200: Information exposure through an exception
Stack trace information flows to this location and may be exposed to an external user.


[warning] 206-211: Information exposure through an exception
Stack trace information flows to this location and may be exposed to an external user.


[warning] 220-224: Information exposure through an exception
Stack trace information flows to this location and may be exposed to an external user.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/api/viewsets/inbound/brevo.py` around lines 194 - 224, The
response `results` currently includes raw exception strings (str(e)) which may
leak internal details; update the exception handlers that append to the
`results` list (where you currently append str(e)) to instead append a
sanitized, generic error object or message (e.g., {"error": "internal processing
error"} or {"error": "failed to process message"}) and move the detailed
exception into server-side logs using logger.exception(...) or logger.error(...,
exc_info=True); keep `failure_count`/`success_count` logic intact and ensure
only the sanitized messages are returned in the error/partial_success Responses
while full exception details remain in the logs.

)

def _process_brevo_item(self, item: Dict[str, Any], channel: Optional[models.Channel]) -> Dict[str, Any]:
"""Process a single Brevo email item."""
try:
from_email = item.get("From", {}).get("Address", "")
if not from_email:
return {"success": False, "error": "Missing From address"}

recipients = item.get("Recipients", [])
if not recipients:
recipients = [r.get("Address") for r in item.get("To", []) if r.get("Address")]
if not recipients:
return {"success": False, "error": "No recipients found"}

recipient_emails = [r for r in recipients if r]
if not recipient_emails:
return {"success": False, "error": "No valid recipient addresses"}

local_recipients = check_local_recipients(recipient_emails)

parsed_email = convert_brevo_payload_to_parsed_email(item)
prepend_headers = [
("X-Brevo-Webhook", "inbound"),
("Received", "from brevo-inbound"),
]

raw_email = compose_email(parsed_email, prepend_headers=prepend_headers)

delivered_count = 0
for recipient in recipient_emails:
if recipient in local_recipients:
delivered = deliver_inbound_message(
recipient,
parsed_email,
raw_email,
channel=channel,
skip_inbound_queue=True,
)
if delivered:
delivered_count += 1

return {
"success": delivered_count > 0,
"message_id": item.get("MessageId"),
"delivered": delivered_count,
"recipients": len(recipient_emails),
}

except Exception as e:
logger.exception("Error processing Brevo item: %s", e)
return {"success": False, "error": str(e)}

@extend_schema(exclude=True)
@action(
detail=False,
methods=["post"],
url_path="check",
url_name="inbound-brevo-check",
)
def check(self, request):
"""Check which recipients are locally deliverable."""
data = request.data
addresses = data.get("addresses", [])
if not addresses or not isinstance(addresses, list):
return Response(
{"detail": "Missing addresses"},
status=status.HTTP_400_BAD_REQUEST,
)

local_addresses = check_local_recipients(addresses)
results = {address: address in local_addresses for address in addresses}
return Response(results)
Loading
Loading