diff --git a/src/regression_model_template/controller/kafka_app.py b/src/regression_model_template/controller/kafka_app.py index 7b69d15..79b234c 100644 --- a/src/regression_model_template/controller/kafka_app.py +++ b/src/regression_model_template/controller/kafka_app.py @@ -14,6 +14,7 @@ import uvicorn from confluent_kafka import Consumer, KafkaError, Message, Producer from fastapi import FastAPI, HTTPException, Request +from fastapi.concurrency import run_in_threadpool from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.trustedhost import TrustedHostMiddleware from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware @@ -371,7 +372,7 @@ async def predict(request_data: PredictionRequest, request: Request) -> Predicti except Exception: logger.info("Received HTTP prediction request with unknown data structure") - prediction_result = fastapi_kafka_service.prediction_callback(request_data) + prediction_result = await run_in_threadpool(fastapi_kafka_service.prediction_callback, request_data) logger.debug(f"HTTP prediction result: {prediction_result}") try: diff --git a/tests/performance/benchmark_blocking.py b/tests/performance/benchmark_blocking.py new file mode 100644 index 0000000..2f57136 --- /dev/null +++ b/tests/performance/benchmark_blocking.py @@ -0,0 +1,64 @@ +import asyncio +import time +import threading + + +def sync_prediction(): + """Simulates a CPU-bound synchronous prediction call.""" + print(" [Sync] Starting prediction...") + time.sleep(2) # Simulate 2 seconds of blocking work + print(" [Sync] Prediction complete.") + return {"result": "success"} + + +async def async_health_check(): + """Simulates an asynchronous health check that should remain responsive.""" + for i in range(5): + print(f" [Async] Health check {i + 1}...") + await asyncio.sleep(0.5) + + +async def run_blocking_scenario(): + print("\nScenario: Blocking Sync Call in Event Loop") + print("-" * 40) + start_time = time.time() + + # In a real FastAPI app, calling sync_prediction() directly in an async def endpoint + # would block the entire event loop. + + health_task = asyncio.create_task(async_health_check()) + + print("Calling sync_prediction() directly...") + sync_prediction() + + await health_task + + end_time = time.time() + print(f"Total time: {end_time - start_time:.2f}s") + print("Note: Notice how health checks were delayed until prediction finished.") + + +async def run_non_blocking_scenario(): + print("\nScenario: Non-Blocking Call using ThreadPool") + print("-" * 40) + + # We'll use asyncio.to_thread which is similar to what run_in_threadpool does + # (available in Python 3.9+) + + start_time = time.time() + + health_task = asyncio.create_task(async_health_check()) + + print("Calling sync_prediction() in a thread...") + prediction_task = asyncio.to_thread(sync_prediction) + + await asyncio.gather(health_task, prediction_task) + + end_time = time.time() + print(f"Total time: {end_time - start_time:.2f}s") + print("Note: Notice how health checks ran concurrently with prediction.") + + +if __name__ == "__main__": + asyncio.run(run_blocking_scenario()) + asyncio.run(run_non_blocking_scenario())