Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/functional-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ jobs:
compose-init:
name: Test Compose Init
runs-on: ubuntu-latest
if: github.event_name == 'pull_request' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/trunk'
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand All @@ -69,6 +70,7 @@ jobs:
compose-init-config-flag:
name: Test Compose Init with Config Flag
runs-on: ubuntu-latest
if: github.event_name == 'pull_request' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/trunk'
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand All @@ -91,6 +93,7 @@ jobs:
compose-init-set:
name: Test Compose Init with Set
runs-on: ubuntu-latest
if: github.event_name == 'pull_request' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/trunk'
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand Down
25 changes: 25 additions & 0 deletions cmd/daemon/run/health/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package health

import (
"stamus-ctl/internal/logging"
"stamus-ctl/pkg"

"github.com/gin-gonic/gin"
)

// Health godoc
// @Summary Health check endpoint
// @Description Returns health status of the daemon (liveness probe for Kubernetes)
// @Tags health
// @Accept json
// @Produce json
// @Success 200 {object} pkg.HealthResponse
// @Router /health [get]
func healthHandler(c *gin.Context) {
logging.LoggerWithRequest(c.Request).Info("Health check")

c.JSON(200, pkg.HealthResponse{
Status: "ok",
Message: "daemon is running",
})
}
139 changes: 139 additions & 0 deletions cmd/daemon/run/health/readiness.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package health

import (
"context"
"os"
"time"

"stamus-ctl/internal/app"
"stamus-ctl/internal/logging"
"stamus-ctl/pkg"

"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/gin-gonic/gin"
)

// Readiness godoc
// @Summary Readiness check endpoint
// @Description Returns readiness status of the daemon with detailed checks (readiness probe for Kubernetes)
// @Tags health
// @Accept json
// @Produce json
// @Success 200 {object} pkg.ReadinessResponse
// @Failure 503 {object} pkg.ReadinessResponse
// @Router /ready [get]
func readinessHandler(c *gin.Context) {
logging.LoggerWithRequest(c.Request).Info("Readiness check")

checks := make(map[string]string)
allReady := true

// Check Docker daemon connectivity
dockerConnected := checkDockerConnectivity()
if dockerConnected {
checks["docker_daemon"] = "connected"
} else {
checks["docker_daemon"] = "disconnected"
allReady = false
}

// Check configuration validity
configValid := checkConfigurationValidity()
if configValid {
checks["configuration"] = "valid"
} else {
checks["configuration"] = "not_found"
// Configuration not being present is not necessarily a blocker for readiness
// as the daemon can still accept requests to initialize configuration
// So we don't set allReady to false here
}

// Check required resources (config folder exists and is writable)
resourcesAvailable := checkRequiredResources()
if resourcesAvailable {
checks["resources"] = "available"
} else {
checks["resources"] = "unavailable"
allReady = false
}

response := pkg.ReadinessResponse{
Checks: checks,
DockerConnected: dockerConnected,
}

if allReady {
response.Status = "ready"
response.Message = "daemon is ready to accept requests"
c.JSON(200, response)
} else {
response.Status = "not_ready"
response.Message = "daemon is not ready, check individual checks for details"
c.JSON(503, response)
}
}

// checkDockerConnectivity verifies that the Docker daemon is accessible
func checkDockerConnectivity() bool {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

apiClient, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
return false
}
defer apiClient.Close()

// Try to ping the Docker daemon
_, err = apiClient.Ping(ctx)
if err != nil {
// Fallback: try to list containers as a connectivity check
_, err = apiClient.ContainerList(ctx, container.ListOptions{Limit: 1})
if err != nil {
return false
}
}

return true
}

// checkConfigurationValidity checks if a valid configuration exists
func checkConfigurationValidity() bool {
// Check if the default config folder exists
configPath := app.GetConfigsFolder("config")
if _, err := os.Stat(configPath); err == nil {
return true
}

// If default config doesn't exist, that's okay - the daemon can still accept
// requests to create a new configuration
return false
}

// checkRequiredResources verifies that required directories exist and are writable
func checkRequiredResources() bool {
// Check if the base configs folder exists and is writable
configsFolder := app.ConfigsFolder
if info, err := os.Stat(configsFolder); err != nil {
// Try to create it if it doesn't exist
if os.IsNotExist(err) {
if err := os.MkdirAll(configsFolder, 0o755); err != nil {
return false
}
} else {
return false
}
} else if !info.IsDir() {
return false
}

// Test write permissions by attempting to create a temp file
testFile := configsFolder + "/.healthcheck"
if err := os.WriteFile(testFile, []byte("test"), 0o644); err != nil {
return false
}
os.Remove(testFile)

return true
}
13 changes: 13 additions & 0 deletions cmd/daemon/run/health/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package health

import (
"github.com/gin-gonic/gin"
)

// NewHealth registers health check endpoints
func NewHealth(router *gin.Engine) {
// Health and readiness checks should be at root level for Kubernetes
// and not require authentication
router.GET("/health", healthHandler)
router.GET("/ready", readinessHandler)
}
95 changes: 84 additions & 11 deletions cmd/daemon/run/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,40 @@ package run
import (
// Common
"context"
"net/http"
"os"
"os/signal"
"time"

// Custom
docs "stamus-ctl/cmd/daemon/docs"
"stamus-ctl/cmd/daemon/run/compose"
"stamus-ctl/cmd/daemon/run/config"
"stamus-ctl/cmd/daemon/run/health"
"stamus-ctl/cmd/daemon/run/troubleshoot"
"stamus-ctl/internal/auth"
"stamus-ctl/internal/logging"
"stamus-ctl/internal/middleware"

// External

"github.com/gin-gonic/gin"
"github.com/redis/go-redis/v9"
"github.com/spf13/cobra"
"github.com/spf13/viper"

ratelimit "github.com/JGLTechnologies/gin-rate-limit"
swaggerfiles "github.com/swaggo/files"
ginSwagger "github.com/swaggo/gin-swagger"
ginprometheus "github.com/zsais/go-gin-prometheus"
"go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin"
"go.opentelemetry.io/otel/trace"
)

func NewPrometheusServer(ctx context.Context) {
engineProm := gin.New()
const (
// RateLimitPerSecond defines the number of requests allowed per second per IP
RateLimitPerSecond = 5
)

p := ginprometheus.NewPrometheus("gin")
p.Use(engineProm)

logging.LoggerWithContextToSpanContext(ctx).Info("Starting prometheus endpoint")
engineProm.Run(":9001")
}

// Ping godoc
// @Summary ping example
Expand All @@ -58,7 +60,10 @@ func RunCmd() *cobra.Command {
_, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()

logging.NewTraceProvider()
// Initialize OpenTelemetry tracing
collectorURL := getEnvFallback("OTEL_COLLECTOR_URL", "")
serviceName := getEnvFallback("OTEL_SERVICE_NAME", "stamus-ctl-daemon")
logging.InitTracer(collectorURL, serviceName)

viper.SetDefault("tokenpath", "")

Expand Down Expand Up @@ -89,7 +94,7 @@ func setupLogging() trace.Span {
c := context.Background()
ctx, span := logging.Tracer.Start(c, "main")
defer span.End()
go NewPrometheusServer(ctx)
go logging.NewPrometheusServer(ctx)
return span
}

Expand All @@ -109,16 +114,25 @@ func SetupRouter(logger func(string)) *gin.Engine {
go auth.WatchForToken(viper.GetString("tokenpath"))
}

// Health endpoints (no auth required for Kubernetes probes)
logger("Setup health endpoints")
health.NewHealth(r)

// Middleware
r.Use(gin.Recovery())
r.Use(middleware.SecurityHeadersMiddleware())
r.Use(middleware.CORSMiddleware())
if viper.GetString("tokenpath") != "" {
r.Use(otelgin.Middleware("stamusd", otelgin.WithTracerProvider(logging.TracerProvider)))
r.Use(logging.SetRequestIDInResponse())
r.Use(logging.LogRequestResponse())
}
r.Use(auth.AuthMiddleware())

// Routes
logger("Setup routes")
v1 := r.Group("/api/v1")
v1.Use(RateLimiter())
{
v1.GET("/ping", ping)
v1.POST("/upload", uploadHandler)
Expand All @@ -135,3 +149,62 @@ func SetupRouter(logger func(string)) *gin.Engine {
// r.RunUnix("./daemon.sock")
return r
}

func getEnvFallback(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}

return fallback
}

func errorHandler(c *gin.Context, info ratelimit.Info) {
c.String(http.StatusTooManyRequests, "Too many requests. Try again in "+time.Until(info.ResetTime).String())
}

func keyFunc(c *gin.Context) string {
return c.ClientIP()
}

// RateLimiter returns a gin middleware that limits the number of requests per IP address.
// It attempts to use Redis for distributed rate limiting, but falls back to in-memory
// rate limiting if Redis is unavailable.
func RateLimiter() gin.HandlerFunc {
redisHost := getEnvFallback("REDIS_HOST", "localhost")
redisPort := getEnvFallback("REDIS_PORT", "6379")
redisPass := getEnvFallback("REDIS_PASSWORD", "license")

// Each ip can make 5 requests per second
client := redis.NewClient(&redis.Options{
Addr: redisHost + ":" + redisPort,
Password: redisPass,
DB: 0, // use default DB
})

// Test Redis connection with timeout
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

var store ratelimit.Store
if err := client.Ping(ctx).Err(); err != nil {
// Redis unavailable - fall back to in-memory store
logging.Logger.Warn("Redis unavailable, falling back to in-memory rate limiter: " + err.Error())
store = ratelimit.InMemoryStore(&ratelimit.InMemoryOptions{
Rate: time.Second,
Limit: RateLimitPerSecond,
})
} else {
// Redis available - use distributed rate limiting
logging.Logger.Info("Using Redis for distributed rate limiting at " + redisHost + ":" + redisPort)
store = ratelimit.RedisStore(&ratelimit.RedisOptions{
RedisClient: client,
Rate: time.Second,
Limit: RateLimitPerSecond,
})
}

return ratelimit.RateLimiter(store, &ratelimit.Options{
ErrorHandler: errorHandler,
KeyFunc: keyFunc,
})
}
Loading