Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up QEMU
uses: docker/setup-qemu-action@v3

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

Expand Down Expand Up @@ -79,7 +82,7 @@ jobs:
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
platforms: linux/amd64
platforms: linux/amd64,linux/arm64

# Notify on completion
notify:
Expand Down
8 changes: 5 additions & 3 deletions backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ WORKDIR /app

# Install docker CLI for local worker spawn feature
# Using Docker 27.x for API version 1.47 compatibility
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
&& curl -fsSL https://download.docker.com/linux/static/stable/x86_64/docker-27.4.1.tgz | tar xz --strip-components=1 -C /usr/local/bin docker/docker \
# Auto-detect architecture (x86_64 or aarch64)
RUN apt-get update && apt-get install -y --no-install-recommends curl \
&& ARCH=$(uname -m) \
&& if [ "$ARCH" = "aarch64" ] || [ "$ARCH" = "arm64" ]; then DOCKER_ARCH="aarch64"; else DOCKER_ARCH="x86_64"; fi \
&& curl -fsSL "https://download.docker.com/linux/static/stable/${DOCKER_ARCH}/docker-27.4.1.tgz" | tar xz --strip-components=1 -C /usr/local/bin docker/docker \
&& rm -rf /var/lib/apt/lists/*

# Copy installed packages from builder
Expand Down
5 changes: 4 additions & 1 deletion backend/app/api/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,9 @@ def _generate_docker_command(token: str, name: str, backend_url: str) -> str:

Command is single-line for cross-platform compatibility (Linux/Mac/Windows).
"""
from app.config import get_settings

settings = get_settings()
return (
f"docker run -d --name lmstack-worker --restart unless-stopped "
f"--network host --gpus all --privileged "
Expand All @@ -540,7 +543,7 @@ def _generate_docker_command(token: str, name: str, backend_url: str) -> str:
f"-e BACKEND_URL={backend_url} "
f"-e WORKER_NAME={name} "
f"-e REGISTRATION_TOKEN={token} "
f"infinirc/lmstack-worker:latest"
f"{settings.worker_image}"
)


Expand Down
1 change: 1 addition & 0 deletions backend/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class Settings(BaseSettings):
# Worker settings
worker_heartbeat_interval: int = 10 # seconds between status checks
worker_timeout: int = 30 # seconds to consider worker offline
worker_image: str = "infinirc/lmstack-worker:latest" # Docker image for local worker

# vLLM defaults
vllm_default_image: str = "vllm/vllm-openai:latest"
Expand Down
155 changes: 84 additions & 71 deletions backend/app/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,85 +37,98 @@ async def get_db() -> AsyncSession:
await session.close()


def _get_column_type_sql(column) -> str:
"""Convert SQLAlchemy column type to SQLite type string."""
from sqlalchemy import JSON, Boolean, DateTime, Float, Integer, String, Text

col_type = type(column.type)

if col_type == Integer or "Integer" in str(col_type):
return "INTEGER"
elif col_type == String or "String" in str(col_type):
length = getattr(column.type, "length", None)
return f"VARCHAR({length})" if length else "VARCHAR(255)"
elif col_type == Text or "Text" in str(col_type):
return "TEXT"
elif col_type == Boolean or "Boolean" in str(col_type):
return "BOOLEAN"
elif col_type == Float or "Float" in str(col_type):
return "FLOAT"
elif col_type == DateTime or "DateTime" in str(col_type):
return "DATETIME"
elif col_type == JSON or "JSON" in str(col_type):
return "JSON"
else:
# Default fallback
return "TEXT"


async def _run_migrations(conn):
"""Run schema migrations for new columns (SQLite compatible)."""
"""Auto-detect and add missing columns by comparing models with database schema."""
from sqlalchemy import text

async def column_exists(table_name: str, column_name: str) -> bool:
"""Check if a column exists in a table."""
result = await conn.execute(text(f"PRAGMA table_info({table_name})"))
columns = [row[1] for row in result.fetchall()]
return column_name in columns

# Migration: Add container_name to deployments (for Windows Docker compatibility)
if not await column_exists("deployments", "container_name"):
logger.info("Adding 'container_name' column to deployments table...")
await conn.execute(text("ALTER TABLE deployments ADD COLUMN container_name VARCHAR(255)"))
logger.info("'container_name' column added!")

# Migration: Add is_local to registration_tokens (for local worker detection)
if not await column_exists("registration_tokens", "is_local"):
logger.info("Adding 'is_local' column to registration_tokens table...")
await conn.execute(
text("ALTER TABLE registration_tokens ADD COLUMN is_local BOOLEAN DEFAULT 0")
)
logger.info("'is_local' column added!")

# Migration: Add conversation_type to conversations (for Agent chat support)
if not await column_exists("conversations", "conversation_type"):
logger.info("Adding 'conversation_type' column to conversations table...")
await conn.execute(
text(
"ALTER TABLE conversations ADD COLUMN conversation_type VARCHAR(20) DEFAULT 'chat' NOT NULL"
)
async def get_table_columns(table_name: str) -> set[str]:
"""Get all column names from a database table."""
try:
result = await conn.execute(text(f"PRAGMA table_info({table_name})"))
return {row[1] for row in result.fetchall()}
except Exception:
return set()

async def table_exists(table_name: str) -> bool:
"""Check if a table exists in the database."""
result = await conn.execute(
text("SELECT name FROM sqlite_master WHERE type='table' AND name=:name"),
{"name": table_name},
)
logger.info("'conversation_type' column added!")

# Migration: Add agent_config to conversations (for Agent configuration)
if not await column_exists("conversations", "agent_config"):
logger.info("Adding 'agent_config' column to conversations table...")
await conn.execute(text("ALTER TABLE conversations ADD COLUMN agent_config JSON"))
logger.info("'agent_config' column added!")

# Migration: Add tool_calls to messages (for Agent tool calls)
if not await column_exists("messages", "tool_calls"):
logger.info("Adding 'tool_calls' column to messages table...")
await conn.execute(text("ALTER TABLE messages ADD COLUMN tool_calls JSON"))
logger.info("'tool_calls' column added!")

# Migration: Add tool_call_id to messages (for Agent tool results)
if not await column_exists("messages", "tool_call_id"):
logger.info("Adding 'tool_call_id' column to messages table...")
await conn.execute(text("ALTER TABLE messages ADD COLUMN tool_call_id VARCHAR(100)"))
logger.info("'tool_call_id' column added!")

# Migration: Add step_type to messages (for Agent execution steps)
if not await column_exists("messages", "step_type"):
logger.info("Adding 'step_type' column to messages table...")
await conn.execute(text("ALTER TABLE messages ADD COLUMN step_type VARCHAR(50)"))
logger.info("'step_type' column added!")

# Migration: Add execution_time_ms to messages (for tool execution timing)
if not await column_exists("messages", "execution_time_ms"):
logger.info("Adding 'execution_time_ms' column to messages table...")
await conn.execute(text("ALTER TABLE messages ADD COLUMN execution_time_ms FLOAT"))
logger.info("'execution_time_ms' column added!")

# Migration: Add tuning_config to tuning_jobs (for multi-framework testing)
if not await column_exists("tuning_jobs", "tuning_config"):
logger.info("Adding 'tuning_config' column to tuning_jobs table...")
await conn.execute(text("ALTER TABLE tuning_jobs ADD COLUMN tuning_config JSON"))
logger.info("'tuning_config' column added!")

# Migration: Add conversation_id to tuning_jobs (for Agent Chat integration)
if not await column_exists("tuning_jobs", "conversation_id"):
logger.info("Adding 'conversation_id' column to tuning_jobs table...")
await conn.execute(text("ALTER TABLE tuning_jobs ADD COLUMN conversation_id INTEGER"))
logger.info("'conversation_id' column added!")
return result.fetchone() is not None

# Iterate through all tables defined in models
for table_name, table in Base.metadata.tables.items():
# Skip if table doesn't exist yet (will be created by create_all)
if not await table_exists(table_name):
continue

# Get existing columns in database
existing_columns = await get_table_columns(table_name)

# Check each column in the model
for column in table.columns:
if column.name not in existing_columns:
# Build ALTER TABLE statement
col_type = _get_column_type_sql(column)

# Handle default values
default_clause = ""
if column.default is not None:
default_val = column.default.arg
if callable(default_val):
default_val = default_val(None)
if isinstance(default_val, str):
default_clause = f" DEFAULT '{default_val}'"
elif isinstance(default_val, bool):
default_clause = f" DEFAULT {1 if default_val else 0}"
elif default_val is not None:
default_clause = f" DEFAULT {default_val}"

sql = (
f"ALTER TABLE {table_name} ADD COLUMN {column.name} {col_type}{default_clause}"
)

logger.info(f"Auto-migration: Adding '{column.name}' column to {table_name}...")
try:
await conn.execute(text(sql))
logger.info(f"Column '{column.name}' added to {table_name}!")
except Exception as e:
logger.warning(f"Failed to add column {column.name} to {table_name}: {e}")


async def init_db():
"""Initialize database tables and run migrations"""
# Import all models to register them with Base.metadata
# This ensures all tables are created by create_all()
import app.models # noqa: F401

try:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
Expand Down
67 changes: 67 additions & 0 deletions backend/app/services/deployment_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,69 @@ async def check_with_semaphore(deployment: Deployment):

return stats

def _is_native_deployment(self, deployment: Deployment) -> bool:
"""Check if this is a native Mac deployment (not Docker)."""
# Native deployments have container_id like "native-123"
if deployment.container_id and deployment.container_id.startswith("native-"):
return True

# Mac-only backends are always native
native_only_backends = {"mlx", "llama_cpp"}
if deployment.backend in native_only_backends:
return True

# For Ollama, check if worker is Mac
if deployment.backend == BackendType.OLLAMA.value:
if deployment.worker and deployment.worker.is_mac:
return True

return False

async def _check_native_deployment(self, deployment: Deployment) -> str:
"""Check a native Mac deployment's API health.

Native deployments run as processes, not Docker containers.
We can only check if the API endpoint is responding.
"""
try:
# For native deployments, if worker is offline, keep current status
# and let the health check loop retry later (worker may be reconnecting)
if deployment.worker.status != "online":
logger.info(
f"Native deployment {deployment.name}: worker offline, "
"keeping current status (may be reconnecting)"
)
# Don't change status - worker might be in the process of reconnecting
return "skipped"

# Check API health via worker
api_healthy = await self._check_api_health(
deployment.worker.address,
deployment.port,
deployment.backend,
None, # No container_name for native
)

if api_healthy:
if deployment.status != DeploymentStatus.RUNNING.value:
deployment.status = DeploymentStatus.RUNNING.value
deployment.status_message = "Model ready (native process verified)"
logger.info(f"Native deployment {deployment.name}: healthy")
return "running_verified"
else:
# Process might have died or not started yet
# Mark as STARTING instead of ERROR to allow retry
deployment.status = DeploymentStatus.STARTING.value
deployment.status_message = "Native process not responding. Waiting for recovery..."
logger.info(f"Native deployment {deployment.name}: API not responding, waiting...")
return "api_not_ready"

except Exception as e:
logger.error(f"Error checking native deployment {deployment.name}: {e}")
deployment.status = DeploymentStatus.STARTING.value
deployment.status_message = f"Checking status: {e}"
return "api_not_ready"

async def _check_and_update_deployment(self, deployment: Deployment, db) -> str:
"""Check a single deployment and update its status.

Expand All @@ -134,6 +197,10 @@ async def _check_and_update_deployment(self, deployment: Deployment, db) -> str:
logger.warning(f"Deployment {deployment.id} has no worker, skipping")
return "skipped"

# Check if this is a native deployment (Mac without Docker)
if self._is_native_deployment(deployment):
return await self._check_native_deployment(deployment)

if not deployment.container_id:
# If deployment is still starting, skip it
if deployment.status == DeploymentStatus.STARTING.value:
Expand Down
16 changes: 14 additions & 2 deletions backend/app/services/local_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,24 @@ def spawn_docker_worker(
backend_url: str,
registration_token: str,
container_name: str = "lmstack-worker",
worker_image: str | None = None,
) -> dict:
"""Spawn a Docker worker container on the local machine.

Args:
worker_name: Name for the worker
backend_url: URL of the backend server
registration_token: Token for worker registration
container_name: Name for the Docker container
worker_image: Docker image to use (defaults to settings.worker_image)

Returns:
dict with keys: success, message, container_id (if success)
"""
from app.config import get_settings

settings = get_settings()
image = worker_image or settings.worker_image
# On Mac, ensure Ollama is running with external access before starting Docker
if platform.system() == "Darwin":
logger.info("Mac detected, ensuring Ollama is running with external access...")
Expand Down Expand Up @@ -270,11 +282,11 @@ def spawn_docker_worker(
f"WORKER_NAME={worker_name}",
"-e",
f"REGISTRATION_TOKEN={registration_token}",
"infinirc/lmstack-worker:latest",
image,
]

try:
logger.info(f"Spawning Docker worker: {worker_name}")
logger.info(f"Spawning Docker worker: {worker_name} with image {image}")
result = subprocess.run(
cmd,
capture_output=True,
Expand Down
5 changes: 5 additions & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ dependencies = [
"httpx>=0.26.0",
"docker>=7.0.0",
"python-multipart>=0.0.6",
"python-jose[cryptography]>=3.3.0",
"email-validator>=2.0.0",
"psutil>=5.9.0",
"optuna>=3.5.0",
"openai>=1.0.0",
]

[project.optional-dependencies]
Expand Down
1 change: 1 addition & 0 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ python-jose[cryptography]>=3.3.0
email-validator>=2.0.0
psutil>=5.9.0
optuna>=3.5.0
openai>=1.0.0
Loading
Loading