Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions backend/app/api/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,15 @@ async def get_or_create_agent(
llm_api_key = llm_config.api_key
llm_model = llm_config.model

# Track if tool calling is supported
supports_tool_calling = True

if llm_config.provider == "system" and llm_config.deployment_id:
# Look up the deployment
# Look up the deployment with model info
result = await db.execute(
select(Deployment)
.where(Deployment.id == llm_config.deployment_id)
.options(selectinload(Deployment.worker))
.options(selectinload(Deployment.worker), selectinload(Deployment.model))
)
deployment = result.scalar_one_or_none()

Expand All @@ -184,9 +187,16 @@ async def get_or_create_agent(
raise HTTPException(status_code=400, detail="Deployment is not running")

worker = deployment.worker
llm_base_url = f"http://{worker.host}:{deployment.port}/v1"
# Extract IP from worker address (format: IP:Port)
worker_ip = worker.address.split(":")[0]
llm_base_url = f"http://{worker_ip}:{deployment.port}/v1"
llm_api_key = "dummy"
llm_model = llm_config.model or "default"
# Use the actual model_id from the LLMModel (e.g., "Qwen/Qwen2.5-0.5B-Instruct")
llm_model = deployment.model.model_id

# Check if deployment has tool calling enabled
extra_params = deployment.extra_params or {}
supports_tool_calling = extra_params.get("enable-auto-tool-choice", False)

elif llm_config.provider == "openai":
llm_base_url = "https://api.openai.com/v1"
Expand Down Expand Up @@ -216,6 +226,7 @@ async def get_or_create_agent(
llm_model=llm_model,
mcp_api_url=mcp_api_url,
mcp_api_token=api_token,
supports_tool_calling=supports_tool_calling,
)
await agent.initialize()

Expand Down Expand Up @@ -443,7 +454,7 @@ async def agent_chat_simple(
result = await db.execute(
select(Deployment)
.where(Deployment.id == request.llm_config.deployment_id)
.options(selectinload(Deployment.worker))
.options(selectinload(Deployment.worker), selectinload(Deployment.model))
)
deployment = result.scalar_one_or_none()

Expand All @@ -453,9 +464,12 @@ async def agent_chat_simple(
raise HTTPException(status_code=400, detail="Deployment is not running")

worker = deployment.worker
llm_base_url = f"http://{worker.host}:{deployment.port}/v1"
# Extract IP from worker address (format: IP:Port)
worker_ip = worker.address.split(":")[0]
llm_base_url = f"http://{worker_ip}:{deployment.port}/v1"
llm_api_key = "dummy"
llm_model = request.llm_config.model or "default"
# Use the actual model_id from the LLMModel (e.g., "Qwen/Qwen2.5-0.5B-Instruct")
llm_model = deployment.model.model_id

elif request.llm_config.provider == "openai":
llm_base_url = "https://api.openai.com/v1"
Expand Down
241 changes: 225 additions & 16 deletions backend/app/api/auto_tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,17 @@
BenchmarkRequest,
BenchmarkResultListResponse,
BenchmarkResultResponse,
ComprehensiveBenchmarkMetrics,
ComprehensiveBenchmarkRequest,
ComprehensiveBenchmarkResponse,
ConcurrencyLevelResult,
KnowledgeQuery,
KnowledgeQueryResponse,
KnowledgeRecord,
KnowledgeSaveRequest,
LatencyPercentiles,
SaturationDetectionRequest,
SaturationDetectionResponse,
TuningJobCreate,
TuningJobListResponse,
TuningJobProgress,
Expand Down Expand Up @@ -397,6 +404,198 @@ async def list_benchmark_results(
)


# ============================================================================
# Comprehensive Benchmark Endpoints
# ============================================================================


@router.post("/benchmarks/comprehensive", response_model=ComprehensiveBenchmarkResponse)
async def run_comprehensive_benchmark(
request: ComprehensiveBenchmarkRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_operator),
):
"""
Run a comprehensive benchmark with detailed metrics.

Returns metrics including:
- TTFT, ITL, TPOT with percentiles (p50, p90, p95, p99)
- Throughput (tokens/sec, requests/sec)
- Success rate and error statistics
"""
from app.services.benchmark import BenchmarkConfig, BenchmarkRunner, LoadPattern

# Verify deployment exists and is running
result = await db.execute(
select(Deployment)
.where(Deployment.id == request.deployment_id)
.options(
selectinload(Deployment.model),
selectinload(Deployment.worker),
)
)
deployment = result.scalar_one_or_none()

if not deployment:
raise HTTPException(status_code=404, detail="Deployment not found")

if deployment.status != DeploymentStatus.RUNNING.value:
raise HTTPException(status_code=400, detail="Deployment is not running")

worker = deployment.worker
if not worker:
raise HTTPException(status_code=400, detail="Worker not found")

endpoint = f"http://{worker.host}:{deployment.port}/v1"
model_name = deployment.model.model_id if deployment.model else "default"

# Configure benchmark
config = BenchmarkConfig(
endpoint=endpoint,
model_name=model_name,
load_pattern=LoadPattern.FIXED,
concurrency=request.concurrency,
num_requests=request.num_requests,
warmup_requests=request.warmup_requests,
prompt_tokens=request.prompt_tokens,
output_tokens=request.output_tokens,
custom_prompt=request.custom_prompt,
stream=True,
)

# Run benchmark
runner = BenchmarkRunner(config)
bench_result = await runner.run()

# Convert metrics to response format
m = bench_result.metrics

def to_percentiles(lm) -> LatencyPercentiles:
return LatencyPercentiles(
mean=lm.mean,
median=lm.median,
min=lm.min,
max=lm.max,
std=lm.std,
p50=lm.p50,
p90=lm.p90,
p95=lm.p95,
p99=lm.p99,
)

metrics = ComprehensiveBenchmarkMetrics(
ttft=to_percentiles(m.ttft),
itl=to_percentiles(m.itl),
tpot=to_percentiles(m.tpot),
e2e_latency=to_percentiles(m.e2e_latency),
throughput_tps=m.throughput_tps,
throughput_rps=m.throughput_rps,
output_tps=m.output_tps,
total_requests=m.total_requests,
successful_requests=m.successful_requests,
failed_requests=m.failed_requests,
success_rate=m.success_rate,
total_prompt_tokens=m.total_prompt_tokens,
total_completion_tokens=m.total_completion_tokens,
avg_prompt_tokens=m.avg_prompt_tokens,
avg_completion_tokens=m.avg_completion_tokens,
total_duration_seconds=m.total_duration_seconds,
concurrency=m.concurrency,
)

return ComprehensiveBenchmarkResponse(
metrics=metrics,
config={
"endpoint": endpoint,
"model_name": model_name,
"concurrency": request.concurrency,
"num_requests": request.num_requests,
},
error=bench_result.error,
started_at=bench_result.started_at,
completed_at=bench_result.completed_at,
duration_seconds=bench_result.duration_seconds,
)


@router.post("/benchmarks/saturation", response_model=SaturationDetectionResponse)
async def run_saturation_detection(
request: SaturationDetectionRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_operator),
):
"""
Run saturation detection to find optimal concurrency.

Automatically increases concurrency and detects:
- Throughput plateau (where adding more concurrency doesn't help)
- Latency degradation (where latency increases significantly)
- Error rate increases

Returns the optimal concurrency level for the deployment.
"""
from app.services.benchmark import SaturationConfig, SaturationDetector

# Verify deployment exists and is running
result = await db.execute(
select(Deployment)
.where(Deployment.id == request.deployment_id)
.options(
selectinload(Deployment.model),
selectinload(Deployment.worker),
)
)
deployment = result.scalar_one_or_none()

if not deployment:
raise HTTPException(status_code=404, detail="Deployment not found")

if deployment.status != DeploymentStatus.RUNNING.value:
raise HTTPException(status_code=400, detail="Deployment is not running")

worker = deployment.worker
if not worker:
raise HTTPException(status_code=400, detail="Worker not found")

endpoint = f"http://{worker.host}:{deployment.port}/v1"
model_name = deployment.model.model_id if deployment.model else "default"

# Configure saturation detection
config = SaturationConfig(
enabled=True,
start_concurrency=request.start_concurrency,
max_concurrency=request.max_concurrency,
requests_per_level=request.requests_per_level,
use_exponential=request.use_exponential,
step_size=request.step_size,
step_multiplier=request.step_multiplier,
)

# Run saturation detection
detector = SaturationDetector(endpoint, model_name, config)
sat_result = await detector.detect()

# Convert to response
return SaturationDetectionResponse(
optimal_concurrency=sat_result.optimal_concurrency,
max_throughput_tps=sat_result.max_throughput_tps,
latency_at_optimal_ms=sat_result.latency_at_optimal_ms,
saturation_concurrency=sat_result.saturation_concurrency,
saturation_detected=sat_result.saturation_detected,
stop_reason=sat_result.stop_reason,
concurrency_results=[
ConcurrencyLevelResult(
concurrency=r.concurrency,
throughput_tps=r.throughput_tps,
avg_latency_ms=r.avg_latency_ms,
p95_latency_ms=r.p95_latency_ms,
success_rate=r.success_rate,
)
for r in sat_result.results_by_concurrency
],
)


# ============================================================================
# Knowledge Base Endpoints
# ============================================================================
Expand Down Expand Up @@ -558,7 +757,7 @@ async def agent_chat(
current_user: User = Depends(require_viewer),
):
"""Chat with the Auto-Tuning Agent"""
from app.services.tuning_agent import AGENT_SYSTEM_PROMPT, AgentToolExecutor, get_agent_tools
from app.services.tuning import AGENT_SYSTEM_PROMPT, AgentToolExecutor, get_agent_tools

config = request.config
provider = config.get("provider", "system")
Expand Down Expand Up @@ -706,34 +905,44 @@ async def run_auto_tuning(job_id: int, llm_config: dict | None = None):


async def _run_benchmark_test(deployment: Deployment, request: BenchmarkRequest) -> dict:
"""Run actual benchmark test on a deployment using HTTP requests"""
from app.services.tuning_agent import _run_http_benchmark
"""Run actual benchmark test on a deployment using the benchmark module"""
from app.services.benchmark import BenchmarkConfig, BenchmarkRunner, LoadPattern

# Get worker info
worker = deployment.worker
if not worker:
return {"error": "Worker not found"}

base_url = f"http://{worker.host}:{deployment.port}/v1"
endpoint = f"http://{worker.host}:{deployment.port}/v1"
model_name = deployment.model.model_id if deployment.model else "default"

result = await _run_http_benchmark(
base_url=base_url,
num_requests=max(10, request.concurrency * 5),
# Configure benchmark
config = BenchmarkConfig(
endpoint=endpoint,
model_name=model_name,
load_pattern=LoadPattern.FIXED,
concurrency=request.concurrency,
input_tokens=request.input_length,
num_requests=max(20, request.concurrency * 5),
warmup_requests=5,
prompt_tokens=request.input_length,
output_tokens=request.output_length,
stream=True,
)

if not result.get("success"):
return {"error": result.get("error", "Benchmark failed")}
# Run benchmark
runner = BenchmarkRunner(config)
result = await runner.run()

if result.error:
return {"error": result.error}

metrics = result.get("metrics", {})
metrics = result.metrics
return {
"throughput_tps": metrics.get("throughput_tps"),
"ttft_ms": metrics.get("avg_ttft_ms"),
"tpot_ms": metrics.get("avg_tpot_ms"),
"total_latency_ms": None, # Not directly measured
"throughput_tps": metrics.output_tps,
"ttft_ms": metrics.ttft.mean,
"tpot_ms": metrics.tpot.mean,
"total_latency_ms": metrics.e2e_latency.mean,
"gpu_utilization": None, # Would need GPU monitoring
"vram_usage_gb": None, # Would need GPU monitoring
"raw": result.get("summary"),
"raw": metrics.to_dict(),
}
Loading
Loading