diff --git a/internal/controller/v2/challenge_controller.go b/internal/controller/v2/challenge_controller.go index ad4ed46..22f0fe3 100644 --- a/internal/controller/v2/challenge_controller.go +++ b/internal/controller/v2/challenge_controller.go @@ -44,10 +44,6 @@ var log = logr.Log.WithName("ChallengeController") type ChallengeReconciler struct { client.Client Scheme *runtime.Scheme - - // KafkaClient is the Kafka producer client - // 중요한 메세지를 Kafka를 통해 보낸다. - // KafkaClient *kafka.KafkaProducer } // +kubebuilder:rbac:groups=apps.hexactf.io,resources=challenges,verbs=get;list;watch;create;update;patch;delete diff --git a/internal/controller/v2/handler.go b/internal/controller/v2/handler.go index 8d6bf10..3a61667 100644 --- a/internal/controller/v2/handler.go +++ b/internal/controller/v2/handler.go @@ -26,10 +26,6 @@ func (r *ChallengeReconciler) initializeChallenge(ctx context.Context, challenge return fmt.Errorf("failed to initialize status: %w", err) } - // if err := r.KafkaClient.SendStatusChange(challenge.Labels["apps.hexactf.io/userId"], challenge.Labels["apps.hexactf.io/challengeId"], "Pending"); err != nil { - // return fmt.Errorf("failed to send status change: %w", err) - // } - return nil } @@ -54,16 +50,6 @@ func (r *ChallengeReconciler) handlePendingState(ctx context.Context, challenge log.Error(err, "Failed to update Challenge status", "challenge", challenge.Name) } - // Convert endpoint to string - // endpoint := "" - // if challenge.Status.Endpoint != 0 { - // endpoint = fmt.Sprintf("%d", challenge.Status.Endpoint) - // } - - // Send Message - // if err := r.KafkaClient.SendStatusChangeWithEndpoint(challenge.Labels["apps.hexactf.io/userId"], challenge.Labels["apps.hexactf.io/challengeId"], "Running", endpoint); err != nil { - // log.Error(err, "Failed to send status change: %w", err) - // } } return ctrl.Result{RequeueAfter: requeueInterval}, nil @@ -117,13 +103,6 @@ func (r *ChallengeReconciler) handleDeletion(ctx context.Context, challenge *hex return ctrl.Result{RequeueAfter: time.Second * 5}, err } - // Send message to queue - // sendErr := r.KafkaClient.SendStatusChange(challenge.Labels["apps.hexactf.io/userId"], challenge.Labels["apps.hexactf.io/challengeId"], "Deleted") - // if sendErr != nil { - // log.Error(sendErr, "Failed to send status change message") - // return ctrl.Result{}, sendErr - // } - } log.Info("Successfully completed deletion process") @@ -144,15 +123,6 @@ func (r *ChallengeReconciler) handleError(ctx context.Context, req ctrl.Request, return ctrl.Result{}, err } - // crStatusMetric.WithLabelValues(challenge.Labels["apps.hexactf.io/challengeId"], challenge.Name, challenge.Labels["apps.hexactf.io/user"], challenge.Namespace).Set(3) - - // Send message to queue - // sendErr := r.KafkaClient.SendStatusChange(challenge.Labels["apps.hexactf.io/userId"], challenge.Labels["apps.hexactf.io/challengeId"], "Error") - // if sendErr != nil { - // log.Error(sendErr, "Failed to send status change message") - // return ctrl.Result{}, sendErr - // } - // 에러 발생 시 challenge 삭제 if deleteErr := r.Delete(ctx, challenge); deleteErr != nil { log.Error(deleteErr, "Failed to delete Challenge") diff --git a/internal/kafka/kafka.go b/internal/kafka/kafka.go deleted file mode 100644 index 8a93364..0000000 --- a/internal/kafka/kafka.go +++ /dev/null @@ -1,107 +0,0 @@ -package kafka - -import ( - "encoding/json" - "fmt" - "time" - - "github.com/IBM/sarama" -) - -const ( - kafkaTopic = "challenge-status" -) - -// KafkaProducer Kafka producer -type KafkaProducer struct { - producer sarama.SyncProducer -} - -// StatusMessage 는 Kafka에 보낼 메세지 -type StatusMessage struct { - UserId string `json:"userId"` - ProblemID string `json:"problemId"` - NewStatus string `json:"newStatus"` - Endpoint string `json:"endpoint"` - Timestamp time.Time `json:"timestamp"` -} - -// NewKafkaProducer Kafka producer 객체 생성 -// Producer 관련 설정 수행 -func NewKafkaProducer(brokers []string) (*KafkaProducer, error) { - config := sarama.NewConfig() - config.Producer.RequiredAcks = sarama.WaitForAll - config.Producer.Retry.Max = 5 - config.Producer.Return.Successes = true - - var producer sarama.SyncProducer - var err error - maxRetries := 10 - retryInterval := time.Second * 10 - - for i := 0; i < maxRetries; i++ { - producer, err = sarama.NewSyncProducer(brokers, config) - if err == nil { - break - } - //log.Info("Failed to connect to Kafka, retrying...", - // "attempt", i+1, - // "maxRetries", maxRetries, - // "error", err) - time.Sleep(retryInterval) - } - if err != nil { - return nil, fmt.Errorf("failed to create Kafka producer: %w", err) - } - - return &KafkaProducer{ - producer: producer, - }, nil -} - -// SendStatusChange 상태 메세지를 보낼때 사용된다. -func (k *KafkaProducer) SendStatusChange(userId, problemId, newStatus string) error { - return k.SendStatusChangeWithEndpoint(userId, problemId, newStatus, "") -} - -// SendStatusChangeWithEndpoint 상태 메세지를 보낼때 사용된다. -func (k *KafkaProducer) SendStatusChangeWithEndpoint(userId, problemId, newStatus, endpoint string) error { - if k == nil { - return fmt.Errorf("KafkaProducer instance is nil") - } - if k.producer == nil { - return fmt.Errorf("internal Kafka producer is nil") - } - msg := StatusMessage{ - UserId: userId, - ProblemID: problemId, - NewStatus: newStatus, - Endpoint: endpoint, - Timestamp: time.Now(), - } - - payload, err := json.Marshal(msg) - if err != nil { - return fmt.Errorf("failed to marshal status message: %w", err) - } - - _, _, err = k.producer.SendMessage(&sarama.ProducerMessage{ - Topic: kafkaTopic, - Value: sarama.StringEncoder(payload), - Key: sarama.StringEncoder(fmt.Sprintf("%s-%s", userId, problemId)), - }) - - if err != nil { - return fmt.Errorf("failed to send Kafka message: %w", err) - } - - return nil -} - -// Close Kafka producer 종료 -func (k *KafkaProducer) Close() error { - if k.producer != nil { - return k.producer.Close() - } - return nil -}