Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/regression_model_template/controller/kafka_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import uvicorn
from confluent_kafka import Consumer, KafkaError, Message, Producer
from fastapi import FastAPI, HTTPException, Request
from fastapi.concurrency import run_in_threadpool
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
Expand Down Expand Up @@ -371,7 +372,7 @@ async def predict(request_data: PredictionRequest, request: Request) -> Predicti
except Exception:
logger.info("Received HTTP prediction request with unknown data structure")

prediction_result = fastapi_kafka_service.prediction_callback(request_data)
prediction_result = await run_in_threadpool(fastapi_kafka_service.prediction_callback, request_data)

logger.debug(f"HTTP prediction result: {prediction_result}")
try:
Expand Down
64 changes: 64 additions & 0 deletions tests/performance/benchmark_blocking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import asyncio
import time
import threading


def sync_prediction():
"""Simulates a CPU-bound synchronous prediction call."""
print(" [Sync] Starting prediction...")
time.sleep(2) # Simulate 2 seconds of blocking work
print(" [Sync] Prediction complete.")
return {"result": "success"}


async def async_health_check():
"""Simulates an asynchronous health check that should remain responsive."""
for i in range(5):
print(f" [Async] Health check {i + 1}...")
await asyncio.sleep(0.5)


async def run_blocking_scenario():
print("\nScenario: Blocking Sync Call in Event Loop")
print("-" * 40)
start_time = time.time()

# In a real FastAPI app, calling sync_prediction() directly in an async def endpoint
# would block the entire event loop.

health_task = asyncio.create_task(async_health_check())

print("Calling sync_prediction() directly...")
sync_prediction()

await health_task

end_time = time.time()
print(f"Total time: {end_time - start_time:.2f}s")
print("Note: Notice how health checks were delayed until prediction finished.")


async def run_non_blocking_scenario():
print("\nScenario: Non-Blocking Call using ThreadPool")
print("-" * 40)

# We'll use asyncio.to_thread which is similar to what run_in_threadpool does
# (available in Python 3.9+)

start_time = time.time()

health_task = asyncio.create_task(async_health_check())

print("Calling sync_prediction() in a thread...")
prediction_task = asyncio.to_thread(sync_prediction)

await asyncio.gather(health_task, prediction_task)

end_time = time.time()
print(f"Total time: {end_time - start_time:.2f}s")
print("Note: Notice how health checks ran concurrently with prediction.")


if __name__ == "__main__":
asyncio.run(run_blocking_scenario())
asyncio.run(run_non_blocking_scenario())
Loading