Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .jules/sentinel.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
**Learning:** Developers often pass `str(e)` to `HTTPException` for convenience during debugging, but this practice frequently makes it into production code, leading to information leakage.
**Prevention:** In production, catch `Exception` and raise `HTTPException` with a generic message (e.g., "Internal Server Error"). Ensure full exception details are logged using `logger.exception()` for server-side debugging.

## 2026-02-05 - Information Leakage in Kafka Consumer
**Vulnerability:** The Kafka consumer and prediction callback were catching exceptions and embedding the raw exception message `str(e)` into the response payload sent to the output topic.
**Learning:** Even asynchronous background workers (like Kafka consumers) can leak information if they reflect input processing errors back to an output channel without sanitization.
**Prevention:** Use a dedicated `PredictionService` wrapper that catches exceptions, logs the full stack trace securely, and returns a generic "Internal Processing Error" message to the output topic.

## 2026-03-04 - Information Leakage in Application Error Fields
**Vulnerability:** The application was catching exceptions in logic callbacks and Kafka consumers, then assigning the raw exception string to a JSON `error` field in the successful response object. This leaked internal details even when the HTTP status code was 200 OK or when processing asynchronously via Kafka.
**Learning:** Checking for HTTP 500 handlers is not enough. Review application-level error handling where business logic manually constructs error objects.
Expand Down
168 changes: 168 additions & 0 deletions app.log

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions check_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import mlflow
import os
import sys
from confluent_kafka import Producer

def check_env():
print(f"Tracking URI: {mlflow.get_tracking_uri()}")
try:
models = mlflow.search_registered_models()
print("\nRegistered Models:")
for m in models:
print(f"- {m.name}")
for v in m.latest_versions:
print(f" Version {v.version}: Aliases={v.aliases}")

# Check Kafka
kafka_server = os.getenv("DEFAULT_KAFKA_SERVER", "localhost:9092")
print(f"\nTesting Kafka Producer to: {kafka_server}")
p = Producer({'bootstrap.servers': kafka_server, 'socket.timeout.ms': 2000})
p.produce('test_topic', b'test')
p.flush(timeout=2.0)
print("Kafka reachable!")

except Exception as e:
print(f"Error: {e}")

if __name__ == "__main__":
check_env()
539 changes: 443 additions & 96 deletions poetry.lock

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions promote_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import mlflow
from mlflow.tracking import MlflowClient
import os

def promote_model():
tracking_uri = os.getenv("MLFLOW_TRACKING_URI", "http://mlflow.llm-apps.svc.cluster.local:5000")
registry_uri = os.getenv("MLFLOW_REGISTRY_URI", tracking_uri)
model_name = os.getenv("MLFLOW_REGISTERED_MODEL_NAME", "regression_model_template")

mlflow.set_tracking_uri(tracking_uri)
mlflow.set_registry_uri(registry_uri)

client = MlflowClient()

print(f"Promoting {model_name} version 1 to Champion...")
client.set_registered_model_alias(name=model_name, alias="Champion", version="1")

# Verify
model = client.get_model_version_by_alias(name=model_name, alias="Champion")
print(f"Success! Model {model.name} Version {model.version} is now 'Champion'.")

if __name__ == "__main__":
promote_model()
36 changes: 25 additions & 11 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,30 @@

# PROJECT

[tool.poetry]
[project]
name = "regression_model_template"
version = "2.0.0"
description = "Predict the number of regression_model_template available."
repository = "https://github.com/lgcorzo/mlops-python-package"
documentation = "https://lgcorzo.github.io/mlops-python-package/"
authors = []
authors = [{ name = "lgcorzo" }]
readme = "README.md"
license = "MIT"
keywords = ["mlops", "python", "package"]
packages = [{ include = "regression_model_template", from = "src" }]

# SCRIPTS
requires-python = ">=3.12"
classifiers = [
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.12",
]
dynamic = ["dependencies"]

[project.urls]
repository = "https://github.com/lgcorzo/mlops-python-package"
documentation = "https://lgcorzo.github.io/mlops-python-package/"

[tool.poetry.scripts]
regression_model_template = 'regression_model_template.scripts:main'
[project.scripts]
regression_model_template = "regression_model_template.scripts:main"

# DEPENDENCIES
[tool.poetry]
packages = [{ include = "regression_model_template", from = "src" }]

[tool.poetry.dependencies]
python = "^3.12"
Expand Down Expand Up @@ -49,7 +55,6 @@ opentelemetry-exporter-otlp = "^1.30.0"
confluent-kafka = "^2.8.2"
fastapi = "^0.115.11"


[tool.poetry.group.checks.dependencies]
bandit = "^1.7.9"
coverage = "^7.5.4"
Expand All @@ -60,6 +65,7 @@ pytest-xdist = "^3.6.1"
pandera = { extras = ["mypy"], version = "^0.20.1" }
ruff = "^0.5.0"
pytest-mock = "^3.14.0"
pytest-asyncio = "^0.23.7"

[tool.poetry.group.commits.dependencies]
commitizen = "^3.27.0"
Expand Down Expand Up @@ -105,6 +111,14 @@ exclude = []
[tool.pytest.ini_options]
addopts = "--verbosity=2"
pythonpath = ["src"]
asyncio_mode = "auto"
filterwarnings = [
"ignore::DeprecationWarning",
"ignore:.*predict.*not supported:UserWarning",
"ignore:.*Add type hints to the .predict. method:UserWarning",
"ignore:.*min_items.*is deprecated:DeprecationWarning",
"ignore:.*np.inexact.*to a dtype is deprecated:DeprecationWarning",
]

[tool.ruff]
fix = true
Expand Down
58 changes: 32 additions & 26 deletions src/regression_model_template/controller/kafka_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import time
import json
import typing as T
from typing import Any, Dict, Callable

import uvicorn
Expand Down Expand Up @@ -181,15 +182,9 @@ def _process_message(self, msg: Message) -> None:
input_obj.input_data = kafka_msg["input_data"]
logger.info(f"kafka Received input {kafka_msg}")
prediction_result = self.prediction_callback(input_obj).result
except json.JSONDecodeError as e:
error = f"Failed to decode JSON message: {e}. Raw message: {msg.value()}"
predictionresponse.result["error"] = error
logger.error(error)
prediction_result = predictionresponse.result
except Exception:
error = "An error occurred during prediction processing."
logger.exception("Error during prediction processing")
predictionresponse.result["error"] = error
except Exception as e:
logger.exception(f"Error during prediction processing: {e}")
predictionresponse.result["error"] = "An error occurred during prediction processing."
prediction_result = predictionresponse.result

try:
Expand Down Expand Up @@ -226,7 +221,7 @@ def stop(self) -> None:


# Global Service Instance
fastapi_kafka_service: FastAPIKafkaService
fastapi_kafka_service: "FastAPIKafkaService" = T.cast("FastAPIKafkaService", None)


# FastAPI Endpoints
Expand All @@ -245,7 +240,7 @@ async def predict(request: PredictionRequest) -> PredictionResponse: # Use glob
prediction_result = fastapi_kafka_service.prediction_callback(request)
logger.info(f"HTTP prediction result: {prediction_result}")
return prediction_result # Use the global class
except Exception as e:
except Exception:
logger.exception("Error processing HTTP prediction request:")
raise HTTPException(status_code=500, detail="Internal Server Error")

Expand All @@ -256,6 +251,29 @@ async def health_check() -> dict[str, str]:
return {"status": "healthy"}


class PredictionService:
"""Service to handle prediction logic securely."""

def __init__(self, model: Any):
self.model = model

def predict(self, input_data: PredictionRequest) -> PredictionResponse:
"""Make a prediction using the model."""
predictionresponse: PredictionResponse = PredictionResponse()
try:
outputs: Outputs = self.model.predict(inputs=InputsSchema.check(pd.DataFrame(input_data.input_data)))
predictionresponse.result["inference"] = outputs.to_numpy().tolist()
predictionresponse.result["quality"] = 1
predictionresponse.result["error"] = None
except Exception as e:
# Securely handle exceptions: Log details, return generic error
logger.exception(f"Prediction failed: {e}")
predictionresponse.result["inference"] = 0
predictionresponse.result["quality"] = 0
predictionresponse.result["error"] = "An error occurred during prediction processing."
return predictionresponse


def main() -> None:
global fastapi_kafka_service
# Configuration
Expand All @@ -270,20 +288,8 @@ def main() -> None:
loader = CustomLoader()
model = loader.load(uri=model_uri)

# Prediction Callback Function
def my_prediction_function(input_data: PredictionRequest) -> PredictionResponse:
predictionresponse: PredictionResponse = PredictionResponse()
try:
outputs: Outputs = model.predict(inputs=InputsSchema.check(pd.DataFrame(input_data.input_data)))
predictionresponse.result["inference"] = outputs.to_numpy().tolist()
predictionresponse.result["quality"] = 1
predictionresponse.result["error"] = None
except Exception:
logger.exception("Prediction callback failed")
predictionresponse.result["inference"] = 0
predictionresponse.result["quality"] = 0
predictionresponse.result["error"] = "Prediction failed."
return predictionresponse
# Initialize Prediction Service
prediction_service = PredictionService(model)

# Kafka Configuration
kafka_config = {
Expand All @@ -293,7 +299,7 @@ def my_prediction_function(input_data: PredictionRequest) -> PredictionResponse:
}
# Initialize and Start Service
fastapi_kafka_service = FastAPIKafkaService(
prediction_callback=my_prediction_function,
prediction_callback=prediction_service.predict,
kafka_config=kafka_config,
input_topic=DEFAULT_INPUT_TOPIC,
output_topic=DEFAULT_OUTPUT_TOPIC,
Expand Down
4 changes: 2 additions & 2 deletions src/regression_model_template/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def fit(self, inputs: schemas.Inputs, targets: schemas.Targets) -> T.Self:
"""

@abc.abstractmethod
def predict(self, inputs: schemas.Inputs) -> schemas.Outputs:
def predict(self, inputs: T.Any) -> schemas.Outputs:
"""Generate outputs with the model for the given inputs.

Args:
Expand Down Expand Up @@ -172,7 +172,7 @@ def fit(self, inputs: schemas.Inputs, targets: schemas.Targets) -> "BaselineSkle
self._pipeline.fit(X=inputs, y=targets[schemas.TargetsSchema.cnt])
return self

def predict(self, inputs: schemas.Inputs) -> schemas.Outputs:
def predict(self, inputs: T.Any) -> schemas.Outputs:
model = self.get_internal_model()
prediction = model.predict(inputs)
outputs = schemas.Outputs({schemas.OutputsSchema.prediction: prediction}, index=inputs.index)
Expand Down
4 changes: 2 additions & 2 deletions src/regression_model_template/io/registries.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def __init__(self, model: models.Model):
def predict(
self,
context: mlflow.pyfunc.PythonModelContext, # type: ignore[name-defined]
model_input: schemas.Inputs,
model_input: T.Any,
params: dict[str, T.Any] | None = None,
) -> schemas.Outputs:
"""Generate predictions with a custom model for the given inputs.
Expand Down Expand Up @@ -189,7 +189,7 @@ class Adapter(abc.ABC):
"""Adapt any model for the project inference."""

@abc.abstractmethod
def predict(self, inputs: schemas.Inputs) -> schemas.Outputs:
def predict(self, inputs: T.Any) -> schemas.Outputs:
"""Generate predictions with the internal model for the given inputs.

Args:
Expand Down
75 changes: 75 additions & 0 deletions tests/controller/simulated_integration_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import requests
import time
import subprocess
import os
import sys

# This script will run the actual FastAPI app but mock the Kafka part to allow it to start
# without a real Kafka server.


def run_simulated_test():
print("Starting Kafka app integration test (Simulated with Mocks)...")

# We'll use a wrapper script to run the app with mocks
wrapper_code = """
import sys
from unittest.mock import patch, MagicMock

# Mock Kafka before importing the app
mock_producer = patch('confluent_kafka.Producer').start()
mock_consumer = patch('confluent_kafka.Consumer').start()

from regression_model_template.controller.kafka_app import app
import uvicorn

if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8123)
"""
with open("tests/controller/app_wrapper.py", "w") as f:
f.write(wrapper_code)

env = os.environ.copy()
env["DEFAULT_KAFKA_SERVER"] = "localhost:9092"
env["DEFAULT_FASTAPI_HOST"] = "127.0.0.1"
env["DEFAULT_FASTAPI_PORT"] = "8123"

print("Launching FastAPI server via wrapper...")
process = subprocess.Popen(
[sys.executable, "tests/controller/app_wrapper.py"],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)

try:
# Wait for the server to start
time.sleep(5)

print("Checking health endpoint...")
response = requests.get("http://127.0.0.1:8123/health", timeout=5)
print(f"Health Response: {response.status_code} - {response.json()}")

if response.status_code == 200:
print("Integration test passed!")
else:
print(f"Integration test failed with status: {response.status_code}")

except Exception as e:
print(f"Integration test encountered an error: {e}")
raise

finally:
print("Shutting down the server...")
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
if os.path.exists("tests/controller/app_wrapper.py"):
os.remove("tests/controller/app_wrapper.py")


if __name__ == "__main__":
run_simulated_test()
1 change: 0 additions & 1 deletion tests/controller/test_kafka_app_leakage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from unittest.mock import MagicMock
import json
import pytest
from regression_model_template.controller.kafka_app import FastAPIKafkaService


Expand Down
Loading