Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ charts/*/charts/
*.log
logs/

# Port forward artifacts
.port-forward-logs/
.port-forward-pids/

# Temporary files
tmp/
temp/
Expand Down
66 changes: 55 additions & 11 deletions api/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/streamspace/streamspace/api/internal/activity"
"github.com/streamspace/streamspace/api/internal/api"
"github.com/streamspace/streamspace/api/internal/auth"
Expand All @@ -23,7 +24,7 @@ import (
"github.com/streamspace/streamspace/api/internal/quota"
"github.com/streamspace/streamspace/api/internal/sync"
"github.com/streamspace/streamspace/api/internal/tracker"
"github.com/streamspace/streamspace/api/internal/websocket"
internalWebsocket "github.com/streamspace/streamspace/api/internal/websocket"
)

func main() {
Expand Down Expand Up @@ -118,7 +119,7 @@ func main() {

// Initialize WebSocket manager
log.Println("Initializing WebSocket manager...")
wsManager := websocket.NewManager(database, k8sClient)
wsManager := internalWebsocket.NewManager(database, k8sClient)
wsManager.Start()

// Initialize activity tracker
Expand Down Expand Up @@ -253,7 +254,7 @@ func main() {
batchHandler := handlers.NewBatchHandler(database)
monitoringHandler := handlers.NewMonitoringHandler(database)
quotasHandler := handlers.NewQuotasHandler(database)
websocketHandler := handlers.NewWebSocketHandler(database)
// NOTE: WebSocket routes now use wsManager directly (see ws.GET routes below)
consoleHandler := handlers.NewConsoleHandler(database)
collaborationHandler := handlers.NewCollaborationHandler(database)
integrationsHandler := handlers.NewIntegrationsHandler(database)
Expand All @@ -272,7 +273,7 @@ func main() {
}

// Setup routes
setupRoutes(router, apiHandler, userHandler, groupHandler, authHandler, activityHandler, catalogHandler, sharingHandler, pluginHandler, dashboardHandler, sessionActivityHandler, apiKeyHandler, teamHandler, preferencesHandler, notificationsHandler, searchHandler, sessionTemplatesHandler, batchHandler, monitoringHandler, quotasHandler, websocketHandler, consoleHandler, collaborationHandler, integrationsHandler, loadBalancingHandler, schedulingHandler, securityHandler, templateVersioningHandler, setupHandler, jwtManager, userDB, redisCache, webhookSecret)
setupRoutes(router, apiHandler, userHandler, groupHandler, authHandler, activityHandler, catalogHandler, sharingHandler, pluginHandler, dashboardHandler, sessionActivityHandler, apiKeyHandler, teamHandler, preferencesHandler, notificationsHandler, searchHandler, sessionTemplatesHandler, batchHandler, monitoringHandler, quotasHandler, wsManager, consoleHandler, collaborationHandler, integrationsHandler, loadBalancingHandler, schedulingHandler, securityHandler, templateVersioningHandler, setupHandler, jwtManager, userDB, redisCache, webhookSecret)

// Create HTTP server with security timeouts
srv := &http.Server{
Expand Down Expand Up @@ -353,7 +354,7 @@ func main() {
log.Println("Graceful shutdown completed")
}

func setupRoutes(router *gin.Engine, h *api.Handler, userHandler *handlers.UserHandler, groupHandler *handlers.GroupHandler, authHandler *auth.AuthHandler, activityHandler *handlers.ActivityHandler, catalogHandler *handlers.CatalogHandler, sharingHandler *handlers.SharingHandler, pluginHandler *handlers.PluginHandler, dashboardHandler *handlers.DashboardHandler, sessionActivityHandler *handlers.SessionActivityHandler, apiKeyHandler *handlers.APIKeyHandler, teamHandler *handlers.TeamHandler, preferencesHandler *handlers.PreferencesHandler, notificationsHandler *handlers.NotificationsHandler, searchHandler *handlers.SearchHandler, sessionTemplatesHandler *handlers.SessionTemplatesHandler, batchHandler *handlers.BatchHandler, monitoringHandler *handlers.MonitoringHandler, quotasHandler *handlers.QuotasHandler, websocketHandler *handlers.WebSocketHandler, consoleHandler *handlers.ConsoleHandler, collaborationHandler *handlers.CollaborationHandler, integrationsHandler *handlers.IntegrationsHandler, loadBalancingHandler *handlers.LoadBalancingHandler, schedulingHandler *handlers.SchedulingHandler, securityHandler *handlers.SecurityHandler, templateVersioningHandler *handlers.TemplateVersioningHandler, setupHandler *handlers.SetupHandler, jwtManager *auth.JWTManager, userDB *db.UserDB, redisCache *cache.Cache, webhookSecret string) {
func setupRoutes(router *gin.Engine, h *api.Handler, userHandler *handlers.UserHandler, groupHandler *handlers.GroupHandler, authHandler *auth.AuthHandler, activityHandler *handlers.ActivityHandler, catalogHandler *handlers.CatalogHandler, sharingHandler *handlers.SharingHandler, pluginHandler *handlers.PluginHandler, dashboardHandler *handlers.DashboardHandler, sessionActivityHandler *handlers.SessionActivityHandler, apiKeyHandler *handlers.APIKeyHandler, teamHandler *handlers.TeamHandler, preferencesHandler *handlers.PreferencesHandler, notificationsHandler *handlers.NotificationsHandler, searchHandler *handlers.SearchHandler, sessionTemplatesHandler *handlers.SessionTemplatesHandler, batchHandler *handlers.BatchHandler, monitoringHandler *handlers.MonitoringHandler, quotasHandler *handlers.QuotasHandler, wsManager *internalWebsocket.Manager, consoleHandler *handlers.ConsoleHandler, collaborationHandler *handlers.CollaborationHandler, integrationsHandler *handlers.IntegrationsHandler, loadBalancingHandler *handlers.LoadBalancingHandler, schedulingHandler *handlers.SchedulingHandler, securityHandler *handlers.SecurityHandler, templateVersioningHandler *handlers.TemplateVersioningHandler, setupHandler *handlers.SetupHandler, jwtManager *auth.JWTManager, userDB *db.UserDB, redisCache *cache.Cache, webhookSecret string) {
// SECURITY: Create authentication middleware
authMiddleware := auth.Middleware(jwtManager, userDB)
adminMiddleware := auth.RequireRole("admin")
Expand All @@ -365,6 +366,16 @@ func setupRoutes(router *gin.Engine, h *api.Handler, userHandler *handlers.UserH
webhookAuth = middleware.NewWebhookAuth(webhookSecret)
}

// WebSocket upgrader for real-time connections
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// Allow all origins for development (should be restricted in production)
return true
},
}

// Health check (public - no auth required)
router.GET("/health", h.Health)
router.GET("/version", h.Version)
Expand Down Expand Up @@ -784,16 +795,49 @@ func setupRoutes(router *gin.Engine, h *api.Handler, userHandler *handlers.UserH
ws := router.Group("/api/v1/ws")
ws.Use(authMiddleware)
{
// NOTE: /ws/sessions is now handled by websocketHandler.RegisterRoutes() below
ws.GET("/cluster", operatorMiddleware, h.ClusterWebSocket)
// Session updates WebSocket - connects to wsManager for real-time session broadcasts
ws.GET("/sessions", func(c *gin.Context) {
// Get user ID from auth middleware
userID, exists := c.Get("userID")
if !exists {
c.JSON(http.StatusUnauthorized, gin.H{"error": "User not authenticated"})
return
}

userIDStr, ok := userID.(string)
if !ok {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Invalid user ID"})
return
}

// Upgrade HTTP connection to WebSocket
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Printf("Failed to upgrade WebSocket connection: %v", err)
return
}

// Delegate to wsManager which broadcasts sessions every 3 seconds
wsManager.HandleSessionsWebSocket(conn, userIDStr, "")
})

// Metrics WebSocket - connects to wsManager for real-time metrics broadcasts
ws.GET("/cluster", operatorMiddleware, func(c *gin.Context) {
// Upgrade HTTP connection to WebSocket
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Printf("Failed to upgrade WebSocket connection: %v", err)
return
}

// Delegate to wsManager which broadcasts metrics every 5 seconds
wsManager.HandleMetricsWebSocket(conn)
})

ws.GET("/logs/:namespace/:pod", operatorMiddleware, h.LogsWebSocket)
ws.GET("/enterprise", handlers.HandleEnterpriseWebSocket) // Real-time enterprise features
}

// Real-time updates via WebSocket - using dedicated handler (all authenticated users)
// Registers: /ws/sessions, /ws/notifications, /ws/metrics, /ws/alerts
websocketHandler.RegisterRoutes(router.Group("/api/v1", authMiddleware))

// Webhook endpoints (HMAC signature validation required)
webhooks := router.Group("/webhooks")
{
Expand Down
32 changes: 25 additions & 7 deletions scripts/local-deploy-kubectl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,24 @@ show_status() {
echo ""
}

# Start port forwards
start_port_forwards() {
if [ "${AUTO_PORT_FORWARD:-true}" = "true" ]; then
echo ""
log "Starting port forwards automatically..."

if [ -f "${PROJECT_ROOT}/scripts/local-port-forward.sh" ]; then
"${PROJECT_ROOT}/scripts/local-port-forward.sh"
return 0
else
log_warning "Port forward script not found, skipping"
show_access_info
fi
else
show_access_info
fi
}

# Show access instructions
show_access_info() {
echo ""
Expand All @@ -550,14 +568,13 @@ show_access_info() {
echo -e "${COLOR_BOLD}═══════════════════════════════════════════════════${COLOR_RESET}"
echo ""

log_info "Port-forward UI (in a separate terminal):"
echo " kubectl port-forward -n ${NAMESPACE} svc/streamspace-ui 3000:80"
echo " Then access: http://localhost:3000"
log_info "Start automatic port forwards:"
echo " ./scripts/local-port-forward.sh"
echo ""

log_info "Port-forward API (in a separate terminal):"
log_info "Or manually port-forward (in separate terminals):"
echo " kubectl port-forward -n ${NAMESPACE} svc/streamspace-ui 3000:80"
echo " kubectl port-forward -n ${NAMESPACE} svc/streamspace-api 8000:8000"
echo " Then access: http://localhost:8000"
echo ""

log_info "View logs:"
Expand All @@ -568,7 +585,8 @@ show_access_info() {
echo ""

log_info "When finished testing:"
echo " kubectl delete namespace ${NAMESPACE}"
echo " ./scripts/local-stop-port-forward.sh # Stop port forwards"
echo " kubectl delete namespace ${NAMESPACE} # Delete everything"
echo ""
}

Expand All @@ -594,7 +612,7 @@ main() {
deploy_ui
wait_for_pods
show_status
show_access_info
start_port_forwards

echo -e "${COLOR_BOLD}═══════════════════════════════════════════════════${COLOR_RESET}"
log_success "Deployment complete!"
Expand Down
32 changes: 25 additions & 7 deletions scripts/local-deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,24 @@ show_status() {
helm status "${RELEASE_NAME}" -n "${NAMESPACE}"
}

# Start port forwards
start_port_forwards() {
if [ "${AUTO_PORT_FORWARD:-true}" = "true" ]; then
echo ""
log "Starting port forwards automatically..."

if [ -f "${PROJECT_ROOT}/scripts/local-port-forward.sh" ]; then
"${PROJECT_ROOT}/scripts/local-port-forward.sh"
return 0
else
log_warning "Port forward script not found, skipping"
show_access_info
fi
else
show_access_info
fi
}

# Show access instructions
show_access_info() {
echo ""
Expand All @@ -277,14 +295,13 @@ show_access_info() {
echo -e "${COLOR_BOLD}═══════════════════════════════════════════════════${COLOR_RESET}"
echo ""

log_info "Port-forward UI (in a separate terminal):"
echo " kubectl port-forward -n ${NAMESPACE} svc/${RELEASE_NAME}-ui 3000:80"
echo " Then access: http://localhost:3000"
log_info "Start automatic port forwards:"
echo " ./scripts/local-port-forward.sh"
echo ""

log_info "Port-forward API (in a separate terminal):"
log_info "Or manually port-forward (in separate terminals):"
echo " kubectl port-forward -n ${NAMESPACE} svc/${RELEASE_NAME}-ui 3000:80"
echo " kubectl port-forward -n ${NAMESPACE} svc/${RELEASE_NAME}-api 8000:8000"
echo " Then access: http://localhost:8000"
echo ""

log_info "View logs:"
Expand All @@ -294,7 +311,8 @@ show_access_info() {
echo ""

log_info "When finished testing:"
echo " ./scripts/local-teardown.sh"
echo " ./scripts/local-stop-port-forward.sh # Stop port forwards"
echo " ./scripts/local-teardown.sh # Full teardown"
echo ""
}

Expand All @@ -316,7 +334,7 @@ main() {
deploy_helm
wait_for_pods
show_status
show_access_info
start_port_forwards

echo -e "${COLOR_BOLD}═══════════════════════════════════════════════════${COLOR_RESET}"
log_success "Deployment complete!"
Expand Down
Loading
Loading