From c54ea978a1faaa407e23f15ea1edd084b41a78ef Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Thu, 30 Apr 2026 07:22:10 +0000 Subject: [PATCH 1/2] perf: run synchronous prediction callback in threadpool The `/predict` endpoint is an `async def` function, but it was calling the synchronous `prediction_callback` directly. This blocks the FastAPI event loop, preventing other concurrent requests (like health checks) from being processed. By using `fastapi.concurrency.run_in_threadpool`, we offload the synchronous prediction task to a separate thread, allowing the event loop to remain responsive and handle other tasks concurrently. Benchmark results showed that concurrent async tasks are no longer blocked, reducing total processing time for concurrent operations from 4.5s to 2.5s in a simulated scenario. Co-authored-by: lgcorzo <46710567+lgcorzo@users.noreply.github.com> --- .../controller/kafka_app.py | 3 +- tests/performance/benchmark_blocking.py | 59 +++++++++++++++++++ verify_predict.py | 50 ++++++++++++++++ 3 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 tests/performance/benchmark_blocking.py create mode 100644 verify_predict.py diff --git a/src/regression_model_template/controller/kafka_app.py b/src/regression_model_template/controller/kafka_app.py index 7b69d15..79b234c 100644 --- a/src/regression_model_template/controller/kafka_app.py +++ b/src/regression_model_template/controller/kafka_app.py @@ -14,6 +14,7 @@ import uvicorn from confluent_kafka import Consumer, KafkaError, Message, Producer from fastapi import FastAPI, HTTPException, Request +from fastapi.concurrency import run_in_threadpool from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.trustedhost import TrustedHostMiddleware from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware @@ -371,7 +372,7 @@ async def predict(request_data: PredictionRequest, request: Request) -> Predicti except Exception: logger.info("Received HTTP prediction request with unknown data structure") - prediction_result = fastapi_kafka_service.prediction_callback(request_data) + prediction_result = await run_in_threadpool(fastapi_kafka_service.prediction_callback, request_data) logger.debug(f"HTTP prediction result: {prediction_result}") try: diff --git a/tests/performance/benchmark_blocking.py b/tests/performance/benchmark_blocking.py new file mode 100644 index 0000000..1710f80 --- /dev/null +++ b/tests/performance/benchmark_blocking.py @@ -0,0 +1,59 @@ +import asyncio +import time +import threading + +def sync_prediction(): + """Simulates a CPU-bound synchronous prediction call.""" + print(" [Sync] Starting prediction...") + time.sleep(2) # Simulate 2 seconds of blocking work + print(" [Sync] Prediction complete.") + return {"result": "success"} + +async def async_health_check(): + """Simulates an asynchronous health check that should remain responsive.""" + for i in range(5): + print(f" [Async] Health check {i+1}...") + await asyncio.sleep(0.5) + +async def run_blocking_scenario(): + print("\nScenario: Blocking Sync Call in Event Loop") + print("-" * 40) + start_time = time.time() + + # In a real FastAPI app, calling sync_prediction() directly in an async def endpoint + # would block the entire event loop. + + health_task = asyncio.create_task(async_health_check()) + + print("Calling sync_prediction() directly...") + sync_prediction() + + await health_task + + end_time = time.time() + print(f"Total time: {end_time - start_time:.2f}s") + print("Note: Notice how health checks were delayed until prediction finished.") + +async def run_non_blocking_scenario(): + print("\nScenario: Non-Blocking Call using ThreadPool") + print("-" * 40) + + # We'll use asyncio.to_thread which is similar to what run_in_threadpool does + # (available in Python 3.9+) + + start_time = time.time() + + health_task = asyncio.create_task(async_health_check()) + + print("Calling sync_prediction() in a thread...") + prediction_task = asyncio.to_thread(sync_prediction) + + await asyncio.gather(health_task, prediction_task) + + end_time = time.time() + print(f"Total time: {end_time - start_time:.2f}s") + print("Note: Notice how health checks ran concurrently with prediction.") + +if __name__ == "__main__": + asyncio.run(run_blocking_scenario()) + asyncio.run(run_non_blocking_scenario()) diff --git a/verify_predict.py b/verify_predict.py new file mode 100644 index 0000000..3023dde --- /dev/null +++ b/verify_predict.py @@ -0,0 +1,50 @@ +import asyncio +import unittest +from unittest.mock import MagicMock, patch +from pydantic import BaseModel +from typing import Dict, Any + +# Mock the parts that might fail due to missing dependencies +import sys +from types import ModuleType + +def mock_module(name): + m = ModuleType(name) + sys.modules[name] = m + return m + +# Mock dependencies that are not needed for this unit test but might be imported +for mod in ["confluent_kafka", "regression_model_template.io.services", "regression_model_template.io.registries", "regression_model_template.core.schemas"]: + mock_module(mod) + +# Now import the components we want to test +# We might need to mock more before importing kafka_app +with patch('fastapi.FastAPI'), \ + patch('fastapi.middleware.cors.CORSMiddleware'), \ + patch('fastapi.middleware.trustedhost.TrustedHostMiddleware'), \ + patch('uvicorn.middleware.proxy_headers.ProxyHeadersMiddleware'): + + from regression_model_template.controller.kafka_app import predict, PredictionRequest, PredictionResponse + +class TestPredictEndpoint(unittest.IsolatedAsyncioTestCase): + @patch("regression_model_template.controller.kafka_app.fastapi_kafka_service") + async def test_predict_success(self, mock_service): + # Setup mock response + mock_response = PredictionResponse(result={"inference": [0.85], "quality": 1.0, "error": None}) + mock_service.prediction_callback.return_value = mock_response + + # Prepare request data + request_data = PredictionRequest() + mock_request = MagicMock() + mock_request.client.host = "127.0.0.1" + + # Call the endpoint + response = await predict(request_data, mock_request) + + # Assertions + self.assertEqual(response.result["inference"], [0.85]) + mock_service.prediction_callback.assert_called_once_with(request_data) + print("Test predict_success passed!") + +if __name__ == "__main__": + unittest.main() From f6dfe11c1d8a5eec3a25b951d2dc77c7ee4b30f8 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Thu, 30 Apr 2026 20:42:10 +0000 Subject: [PATCH 2/2] perf: offload synchronous prediction to threadpool - Use `fastapi.concurrency.run_in_threadpool` to execute the synchronous `prediction_callback` in the `/predict` endpoint. - This prevents the FastAPI event loop from being blocked by CPU-bound or blocking I/O tasks. - Added a conceptual benchmark script in `tests/performance/benchmark_blocking.py` to demonstrate and verify the performance improvement. - Ensure all new files are correctly formatted to comply with CI requirements. Co-authored-by: lgcorzo <46710567+lgcorzo@users.noreply.github.com> --- tests/performance/benchmark_blocking.py | 7 +++- verify_predict.py | 50 ------------------------- 2 files changed, 6 insertions(+), 51 deletions(-) delete mode 100644 verify_predict.py diff --git a/tests/performance/benchmark_blocking.py b/tests/performance/benchmark_blocking.py index 1710f80..2f57136 100644 --- a/tests/performance/benchmark_blocking.py +++ b/tests/performance/benchmark_blocking.py @@ -2,6 +2,7 @@ import time import threading + def sync_prediction(): """Simulates a CPU-bound synchronous prediction call.""" print(" [Sync] Starting prediction...") @@ -9,12 +10,14 @@ def sync_prediction(): print(" [Sync] Prediction complete.") return {"result": "success"} + async def async_health_check(): """Simulates an asynchronous health check that should remain responsive.""" for i in range(5): - print(f" [Async] Health check {i+1}...") + print(f" [Async] Health check {i + 1}...") await asyncio.sleep(0.5) + async def run_blocking_scenario(): print("\nScenario: Blocking Sync Call in Event Loop") print("-" * 40) @@ -34,6 +37,7 @@ async def run_blocking_scenario(): print(f"Total time: {end_time - start_time:.2f}s") print("Note: Notice how health checks were delayed until prediction finished.") + async def run_non_blocking_scenario(): print("\nScenario: Non-Blocking Call using ThreadPool") print("-" * 40) @@ -54,6 +58,7 @@ async def run_non_blocking_scenario(): print(f"Total time: {end_time - start_time:.2f}s") print("Note: Notice how health checks ran concurrently with prediction.") + if __name__ == "__main__": asyncio.run(run_blocking_scenario()) asyncio.run(run_non_blocking_scenario()) diff --git a/verify_predict.py b/verify_predict.py deleted file mode 100644 index 3023dde..0000000 --- a/verify_predict.py +++ /dev/null @@ -1,50 +0,0 @@ -import asyncio -import unittest -from unittest.mock import MagicMock, patch -from pydantic import BaseModel -from typing import Dict, Any - -# Mock the parts that might fail due to missing dependencies -import sys -from types import ModuleType - -def mock_module(name): - m = ModuleType(name) - sys.modules[name] = m - return m - -# Mock dependencies that are not needed for this unit test but might be imported -for mod in ["confluent_kafka", "regression_model_template.io.services", "regression_model_template.io.registries", "regression_model_template.core.schemas"]: - mock_module(mod) - -# Now import the components we want to test -# We might need to mock more before importing kafka_app -with patch('fastapi.FastAPI'), \ - patch('fastapi.middleware.cors.CORSMiddleware'), \ - patch('fastapi.middleware.trustedhost.TrustedHostMiddleware'), \ - patch('uvicorn.middleware.proxy_headers.ProxyHeadersMiddleware'): - - from regression_model_template.controller.kafka_app import predict, PredictionRequest, PredictionResponse - -class TestPredictEndpoint(unittest.IsolatedAsyncioTestCase): - @patch("regression_model_template.controller.kafka_app.fastapi_kafka_service") - async def test_predict_success(self, mock_service): - # Setup mock response - mock_response = PredictionResponse(result={"inference": [0.85], "quality": 1.0, "error": None}) - mock_service.prediction_callback.return_value = mock_response - - # Prepare request data - request_data = PredictionRequest() - mock_request = MagicMock() - mock_request.client.host = "127.0.0.1" - - # Call the endpoint - response = await predict(request_data, mock_request) - - # Assertions - self.assertEqual(response.result["inference"], [0.85]) - mock_service.prediction_callback.assert_called_once_with(request_data) - print("Test predict_success passed!") - -if __name__ == "__main__": - unittest.main()