From 04a9f721eebceb31d4b902d288c69dc71bfbb23c Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 18 Nov 2025 00:03:01 +0000 Subject: [PATCH] docs: add multi-controller architecture design and implementation guide Add comprehensive documentation for multi-controller architecture that enables StreamSpace to support multiple backend types (Kubernetes, Docker, ESXi, KVM, Proxmox, Hyper-V) while maintaining backward compatibility. Architecture changes: - gRPC-based controller interface contract - Controller registration and discovery system - API server as central orchestrator - Database schema changes for controller tracking - Session routing to appropriate controllers Implementation guide includes: - Week-by-week implementation plan - Protocol Buffer definitions - Code examples for API server and K8s controller - Database migration scripts - Testing strategy and rollback plan Phase 1 focuses on foundation with backward compatibility: - Existing Kubernetes controller continues to work - Database changes are additive - No breaking changes for existing deployments This design supports the strategic goal of enabling StreamSpace to manage sessions across diverse infrastructure types beyond Kubernetes. --- docs/MULTI_CONTROLLER_ARCHITECTURE.md | 1004 +++++++++++++++++++++ docs/MULTI_CONTROLLER_IMPLEMENTATION.md | 1058 +++++++++++++++++++++++ 2 files changed, 2062 insertions(+) create mode 100644 docs/MULTI_CONTROLLER_ARCHITECTURE.md create mode 100644 docs/MULTI_CONTROLLER_IMPLEMENTATION.md diff --git a/docs/MULTI_CONTROLLER_ARCHITECTURE.md b/docs/MULTI_CONTROLLER_ARCHITECTURE.md new file mode 100644 index 00000000..51aa2e85 --- /dev/null +++ b/docs/MULTI_CONTROLLER_ARCHITECTURE.md @@ -0,0 +1,1004 @@ +# Multi-Controller Architecture Design + +**Version**: 1.0 +**Date**: 2025-11-17 +**Status**: Proposed Design + +--- + +## Executive Summary + +This document outlines the architectural changes needed to evolve StreamSpace from a Kubernetes-only platform to a **multi-backend orchestration platform** that can manage sessions across different infrastructure types: + +- **Kubernetes** (existing) +- **Docker** (standalone containers) +- **VMware ESXi** (virtual machines) +- **KVM/QEMU** (Linux virtual machines) +- **Proxmox VE** (containers and VMs) +- **Hyper-V / App-V** (Windows containers and VMs) +- **Future**: Cloud VMs (EC2, Azure VMs, GCE) + +This design maintains **backward compatibility** with the existing Kubernetes controller while enabling extensibility for new controller types. + +--- + +## Table of Contents + +1. [Current Architecture](#current-architecture) +2. [Design Goals](#design-goals) +3. [Architecture Overview](#architecture-overview) +4. [Controller Interface Contract](#controller-interface-contract) +5. [Controller Registration System](#controller-registration-system) +6. [API Server Changes](#api-server-changes) +7. [Database Schema Changes](#database-schema-changes) +8. [Session Model Evolution](#session-model-evolution) +9. [Kubernetes Controller Refactoring](#kubernetes-controller-refactoring) +10. [Implementation Phases](#implementation-phases) +11. [Backward Compatibility](#backward-compatibility) +12. [Example: Docker Controller](#example-docker-controller) + +--- + +## Current Architecture + +### Component Overview + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Web UI (React) │ +└────────────────────────┬────────────────────────────────────┘ + │ REST/WebSocket + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ API Backend (Go) │ +│ - REST handlers │ +│ - Kubernetes client (k8s.Client) │ +│ - Creates Session CRDs │ +└────────────────────────┬────────────────────────────────────┘ + │ Kubernetes API + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ Kubernetes Controller (Kubebuilder) │ +│ - Watches Session CRDs │ +│ - Creates Deployments, Services, Ingress, PVCs │ +│ - Manages hibernation (scale to 0) │ +└────────────────────────┬────────────────────────────────────┘ + │ Creates + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ Kubernetes Resources │ +│ - Pods (containerized sessions) │ +│ - Services (networking) │ +│ - Ingress (external access) │ +│ - PVCs (persistent storage) │ +└─────────────────────────────────────────────────────────────┘ +``` + +### Limitations + +- **Kubernetes-only**: Can only run sessions on Kubernetes clusters +- **Tight coupling**: API server directly uses Kubernetes client +- **CRD-based**: Session state stored in Kubernetes CRDs (not in PostgreSQL) +- **No abstraction**: Session creation assumes Kubernetes primitives + +--- + +## Design Goals + +### Primary Goals + +1. **Multi-Backend Support**: Enable sessions on Docker, ESXi, KVM, Proxmox, Hyper-V +2. **Controller Abstraction**: Define a common interface for all controller types +3. **Backward Compatibility**: Existing Kubernetes deployments continue to work +4. **Centralized State**: API server becomes the source of truth (not Kubernetes) +5. **Controller Independence**: Controllers can be deployed separately, scaled independently + +### Non-Goals (Out of Scope) + +- Cross-controller session migration (Phase 1) +- Multi-controller load balancing (Phase 1) +- Automatic controller failover (Phase 1) + +--- + +## Architecture Overview + +### New Architecture + +``` +┌────────────────────────────────────────────────────────────────┐ +│ Web UI (React) │ +└───────────────────────────┬────────────────────────────────────┘ + │ REST/WebSocket + ↓ +┌────────────────────────────────────────────────────────────────┐ +│ API Backend (Go) │ +│ ┌──────────────────────────────────────────────────────────┐ │ +│ │ Controller Router │ │ +│ │ - Routes requests to appropriate controller │ │ +│ │ - Controller selection logic │ │ +│ │ - Health checks and discovery │ │ +│ └──────────────────────────────────────────────────────────┘ │ +│ │ +│ ┌──────────────────────────────────────────────────────────┐ │ +│ │ PostgreSQL Database │ │ +│ │ - Sessions table (source of truth) │ │ +│ │ - Controllers table (registration) │ │ +│ │ - Controller capabilities │ │ +│ └──────────────────────────────────────────────────────────┘ │ +└────────┬────────────┬────────────┬────────────┬───────────────┘ + │ │ │ │ + │ gRPC │ gRPC │ gRPC │ gRPC + ↓ ↓ ↓ ↓ +┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ +│ Kubernetes │ │ Docker │ │ ESXi │ │ KVM │ +│ Controller │ │ Controller │ │ Controller │ │ Controller │ +└──────┬─────┘ └──────┬─────┘ └──────┬─────┘ └──────┬─────┘ + │ │ │ │ + ↓ ↓ ↓ ↓ +┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ +│ K8s Pods │ │ Containers │ │ VMs │ │ VMs │ +└────────────┘ └────────────┘ └────────────┘ └────────────┘ +``` + +### Key Changes + +1. **API Server** becomes the orchestrator +2. **Controllers** are independent services (not just Kubernetes controllers) +3. **gRPC** protocol for API ↔ Controller communication +4. **PostgreSQL** stores session state (single source of truth) +5. **Controller registration** allows dynamic discovery + +--- + +## Controller Interface Contract + +### gRPC Service Definition + +All controllers must implement this gRPC service: + +```protobuf +// File: api/proto/controller.proto +syntax = "proto3"; +package streamspace.controller.v1; + +service ControllerService { + // Lifecycle operations + rpc CreateSession(CreateSessionRequest) returns (CreateSessionResponse); + rpc GetSession(GetSessionRequest) returns (GetSessionResponse); + rpc DeleteSession(DeleteSessionRequest) returns (DeleteSessionResponse); + + // State transitions + rpc HibernateSession(HibernateSessionRequest) returns (HibernateSessionResponse); + rpc WakeSession(WakeSessionRequest) returns (WakeSessionResponse); + + // Monitoring + rpc GetSessionStatus(GetSessionStatusRequest) returns (SessionStatusResponse); + rpc GetSessionMetrics(GetSessionMetricsRequest) returns (SessionMetricsResponse); + rpc GetSessionLogs(GetSessionLogsRequest) returns (stream LogEntry); + + // Controller metadata + rpc GetCapabilities(GetCapabilitiesRequest) returns (CapabilitiesResponse); + rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse); +} + +message CreateSessionRequest { + string session_id = 1; + string user_id = 2; + string template_id = 3; + + // Generic resource spec (controller interprets) + SessionSpec spec = 4; + + // Controller-specific config (JSON) + string controller_config = 5; +} + +message SessionSpec { + // Generic fields (all controllers support) + string base_image = 1; + ResourceRequirements resources = 2; + repeated Port ports = 3; + repeated EnvVar env = 4; + repeated VolumeMount volume_mounts = 5; + + // VNC/webapp config + VNCConfig vnc = 6; + WebAppConfig webapp = 7; + + // Storage + bool persistent_home = 8; + string home_size = 9; +} + +message ResourceRequirements { + string memory = 1; // e.g., "2Gi" + string cpu = 2; // e.g., "1000m" or "1" + string storage = 3; // e.g., "50Gi" +} + +message SessionStatusResponse { + string session_id = 1; + string phase = 2; // Pending, Running, Hibernated, Failed, Terminated + string url = 3; // Access URL + string node_id = 4; // Controller-specific identifier (pod name, container ID, VM ID) + google.protobuf.Timestamp last_activity = 5; + ResourceUsage resource_usage = 6; + repeated Condition conditions = 7; +} + +message CapabilitiesResponse { + string controller_type = 1; // "kubernetes", "docker", "esxi", etc. + string version = 2; + + // Features + bool supports_hibernation = 3; + bool supports_gpu = 4; + bool supports_persistent_storage = 5; + bool supports_networking = 6; + + // Capacity + ResourceCapacity capacity = 7; + + // Metadata + map labels = 8; +} +``` + +### Controller Responsibilities + +Each controller must: + +1. **Implement gRPC service**: All methods in `ControllerService` +2. **Register on startup**: Call API server registration endpoint +3. **Heartbeat**: Send periodic health checks (every 30s) +4. **State sync**: Report session status back to API server +5. **Resource management**: Allocate/deallocate resources on backend +6. **Lifecycle management**: Handle creation, hibernation, wake, deletion + +--- + +## Controller Registration System + +### Registration Flow + +``` +┌─────────────────┐ ┌─────────────────┐ +│ Controller │ │ API Server │ +│ (any type) │ │ │ +└────────┬────────┘ └────────┬────────┘ + │ │ + │ 1. Start controller │ + │ (read config) │ + │ │ + │ 2. POST /api/v1/controllers/register│ + │ { │ + │ "type": "docker", │ + │ "endpoint": "grpc://...:50051", │ + │ "capabilities": {...} │ + │ } │ + │─────────────────────────────────────>│ + │ │ + │ 3. Registration OK │ + │ (controller_id, token) │ + │<─────────────────────────────────────│ + │ │ + │ 4. Periodic heartbeat (30s) │ + │ POST /api/v1/controllers/:id/heartbeat + │─────────────────────────────────────>│ + │ │ + │ 5. gRPC HealthCheck() │ + │<─────────────────────────────────────│ + │ │ + │ 6. Health OK │ + │─────────────────────────────────────>│ + │ │ +``` + +### API Endpoints + +**Register Controller**: +``` +POST /api/v1/controllers/register +{ + "type": "docker", + "name": "docker-controller-1", + "endpoint": "grpc://docker-controller.local:50051", + "capabilities": { + "supports_hibernation": true, + "supports_gpu": false, + "capacity": { + "max_sessions": 50, + "memory": "128Gi", + "cpu": "32" + } + }, + "labels": { + "region": "us-west-2", + "environment": "production" + } +} + +Response: +{ + "controller_id": "ctrl-abc123", + "auth_token": "jwt-token-for-controller", + "api_version": "v1alpha1" +} +``` + +**Heartbeat**: +``` +POST /api/v1/controllers/:id/heartbeat +Authorization: Bearer +{ + "status": "healthy", + "current_sessions": 15, + "resource_usage": { + "memory": "32Gi", + "cpu": "8" + } +} +``` + +**List Controllers** (Admin): +``` +GET /api/v1/admin/controllers +Response: +[ + { + "id": "ctrl-abc123", + "type": "kubernetes", + "name": "k8s-controller-1", + "status": "healthy", + "sessions": 25, + "last_heartbeat": "2025-11-17T10:00:00Z" + }, + { + "id": "ctrl-def456", + "type": "docker", + "name": "docker-controller-1", + "status": "healthy", + "sessions": 10, + "last_heartbeat": "2025-11-17T10:00:05Z" + } +] +``` + +--- + +## API Server Changes + +### 1. Controller Manager + +New component in API server: + +```go +// File: api/internal/controller/manager.go +package controller + +import ( + "context" + "sync" + "time" + + pb "github.com/streamspace/streamspace/api/proto" + "google.golang.org/grpc" +) + +// ControllerManager manages connections to all registered controllers +type ControllerManager struct { + mu sync.RWMutex + controllers map[string]*ControllerClient // controller_id -> client + db *db.Database +} + +// ControllerClient wraps a gRPC connection to a controller +type ControllerClient struct { + ID string + Type string // "kubernetes", "docker", etc. + Name string + Endpoint string + Conn *grpc.ClientConn + Client pb.ControllerServiceClient + Capabilities *pb.CapabilitiesResponse + Status string // "healthy", "unhealthy", "offline" + LastHeartbeat time.Time + SessionCount int +} + +// RegisterController adds a new controller +func (m *ControllerManager) RegisterController(ctx context.Context, req *RegisterRequest) (*RegisterResponse, error) { + // 1. Validate request + // 2. Connect to controller gRPC endpoint + // 3. Call GetCapabilities() + // 4. Store in database + // 5. Add to in-memory map + // 6. Generate auth token + // 7. Start health check goroutine +} + +// GetController returns a controller client by ID +func (m *ControllerManager) GetController(controllerID string) (*ControllerClient, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + client, ok := m.controllers[controllerID] + if !ok { + return nil, ErrControllerNotFound + } + return client, nil +} + +// SelectController chooses a controller for a new session +func (m *ControllerManager) SelectController(ctx context.Context, req *SessionCreateRequest) (*ControllerClient, error) { + // Selection strategies: + // 1. Explicit: User/admin specifies controller_id or controller_type + // 2. Template-based: Template has preferred controller type + // 3. Load balancing: Round-robin or least-loaded + // 4. Capability-based: GPU requirements, storage type, etc. + // 5. Label-based: Region, environment, etc. +} +``` + +### 2. Session Handler Refactoring + +```go +// File: api/internal/handlers/sessions.go + +// CreateSession now routes to appropriate controller +func (h *SessionHandler) CreateSession(c *gin.Context) { + var req CreateSessionRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(400, gin.H{"error": err.Error()}) + return + } + + // 1. Select controller + controller, err := h.controllerManager.SelectController(c.Request.Context(), &req) + if err != nil { + c.JSON(500, gin.H{"error": "No available controller"}) + return + } + + // 2. Create session in database (pending state) + session := &db.Session{ + ID: generateSessionID(), + UserID: req.UserID, + TemplateID: req.TemplateID, + ControllerID: controller.ID, // NEW FIELD + State: "pending", + CreatedAt: time.Now(), + } + if err := h.db.CreateSession(session); err != nil { + c.JSON(500, gin.H{"error": err.Error()}) + return + } + + // 3. Call controller via gRPC + grpcReq := &pb.CreateSessionRequest{ + SessionId: session.ID, + UserId: req.UserID, + TemplateId: req.TemplateID, + Spec: convertToProtoSpec(req.Spec), + } + + grpcResp, err := controller.Client.CreateSession(c.Request.Context(), grpcReq) + if err != nil { + // Update session state to failed + h.db.UpdateSessionState(session.ID, "failed") + c.JSON(500, gin.H{"error": err.Error()}) + return + } + + // 4. Update session with controller response + session.NodeID = grpcResp.NodeId // Pod name, container ID, VM ID, etc. + session.State = grpcResp.Phase + session.URL = grpcResp.Url + h.db.UpdateSession(session) + + c.JSON(201, session) +} +``` + +### 3. Status Sync Worker + +```go +// File: api/internal/controller/status_sync.go + +// StatusSyncWorker periodically syncs session status from controllers +func (m *ControllerManager) StartStatusSyncWorker(ctx context.Context) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + m.syncAllSessionStatuses(ctx) + } + } +} + +func (m *ControllerManager) syncAllSessionStatuses(ctx context.Context) { + // 1. Get all active sessions from database + sessions := m.db.GetActiveSessions() + + // 2. Group by controller + sessionsByController := make(map[string][]string) + for _, session := range sessions { + sessionsByController[session.ControllerID] = append( + sessionsByController[session.ControllerID], + session.ID, + ) + } + + // 3. Call each controller for status updates + for controllerID, sessionIDs := range sessionsByController { + controller, err := m.GetController(controllerID) + if err != nil { + continue + } + + for _, sessionID := range sessionIDs { + status, err := controller.Client.GetSessionStatus(ctx, &pb.GetSessionStatusRequest{ + SessionId: sessionID, + }) + if err != nil { + // Mark session as unhealthy + continue + } + + // Update database + m.db.UpdateSessionStatus(sessionID, status) + } + } +} +``` + +--- + +## Database Schema Changes + +### New Tables + +```sql +-- Controllers table +CREATE TABLE controllers ( + id VARCHAR(255) PRIMARY KEY, + type VARCHAR(50) NOT NULL, -- "kubernetes", "docker", "esxi", etc. + name VARCHAR(255) NOT NULL, + endpoint TEXT NOT NULL, -- gRPC endpoint (grpc://host:port) + status VARCHAR(50) NOT NULL DEFAULT 'offline', -- "healthy", "unhealthy", "offline" + capabilities JSONB, -- Full capabilities JSON + labels JSONB, -- Custom labels for selection + current_sessions INTEGER DEFAULT 0, + last_heartbeat TIMESTAMP, + created_at TIMESTAMP DEFAULT NOW(), + updated_at TIMESTAMP DEFAULT NOW() +); + +CREATE INDEX idx_controllers_type ON controllers(type); +CREATE INDEX idx_controllers_status ON controllers(status); + +-- Controller authentication tokens +CREATE TABLE controller_tokens ( + controller_id VARCHAR(255) REFERENCES controllers(id) ON DELETE CASCADE, + token_hash VARCHAR(255) NOT NULL, + expires_at TIMESTAMP, + created_at TIMESTAMP DEFAULT NOW(), + PRIMARY KEY (controller_id) +); +``` + +### Modified Tables + +```sql +-- Add controller_id and node_id to sessions table +ALTER TABLE sessions ADD COLUMN controller_id VARCHAR(255) REFERENCES controllers(id); +ALTER TABLE sessions ADD COLUMN controller_type VARCHAR(50); -- Denormalized for performance +ALTER TABLE sessions ADD COLUMN node_id VARCHAR(255); -- Pod name, container ID, VM ID, etc. + +CREATE INDEX idx_sessions_controller ON sessions(controller_id); +CREATE INDEX idx_sessions_controller_type ON sessions(controller_type); +``` + +--- + +## Session Model Evolution + +### Generic Session Spec + +Sessions become infrastructure-agnostic: + +```go +// File: api/internal/models/session.go + +type Session struct { + ID string `json:"id"` + UserID string `json:"user_id"` + TemplateID string `json:"template_id"` + + // Controller information + ControllerID string `json:"controller_id"` + ControllerType string `json:"controller_type"` // "kubernetes", "docker", etc. + NodeID string `json:"node_id"` // Backend-specific ID + + // Generic fields + State string `json:"state"` // "pending", "running", "hibernated", "terminated" + Phase string `json:"phase"` // "Pending", "Running", "Hibernated", "Failed", "Terminated" + URL string `json:"url"` + + // Generic spec (all controllers support) + Spec SessionSpec `json:"spec"` + + // Controller-specific config (optional) + ControllerConfig map[string]interface{} `json:"controller_config,omitempty"` + + // Status + Status SessionStatus `json:"status"` + + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +type SessionSpec struct { + BaseImage string `json:"base_image"` + Resources ResourceRequirements `json:"resources"` + Ports []Port `json:"ports"` + Env []EnvVar `json:"env"` + VolumeMounts []VolumeMount `json:"volume_mounts"` + VNC *VNCConfig `json:"vnc,omitempty"` + WebApp *WebAppConfig `json:"webapp,omitempty"` + PersistentHome bool `json:"persistent_home"` + HomeSize string `json:"home_size"` + IdleTimeout string `json:"idle_timeout"` +} +``` + +--- + +## Kubernetes Controller Refactoring + +### Changes Required + +1. **Rename**: `controller/` → `controller-kubernetes/` (or keep as is) +2. **Add gRPC server**: Implement `ControllerService` interface +3. **Remove CRD dependency** (optional): Use database as source of truth +4. **Registration**: Call API server on startup to register +5. **Heartbeat**: Send periodic health checks + +### Option A: Keep CRDs (Recommended for Phase 1) + +- Maintain existing CRD-based architecture +- Add gRPC server layer on top +- API server creates Sessions via gRPC, controller creates CRDs +- Minimal changes to existing reconciliation logic + +``` +API Server ──gRPC──> K8s Controller ──CRD──> Kubernetes API ──> Pods +``` + +### Option B: Remove CRDs (Future) + +- Controller talks directly to Kubernetes API (no CRDs) +- API server database is source of truth +- More flexibility, but larger refactoring + +``` +API Server ──gRPC──> K8s Controller ──Deployments/Services──> Kubernetes API ──> Pods +``` + +**Recommendation**: Start with Option A, migrate to Option B later. + +--- + +## Implementation Phases + +### Phase 1: Foundation (Current Priority) + +**Goal**: Prepare architecture for multi-controller support without breaking existing functionality. + +**Tasks**: + +1. ✅ **Design**: Create this document +2. **Database**: + - Add `controllers` and `controller_tokens` tables + - Add `controller_id`, `controller_type`, `node_id` to `sessions` table + - Migration scripts +3. **API Server**: + - Implement `ControllerManager` + - Add controller registration endpoints + - Add heartbeat handling +4. **Proto Definitions**: + - Define gRPC service in `api/proto/controller.proto` + - Generate Go and Python clients +5. **Kubernetes Controller**: + - Add gRPC server + - Implement `ControllerService` interface + - Add registration logic on startup + - Keep existing CRD reconciliation (no changes) +6. **Backward Compatibility**: + - Default to Kubernetes controller if `controller_id` not specified + - Auto-register Kubernetes controller on API server startup +7. **Testing**: + - Integration tests for controller registration + - Session creation via gRPC + - Status sync worker + +**Deliverables**: +- Working multi-controller infrastructure +- Kubernetes controller as first gRPC-based controller +- No breaking changes for existing users + +### Phase 2: Docker Controller (Next) + +**Goal**: Implement first alternative controller type. + +**Tasks**: + +1. **Docker Controller**: + - New Go service: `controller-docker/` + - Implements `ControllerService` gRPC + - Uses Docker API to manage containers + - Hibernation = `docker stop`, wake = `docker start` + - Persistent storage via Docker volumes +2. **Templates**: + - Mark templates as compatible with multiple controller types + - Add `compatible_controllers: ["kubernetes", "docker"]` field +3. **UI**: + - Show controller type in session list + - Allow controller selection when creating session (optional) +4. **Documentation**: + - Docker controller setup guide + - Template compatibility guide + +### Phase 3: ESXi/KVM Controllers (Future) + +**Goal**: Add VM-based controller types. + +**Tasks**: +- ESXi controller using vSphere API +- KVM controller using libvirt +- Proxmox controller using Proxmox VE API +- UI enhancements for controller selection +- Load balancing across controllers + +### Phase 4: Advanced Features (Future) + +**Goal**: Cross-controller capabilities. + +**Tasks**: +- Session migration between controllers (same type) +- Multi-controller load balancing +- Controller auto-scaling +- Automatic failover + +--- + +## Backward Compatibility + +### Compatibility Guarantees + +1. **Existing Deployments**: Continue to work without changes +2. **Kubernetes Controller**: Remains default if no controller specified +3. **CRDs**: Still supported (Option A approach) +4. **API Endpoints**: Existing endpoints unchanged +5. **Database**: Migrations are backward-compatible + +### Migration Path for Existing Users + +```bash +# 1. Upgrade API server (includes new tables) +kubectl apply -f manifests/config/api-deployment.yaml + +# 2. Run database migration +kubectl exec -it api-pod -- /app/migrate + +# 3. Upgrade Kubernetes controller (adds gRPC server) +kubectl apply -f manifests/config/controller-deployment.yaml + +# 4. Verify auto-registration +curl http://api-server/api/v1/admin/controllers +# Should show Kubernetes controller registered + +# 5. Existing sessions continue to work +kubectl get sessions +# All existing sessions now have controller_id populated +``` + +### Deprecation Policy + +- **CRDs**: Not deprecated in Phase 1-2, may be optional in Phase 3 +- **Direct Kubernetes API access**: No changes, still supported +- **API fields**: `controller_id` is optional (defaults to Kubernetes) + +--- + +## Example: Docker Controller + +### High-Level Architecture + +``` +┌──────────────────────────────────────────────────────────────┐ +│ Docker Controller (Go) │ +│ ┌────────────────────────────────────────────────────────┐ │ +│ │ gRPC Server (:50052) │ │ +│ │ - ControllerService implementation │ │ +│ └────────────────────────────────────────────────────────┘ │ +│ │ +│ ┌────────────────────────────────────────────────────────┐ │ +│ │ Docker API Client │ │ +│ │ - Container lifecycle (create, start, stop, remove) │ │ +│ │ - Volume management │ │ +│ │ - Network configuration │ │ +│ └────────────────────────────────────────────────────────┘ │ +│ │ +│ ┌────────────────────────────────────────────────────────┐ │ +│ │ Session Manager │ │ +│ │ - Tracks active containers │ │ +│ │ - Status reporting │ │ +│ │ - Resource monitoring │ │ +│ └────────────────────────────────────────────────────────┘ │ +└───────────────────────────┬──────────────────────────────────┘ + │ Docker API + ↓ +┌────────────────────────────────────────────────────────────────┐ +│ Docker Engine │ +│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ +│ │ Container │ │ Container │ │ Container │ │ +│ │ (Firefox) │ │ (VS Code) │ │ (Blender) │ │ +│ │ │ │ │ │ │ │ +│ │ Volume: │ │ Volume: │ │ Volume: │ │ +│ │ user1-home │ │ user2-home │ │ user1-home │ │ +│ └──────────────┘ └──────────────┘ └──────────────┘ │ +└────────────────────────────────────────────────────────────────┘ +``` + +### Implementation Sketch + +```go +// File: controller-docker/cmd/main.go +package main + +import ( + "context" + "fmt" + "net" + + "github.com/docker/docker/client" + pb "github.com/streamspace/streamspace/api/proto" + "google.golang.org/grpc" +) + +type DockerController struct { + pb.UnimplementedControllerServiceServer + dockerClient *client.Client + apiServerURL string +} + +func (c *DockerController) CreateSession(ctx context.Context, req *pb.CreateSessionRequest) (*pb.CreateSessionResponse, error) { + // 1. Parse session spec + spec := req.Spec + + // 2. Create Docker container + containerName := fmt.Sprintf("ss-%s", req.SessionId) + + containerConfig := &container.Config{ + Image: spec.BaseImage, + Env: convertEnvVars(spec.Env), + Labels: map[string]string{ + "streamspace.session_id": req.SessionId, + "streamspace.user_id": req.UserId, + }, + } + + hostConfig := &container.HostConfig{ + Resources: container.Resources{ + Memory: parseMemory(spec.Resources.Memory), + CPUCount: parseCPU(spec.Resources.Cpu), + }, + Binds: []string{ + fmt.Sprintf("user-%s:/config", req.UserId), + }, + } + + resp, err := c.dockerClient.ContainerCreate(ctx, containerConfig, hostConfig, nil, nil, containerName) + if err != nil { + return nil, err + } + + // 3. Start container + if err := c.dockerClient.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil { + return nil, err + } + + // 4. Get container IP for URL + inspect, _ := c.dockerClient.ContainerInspect(ctx, resp.ID) + url := fmt.Sprintf("http://%s:%d", inspect.NetworkSettings.IPAddress, spec.Vnc.Port) + + return &pb.CreateSessionResponse{ + SessionId: req.SessionId, + NodeId: resp.ID, // Docker container ID + Phase: "Running", + Url: url, + }, nil +} + +func (c *DockerController) HibernateSession(ctx context.Context, req *pb.HibernateSessionRequest) (*pb.HibernateSessionResponse, error) { + // Stop container (don't remove) + timeout := 30 + if err := c.dockerClient.ContainerStop(ctx, req.NodeId, &timeout); err != nil { + return nil, err + } + + return &pb.HibernateSessionResponse{ + SessionId: req.SessionId, + Phase: "Hibernated", + }, nil +} + +func (c *DockerController) WakeSession(ctx context.Context, req *pb.WakeSessionRequest) (*pb.WakeSessionResponse, error) { + // Start stopped container + if err := c.dockerClient.ContainerStart(ctx, req.NodeId, types.ContainerStartOptions{}); err != nil { + return nil, err + } + + return &pb.WakeSessionResponse{ + SessionId: req.SessionId, + Phase: "Running", + }, nil +} + +func main() { + // 1. Create Docker client + dockerClient, _ := client.NewClientWithOpts(client.FromEnv) + + // 2. Create controller + controller := &DockerController{ + dockerClient: dockerClient, + apiServerURL: os.Getenv("API_SERVER_URL"), + } + + // 3. Start gRPC server + lis, _ := net.Listen("tcp", ":50052") + grpcServer := grpc.NewServer() + pb.RegisterControllerServiceServer(grpcServer, controller) + + // 4. Register with API server + registerWithAPIServer(controller) + + // 5. Start heartbeat + go startHeartbeat(controller) + + // 6. Serve + grpcServer.Serve(lis) +} +``` + +--- + +## Conclusion + +This multi-controller architecture provides: + +1. **Flexibility**: Support for Kubernetes, Docker, ESXi, KVM, Proxmox, Hyper-V +2. **Backward Compatibility**: Existing Kubernetes deployments continue to work +3. **Scalability**: Controllers can be deployed and scaled independently +4. **Extensibility**: Easy to add new controller types via gRPC interface +5. **Centralized Management**: API server as single source of truth + +### Next Steps + +1. **Review & Approve**: Team review of this design +2. **Implement Phase 1**: Database changes, ControllerManager, gRPC definitions +3. **Refactor K8s Controller**: Add gRPC server, keep CRD reconciliation +4. **Test**: End-to-end testing with Kubernetes controller via gRPC +5. **Docker Controller**: Implement as proof of concept for multi-backend +6. **Documentation**: Update deployment guides, API docs + +--- + +**Authors**: StreamSpace Team +**Last Updated**: 2025-11-17 diff --git a/docs/MULTI_CONTROLLER_IMPLEMENTATION.md b/docs/MULTI_CONTROLLER_IMPLEMENTATION.md new file mode 100644 index 00000000..8688487c --- /dev/null +++ b/docs/MULTI_CONTROLLER_IMPLEMENTATION.md @@ -0,0 +1,1058 @@ +# Multi-Controller Implementation Guide + +**Date**: 2025-11-17 +**Status**: Implementation Roadmap +**Related**: See `MULTI_CONTROLLER_ARCHITECTURE.md` for full design + +--- + +## Quick Summary + +This document provides a practical, step-by-step implementation guide for adding multi-controller support to StreamSpace. Changes are designed to be **backward compatible** and **incremental**. + +--- + +## Phase 1: Core Infrastructure (Weeks 1-3) + +### Week 1: Database & Proto Definitions + +#### 1.1 Database Schema Changes + +**Location**: `api/internal/db/database.go` + +**Add new tables**: + +```sql +-- controllers table +CREATE TABLE IF NOT EXISTS controllers ( + id VARCHAR(255) PRIMARY KEY, + type VARCHAR(50) NOT NULL, -- "kubernetes", "docker", "esxi", "kvm", etc. + name VARCHAR(255) NOT NULL, + endpoint TEXT NOT NULL, -- gRPC endpoint: "grpc://host:port" + status VARCHAR(50) NOT NULL DEFAULT 'offline', -- "healthy", "unhealthy", "offline" + capabilities JSONB, -- Full capabilities: {supports_hibernation, supports_gpu, ...} + labels JSONB, -- Custom labels: {region: "us-west", env: "prod", ...} + current_sessions INTEGER DEFAULT 0, + capacity JSONB, -- {max_sessions, memory, cpu, storage} + last_heartbeat TIMESTAMP, + registered_at TIMESTAMP DEFAULT NOW(), + updated_at TIMESTAMP DEFAULT NOW() +); + +CREATE INDEX idx_controllers_type ON controllers(type); +CREATE INDEX idx_controllers_status ON controllers(status); + +-- controller_tokens table +CREATE TABLE IF NOT EXISTS controller_tokens ( + controller_id VARCHAR(255) PRIMARY KEY REFERENCES controllers(id) ON DELETE CASCADE, + token_hash VARCHAR(255) NOT NULL, + expires_at TIMESTAMP, + created_at TIMESTAMP DEFAULT NOW() +); + +-- Modify sessions table +ALTER TABLE sessions ADD COLUMN controller_id VARCHAR(255) REFERENCES controllers(id); +ALTER TABLE sessions ADD COLUMN controller_type VARCHAR(50); -- Denormalized for queries +ALTER TABLE sessions ADD COLUMN node_id VARCHAR(255); -- Backend-specific ID (pod name, container ID, etc.) + +CREATE INDEX idx_sessions_controller_id ON sessions(controller_id); +CREATE INDEX idx_sessions_controller_type ON sessions(controller_type); +CREATE INDEX idx_sessions_node_id ON sessions(node_id); +``` + +**Migration Script**: Create `api/migrations/007_multi_controller.sql` + +**Backward compatibility**: Existing sessions will have `controller_id = NULL`, which will be handled as "default Kubernetes controller" in the code. + +#### 1.2 Protocol Buffers Definitions + +**Location**: Create new directory `api/proto/` + +**File**: `api/proto/controller.proto` + +```protobuf +syntax = "proto3"; +package streamspace.controller.v1; +option go_package = "github.com/streamspace/streamspace/api/proto/v1"; + +import "google/protobuf/timestamp.proto"; + +// ControllerService defines the interface all controllers must implement +service ControllerService { + // Lifecycle operations + rpc CreateSession(CreateSessionRequest) returns (CreateSessionResponse); + rpc DeleteSession(DeleteSessionRequest) returns (DeleteSessionResponse); + + // State transitions + rpc HibernateSession(HibernateSessionRequest) returns (HibernateSessionResponse); + rpc WakeSession(WakeSessionRequest) returns (WakeSessionResponse); + + // Monitoring + rpc GetSessionStatus(GetSessionStatusRequest) returns (SessionStatusResponse); + rpc GetSessionMetrics(GetSessionMetricsRequest) returns (SessionMetricsResponse); + rpc StreamLogs(StreamLogsRequest) returns (stream LogEntry); + + // Controller metadata + rpc GetCapabilities(GetCapabilitiesRequest) returns (CapabilitiesResponse); + rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse); +} + +// CreateSession messages +message CreateSessionRequest { + string session_id = 1; + string user_id = 2; + string template_id = 3; + SessionSpec spec = 4; + map controller_config = 5; // Controller-specific config +} + +message SessionSpec { + string base_image = 1; + ResourceRequirements resources = 2; + repeated Port ports = 3; + repeated EnvVar env = 4; + repeated VolumeMount volume_mounts = 5; + VNCConfig vnc = 6; + bool persistent_home = 7; + string home_size = 8; + string idle_timeout = 9; +} + +message ResourceRequirements { + string memory = 1; // e.g., "2Gi" + string cpu = 2; // e.g., "1000m" + string storage = 3; // e.g., "50Gi" +} + +message Port { + string name = 1; + int32 container_port = 2; + string protocol = 3; +} + +message EnvVar { + string name = 1; + string value = 2; +} + +message VolumeMount { + string name = 1; + string mount_path = 2; +} + +message VNCConfig { + bool enabled = 1; + int32 port = 2; + string protocol = 3; +} + +message CreateSessionResponse { + string session_id = 1; + string node_id = 2; // Backend-specific ID + string phase = 3; // "Pending", "Running", etc. + string url = 4; // Access URL + string message = 5; // Optional status message +} + +// Hibernate/Wake messages +message HibernateSessionRequest { + string session_id = 1; + string node_id = 2; +} + +message HibernateSessionResponse { + string session_id = 1; + string phase = 2; + string message = 3; +} + +message WakeSessionRequest { + string session_id = 1; + string node_id = 2; +} + +message WakeSessionResponse { + string session_id = 1; + string phase = 2; + string url = 3; + string message = 4; +} + +// Delete session +message DeleteSessionRequest { + string session_id = 1; + string node_id = 2; +} + +message DeleteSessionResponse { + string session_id = 1; + bool success = 2; + string message = 3; +} + +// Status messages +message GetSessionStatusRequest { + string session_id = 1; + string node_id = 2; +} + +message SessionStatusResponse { + string session_id = 1; + string phase = 2; + string url = 3; + string node_id = 4; + google.protobuf.Timestamp last_activity = 5; + ResourceUsage resource_usage = 6; + repeated Condition conditions = 7; +} + +message ResourceUsage { + string memory = 1; + string cpu = 2; + float memory_percent = 3; + float cpu_percent = 4; +} + +message Condition { + string type = 1; + string status = 2; + string reason = 3; + string message = 4; + google.protobuf.Timestamp last_transition_time = 5; +} + +// Metrics +message GetSessionMetricsRequest { + string session_id = 1; + string node_id = 2; +} + +message SessionMetricsResponse { + string session_id = 1; + ResourceUsage current_usage = 2; + map custom_metrics = 3; +} + +// Logs +message StreamLogsRequest { + string session_id = 1; + string node_id = 2; + bool follow = 3; + int32 tail_lines = 4; +} + +message LogEntry { + google.protobuf.Timestamp timestamp = 1; + string message = 2; + string stream = 3; // "stdout" or "stderr" +} + +// Capabilities +message GetCapabilitiesRequest {} + +message CapabilitiesResponse { + string controller_type = 1; + string version = 2; + bool supports_hibernation = 3; + bool supports_gpu = 4; + bool supports_persistent_storage = 5; + bool supports_networking = 6; + ResourceCapacity capacity = 7; + map labels = 8; +} + +message ResourceCapacity { + int32 max_sessions = 1; + string total_memory = 2; + string total_cpu = 3; + string total_storage = 4; +} + +// Health check +message HealthCheckRequest {} + +message HealthCheckResponse { + string status = 1; // "healthy", "unhealthy" + string message = 2; + int32 current_sessions = 3; + ResourceUsage resource_usage = 4; +} +``` + +**Generate Go code**: +```bash +# Install protoc and plugins +go install google.golang.org/protobuf/cmd/protoc-gen-go@latest +go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest + +# Generate +protoc --go_out=. --go_opt=paths=source_relative \ + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + api/proto/controller.proto +``` + +### Week 2: API Server Changes + +#### 2.1 Controller Manager + +**Location**: Create `api/internal/controller/manager.go` + +```go +package controller + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "sync" + "time" + + "github.com/streamspace/streamspace/api/internal/db" + pb "github.com/streamspace/streamspace/api/proto/v1" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// ControllerManager manages all registered controllers +type ControllerManager struct { + mu sync.RWMutex + controllers map[string]*ControllerClient // controller_id -> client + db *db.Database + defaultKubernetesControllerID string // For backward compatibility +} + +// ControllerClient wraps a gRPC connection to a controller +type ControllerClient struct { + ID string + Type string + Name string + Endpoint string + Conn *grpc.ClientConn + Client pb.ControllerServiceClient + Capabilities *pb.CapabilitiesResponse + Status string + LastHeartbeat time.Time + SessionCount int +} + +// NewControllerManager creates a new controller manager +func NewControllerManager(database *db.Database) *ControllerManager { + return &ControllerManager{ + controllers: make(map[string]*ControllerClient), + db: database, + } +} + +// Initialize loads existing controllers from database +func (m *ControllerManager) Initialize(ctx context.Context) error { + m.mu.Lock() + defer m.mu.Unlock() + + // Load all controllers from database + controllers, err := m.db.ListControllers(ctx) + if err != nil { + return err + } + + // Connect to each controller + for _, ctrl := range controllers { + if err := m.connectToController(ctx, ctrl); err != nil { + // Log error but continue + fmt.Printf("Failed to connect to controller %s: %v\n", ctrl.ID, err) + continue + } + } + + return nil +} + +// RegisterController registers a new controller +func (m *ControllerManager) RegisterController(ctx context.Context, req *RegisterControllerRequest) (*RegisterControllerResponse, error) { + m.mu.Lock() + defer m.mu.Unlock() + + // Generate controller ID + controllerID := generateControllerID(req.Type, req.Name) + + // Connect to controller gRPC endpoint + conn, err := grpc.Dial(req.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, fmt.Errorf("failed to connect to controller: %w", err) + } + + client := pb.NewControllerServiceClient(conn) + + // Get capabilities + capResp, err := client.GetCapabilities(ctx, &pb.GetCapabilitiesRequest{}) + if err != nil { + conn.Close() + return nil, fmt.Errorf("failed to get capabilities: %w", err) + } + + // Generate auth token + token := generateAuthToken() + tokenHash := hashToken(token) + + // Store in database + controller := &db.Controller{ + ID: controllerID, + Type: req.Type, + Name: req.Name, + Endpoint: req.Endpoint, + Status: "healthy", + Capabilities: capResp, + Labels: req.Labels, + RegisteredAt: time.Now(), + } + + if err := m.db.CreateController(ctx, controller); err != nil { + conn.Close() + return nil, err + } + + if err := m.db.CreateControllerToken(ctx, controllerID, tokenHash); err != nil { + conn.Close() + return nil, err + } + + // Add to in-memory map + m.controllers[controllerID] = &ControllerClient{ + ID: controllerID, + Type: req.Type, + Name: req.Name, + Endpoint: req.Endpoint, + Conn: conn, + Client: client, + Capabilities: capResp, + Status: "healthy", + LastHeartbeat: time.Now(), + } + + // Start health check for this controller + go m.healthCheckLoop(controllerID) + + return &RegisterControllerResponse{ + ControllerID: controllerID, + AuthToken: token, + APIVersion: "v1alpha1", + }, nil +} + +// GetController returns a controller by ID +func (m *ControllerManager) GetController(controllerID string) (*ControllerClient, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + ctrl, ok := m.controllers[controllerID] + if !ok { + return nil, fmt.Errorf("controller not found: %s", controllerID) + } + + return ctrl, nil +} + +// SelectController chooses a controller for a new session +func (m *ControllerManager) SelectController(ctx context.Context, req *SelectControllerRequest) (*ControllerClient, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + // Strategy 1: Explicit controller ID + if req.ControllerID != "" { + return m.controllers[req.ControllerID], nil + } + + // Strategy 2: Explicit controller type + if req.ControllerType != "" { + return m.selectByType(req.ControllerType) + } + + // Strategy 3: Template preference + if req.Template != nil && req.Template.PreferredControllerType != "" { + return m.selectByType(req.Template.PreferredControllerType) + } + + // Strategy 4: Default to Kubernetes + if m.defaultKubernetesControllerID != "" { + return m.controllers[m.defaultKubernetesControllerID], nil + } + + // Strategy 5: Least loaded + return m.selectLeastLoaded() +} + +func (m *ControllerManager) selectByType(controllerType string) (*ControllerClient, error) { + candidates := []*ControllerClient{} + + for _, ctrl := range m.controllers { + if ctrl.Type == controllerType && ctrl.Status == "healthy" { + candidates = append(candidates, ctrl) + } + } + + if len(candidates) == 0 { + return nil, fmt.Errorf("no healthy controller of type %s", controllerType) + } + + // Return least loaded + leastLoaded := candidates[0] + for _, ctrl := range candidates { + if ctrl.SessionCount < leastLoaded.SessionCount { + leastLoaded = ctrl + } + } + + return leastLoaded, nil +} + +func (m *ControllerManager) selectLeastLoaded() (*ControllerClient, error) { + var leastLoaded *ControllerClient + + for _, ctrl := range m.controllers { + if ctrl.Status != "healthy" { + continue + } + + if leastLoaded == nil || ctrl.SessionCount < leastLoaded.SessionCount { + leastLoaded = ctrl + } + } + + if leastLoaded == nil { + return nil, fmt.Errorf("no healthy controllers available") + } + + return leastLoaded, nil +} + +// Health check loop for a controller +func (m *ControllerManager) healthCheckLoop(controllerID string) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for range ticker.C { + m.checkControllerHealth(controllerID) + } +} + +func (m *ControllerManager) checkControllerHealth(controllerID string) { + m.mu.RLock() + ctrl, ok := m.controllers[controllerID] + m.mu.RUnlock() + + if !ok { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + resp, err := ctrl.Client.HealthCheck(ctx, &pb.HealthCheckRequest{}) + if err != nil { + // Mark as unhealthy + m.mu.Lock() + ctrl.Status = "unhealthy" + m.mu.Unlock() + m.db.UpdateControllerStatus(context.Background(), controllerID, "unhealthy") + return + } + + // Update status + m.mu.Lock() + ctrl.Status = resp.Status + ctrl.LastHeartbeat = time.Now() + ctrl.SessionCount = int(resp.CurrentSessions) + m.mu.Unlock() + + m.db.UpdateControllerStatus(context.Background(), controllerID, resp.Status) + m.db.UpdateControllerSessionCount(context.Background(), controllerID, int(resp.CurrentSessions)) +} + +// Helper functions +func generateControllerID(ctrlType, name string) string { + timestamp := time.Now().UnixNano() + raw := fmt.Sprintf("%s-%s-%d", ctrlType, name, timestamp) + hash := sha256.Sum256([]byte(raw)) + return fmt.Sprintf("ctrl-%s", hex.EncodeToString(hash[:8])) +} + +func generateAuthToken() string { + // Generate secure random token + // Implementation: use crypto/rand + return "jwt-token-here" // TODO: Implement JWT generation +} + +func hashToken(token string) string { + hash := sha256.Sum256([]byte(token)) + return hex.EncodeToString(hash[:]) +} + +// Request/Response types +type RegisterControllerRequest struct { + Type string + Name string + Endpoint string + Labels map[string]string +} + +type RegisterControllerResponse struct { + ControllerID string + AuthToken string + APIVersion string +} + +type SelectControllerRequest struct { + ControllerID string + ControllerType string + Template *Template +} + +type Template struct { + PreferredControllerType string +} +``` + +#### 2.2 Controller API Endpoints + +**Location**: Create `api/internal/handlers/controllers.go` + +```go +package handlers + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "github.com/streamspace/streamspace/api/internal/controller" +) + +type ControllerHandler struct { + controllerManager *controller.ControllerManager +} + +func NewControllerHandler(cm *controller.ControllerManager) *ControllerHandler { + return &ControllerHandler{ + controllerManager: cm, + } +} + +// RegisterController handles controller registration +// POST /api/v1/controllers/register +func (h *ControllerHandler) RegisterController(c *gin.Context) { + var req controller.RegisterControllerRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + resp, err := h.controllerManager.RegisterController(c.Request.Context(), &req) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + c.JSON(http.StatusCreated, resp) +} + +// ListControllers lists all registered controllers (admin only) +// GET /api/v1/admin/controllers +func (h *ControllerHandler) ListControllers(c *gin.Context) { + // TODO: Implement list controllers from database + c.JSON(http.StatusOK, gin.H{"controllers": []string{}}) +} + +// GetController gets a specific controller (admin only) +// GET /api/v1/admin/controllers/:id +func (h *ControllerHandler) GetController(c *gin.Context) { + controllerID := c.Param("id") + ctrl, err := h.controllerManager.GetController(controllerID) + if err != nil { + c.JSON(http.StatusNotFound, gin.H{"error": "Controller not found"}) + return + } + + c.JSON(http.StatusOK, gin.H{ + "id": ctrl.ID, + "type": ctrl.Type, + "name": ctrl.Name, + "status": ctrl.Status, + "session_count": ctrl.SessionCount, + "last_heartbeat": ctrl.LastHeartbeat, + "capabilities": ctrl.Capabilities, + }) +} + +// Heartbeat handles controller heartbeat +// POST /api/v1/controllers/:id/heartbeat +func (h *ControllerHandler) Heartbeat(c *gin.Context) { + // TODO: Verify controller auth token + // TODO: Update last_heartbeat in database + c.JSON(http.StatusOK, gin.H{"status": "ok"}) +} +``` + +**Register routes** in `api/internal/api/handlers.go`: + +```go +// Controller routes +controllerHandler := handlers.NewControllerHandler(controllerManager) +api.POST("/controllers/register", controllerHandler.RegisterController) +api.POST("/controllers/:id/heartbeat", controllerHandler.Heartbeat) + +admin := api.Group("/admin") +admin.Use(middleware.RequireAdmin()) +{ + admin.GET("/controllers", controllerHandler.ListControllers) + admin.GET("/controllers/:id", controllerHandler.GetController) +} +``` + +### Week 3: Session Handler Refactoring + +**Location**: Modify `api/internal/handlers/sessiontemplates.go` (or create new session handler) + +**Key changes**: + +1. Add controller selection logic +2. Call controller via gRPC instead of creating Kubernetes CRDs directly +3. Store `controller_id` and `node_id` in database + +```go +// CreateSession (modified) +func (h *SessionHandler) CreateSession(c *gin.Context) { + var req CreateSessionRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(400, gin.H{"error": err.Error()}) + return + } + + // Get user from context + userID := c.GetString("user_id") + + // Get template + template, err := h.db.GetTemplate(c.Request.Context(), req.TemplateID) + if err != nil { + c.JSON(404, gin.H{"error": "Template not found"}) + return + } + + // SELECT CONTROLLER + selectReq := &controller.SelectControllerRequest{ + ControllerID: req.ControllerID, // Optional explicit selection + ControllerType: req.ControllerType, // Optional type selection + Template: template, // For template preferences + } + + ctrl, err := h.controllerManager.SelectController(c.Request.Context(), selectReq) + if err != nil { + c.JSON(500, gin.H{"error": "No available controller"}) + return + } + + // Generate session ID + sessionID := generateSessionID(userID, template.Name) + + // CREATE SESSION IN DATABASE (pending state) + session := &db.Session{ + ID: sessionID, + UserID: userID, + TemplateID: req.TemplateID, + ControllerID: ctrl.ID, + ControllerType: ctrl.Type, + State: "pending", + Spec: req.Spec, + CreatedAt: time.Now(), + } + + if err := h.db.CreateSession(c.Request.Context(), session); err != nil { + c.JSON(500, gin.H{"error": err.Error()}) + return + } + + // CALL CONTROLLER VIA GRPC + grpcReq := &pb.CreateSessionRequest{ + SessionId: sessionID, + UserId: userID, + TemplateId: req.TemplateID, + Spec: convertToProtoSpec(req.Spec), + } + + grpcResp, err := ctrl.Client.CreateSession(c.Request.Context(), grpcReq) + if err != nil { + // Update session to failed + h.db.UpdateSessionState(c.Request.Context(), sessionID, "failed") + c.JSON(500, gin.H{"error": fmt.Sprintf("Controller error: %v", err)}) + return + } + + // UPDATE SESSION WITH CONTROLLER RESPONSE + session.NodeID = grpcResp.NodeId + session.State = grpcResp.Phase + session.URL = grpcResp.Url + session.UpdatedAt = time.Now() + + if err := h.db.UpdateSession(c.Request.Context(), session); err != nil { + c.JSON(500, gin.H{"error": err.Error()}) + return + } + + c.JSON(201, session) +} +``` + +--- + +## Phase 2: Kubernetes Controller Refactoring (Weeks 4-5) + +### 4.1 Add gRPC Server to Kubernetes Controller + +**Location**: Create `controller/internal/grpc/server.go` + +```go +package grpc + +import ( + "context" + "fmt" + + pb "github.com/streamspace/streamspace/api/proto/v1" + "github.com/streamspace/streamspace/controller/internal/session" +) + +type ControllerServer struct { + pb.UnimplementedControllerServiceServer + sessionManager *session.Manager +} + +func NewControllerServer(sessionMgr *session.Manager) *ControllerServer { + return &ControllerServer{ + sessionManager: sessionMgr, + } +} + +func (s *ControllerServer) CreateSession(ctx context.Context, req *pb.CreateSessionRequest) (*pb.CreateSessionResponse, error) { + // Call existing session creation logic (which creates CRD) + result, err := s.sessionManager.CreateSession(ctx, &session.CreateRequest{ + SessionID: req.SessionId, + UserID: req.UserId, + TemplateID: req.TemplateId, + Spec: convertFromProtoSpec(req.Spec), + }) + + if err != nil { + return nil, err + } + + return &pb.CreateSessionResponse{ + SessionId: result.SessionID, + NodeId: result.PodName, // Pod name is the "node ID" + Phase: result.Phase, + Url: result.URL, + }, nil +} + +func (s *ControllerServer) HibernateSession(ctx context.Context, req *pb.HibernateSessionRequest) (*pb.HibernateSessionResponse, error) { + err := s.sessionManager.HibernateSession(ctx, req.SessionId, req.NodeId) + if err != nil { + return nil, err + } + + return &pb.HibernateSessionResponse{ + SessionId: req.SessionId, + Phase: "Hibernated", + }, nil +} + +// Implement other methods... + +func (s *ControllerServer) GetCapabilities(ctx context.Context, req *pb.GetCapabilitiesRequest) (*pb.CapabilitiesResponse, error) { + return &pb.CapabilitiesResponse{ + ControllerType: "kubernetes", + Version: "v1.0.0", + SupportsHibernation: true, + SupportsGpu: true, + SupportsPersistentStorage: true, + SupportsNetworking: true, + Capacity: &pb.ResourceCapacity{ + MaxSessions: 100, + TotalMemory: "256Gi", + TotalCpu: "64", + TotalStorage: "1Ti", + }, + }, nil +} + +func (s *ControllerServer) HealthCheck(ctx context.Context, req *pb.HealthCheckRequest) (*pb.HealthCheckResponse, error) { + // Get current session count from Kubernetes + count, usage := s.sessionManager.GetStats() + + return &pb.HealthCheckResponse{ + Status: "healthy", + CurrentSessions: int32(count), + ResourceUsage: &pb.ResourceUsage{ + Memory: usage.Memory, + Cpu: usage.CPU, + MemoryPercent: usage.MemoryPercent, + CpuPercent: usage.CPUPercent, + }, + }, nil +} +``` + +### 4.2 Add Registration Logic + +**Location**: Modify `controller/cmd/main.go` + +```go +func main() { + // ... existing setup ... + + // Start gRPC server + grpcServer := startGRPCServer(sessionManager) + + // Register with API server + if err := registerWithAPIServer(); err != nil { + log.Fatalf("Failed to register with API server: %v", err) + } + + // Start heartbeat + go startHeartbeat() + + // ... existing controller manager run ... +} + +func startGRPCServer(sessionMgr *session.Manager) *grpc.Server { + lis, err := net.Listen("tcp", ":50051") + if err != nil { + log.Fatalf("Failed to listen: %v", err) + } + + grpcServer := grpc.NewServer() + pb.RegisterControllerServiceServer(grpcServer, grpc.NewControllerServer(sessionMgr)) + + go func() { + if err := grpcServer.Serve(lis); err != nil { + log.Fatalf("Failed to serve: %v", err) + } + }() + + return grpcServer +} + +func registerWithAPIServer() error { + apiServerURL := os.Getenv("API_SERVER_URL") + if apiServerURL == "" { + apiServerURL = "http://streamspace-api:8080" + } + + req := map[string]interface{}{ + "type": "kubernetes", + "name": os.Getenv("CONTROLLER_NAME"), + "endpoint": fmt.Sprintf("grpc://%s:50051", os.Getenv("POD_IP")), + "labels": map[string]string{ + "cluster": os.Getenv("CLUSTER_NAME"), + }, + } + + // POST to /api/v1/controllers/register + // Store returned auth token for heartbeats + // ... +} + +func startHeartbeat() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for range ticker.C { + // POST to /api/v1/controllers/:id/heartbeat + // Include current session count and resource usage + } +} +``` + +--- + +## Testing Strategy + +### Unit Tests + +1. **Controller Manager**: Registration, selection logic +2. **gRPC handlers**: Session create/delete/hibernate/wake +3. **Database**: New tables and queries + +### Integration Tests + +1. **End-to-end session creation**: API → Controller → Kubernetes +2. **Controller registration**: Controller → API Server +3. **Health checks**: API Server → Controller +4. **Session lifecycle**: Create → Hibernate → Wake → Delete + +### Manual Testing + +```bash +# 1. Start API server with controller manager +make run-api + +# 2. Start Kubernetes controller with gRPC server +make run-controller-kubernetes + +# 3. Verify controller registered +curl http://localhost:8080/api/v1/admin/controllers + +# 4. Create session (should use Kubernetes controller) +curl -X POST http://localhost:8080/api/v1/sessions \ + -H "Authorization: Bearer $TOKEN" \ + -d '{ + "template_id": "firefox-browser", + "resources": {"memory": "2Gi", "cpu": "1000m"} + }' + +# 5. Verify session created in Kubernetes +kubectl get sessions -n streamspace + +# 6. Check database +psql -c "SELECT id, controller_id, controller_type, node_id FROM sessions;" +``` + +--- + +## Rollback Plan + +If issues arise, the rollback is simple: + +1. **Database**: Controllers and sessions work without new fields (NULL is OK) +2. **API Server**: Can run without ControllerManager (falls back to direct Kubernetes client) +3. **Controller**: Can run without gRPC server (CRD reconciliation continues) + +**To rollback**: +```bash +# Redeploy previous versions +kubectl rollout undo deployment/streamspace-api +kubectl rollout undo deployment/streamspace-controller +``` + +--- + +## Next Steps + +1. **Review this document** with the team +2. **Create JIRA tickets** for each week's tasks +3. **Implement Week 1** (database + proto) +4. **Code review** after each week +5. **Document as you go** (update API docs, deployment guides) + +--- + +## Summary + +This phased approach allows incremental implementation with backward compatibility at every step. The Kubernetes controller remains functional while we add the multi-controller infrastructure around it. + +**Key Benefits**: +- ✅ No breaking changes for existing deployments +- ✅ Kubernetes controller works during entire migration +- ✅ New controllers can be added without API changes +- ✅ Database migrations are additive (no data loss) +- ✅ Rollback is straightforward + +**Estimated Timeline**: 5 weeks for Phase 1 (core infrastructure + Kubernetes refactoring) +