diff --git a/deployment/templates/clusterrole.yaml b/deployment/templates/clusterrole.yaml index 54ef43db..c17f275d 100644 --- a/deployment/templates/clusterrole.yaml +++ b/deployment/templates/clusterrole.yaml @@ -9,4 +9,7 @@ rules: - apiGroups: ["", "resource.k8s.io"] resources: ["pods", "resourceslices"] verbs: ["get", "list", "watch"] +- apiGroups: [""] + resources: ["nodes/proxy"] + verbs: ["get"] {{- end }} diff --git a/internal/pkg/appconfig/types.go b/internal/pkg/appconfig/types.go index ce1e02a2..3f0a9f4d 100644 --- a/internal/pkg/appconfig/types.go +++ b/internal/pkg/appconfig/types.go @@ -48,6 +48,8 @@ type Config struct { KubernetesGPUIdType KubernetesGPUIDType KubernetesPodLabelAllowlistRegex []string // Regex patterns for filtering pod labels KubernetesPodLabelCacheSize int // Maximum number of label keys to cache (<=0 means default size) + KubernetesUseKubeletAPI bool // Use kubelet API instead of apiserver for pod metadata + KubernetesKubeletURL string // Kubelet API URL (e.g. https://127.0.0.1:10250) CollectDCP bool UseOldNamespace bool UseRemoteHE bool diff --git a/internal/pkg/os/os.go b/internal/pkg/os/os.go index 2e676fce..23263a79 100644 --- a/internal/pkg/os/os.go +++ b/internal/pkg/os/os.go @@ -28,6 +28,7 @@ type OS interface { IsNotExist(err error) bool MkdirTemp(dir, pattern string) (string, error) Open(name string) (*os.File, error) + ReadFile(name string) ([]byte, error) Remove(name string) error RemoveAll(path string) error Stat(name string) (os.FileInfo, error) @@ -58,6 +59,10 @@ func (RealOS) Open(name string) (*os.File, error) { return os.Open(name) } +func (RealOS) ReadFile(name string) ([]byte, error) { + return os.ReadFile(name) +} + func (RealOS) MkdirTemp(dir, pattern string) (string, error) { return os.MkdirTemp(dir, pattern) } diff --git a/internal/pkg/transformation/kubernetes.go b/internal/pkg/transformation/kubernetes.go index 74f92870..221ce1de 100644 --- a/internal/pkg/transformation/kubernetes.go +++ b/internal/pkg/transformation/kubernetes.go @@ -19,10 +19,14 @@ package transformation import ( "container/list" "context" + "crypto/tls" + "crypto/x509" + "encoding/json" "fmt" "log/slog" "maps" "net" + "net/http" "regexp" "slices" "strings" @@ -33,6 +37,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -54,6 +59,11 @@ var ( gkeVirtualGPUDeviceIDSeparator = "/vgpu" ) +const ( + saTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token" + saCAPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" +) + // DeviceProcessingFunc is a callback function type for processing devices type DeviceProcessingFunc func(pod *podresourcesapi.PodResources, container *podresourcesapi.ContainerResources, device *podresourcesapi.ContainerDevices) @@ -100,6 +110,10 @@ func NewPodMapper(c *appconfig.Config) *PodMapper { labelFilterCache: newLabelFilterCache(c.KubernetesPodLabelAllowlistRegex, cacheSize), } + if c.KubernetesUseKubeletAPI { + slog.Info("Using kubelet API for pod metadata instead of apiserver") + } + if !c.KubernetesEnablePodLabels && !c.KubernetesEnablePodUID && !c.KubernetesEnableDRA { return podMapper } @@ -804,10 +818,21 @@ func (p *PodMapper) createPodInfo(pod *podresourcesapi.PodResources, container * // Only make API call if we need something that's not cached if needLabels || needUID { - if podMetadata, err := p.getPodMetadata(pod.GetNamespace(), pod.GetName()); err != nil { + var podMetadata *PodMetadata + var err error + + // Choose data source based on config: kubelet API or apiserver + if p.Config.KubernetesUseKubeletAPI { + podMetadata, err = p.getPodMetadataFromKubelet(pod.GetNamespace(), pod.GetName()) + } else { + podMetadata, err = p.getPodMetadata(pod.GetNamespace(), pod.GetName()) + } + + if err != nil { slog.Warn("Couldn't get pod metadata", "pod", pod.GetName(), "namespace", pod.GetNamespace(), + "source", map[bool]string{true: "kubelet", false: "apiserver"}[p.Config.KubernetesUseKubeletAPI], "error", err) // Cache empty result to avoid repeated failures, but preserve existing cache data if !hasCache { @@ -842,6 +867,150 @@ func (p *PodMapper) createPodInfo(pod *podresourcesapi.PodResources, container * } } +// getPodMetadataFromKubelet fetches metadata (labels and UID) from kubelet /pods API. +// It sanitizes label names to ensure they are valid for Prometheus metrics and applies allowlist filtering. +func (p *PodMapper) getPodMetadataFromKubelet(namespace, podName string) (*PodMetadata, error) { + ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout) + defer cancel() + + tokenBytes, err := os.ReadFile(saTokenPath) + if err != nil { + slog.Warn("Failed to read serviceaccount token for kubelet /pods", + "path", saTokenPath, + "error", err, + ) + return nil, fmt.Errorf("failed to read serviceaccount token from %s: %w", saTokenPath, err) + } + + token := strings.TrimSpace(string(tokenBytes)) + caPEM, err := os.ReadFile(saCAPath) + if err != nil { + slog.Warn("Failed to read serviceaccount CA for kubelet /pods", + "path", saCAPath, + "error", err, + ) + return nil, fmt.Errorf("failed to read serviceaccount CA from %s: %w", saCAPath, err) + } + + rootCAs := x509.NewCertPool() + if !rootCAs.AppendCertsFromPEM(caPEM) { + slog.Warn("Failed to append CA certs for kubelet /pods", + "path", saCAPath, + ) + return nil, fmt.Errorf("failed to append CA certs from %s", saCAPath) + } + + tlsCfg := &tls.Config{ + RootCAs: rootCAs, + } + + client := &http.Client{ + Timeout: connectionTimeout, + Transport: &http.Transport{TLSClientConfig: tlsCfg}, + } + + base := p.Config.KubernetesKubeletURL + if base == "" { + base = "https://127.0.0.1:10250" + } + + url := strings.TrimRight(base, "/") + "/pods" + slog.Debug("Querying kubelet /pods", + "url", url, + "namespace", namespace, + "pod", podName, + ) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + slog.Warn("Failed to build kubelet /pods request", + "url", url, + "error", err, + ) + return nil, fmt.Errorf("failed to build kubelet /pods request: %w", err) + } + + req.Header.Set("Authorization", "Bearer "+token) + resp, err := client.Do(req) + if err != nil { + slog.Warn("Failed to query kubelet /pods", + "url", url, + "error", err, + ) + return nil, fmt.Errorf("failed to query kubelet /pods: %w", err) + } + + defer resp.Body.Close() + + slog.Debug("Received response from kubelet /pods", + "url", url, + "statusCode", resp.StatusCode, + "status", resp.Status, + ) + + if resp.StatusCode != http.StatusOK { + slog.Warn("Unexpected status from kubelet /pods", + "url", url, + "statusCode", resp.StatusCode, + "status", resp.Status, + ) + return nil, fmt.Errorf("unexpected status from kubelet /pods: %s", resp.Status) + } + + var podList corev1.PodList + if err := json.NewDecoder(resp.Body).Decode(&podList); err != nil { + slog.Warn("Failed to decode kubelet /pods response", + "url", url, + "error", err, + ) + return nil, fmt.Errorf("failed to decode kubelet /pods response: %w", err) + } + + slog.Debug("Decoded kubelet /pods response", + "totalPods", len(podList.Items), + "namespace", namespace, + "pod", podName, + ) + + for _, pod := range podList.Items { + if pod.Namespace == namespace && pod.Name == podName { + sanitizedLabels := make(map[string]string) + for k, v := range pod.Labels { + if !p.shouldIncludeLabel(k) { + slog.Debug("Filtering out pod label", + "label", k, + "pod", podName, + "namespace", namespace) + continue + } + + sanitizedKey := utils.SanitizeLabelName(k) + sanitizedLabels[sanitizedKey] = v + } + + slog.Debug("Found pod in kubelet /pods response", + "pod", podName, + "namespace", namespace, + "uid", string(pod.UID), + "labelCount", len(sanitizedLabels), + ) + + return &PodMetadata{ + UID: string(pod.UID), + Labels: sanitizedLabels, + }, nil + } + } + + slog.Warn("Pod not found in kubelet /pods response", + "pod", podName, + "namespace", namespace, + "totalPods", len(podList.Items), + ) + + return nil, fmt.Errorf("pod %s/%s not found in kubelet /pods response", namespace, podName) +} + // getPodMetadata fetches metadata (labels and UID) from a Kubernetes pod via the API server. // It sanitizes label names to ensure they are valid for Prometheus metrics and applies allowlist filtering. func (p *PodMapper) getPodMetadata(namespace, podName string) (*PodMetadata, error) { diff --git a/internal/pkg/transformation/types.go b/internal/pkg/transformation/types.go index e49b7599..54e8dbd5 100644 --- a/internal/pkg/transformation/types.go +++ b/internal/pkg/transformation/types.go @@ -80,7 +80,7 @@ type DRAResourceSliceManager struct { migDevices map[string]*DRAMigDeviceInfo // pool/device -> MIG info (for MIG devices) } -// PodMetadata holds pod metadata from API server +// PodMetadata holds pod metadata from API server or kubelet type PodMetadata struct { UID string Labels map[string]string diff --git a/pkg/cmd/app.go b/pkg/cmd/app.go index cf88a949..d9eeb654 100644 --- a/pkg/cmd/app.go +++ b/pkg/cmd/app.go @@ -73,6 +73,9 @@ const ( CLIKubernetesEnablePodUID = "kubernetes-enable-pod-uid" CLIKubernetesGPUIDType = "kubernetes-gpu-id-type" CLIKubernetesPodLabelAllowlistRegex = "kubernetes-pod-label-allowlist-regex" + CLIKubernetesPodLabelCacheSize = "kubernetes-pod-label-cache-size" + CLIKubernetesUseKubeletAPI = "kubernetes-use-kubelet-api" + CLIKubernetesKubeletURL = "kubernetes-kubelet-url" CLIUseOldNamespace = "use-old-namespace" CLIRemoteHEInfo = "remote-hostengine-info" CLIGPUDevices = "devices" @@ -200,6 +203,24 @@ func NewApp(buildVersion ...string) *cli.App { Usage: "Regex patterns for filtering pod labels to include in metrics (comma-separated). Empty means include all labels. This parameter is effective only when '--kubernetes-enable-pod-labels' is true.", EnvVars: []string{"DCGM_EXPORTER_KUBERNETES_POD_LABEL_ALLOWLIST_REGEX"}, }, + &cli.IntFlag{ + Name: CLIKubernetesPodLabelCacheSize, + Value: 150000, + Usage: "Maximum number of label keys to cache for allowlist filtering. Larger values use more memory but reduce regex evaluations.", + EnvVars: []string{"DCGM_EXPORTER_KUBERNETES_POD_LABEL_CACHE_SIZE"}, + }, + &cli.BoolFlag{ + Name: CLIKubernetesUseKubeletAPI, + Value: false, + Usage: "Use kubelet API instead of apiserver for fetching pod metadata (labels, UID). This reduces load on the apiserver in large clusters.", + EnvVars: []string{"DCGM_EXPORTER_KUBERNETES_USE_KUBELET_API"}, + }, + &cli.StringFlag{ + Name: CLIKubernetesKubeletURL, + Value: "https://127.0.0.1:10250", + Usage: "Kubelet API URL for fetching pod metadata. Only used when --kubernetes-use-kubelet-api is true.", + EnvVars: []string{"DCGM_EXPORTER_KUBERNETES_KUBELET_URL"}, + }, &cli.StringFlag{ Name: CLIGPUDevices, Aliases: []string{"d"}, @@ -1071,6 +1092,9 @@ func contextToConfig(c *cli.Context) (*appconfig.Config, error) { KubernetesEnablePodUID: c.Bool(CLIKubernetesEnablePodUID), KubernetesGPUIdType: appconfig.KubernetesGPUIDType(c.String(CLIKubernetesGPUIDType)), KubernetesPodLabelAllowlistRegex: c.StringSlice(CLIKubernetesPodLabelAllowlistRegex), + KubernetesPodLabelCacheSize: c.Int(CLIKubernetesPodLabelCacheSize), + KubernetesUseKubeletAPI: c.Bool(CLIKubernetesUseKubeletAPI), + KubernetesKubeletURL: c.String(CLIKubernetesKubeletURL), CollectDCP: true, UseOldNamespace: c.Bool(CLIUseOldNamespace), UseRemoteHE: c.IsSet(CLIRemoteHEInfo), diff --git a/tests/e2e/e2e_verify_kubelet_api_test.go b/tests/e2e/e2e_verify_kubelet_api_test.go new file mode 100644 index 00000000..6df79011 --- /dev/null +++ b/tests/e2e/e2e_verify_kubelet_api_test.go @@ -0,0 +1,121 @@ +//go:build e2e + +/* + * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package e2e + +import ( + "bytes" + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/prometheus/common/expfmt" + corev1 "k8s.io/api/core/v1" + "k8s.io/utils/ptr" +) + +var _ = Describe("dcgm-exporter-e2e-suite", func() { + Context("DCGM exporter with kubelet API", Ordered, Label("kubelet-api"), func() { + var ( + dcgmExpPod *corev1.Pod + customLabels = map[string]string{ + "example.com/accelerator": "nvidia-h20", + "example.com/appid": "test-appid", + } + metricsResponse []byte + ) + + AfterAll(func(ctx context.Context) { + if testContext.noCleanup { + return + } + cleanupTestContext(ctx, kubeClient, helmClient) + }) + + It("should install dcgm-exporter with kubelet API enabled", func(ctx context.Context) { + shouldInstallHelmChart(ctx, helmClient, []string{ + "arguments={--kubernetes-enable-pod-labels,--kubernetes-use-kubelet-api,--kubernetes-pod-label-allowlist-regex=^example\\.com/accelerator$,--kubernetes-pod-label-allowlist-regex=^example\\.com/appid$}", + "kubernetes.enablePodLabels=true", + "kubernetes.rbac.create=true", + "serviceAccount.create=true", + }) + }) + + It("should create dcgm-exporter pod", func(ctx context.Context) { + dcgmExpPod = shouldCheckIfPodCreated(ctx, kubeClient, map[string]string{ + dcgmExporterPodNameLabel: dcgmExporterPodNameLabelValue, + }) + }) + + It("should ensure dcgm-exporter pod is ready", func(ctx context.Context) { + shouldCheckIfPodIsReady(ctx, kubeClient, dcgmExpPod.Namespace, dcgmExpPod.Name) + }) + + It("should create workload pod with custom labels", func(ctx context.Context) { + shouldCreateWorkloadPod(ctx, kubeClient, customLabels) + }) + + It("should wait for metrics to be available", func(ctx context.Context) { + shouldWaitForMetrics(ctx, kubeClient, dcgmExpPod, dcgmExporterPort) + }) + + It("should read metrics", func(ctx context.Context) { + metricsResponse = shouldReadMetrics(ctx, kubeClient, dcgmExpPod, dcgmExporterPort) + Expect(metricsResponse).ShouldNot(BeEmpty()) + }) + + It("should verify pod labels from kubelet API", func(ctx context.Context) { + var parser expfmt.TextParser + metricFamilies, err := parser.TextToMetricFamilies(bytes.NewReader(metricsResponse)) + Expect(err).ShouldNot(HaveOccurred()) + Expect(metricFamilies).ShouldNot(BeEmpty()) + + expectedLabels := map[string]string{ + "example_com_accelerator": "nvidia-h20", + "example_com_appid": "test-appid", + } + + labelsFound := map[string]bool{} + + for _, metricFamily := range metricFamilies { + for _, metric := range metricFamily.GetMetric() { + for _, label := range metric.Label { + labelName := ptr.Deref(label.Name, "") + labelValue := ptr.Deref(label.Value, "") + + if expectedValue, exists := expectedLabels[labelName]; exists { + Expect(labelValue).Should(Equal(expectedValue)) + labelsFound[labelName] = true + } + } + } + } + + for expectedLabel := range expectedLabels { + Expect(labelsFound[expectedLabel]).Should(BeTrue(), + "Label %q not found in metrics", expectedLabel) + } + }) + + It("should verify logs show kubelet API usage", func(ctx context.Context) { + tailLines := int64(100) + logs, err := kubeClient.GetPodLogs(ctx, dcgmExpPod.Namespace, dcgmExpPod.Name, "", &tailLines) + Expect(err).ShouldNot(HaveOccurred()) + Expect(logs).Should(ContainSubstring("Using kubelet API")) + }) + }) +})