Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 101 additions & 5 deletions project-portal/project-portal-backend/cmd/workers/alert_worker.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,103 @@
//go:build future
// +build future

package workers

// This file won't be compiled in normal builds
// Implementation pending
import (
"context"
"errors"
"log"
"sync"
"time"
)

// AlertEvaluationWorker manages periodic evaluation of monitoring alerts.
type AlertEvaluationWorker struct {
interval time.Duration
logger *log.Logger
mu sync.RWMutex
}

// NewAlertEvaluationWorker creates a new alert evaluation worker.
func NewAlertEvaluationWorker(interval time.Duration, logger *log.Logger) *AlertEvaluationWorker {
if interval <= 0 {
interval = 1 * time.Minute
}
if logger == nil {
logger = log.Default()
}
return &AlertEvaluationWorker{
interval: interval,
logger: logger,
}
}

// Start begins the alert evaluation loop and blocks until context is cancelled.
func (w *AlertEvaluationWorker) Start(ctx context.Context) error {
if ctx == nil {
return errors.New("context cannot be nil")
}

ticker := time.NewTicker(w.interval)
defer ticker.Stop()

w.logger.Printf("alert evaluation worker started with interval: %v\n", w.interval)

for {
select {
case <-ctx.Done():
w.logger.Println("alert evaluation worker: context cancelled, initiating graceful shutdown")
return ctx.Err()
case <-ticker.C:
w.evaluateAlerts(ctx)
}
}
}

// evaluateAlerts runs the alert evaluation cycle.
func (w *AlertEvaluationWorker) evaluateAlerts(ctx context.Context) {
w.mu.RLock()
defer w.mu.RUnlock()

w.logger.Println("alert evaluation worker: triggered evaluation cycle")

activeAlerts := w.getActiveAlertsMock()
if len(activeAlerts) == 0 {
w.logger.Println("alert evaluation worker: no active alerts to evaluate")
return
}

w.logger.Printf("alert evaluation worker: evaluating %d alerts\n", len(activeAlerts))

for _, alertID := range activeAlerts {
if err := w.evaluateSingleAlert(ctx, alertID); err != nil {
w.logger.Printf("alert evaluation worker: error evaluating alert %s: %v\n", alertID, err)
} else {
w.logger.Printf("alert evaluation worker: successfully evaluated alert %s\n", alertID)
}
}

w.logger.Println("alert evaluation worker: evaluation cycle completed")
}

// evaluateSingleAlert evaluates a single alert.
func (w *AlertEvaluationWorker) evaluateSingleAlert(ctx context.Context, alertID string) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

// Mock: Retrieve alert configuration
// Mock: Fetch latest monitoring data
// Mock: Evaluate alert conditions
// Mock: Trigger notifications if threshold breached

return nil
}

// getActiveAlertsMock returns a list of mock active alert IDs.
func (w *AlertEvaluationWorker) getActiveAlertsMock() []string {
return []string{
"alert-001",
"alert-002",
"alert-003",
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,147 @@
//go:build future
// +build future
package channels

package notifications
import (
"context"
"errors"
"fmt"
"time"

// This file won't be compiled in normal builds
// Implementation pending
"carbon-scribe/project-portal/project-portal-backend/pkg/aws"
)

// WebSocketConnection mirrors the type from notifications package to avoid import cycle
type WebSocketConnection struct {
ConnectionID string `json:"connection_id" bson:"_id"`
UserID string `json:"user_id" bson:"user_id"`
ProjectIDs []string `json:"project_ids" bson:"project_ids"`
ConnectedAt time.Time `json:"connected_at" bson:"connected_at"`
LastActivity time.Time `json:"last_activity" bson:"last_activity"`
UserAgent string `json:"user_agent,omitempty" bson:"user_agent,omitempty"`
IPAddress string `json:"ip_address,omitempty" bson:"ip_address,omitempty"`
}

// WebSocketNotification mirrors the type from notifications package to avoid import cycle
type WebSocketNotification struct {
ID string `json:"id" bson:"_id"`
UserID string `json:"user_id" bson:"user_id"`
ProjectID string `json:"project_id,omitempty" bson:"project_id,omitempty"`
Category string `json:"category" bson:"category"`
Subject string `json:"subject" bson:"subject"`
Content string `json:"content" bson:"content"`
Channels []string `json:"channels" bson:"channels"`
Status string `json:"status" bson:"status"`
TemplateID string `json:"template_id,omitempty" bson:"template_id,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty" bson:"metadata,omitempty"`
CreatedAt time.Time `json:"created_at" bson:"created_at"`
UpdatedAt time.Time `json:"updated_at" bson:"updated_at"`
DeliveredAt *time.Time `json:"delivered_at,omitempty" bson:"delivered_at,omitempty"`
}

// WebSocketRepo defines the minimal interface needed from the repository to avoid import cycle
type WebSocketRepo interface {
ListConnections(ctx context.Context, projectID string, userID string) ([]WebSocketConnection, error)
}

// WebSocketSender defines the interface for sending WebSocket notifications.
type WebSocketSender interface {
Send(ctx context.Context, userID string, notification *WebSocketNotification) error
Broadcast(ctx context.Context, projectID string, notification *WebSocketNotification) (int, error)
}

// WebSocketChannel implements WebSocket notification delivery.
type WebSocketChannel struct {
repo WebSocketRepo
apiClient *aws.APIGatewayClient
retryLimit int
}

// NewWebSocketChannel creates a new WebSocket notification channel.
func NewWebSocketChannel(repo WebSocketRepo, apiClient *aws.APIGatewayClient) *WebSocketChannel {
return &WebSocketChannel{
repo: repo,
apiClient: apiClient,
retryLimit: 3,
}
}

// Send sends a notification to all active WebSocket connections for a user.
func (w *WebSocketChannel) Send(ctx context.Context, userID string, notification *WebSocketNotification) error {
if userID == "" {
return errors.New("user ID is required")
}
if notification == nil {
return errors.New("notification is required")
}

conns, err := w.repo.ListConnections(ctx, "", userID)
if err != nil {
return fmt.Errorf("failed to list connections: %w", err)
}

if len(conns) == 0 {
return fmt.Errorf("no active connections for user %s", userID)
}

var lastErr error
sentCount := 0

for _, conn := range conns {
for attempt := 0; attempt <= w.retryLimit; attempt++ {
err := w.sendToConnection(ctx, conn.ConnectionID, notification)
if err == nil {
sentCount++
break
}
lastErr = err
if attempt < w.retryLimit {
time.Sleep(time.Duration((attempt + 1) * 100) * time.Millisecond)
}
}
}

if sentCount == 0 && lastErr != nil {
return lastErr
}

return nil
}

// Broadcast sends a notification to all connections in a project.
func (w *WebSocketChannel) Broadcast(ctx context.Context, projectID string, notification *WebSocketNotification) (int, error) {
if projectID == "" {
return 0, errors.New("project ID is required")
}
if notification == nil {
return 0, errors.New("notification is required")
}

conns, err := w.repo.ListConnections(ctx, projectID, "")
if err != nil {
return 0, fmt.Errorf("failed to list connections: %w", err)
}

sentCount := 0
for _, conn := range conns {
for attempt := 0; attempt <= w.retryLimit; attempt++ {
err := w.sendToConnection(ctx, conn.ConnectionID, notification)
if err == nil {
sentCount++
break
}
if attempt < w.retryLimit {
time.Sleep(time.Duration((attempt + 1) * 100) * time.Millisecond)
}
}
}

return sentCount, nil
}

// sendToConnection sends a notification to a single WebSocket connection.
func (w *WebSocketChannel) sendToConnection(ctx context.Context, connectionID string, notification *WebSocketNotification) error {
if w.apiClient != nil {
return w.apiClient.PostToConnection(ctx, connectionID, notification)
}
// If no API client (local dev), just return success (mock delivery)
return nil
}
120 changes: 115 additions & 5 deletions project-portal/project-portal-backend/pkg/aws/apigateway.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,117 @@
//go:build future
// +build future

package aws

// This file won't be compiled in normal builds
// Implementation pending
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"

"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
)

// APIGatewayConfig holds configuration for API Gateway client.
type APIGatewayConfig struct {
Region string
AccessKeyID string
SecretAccessKey string
Endpoint string
Stage string
}

// APIGatewayClient manages interactions with AWS API Gateway's WebSocket API.
type APIGatewayClient struct {
httpClient *http.Client
endpoint string
}

// NewAPIGatewayClient creates a new API Gateway WebSocket client.
func NewAPIGatewayClient(cfg APIGatewayConfig) (*APIGatewayClient, error) {
opts := []func(*config.LoadOptions) error{
config.WithRegion(cfg.Region),
}

if cfg.AccessKeyID != "" && cfg.SecretAccessKey != "" {
opts = append(opts, config.WithCredentialsProvider(
credentials.NewStaticCredentialsProvider(cfg.AccessKeyID, cfg.SecretAccessKey, ""),
))
}

_, err := config.LoadDefaultConfig(context.Background(), opts...)
if err != nil {
return nil, fmt.Errorf("failed to load AWS config: %w", err)
}

endpoint := cfg.Endpoint
if cfg.Stage != "" && endpoint != "" {
endpoint = fmt.Sprintf("%s/%s", endpoint, cfg.Stage)
}

return &APIGatewayClient{
httpClient: &http.Client{Timeout: 10},
endpoint: endpoint,
}, nil
}

// PostToConnection sends a message to a specific WebSocket connection.
func (c *APIGatewayClient) PostToConnection(ctx context.Context, connectionID string, data interface{}) error {
if c.endpoint == "" {
return fmt.Errorf("API Gateway endpoint not configured")
}
if connectionID == "" {
return fmt.Errorf("connection ID is required")
}

payload, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("failed to marshal payload: %w", err)
}

url := fmt.Sprintf("%s/@connections/%s", c.endpoint, connectionID)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(payload))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")

resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to send message: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
return fmt.Errorf("API Gateway returned status %d", resp.StatusCode)
}

return nil
}

// DeleteConnection closes a specific WebSocket connection.
func (c *APIGatewayClient) DeleteConnection(ctx context.Context, connectionID string) error {
if c.endpoint == "" {
return fmt.Errorf("API Gateway endpoint not configured")
}
if connectionID == "" {
return fmt.Errorf("connection ID is required")
}

url := fmt.Sprintf("%s/@connections/%s", c.endpoint, connectionID)
req, err := http.NewRequestWithContext(ctx, "DELETE", url, nil)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to delete connection: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
return fmt.Errorf("API Gateway returned status %d", resp.StatusCode)
}

return nil
}
Loading
Loading