diff --git a/api/internal/api/handlers.go b/api/internal/api/handlers.go index bed6453a..4c8095b1 100644 --- a/api/internal/api/handlers.go +++ b/api/internal/api/handlers.go @@ -499,14 +499,74 @@ func (h *Handler) CreateSession(c *gin.Context) { // Without a valid template, the session cannot be created template, err := h.k8sClient.GetTemplate(ctx, h.namespace, templateName) if err != nil { - // Provide a more helpful error message - errorMsg := fmt.Sprintf("Template not found: %s.", templateName) + // Template is missing - trigger reinstallation if applicationId was provided if req.ApplicationId != "" { - errorMsg += " The application may still be installing or the Kubernetes controller may not be running." - } else { - errorMsg += " Please ensure the application is properly installed." + // Query application details for reinstall + var ( + installID string + catalogTemplateID int + displayName string + description string + category string + iconURL string + manifest string + installedBy string + ) + reinstallErr := h.db.DB().QueryRowContext(ctx, ` + SELECT + ia.id, + ia.catalog_template_id, + ia.display_name, + COALESCE(ct.description, ''), + COALESCE(ct.category, ''), + COALESCE(ct.icon_url, ''), + COALESCE(ct.manifest, '{}'), + ia.created_by + FROM installed_applications ia + LEFT JOIN catalog_templates ct ON ia.catalog_template_id = ct.id + WHERE ia.id = $1 + `, req.ApplicationId).Scan( + &installID, &catalogTemplateID, &displayName, &description, + &category, &iconURL, &manifest, &installedBy, + ) + + if reinstallErr == nil { + // Publish AppInstallEvent to trigger controller to create template + if err := h.publisher.PublishAppInstall(ctx, &events.AppInstallEvent{ + InstallID: installID, + CatalogTemplateID: catalogTemplateID, + TemplateName: templateName, + DisplayName: displayName, + Description: description, + Category: category, + IconURL: iconURL, + Manifest: manifest, + InstalledBy: installedBy, + Platform: h.platform, + }); err != nil { + log.Printf("Failed to publish app reinstall event for %s: %v", templateName, err) + } else { + log.Printf("Triggered reinstall for missing template %s (app: %s)", templateName, installID) + // Update status to creating + h.db.DB().ExecContext(ctx, ` + UPDATE installed_applications + SET install_status = 'creating', install_message = 'Reinstalling missing template', updated_at = NOW() + WHERE id = $1 + `, installID) + } + } + + c.JSON(http.StatusServiceUnavailable, gin.H{ + "error": "Template reinstalling", + "message": fmt.Sprintf("The template for '%s' was missing and is being reinstalled. Please try again in a few seconds.", displayName), + }) + return } - c.JSON(http.StatusBadRequest, gin.H{"error": errorMsg}) + + // No applicationId provided - provide generic error + c.JSON(http.StatusBadRequest, gin.H{ + "error": fmt.Sprintf("Template not found: %s. Please ensure the application is properly installed.", templateName), + }) return } @@ -645,6 +705,25 @@ func (h *Handler) CreateSession(c *gin.Context) { return } + // Cache session in database so status updates can be applied + // This is best-effort - failure doesn't block session creation + dbSession := &db.Session{ + ID: sessionName, + UserID: req.User, + TemplateName: templateName, + State: "pending", + Namespace: h.namespace, + Platform: h.platform, + Memory: memory, + CPU: cpu, + PersistentHome: session.PersistentHome, + IdleTimeout: session.IdleTimeout, + MaxSessionDuration: session.MaxSessionDuration, + } + if err := h.sessionDB.CreateSession(ctx, dbSession); err != nil { + log.Printf("Failed to cache session %s in database (non-fatal): %v", sessionName, err) + } + // Return the session info immediately // The controller will create the actual Kubernetes resources response := map[string]interface{}{ @@ -1934,7 +2013,43 @@ func (h *Handler) convertDBSessionsToResponse(sessions []*db.Session) []map[stri } // convertDBSessionToResponse converts a database session to API response format. +// If the database doesn't have the session URL, it fetches the status from Kubernetes. func (h *Handler) convertDBSessionToResponse(session *db.Session) map[string]interface{} { + // Fetch Kubernetes status if database is missing URL or phase is empty + // This handles the case where the controller hasn't yet communicated status back to API + url := session.URL + podName := session.PodName + phase := session.State + + if (url == "" || phase == "") && h.k8sClient != nil { + ctx := context.Background() + k8sSession, err := h.k8sClient.GetSession(ctx, h.namespace, session.ID) + if err == nil && k8sSession != nil { + if k8sSession.Status.URL != "" { + url = k8sSession.Status.URL + } + if k8sSession.Status.PodName != "" { + podName = k8sSession.Status.PodName + } + if k8sSession.Status.Phase != "" { + phase = k8sSession.Status.Phase + } + // Also update resources from Kubernetes if missing + if session.Memory == "" && k8sSession.Resources.Memory != "" { + session.Memory = k8sSession.Resources.Memory + } + if session.CPU == "" && k8sSession.Resources.CPU != "" { + session.CPU = k8sSession.Resources.CPU + } + } + } + + // Capitalize phase for status.phase (UI expects "Running" not "running") + capitalizedPhase := phase + if len(phase) > 0 { + capitalizedPhase = strings.ToUpper(phase[:1]) + phase[1:] + } + result := map[string]interface{}{ "name": session.ID, "namespace": session.Namespace, @@ -1948,9 +2063,9 @@ func (h *Handler) convertDBSessionToResponse(session *db.Session) map[string]int "platform": session.Platform, "activeConnections": session.ActiveConnections, "status": map[string]interface{}{ - "phase": session.State, - "url": session.URL, - "podName": session.PodName, + "phase": capitalizedPhase, + "url": url, + "podName": podName, }, } diff --git a/api/internal/db/applications.go b/api/internal/db/applications.go index 2e27666e..c3a7821e 100644 --- a/api/internal/db/applications.go +++ b/api/internal/db/applications.go @@ -59,14 +59,70 @@ import ( "database/sql" "encoding/json" "fmt" + "io" + "net/http" "os" "path/filepath" + "strings" "time" "github.com/google/uuid" "github.com/streamspace/streamspace/api/internal/models" ) +// downloadIcon downloads an icon from a URL and returns the binary data and media type. +// Returns empty values if download fails (non-fatal - app can still be installed without icon). +func downloadIcon(iconURL string) ([]byte, string) { + if iconURL == "" { + return nil, "" + } + + // Create HTTP client with timeout + client := &http.Client{ + Timeout: 30 * time.Second, + } + + resp, err := client.Get(iconURL) + if err != nil { + fmt.Printf("Warning: failed to download icon from %s: %v\n", iconURL, err) + return nil, "" + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + fmt.Printf("Warning: failed to download icon from %s: status %d\n", iconURL, resp.StatusCode) + return nil, "" + } + + // Read the icon data + data, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Printf("Warning: failed to read icon data from %s: %v\n", iconURL, err) + return nil, "" + } + + // Determine media type from Content-Type header or file extension + mediaType := resp.Header.Get("Content-Type") + if mediaType == "" || mediaType == "application/octet-stream" { + // Guess from URL extension + if strings.HasSuffix(strings.ToLower(iconURL), ".svg") { + mediaType = "image/svg+xml" + } else if strings.HasSuffix(strings.ToLower(iconURL), ".png") { + mediaType = "image/png" + } else if strings.HasSuffix(strings.ToLower(iconURL), ".jpg") || strings.HasSuffix(strings.ToLower(iconURL), ".jpeg") { + mediaType = "image/jpeg" + } else if strings.HasSuffix(strings.ToLower(iconURL), ".gif") { + mediaType = "image/gif" + } else if strings.HasSuffix(strings.ToLower(iconURL), ".webp") { + mediaType = "image/webp" + } else { + mediaType = "image/png" // Default assumption + } + } + + return data, mediaType +} + // ApplicationDB handles database operations for installed applications type ApplicationDB struct { db *sql.DB @@ -77,16 +133,31 @@ func NewApplicationDB(db *sql.DB) *ApplicationDB { return &ApplicationDB{db: db} } +// InstallApplicationParams contains all data needed to install an application +type InstallApplicationParams struct { + CatalogTemplateID int + DisplayName string + Description string + Category string + IconURL string + IconData []byte + IconMediaType string + Manifest string + Configuration map[string]interface{} +} + // InstallApplication installs a new application from the catalog func (a *ApplicationDB) InstallApplication(ctx context.Context, req *models.InstallApplicationRequest, userID string) (*models.InstalledApplication, error) { appID := uuid.New().String() guidSuffix := uuid.New().String()[:8] - // Get template info for default name - var templateName, templateDisplayName string + // Get full template info including manifest + var templateName, templateDisplayName, description, category, iconURL, manifest string err := a.db.QueryRowContext(ctx, ` - SELECT name, display_name FROM catalog_templates WHERE id = $1 - `, req.CatalogTemplateID).Scan(&templateName, &templateDisplayName) + SELECT name, display_name, COALESCE(description, ''), COALESCE(category, ''), + COALESCE(icon_url, ''), COALESCE(manifest::text, '{}') + FROM catalog_templates WHERE id = $1 + `, req.CatalogTemplateID).Scan(&templateName, &templateDisplayName, &description, &category, &iconURL, &manifest) if err != nil { return nil, fmt.Errorf("failed to get template: %w", err) } @@ -110,11 +181,24 @@ func (a *ApplicationDB) InstallApplication(ctx context.Context, req *models.Inst } } + // Download icon if URL is provided + var iconData []byte + var iconMediaType string + if iconURL != "" { + iconData, iconMediaType = downloadIcon(iconURL) + } + app := &models.InstalledApplication{ ID: appID, CatalogTemplateID: req.CatalogTemplateID, Name: name, DisplayName: displayName, + Description: description, + Category: category, + IconURL: iconURL, + IconData: iconData, + IconMediaType: iconMediaType, + Manifest: manifest, FolderPath: folderPath, Enabled: true, Configuration: req.Configuration, @@ -125,14 +209,16 @@ func (a *ApplicationDB) InstallApplication(ctx context.Context, req *models.Inst query := ` INSERT INTO installed_applications ( - id, catalog_template_id, name, display_name, folder_path, + id, catalog_template_id, name, display_name, description, category, + icon_url, icon_data, icon_media_type, manifest, folder_path, enabled, configuration, created_by, created_at, updated_at ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ` _, err = a.db.ExecContext(ctx, query, - app.ID, app.CatalogTemplateID, app.Name, app.DisplayName, app.FolderPath, + app.ID, app.CatalogTemplateID, app.Name, app.DisplayName, app.Description, app.Category, + app.IconURL, app.IconData, app.IconMediaType, app.Manifest, app.FolderPath, app.Enabled, string(configJSON), app.CreatedBy, app.CreatedAt, app.UpdatedAt, ) if err != nil { @@ -230,12 +316,6 @@ func (a *ApplicationDB) ListApplications(ctx context.Context, enabledOnly bool) } defer rows.Close() - // Get base path for folder checks - basePath := os.Getenv("APPS_BASE_PATH") - if basePath == "" { - basePath = "/app" - } - apps := []*models.InstalledApplication{} for rows.Next() { app := &models.InstalledApplication{} @@ -264,20 +344,10 @@ func (a *ApplicationDB) ListApplications(ctx context.Context, enabledOnly bool) json.Unmarshal(configJSON, &app.Configuration) } - // Check if folder exists - if not and app is enabled, auto-disable it - if app.Enabled && app.FolderPath != "" { - fullPath := filepath.Join(basePath, app.FolderPath) - if _, err := os.Stat(fullPath); os.IsNotExist(err) { - // Folder doesn't exist, disable the application - _, updateErr := a.db.ExecContext(ctx, - "UPDATE installed_applications SET enabled = false, updated_at = $1 WHERE id = $2", - time.Now(), app.ID) - if updateErr == nil { - app.Enabled = false - fmt.Printf("Auto-disabled application %s: folder %s does not exist\n", app.DisplayName, fullPath) - } - } - } + // Note: We no longer auto-disable applications when folders are missing. + // Instead, the controller sync mechanism will recreate missing resources. + // Applications remain enabled in the database and the controller will + // receive AppInstallEvents during sync to recreate any missing templates. apps = append(apps, app) } diff --git a/api/internal/db/database.go b/api/internal/db/database.go index 539efdf9..54623b71 100644 --- a/api/internal/db/database.go +++ b/api/internal/db/database.go @@ -498,9 +498,15 @@ func (d *Database) Migrate() error { catalog_template_id INT REFERENCES catalog_templates(id) ON DELETE SET NULL, name VARCHAR(255) NOT NULL, display_name VARCHAR(255) NOT NULL, + description TEXT, + category VARCHAR(100), folder_path VARCHAR(255), enabled BOOLEAN DEFAULT true, configuration JSONB DEFAULT '{}', + icon_url TEXT, + icon_data BYTEA, + icon_media_type VARCHAR(100), + manifest JSONB, created_by VARCHAR(255) REFERENCES users(id) ON DELETE SET NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP @@ -2061,9 +2067,19 @@ func (d *Database) Migrate() error { `ALTER TABLE installed_applications ADD COLUMN IF NOT EXISTS install_message TEXT`, `ALTER TABLE installed_applications ADD COLUMN IF NOT EXISTS platform VARCHAR(50) DEFAULT 'kubernetes'`, + // Add icon and metadata columns to installed_applications for persistence + // Icons are downloaded when app is installed, enabling offline/air-gapped deployments + `ALTER TABLE installed_applications ADD COLUMN IF NOT EXISTS description TEXT`, + `ALTER TABLE installed_applications ADD COLUMN IF NOT EXISTS category VARCHAR(100)`, + `ALTER TABLE installed_applications ADD COLUMN IF NOT EXISTS icon_url TEXT`, + `ALTER TABLE installed_applications ADD COLUMN IF NOT EXISTS icon_data BYTEA`, + `ALTER TABLE installed_applications ADD COLUMN IF NOT EXISTS icon_media_type VARCHAR(100)`, + `ALTER TABLE installed_applications ADD COLUMN IF NOT EXISTS manifest JSONB`, + // Create index for install status `CREATE INDEX IF NOT EXISTS idx_installed_applications_status ON installed_applications(install_status)`, `CREATE INDEX IF NOT EXISTS idx_installed_applications_platform ON installed_applications(platform)`, + `CREATE INDEX IF NOT EXISTS idx_installed_applications_category ON installed_applications(category)`, // Add platform fields to sessions for multi-platform support `ALTER TABLE sessions ADD COLUMN IF NOT EXISTS platform VARCHAR(50) DEFAULT 'kubernetes'`, diff --git a/api/internal/events/subscriber.go b/api/internal/events/subscriber.go index 7b0eaccb..3d555deb 100644 --- a/api/internal/events/subscriber.go +++ b/api/internal/events/subscriber.go @@ -10,6 +10,7 @@ import ( "encoding/json" "fmt" "log" + "strings" "time" "github.com/nats-io/nats.go" @@ -160,14 +161,18 @@ func (s *Subscriber) handleSessionStatus(data []byte) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - // Update the session state and URL + // Update the session state (using Phase which is the Kubernetes phase like "Running", "Pending"), + // URL, and pod_name query := ` UPDATE sessions - SET state = $1, url = $2, updated_at = $3 - WHERE id = $4 + SET state = $1, url = $2, pod_name = $3, updated_at = $4 + WHERE id = $5 ` - result, err := s.db.ExecContext(ctx, query, event.Status, event.URL, time.Now(), event.SessionID) + // Convert Phase to lowercase for state field (running, hibernated, pending, failed) + // The UI expects lowercase state values for session lifecycle checks + state := strings.ToLower(event.Phase) + result, err := s.db.ExecContext(ctx, query, state, event.URL, event.PodName, time.Now(), event.SessionID) if err != nil { log.Printf("Failed to update session %s status: %v", event.SessionID, err) return @@ -177,7 +182,7 @@ func (s *Subscriber) handleSessionStatus(data []byte) { if rows == 0 { log.Printf("Session %s not found in database (may not be created yet)", event.SessionID) } else { - log.Printf("Updated session %s to status=%s", event.SessionID, event.Status) + log.Printf("Updated session %s to state=%s url=%s", event.SessionID, state, event.URL) } } diff --git a/api/internal/handlers/applications.go b/api/internal/handlers/applications.go index 82716359..7d4c124c 100644 --- a/api/internal/handlers/applications.go +++ b/api/internal/handlers/applications.go @@ -85,6 +85,7 @@ func (h *ApplicationHandler) RegisterRoutes(router *gin.RouterGroup) { apps.POST("", h.InstallApplication) apps.GET("/user", h.GetUserApplications) apps.GET("/:id", h.GetApplication) + apps.GET("/:id/icon", h.GetApplicationIcon) apps.PUT("/:id", h.UpdateApplication) apps.DELETE("/:id", h.DeleteApplication) apps.PUT("/:id/enabled", h.SetApplicationEnabled) @@ -327,6 +328,42 @@ func (h *ApplicationHandler) GetApplication(c *gin.Context) { c.JSON(http.StatusOK, app) } +// GetApplicationIcon godoc +// @Summary Get application icon +// @Description Get the icon image for an installed application +// @Tags applications +// @Produce image/png,image/svg+xml,image/jpeg +// @Param id path string true "Application ID" +// @Success 200 {file} binary +// @Failure 404 {object} ErrorResponse +// @Router /api/v1/applications/{id}/icon [get] +func (h *ApplicationHandler) GetApplicationIcon(c *gin.Context) { + appID := c.Param("id") + ctx := c.Request.Context() + + // Get icon data from database + var iconData []byte + var iconMediaType string + err := h.db.DB().QueryRowContext(ctx, ` + SELECT icon_data, COALESCE(icon_media_type, 'image/png') + FROM installed_applications + WHERE id = $1 AND icon_data IS NOT NULL + `, appID).Scan(&iconData, &iconMediaType) + + if err != nil { + c.JSON(http.StatusNotFound, ErrorResponse{ + Error: "Icon not found", + Message: "Application icon not available", + }) + return + } + + // Set appropriate headers + c.Header("Content-Type", iconMediaType) + c.Header("Cache-Control", "public, max-age=86400") // Cache for 24 hours + c.Data(http.StatusOK, iconMediaType, iconData) +} + // UpdateApplication godoc // @Summary Update an application // @Description Update display name, configuration, or enabled status diff --git a/api/internal/k8s/client.go b/api/internal/k8s/client.go index 1d42bea2..467c3331 100644 --- a/api/internal/k8s/client.go +++ b/api/internal/k8s/client.go @@ -579,11 +579,25 @@ func parseSession(obj *unstructured.Unstructured) (*Session, error) { } if resources, ok := spec["resources"].(map[string]interface{}); ok { - if memory, ok := resources["memory"].(string); ok { - session.Resources.Memory = memory + // Try new nested structure first (requests/limits) + if requests, ok := resources["requests"].(map[string]interface{}); ok { + if memory, ok := requests["memory"].(string); ok { + session.Resources.Memory = memory + } + if cpu, ok := requests["cpu"].(string); ok { + session.Resources.CPU = cpu + } } - if cpu, ok := resources["cpu"].(string); ok { - session.Resources.CPU = cpu + // Fall back to flat structure for backwards compatibility + if session.Resources.Memory == "" { + if memory, ok := resources["memory"].(string); ok { + session.Resources.Memory = memory + } + } + if session.Resources.CPU == "" { + if cpu, ok := resources["cpu"].(string); ok { + session.Resources.CPU = cpu + } } } diff --git a/api/internal/models/application.go b/api/internal/models/application.go index 335aa54a..23703599 100644 --- a/api/internal/models/application.go +++ b/api/internal/models/application.go @@ -68,14 +68,16 @@ type InstalledApplication struct { // UpdatedAt is when the application was last modified. UpdatedAt time.Time `json:"updatedAt" db:"updated_at"` - // Template information (populated from JOIN) + // Template information (stored in installed_applications for persistence) TemplateName string `json:"templateName,omitempty"` TemplateDisplayName string `json:"templateDisplayName,omitempty"` - Description string `json:"description,omitempty"` - Category string `json:"category,omitempty"` + Description string `json:"description,omitempty" db:"description"` + Category string `json:"category,omitempty" db:"category"` AppType string `json:"appType,omitempty"` - IconURL string `json:"icon,omitempty"` - Manifest string `json:"manifest,omitempty"` + IconURL string `json:"icon,omitempty" db:"icon_url"` + IconData []byte `json:"-" db:"icon_data"` // Binary icon data (not sent in JSON) + IconMediaType string `json:"-" db:"icon_media_type"` // MIME type of icon + Manifest string `json:"manifest,omitempty" db:"manifest"` // InstallStatus tracks the installation state (pending, creating, installed, failed) InstallStatus string `json:"installStatus,omitempty" db:"install_status"` diff --git a/api/internal/websocket/handlers.go b/api/internal/websocket/handlers.go index 9ca3bb29..0917e8f9 100644 --- a/api/internal/websocket/handlers.go +++ b/api/internal/websocket/handlers.go @@ -243,12 +243,17 @@ func (m *Manager) broadcastSessionUpdates() { } sessionData := map[string]interface{}{ - "name": session.Name, - "namespace": session.Namespace, - "user": session.User, - "template": session.Template, - "state": session.State, - "status": session.Status, + "name": session.Name, + "namespace": session.Namespace, + "user": session.User, + "template": session.Template, + "state": session.State, + // Convert status to proper JSON format with lowercase keys + "status": map[string]interface{}{ + "phase": session.Status.Phase, + "podName": session.Status.PodName, + "url": session.Status.URL, + }, "createdAt": session.CreatedAt, "activeConnections": activeConns, } diff --git a/chart/crds/stream.space_sessions.yaml b/chart/crds/stream.space_sessions.yaml index da66c726..a8c2c91c 100644 --- a/chart/crds/stream.space_sessions.yaml +++ b/chart/crds/stream.space_sessions.yaml @@ -34,19 +34,26 @@ spec: description: Desired state of the session resources: type: object + description: CPU and memory resource requirements properties: - memory: - type: string - description: Memory limit (e.g., 2Gi, 4Gi) - pattern: '^[0-9]+(Mi|Gi|Ti)$' - minLength: 2 - maxLength: 10 - cpu: - type: string - description: CPU limit (e.g., 1000m, 2000m) - pattern: '^[0-9]+(m)?$' - minLength: 1 - maxLength: 10 + limits: + type: object + description: Maximum resources allowed + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: '^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$' + x-kubernetes-int-or-string: true + requests: + type: object + description: Minimum resources required + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: '^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$' + x-kubernetes-int-or-string: true persistentHome: type: boolean default: true diff --git a/chart/crds/stream.space_templates.yaml b/chart/crds/stream.space_templates.yaml index a4b6c49e..534e4cae 100644 --- a/chart/crds/stream.space_templates.yaml +++ b/chart/crds/stream.space_templates.yaml @@ -92,12 +92,12 @@ spec: status: type: object properties: - phase: - type: string - description: Current phase (Ready, Invalid, etc.) + valid: + type: boolean + description: Whether the template has passed validation message: type: string - description: Additional status information + description: Validation result message subresources: status: {} additionalPrinterColumns: @@ -113,6 +113,9 @@ spec: - name: Memory type: string jsonPath: .spec.defaultResources.memory + - name: Valid + type: boolean + jsonPath: .status.valid - name: Age type: date jsonPath: .metadata.creationTimestamp diff --git a/k8s-controller/cmd/main.go b/k8s-controller/cmd/main.go index 87195182..47a68c7b 100644 --- a/k8s-controller/cmd/main.go +++ b/k8s-controller/cmd/main.go @@ -45,7 +45,9 @@ import ( "context" "flag" "os" + "time" + "github.com/nats-io/nats.go" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -145,14 +147,35 @@ func main() { os.Exit(1) } + // Create NATS connection for SessionReconciler to publish status events + var sessionNATSConn *nats.Conn + if natsURL != "" { + opts := []nats.Option{ + nats.Name("streamspace-session-reconciler"), + nats.ReconnectWait(2 * time.Second), + nats.MaxReconnects(10), + } + if natsUser != "" { + opts = append(opts, nats.UserInfo(natsUser, natsPassword)) + } + sessionNATSConn, err = nats.Connect(natsURL, opts...) + if err != nil { + setupLog.Info("SessionReconciler NATS connection failed, status events will not be published", "error", err) + } else { + setupLog.Info("SessionReconciler connected to NATS for status publishing") + } + } + // Register SessionReconciler // Manages the lifecycle of Session resources: // - Creates Deployments, Services, and PVCs for user sessions // - Handles state transitions (running, hibernated, terminated) // - Updates status with pod information and resource usage if err = (&controllers.SessionReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + NATSConn: sessionNATSConn, + ControllerID: controllerID, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Session") os.Exit(1) @@ -185,6 +208,25 @@ func main() { os.Exit(1) } + // Create NATS connection for ApplicationInstallReconciler to publish status events + var appInstallNATSConn *nats.Conn + if natsURL != "" { + opts := []nats.Option{ + nats.Name("streamspace-app-install-reconciler"), + nats.ReconnectWait(2 * time.Second), + nats.MaxReconnects(10), + } + if natsUser != "" { + opts = append(opts, nats.UserInfo(natsUser, natsPassword)) + } + appInstallNATSConn, err = nats.Connect(natsURL, opts...) + if err != nil { + setupLog.Info("ApplicationInstallReconciler NATS connection failed, status events will not be published", "error", err) + } else { + setupLog.Info("ApplicationInstallReconciler connected to NATS for status publishing") + } + } + // Register ApplicationInstallReconciler // Handles application installation from the catalog: // - Watches ApplicationInstall CRDs created by the API @@ -192,8 +234,10 @@ func main() { // - Sets owner references for cascading deletion // - Updates status with creation progress (Pending → Creating → Ready/Failed) if err = (&controllers.ApplicationInstallReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + NATSConn: appInstallNATSConn, + ControllerID: controllerID, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ApplicationInstall") os.Exit(1) diff --git a/k8s-controller/controllers/applicationinstall_controller.go b/k8s-controller/controllers/applicationinstall_controller.go index 81823dfe..b3643a77 100644 --- a/k8s-controller/controllers/applicationinstall_controller.go +++ b/k8s-controller/controllers/applicationinstall_controller.go @@ -6,9 +6,12 @@ package controllers import ( "context" + "encoding/json" "fmt" "time" + "github.com/google/uuid" + "github.com/nats-io/nats.go" "gopkg.in/yaml.v3" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -33,7 +36,9 @@ import ( // This provides automatic retry on failure and clear status reporting. type ApplicationInstallReconciler struct { client.Client - Scheme *runtime.Scheme + Scheme *runtime.Scheme + NATSConn *nats.Conn + ControllerID string } // +kubebuilder:rbac:groups=stream.space,resources=applicationinstalls,verbs=get;list;watch;create;update;patch;delete @@ -112,6 +117,8 @@ func (r *ApplicationInstallReconciler) Reconcile(ctx context.Context, req ctrl.R logger.Error(updateErr, "Failed to update status") return ctrl.Result{}, updateErr } + // Publish status event to notify API + r.publishAppStatus(appInstall.Name, "installed", appInstall.Spec.TemplateName, "Template already exists") return ctrl.Result{}, nil } @@ -119,6 +126,8 @@ func (r *ApplicationInstallReconciler) Reconcile(ctx context.Context, req ctrl.R if updateErr := r.updateStatus(ctx, &appInstall, "Failed", fmt.Sprintf("Failed to create Template: %v", err)); updateErr != nil { logger.Error(updateErr, "Failed to update status") } + // Publish failure status + r.publishAppStatus(appInstall.Name, "failed", appInstall.Spec.TemplateName, fmt.Sprintf("Failed to create Template: %v", err)) // Retry after delay return ctrl.Result{RequeueAfter: 30 * time.Second}, err } @@ -133,6 +142,9 @@ func (r *ApplicationInstallReconciler) Reconcile(ctx context.Context, req ctrl.R return ctrl.Result{}, err } + // Publish status event to notify API + r.publishAppStatus(appInstall.Name, "installed", appInstall.Spec.TemplateName, "Template created successfully") + return ctrl.Result{}, nil } @@ -322,6 +334,41 @@ func (r *ApplicationInstallReconciler) updateStatus(ctx context.Context, appInst return r.Status().Update(ctx, latest) } +// publishAppStatus publishes an app installation status event via NATS. +func (r *ApplicationInstallReconciler) publishAppStatus(installID, status, templateName, message string) { + if r.NATSConn == nil { + return + } + + event := struct { + EventID string `json:"event_id"` + Timestamp time.Time `json:"timestamp"` + InstallID string `json:"install_id"` + Status string `json:"status"` + TemplateName string `json:"template_name"` + Message string `json:"message"` + ControllerID string `json:"controller_id"` + }{ + EventID: uuid.New().String(), + Timestamp: time.Now(), + InstallID: installID, + Status: status, + TemplateName: templateName, + Message: message, + ControllerID: r.ControllerID, + } + + data, err := json.Marshal(event) + if err != nil { + return + } + + if err := r.NATSConn.Publish("streamspace.app.status", data); err != nil { + // Log but don't fail - status update is best-effort + fmt.Printf("Failed to publish app status event: %v\n", err) + } +} + // SetupWithManager sets up the controller with the Manager. func (r *ApplicationInstallReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). diff --git a/k8s-controller/controllers/session_controller.go b/k8s-controller/controllers/session_controller.go index eb63788e..06deb4c9 100644 --- a/k8s-controller/controllers/session_controller.go +++ b/k8s-controller/controllers/session_controller.go @@ -157,10 +157,13 @@ package controllers import ( "context" + "encoding/json" "fmt" "os" "time" + "github.com/google/uuid" + "github.com/nats-io/nats.go" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" @@ -220,8 +223,10 @@ import ( // - Kubernetes optimistic concurrency prevents conflicts // - Status updates use separate client with retry type SessionReconciler struct { - client.Client // Kubernetes API client - Scheme *runtime.Scheme // Type information for objects + client.Client // Kubernetes API client + Scheme *runtime.Scheme // Type information for objects + NATSConn *nats.Conn // NATS connection for publishing status events + ControllerID string // Unique identifier for this controller instance } // setCondition sets or updates a condition on the Session's status. @@ -264,6 +269,51 @@ func (r *SessionReconciler) setCondition(ctx context.Context, session *streamv1a } } +// SessionStatusEvent represents a session status update published to NATS. +// This struct matches the event type expected by the API backend. +type SessionStatusEvent struct { + EventID string `json:"event_id"` + Timestamp time.Time `json:"timestamp"` + SessionID string `json:"session_id"` + Status string `json:"status"` + Phase string `json:"phase"` + URL string `json:"url,omitempty"` + PodName string `json:"pod_name,omitempty"` + Message string `json:"message,omitempty"` + ControllerID string `json:"controller_id"` +} + +// publishSessionStatus publishes a session status update to NATS so the API can update its database. +// This is critical for the UI to show the correct session state and enable the Connect button. +func (r *SessionReconciler) publishSessionStatus(sessionID, status, phase, url, podName, message string) { + if r.NATSConn == nil { + return // NATS not configured, skip publishing + } + + event := SessionStatusEvent{ + EventID: uuid.New().String(), + Timestamp: time.Now(), + SessionID: sessionID, + Status: status, + Phase: phase, + URL: url, + PodName: podName, + Message: message, + ControllerID: r.ControllerID, + } + + data, err := json.Marshal(event) + if err != nil { + // Log but don't fail - the CRD status is already updated + return + } + + if err := r.NATSConn.Publish("streamspace.session.status", data); err != nil { + // Log but don't fail - the CRD status is already updated + return + } +} + //+kubebuilder:rbac:groups=stream.streamspace.io,resources=sessions,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=stream.streamspace.io,resources=sessions/status,verbs=get;update;patch //+kubebuilder:rbac:groups=stream.streamspace.io,resources=sessions/finalizers,verbs=update @@ -454,6 +504,16 @@ func (r *SessionReconciler) handleRunning(ctx context.Context, session *streamv1 return ctrl.Result{RequeueAfter: 2 * time.Second}, nil } + // BUG FIX: Handle race condition where Valid field is stale but Message shows success + // This can happen when the session controller's cache hasn't been updated yet after + // the template controller set Valid=true. If Message indicates success, wait for + // the Valid field to be updated rather than treating it as an error. + if template.Status.Message == "Template is valid and ready to use" { + log.Info("Template validation status inconsistent (Valid=false but success message present), waiting for cache sync", "template", template.Name) + // Requeue after a short delay to allow cache to sync + return ctrl.Result{RequeueAfter: 2 * time.Second}, nil + } + // Template was validated but is invalid - this is a real error err := fmt.Errorf("template %s is not valid: %s", template.Name, template.Status.Message) log.Error(err, "Cannot create session from invalid template") @@ -596,6 +656,10 @@ func (r *SessionReconciler) handleRunning(ctx context.Context, session *streamv1 return ctrl.Result{}, err } + // Publish status to NATS so the API can update its database + // This enables the Connect button in the UI + r.publishSessionStatus(session.Name, "running", "Running", session.Status.URL, session.Status.PodName, "Session is running") + // Record session state in Prometheus for monitoring metrics.RecordSessionState("running", session.Namespace, 1) @@ -698,6 +762,9 @@ func (r *SessionReconciler) handleHibernated(ctx context.Context, session *strea return ctrl.Result{}, err } + // Publish status to NATS so the API can update its database + r.publishSessionStatus(session.Name, "hibernated", "Hibernated", "", "", "Session is hibernated") + // Record session state in Prometheus for dashboards metrics.RecordSessionState("hibernated", session.Namespace, 1) @@ -794,6 +861,9 @@ func (r *SessionReconciler) handleTerminated(ctx context.Context, session *strea return ctrl.Result{}, err } + // Publish status to NATS so the API can update its database + r.publishSessionStatus(session.Name, "terminated", "Terminated", "", "", "Session is terminated") + // Record session state in Prometheus metrics.RecordSessionState("terminated", session.Namespace, 1) diff --git a/k8s-controller/pkg/events/handlers.go b/k8s-controller/pkg/events/handlers.go index 20c7c3c9..1cc94d72 100644 --- a/k8s-controller/pkg/events/handlers.go +++ b/k8s-controller/pkg/events/handlers.go @@ -439,12 +439,19 @@ func (s *Subscriber) handleNodeDrain(ctx context.Context, data []byte) error { // publishSessionStatus publishes a session status update. func (s *Subscriber) publishSessionStatus(sessionID, status, phase, message string) { + s.publishSessionStatusWithURL(sessionID, status, phase, "", "", message) +} + +// publishSessionStatusWithURL publishes a session status update including URL and pod name. +func (s *Subscriber) publishSessionStatusWithURL(sessionID, status, phase, url, podName, message string) { event := SessionStatusEvent{ EventID: uuid.New().String(), Timestamp: time.Now(), SessionID: sessionID, Status: status, Phase: phase, + URL: url, + PodName: podName, Message: message, ControllerID: s.controllerID, }