Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .jules/sentinel.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,8 @@
**Vulnerability:** The application was catching all exceptions and returning their string representation (`str(e)`) directly to the client in the HTTP 500 response. This could expose sensitive internal details (stack traces, database info, file paths).
**Learning:** Developers often pass `str(e)` to `HTTPException` for convenience during debugging, but this practice frequently makes it into production code, leading to information leakage.
**Prevention:** In production, catch `Exception` and raise `HTTPException` with a generic message (e.g., "Internal Server Error"). Ensure full exception details are logged using `logger.exception()` for server-side debugging.

## 2026-02-05 - Information Leakage in Kafka Consumer
**Vulnerability:** The Kafka consumer and prediction callback were catching exceptions and embedding the raw exception message `str(e)` into the response payload sent to the output topic.
**Learning:** Even asynchronous background workers (like Kafka consumers) can leak information if they reflect input processing errors back to an output channel without sanitization.
**Prevention:** Use a dedicated `PredictionService` wrapper that catches exceptions, logs the full stack trace securely, and returns a generic "Internal Processing Error" message to the output topic.
50 changes: 30 additions & 20 deletions src/regression_model_template/controller/kafka_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,12 @@ def _process_message(self, msg: Message) -> None:
logger.info(f"kafka Received input {kafka_msg}")
prediction_result = self.prediction_callback(input_obj).result
except json.JSONDecodeError as e:
error = f"Failed to decode JSON message: {e}. Raw message: {msg.value()}"
predictionresponse.result["error"] = error
logger.error(error)
logger.error(f"Failed to decode JSON message: {e}. Raw message: {msg.value()}")
predictionresponse.result["error"] = "Invalid JSON format"
prediction_result = predictionresponse.result
except Exception as e:
error = f"Error during prediction processing: {e}"
logger.exception(error)
predictionresponse.result["error"] = error
logger.exception(f"Error during prediction processing: {e}")
predictionresponse.result["error"] = "Internal Processing Error"
prediction_result = predictionresponse.result

try:
Expand Down Expand Up @@ -256,6 +254,29 @@ async def health_check() -> dict[str, str]:
return {"status": "healthy"}


class PredictionService:
"""Service to handle prediction logic securely."""

def __init__(self, model: Any):
self.model = model

def predict(self, input_data: PredictionRequest) -> PredictionResponse:
"""Make a prediction using the model."""
predictionresponse: PredictionResponse = PredictionResponse()
try:
outputs: Outputs = self.model.predict(inputs=InputsSchema.check(pd.DataFrame(input_data.input_data)))
predictionresponse.result["inference"] = outputs.to_numpy().tolist()
predictionresponse.result["quality"] = 1
predictionresponse.result["error"] = None
except Exception as e:
# Securely handle exceptions: Log details, return generic error
logger.exception(f"Prediction failed: {e}")
predictionresponse.result["inference"] = 0
predictionresponse.result["quality"] = 0
predictionresponse.result["error"] = "Internal Processing Error"
return predictionresponse


def main() -> None:
global fastapi_kafka_service
# Configuration
Expand All @@ -270,19 +291,8 @@ def main() -> None:
loader = CustomLoader()
model = loader.load(uri=model_uri)

# Prediction Callback Function
def my_prediction_function(input_data: PredictionRequest) -> PredictionResponse:
predictionresponse: PredictionResponse = PredictionResponse()
try:
outputs: Outputs = model.predict(inputs=InputsSchema.check(pd.DataFrame(input_data.input_data)))
predictionresponse.result["inference"] = outputs.to_numpy().tolist()
predictionresponse.result["quality"] = 1
predictionresponse.result["error"] = None
except Exception as e:
predictionresponse.result["inference"] = 0
predictionresponse.result["quality"] = 0
predictionresponse.result["error"] = str(e)
return predictionresponse
# Initialize Prediction Service
prediction_service = PredictionService(model)

# Kafka Configuration
kafka_config = {
Expand All @@ -292,7 +302,7 @@ def my_prediction_function(input_data: PredictionRequest) -> PredictionResponse:
}
# Initialize and Start Service
fastapi_kafka_service = FastAPIKafkaService(
prediction_callback=my_prediction_function,
prediction_callback=prediction_service.predict,
kafka_config=kafka_config,
input_topic=DEFAULT_INPUT_TOPIC,
output_topic=DEFAULT_OUTPUT_TOPIC,
Expand Down
32 changes: 25 additions & 7 deletions tests/controller/test_kafka_app_security.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,44 @@
import pytest
from unittest.mock import patch, MagicMock
from unittest.mock import MagicMock, patch
from fastapi import HTTPException
from regression_model_template.controller.kafka_app import (
PredictionRequest,
PredictionService,
predict,
PredictionResponse,
)
import asyncio
import regression_model_template.controller.kafka_app as kafka_app


def test_prediction_service_sanitization():
"""Test that PredictionService sanitizes exceptions."""

# Mock model that raises a sensitive exception
mock_model = MagicMock()
sensitive_error = "Connection failed to database at 192.168.1.5 with user admin"
mock_model.predict.side_effect = Exception(sensitive_error)

service = PredictionService(model=mock_model)
request = PredictionRequest() # Use default values

# Call predict
response = service.predict(request)

# Verify
assert response.result["error"] == "Internal Processing Error"
assert sensitive_error not in response.result["error"]
assert response.result["quality"] == 0
assert response.result["inference"] == 0


def test_predict_endpoint_exception_leak():
"""Test that the predict endpoint does NOT leak exception details."""

async def run_async_test():
# Initialize the global variable if it's missing, or patch it.
# Since it might not be initialized, patch with create=True might work,
# or we can manually set it.

with patch(
"regression_model_template.controller.kafka_app.fastapi_kafka_service", create=True
) as mock_fastapi_kafka_service:
# Simulate a sensitive internal error
# Simulate a sensitive internal error raised by callback (unlikely now with PredictionService, but possible if something else fails)
sensitive_error_message = "Database connection failed at 192.168.1.100:5432"
mock_fastapi_kafka_service.prediction_callback.side_effect = Exception(sensitive_error_message)

Expand Down