Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,12 @@ omit = ["__main__.py"]

[tool.mypy]
pretty = true
strict = false
strict = true
python_version = "3.12"
check_untyped_defs = false
check_untyped_defs = true
ignore_missing_imports = true
plugins = ["pandera.mypy", "pydantic.mypy"]
exclude = ["src/regression_model_template/jobs/evaluations.py",
"src/regression_model_template/jobs/training.py",
"src/regression_model_template/io/registries.py"]
exclude = []

[tool.pytest.ini_options]
addopts = "--verbosity=2"
Expand Down
18 changes: 9 additions & 9 deletions src/regression_model_template/controller/kafka_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

from confluent_kafka import Producer, Consumer, KafkaError
from confluent_kafka import Producer, Consumer, KafkaError, Message

from regression_model_template.core.schemas import InputsSchema, Outputs
from regression_model_template.io import services, registries
Expand Down Expand Up @@ -65,9 +65,9 @@ class PredictionRequest(BaseModel):
"registered": [0, 50, 100, 150],
}

def model_validate(self):
def validate_schema(self) -> pd.DataFrame:
"""Validates the input data against InputsSchema."""
return InputsSchema.validate(pd.DataFrame([self.input_data]))
return InputsSchema.validate(pd.DataFrame([self.input_data])) # type: ignore[return-value]


class PredictionResponse(BaseModel):
Expand Down Expand Up @@ -96,7 +96,7 @@ def __init__(
self.producer: Producer | None = None
self.consumer: Consumer | None = None

def delivery_report(self, err, msg):
def delivery_report(self, err: KafkaError | None, msg: Message) -> None:
"""Called once for each message produced to indicate delivery result."""
if err is not None:
logger.error(f"Message delivery failed: {err}")
Expand Down Expand Up @@ -155,15 +155,15 @@ def _consume_messages(self) -> None:
self._process_message(msg)
self._close_consumer()

def _poll_message(self):
def _poll_message(self) -> Message | None:
"""Poll message from Kafka consumer."""
if self.consumer:
return self.consumer.poll(1.0)
else:
logger.error("Kafka consumer is not initialized.")
return None

def _handle_message_error(self, msg) -> bool:
def _handle_message_error(self, msg: Message) -> bool:
"""Handle errors in polled messages."""
if msg.error().code() == KafkaError._PARTITION_EOF:
logger.debug("Reached end of partition.")
Expand All @@ -172,7 +172,7 @@ def _handle_message_error(self, msg) -> bool:
logger.error(f"Consumer error: {msg.error()}")
return False

def _process_message(self, msg) -> None:
def _process_message(self, msg: Message) -> None:
"""Process a valid Kafka message."""
predictionresponse: PredictionResponse = PredictionResponse()
try:
Expand Down Expand Up @@ -251,12 +251,12 @@ async def predict(request: PredictionRequest) -> PredictionResponse: # Use glob


@app.get("/health", summary="Health Check", tags=["System"])
async def health_check():
async def health_check() -> dict[str, str]:
"""Simple health check endpoint to verify that the service is running."""
return {"status": "healthy"}


def main():
def main() -> None:
global fastapi_kafka_service
# Configuration
alias_or_version: str | int = "Champion"
Expand Down
8 changes: 6 additions & 2 deletions src/regression_model_template/core/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ def eval_fn(predictions: pd.Series[int], targets: pd.Series[int]) -> MlflowMetri
score = self.score(targets=score_targets, outputs=score_outputs)
return MlflowMetric(aggregate_results={self.name: score * sign})

return mlflow.metrics.make_metric(eval_fn=eval_fn, name=self.name, greater_is_better=self.greater_is_better)
return mlflow.metrics.make_metric( # type: ignore[no-any-return]
eval_fn=eval_fn, name=self.name, greater_is_better=self.greater_is_better
)


class SklearnMetric(Metric):
Expand Down Expand Up @@ -141,4 +143,6 @@ def to_mlflow(self) -> MlflowThreshold:
Returns:
MlflowThreshold: the mlflow threshold.
"""
return MlflowThreshold(threshold=self.threshold, greater_is_better=self.greater_is_better)
return MlflowThreshold( # type: ignore[no-untyped-call]
threshold=self.threshold, greater_is_better=self.greater_is_better
)
1 change: 1 addition & 0 deletions src/regression_model_template/io/osvariables.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ class Config:
case_sensitive: bool = False # Optional: make env var lookup case-insensitive
env_file: str = ".env" # Enable reading from .env file
env_file_encoding: str = "utf-8"
extra = "ignore"
14 changes: 7 additions & 7 deletions src/regression_model_template/io/registries.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class CustomSaver(Saver):

KIND: T.Literal["CustomSaver"] = "CustomSaver"

class Adapter(mlflow.pyfunc.PythonModel): # type: ignore[misc]
class Adapter(mlflow.pyfunc.PythonModel): # type: ignore[misc, name-defined]
"""Adapt a custom model to the Mlflow PyFunc flavor for saving operations.

https://mlflow.org/docs/latest/python_api/mlflow.pyfunc.html?#mlflow.pyfunc.PythonModel
Expand All @@ -118,7 +118,7 @@ def __init__(self, model: models.Model):

def predict(
self,
context: mlflow.pyfunc.PythonModelContext,
context: mlflow.pyfunc.PythonModelContext, # type: ignore[name-defined]
model_input: schemas.Inputs,
params: dict[str, T.Any] | None = None,
) -> schemas.Outputs:
Expand All @@ -137,7 +137,7 @@ def predict(

def save(self, model: models.Model, signature: signers.Signature, input_example: schemas.Inputs) -> Info:
adapter = CustomSaver.Adapter(model=model)
return mlflow.pyfunc.log_model(
return mlflow.pyfunc.log_model( # type: ignore[no-any-return]
python_model=adapter,
signature=signature,
artifact_path=self.path,
Expand All @@ -163,10 +163,10 @@ def save(
model: models.Model,
signature: signers.Signature,
input_example: schemas.Inputs | None = None,
) -> mlflow.entities.model_registry.ModelVersion:
) -> Info:
builtin_model = model.get_internal_model()
module = getattr(mlflow, self.flavor)
return module.log_model(
return module.log_model( # type: ignore[no-any-return]
builtin_model, artifact_path=self.path, signature=signature, input_example=input_example
)

Expand Down Expand Up @@ -222,7 +222,7 @@ class CustomLoader(Loader):
class Adapter(Loader.Adapter):
"""Adapt a custom model for the project inference."""

def __init__(self, model: mlflow.pyfunc.PyFuncModel) -> None:
def __init__(self, model: mlflow.pyfunc.PyFuncModel) -> None: # type: ignore[name-defined]
"""Initialize the adapter from an mlflow pyfunc model.

Args:
Expand Down Expand Up @@ -254,7 +254,7 @@ class BuiltinLoader(Loader):
class Adapter(Loader.Adapter):
"""Adapt a builtin model for the project inference."""

def __init__(self, model: mlflow.pyfunc.PyFuncModel) -> None:
def __init__(self, model: mlflow.pyfunc.PyFuncModel) -> None: # type: ignore[name-defined]
"""Initialize the adapter from an mlflow pyfunc model.

Args:
Expand Down
6 changes: 3 additions & 3 deletions src/regression_model_template/jobs/evaluations.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ def run(self) -> base.Locals:
logger.debug("- Targets lineage: {}", targets_lineage.to_dict())
# dataset
logger.info("Create dataset: inputs & targets")
dataset = mlflow.data.from_pandas(
dataset = mlflow.data.from_pandas( # type: ignore[attr-defined]
df=pd.concat([inputs, targets], axis="columns"),
name="evaluation",
source=f"{inputs_lineage.source.uri} & {targets_lineage.source.uri}",
source=f"{inputs_lineage.source.uri} & {targets_lineage.source.uri}", # type: ignore[attr-defined]
targets=schemas.TargetsSchema.cnt,
)
logger.debug("- Dataset: {}", dataset.to_dict())
Expand All @@ -104,7 +104,7 @@ def run(self) -> base.Locals:
logger.debug("- Validation thresholds: {}", validation_thresholds)
# evaluations
logger.info("Compute evaluations: {}", self.model_type)
evaluations = mlflow.evaluate(
evaluations = mlflow.evaluate( # type: ignore[no-untyped-call]
data=dataset,
model=model_uri,
model_type=self.model_type,
Expand Down
2 changes: 1 addition & 1 deletion src/regression_model_template/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# %% SETTINGS


class Settings(pdts.BaseSettings, strict=True, frozen=True, extra="forbid"):
class Settings(pdts.BaseSettings, strict=True, frozen=True, extra="ignore"):
"""Base class for application settings.

Use settings to provide high-level preferences.
Expand Down
Loading