Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions deployment/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ rules:
- apiGroups: ["", "resource.k8s.io"]
resources: ["pods", "resourceslices"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["nodes/proxy"]
verbs: ["get"]
{{- end }}
2 changes: 2 additions & 0 deletions internal/pkg/appconfig/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Config struct {
KubernetesGPUIdType KubernetesGPUIDType
KubernetesPodLabelAllowlistRegex []string // Regex patterns for filtering pod labels
KubernetesPodLabelCacheSize int // Maximum number of label keys to cache (<=0 means default size)
KubernetesUseKubeletAPI bool // Use kubelet API instead of apiserver for pod metadata
KubernetesKubeletURL string // Kubelet API URL (e.g. https://127.0.0.1:10250)
CollectDCP bool
UseOldNamespace bool
UseRemoteHE bool
Expand Down
5 changes: 5 additions & 0 deletions internal/pkg/os/os.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type OS interface {
IsNotExist(err error) bool
MkdirTemp(dir, pattern string) (string, error)
Open(name string) (*os.File, error)
ReadFile(name string) ([]byte, error)
Remove(name string) error
RemoveAll(path string) error
Stat(name string) (os.FileInfo, error)
Expand Down Expand Up @@ -58,6 +59,10 @@ func (RealOS) Open(name string) (*os.File, error) {
return os.Open(name)
}

func (RealOS) ReadFile(name string) ([]byte, error) {
return os.ReadFile(name)
}

func (RealOS) MkdirTemp(dir, pattern string) (string, error) {
return os.MkdirTemp(dir, pattern)
}
Expand Down
171 changes: 170 additions & 1 deletion internal/pkg/transformation/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ package transformation
import (
"container/list"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"log/slog"
"maps"
"net"
"net/http"
"regexp"
"slices"
"strings"
Expand All @@ -33,6 +37,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -54,6 +59,11 @@ var (
gkeVirtualGPUDeviceIDSeparator = "/vgpu"
)

const (
saTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
saCAPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
)

// DeviceProcessingFunc is a callback function type for processing devices
type DeviceProcessingFunc func(pod *podresourcesapi.PodResources, container *podresourcesapi.ContainerResources, device *podresourcesapi.ContainerDevices)

Expand Down Expand Up @@ -100,6 +110,10 @@ func NewPodMapper(c *appconfig.Config) *PodMapper {
labelFilterCache: newLabelFilterCache(c.KubernetesPodLabelAllowlistRegex, cacheSize),
}

if c.KubernetesUseKubeletAPI {
slog.Info("Using kubelet API for pod metadata instead of apiserver")
}

if !c.KubernetesEnablePodLabels && !c.KubernetesEnablePodUID && !c.KubernetesEnableDRA {
return podMapper
}
Expand Down Expand Up @@ -804,10 +818,21 @@ func (p *PodMapper) createPodInfo(pod *podresourcesapi.PodResources, container *

// Only make API call if we need something that's not cached
if needLabels || needUID {
if podMetadata, err := p.getPodMetadata(pod.GetNamespace(), pod.GetName()); err != nil {
var podMetadata *PodMetadata
var err error

// Choose data source based on config: kubelet API or apiserver
if p.Config.KubernetesUseKubeletAPI {
podMetadata, err = p.getPodMetadataFromKubelet(pod.GetNamespace(), pod.GetName())
} else {
podMetadata, err = p.getPodMetadata(pod.GetNamespace(), pod.GetName())
}

if err != nil {
slog.Warn("Couldn't get pod metadata",
"pod", pod.GetName(),
"namespace", pod.GetNamespace(),
"source", map[bool]string{true: "kubelet", false: "apiserver"}[p.Config.KubernetesUseKubeletAPI],
"error", err)
// Cache empty result to avoid repeated failures, but preserve existing cache data
if !hasCache {
Expand Down Expand Up @@ -842,6 +867,150 @@ func (p *PodMapper) createPodInfo(pod *podresourcesapi.PodResources, container *
}
}

// getPodMetadataFromKubelet fetches metadata (labels and UID) from kubelet /pods API.
// It sanitizes label names to ensure they are valid for Prometheus metrics and applies allowlist filtering.
func (p *PodMapper) getPodMetadataFromKubelet(namespace, podName string) (*PodMetadata, error) {
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
defer cancel()

tokenBytes, err := os.ReadFile(saTokenPath)
if err != nil {
slog.Warn("Failed to read serviceaccount token for kubelet /pods",
"path", saTokenPath,
"error", err,
)
return nil, fmt.Errorf("failed to read serviceaccount token from %s: %w", saTokenPath, err)
}

token := strings.TrimSpace(string(tokenBytes))
caPEM, err := os.ReadFile(saCAPath)
if err != nil {
slog.Warn("Failed to read serviceaccount CA for kubelet /pods",
"path", saCAPath,
"error", err,
)
return nil, fmt.Errorf("failed to read serviceaccount CA from %s: %w", saCAPath, err)
}

rootCAs := x509.NewCertPool()
if !rootCAs.AppendCertsFromPEM(caPEM) {
slog.Warn("Failed to append CA certs for kubelet /pods",
"path", saCAPath,
)
return nil, fmt.Errorf("failed to append CA certs from %s", saCAPath)
}

tlsCfg := &tls.Config{
RootCAs: rootCAs,
}

client := &http.Client{
Timeout: connectionTimeout,
Transport: &http.Transport{TLSClientConfig: tlsCfg},
}

base := p.Config.KubernetesKubeletURL
if base == "" {
base = "https://127.0.0.1:10250"
}

url := strings.TrimRight(base, "/") + "/pods"
slog.Debug("Querying kubelet /pods",
"url", url,
"namespace", namespace,
"pod", podName,
)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
slog.Warn("Failed to build kubelet /pods request",
"url", url,
"error", err,
)
return nil, fmt.Errorf("failed to build kubelet /pods request: %w", err)
}

req.Header.Set("Authorization", "Bearer "+token)
resp, err := client.Do(req)
if err != nil {
slog.Warn("Failed to query kubelet /pods",
"url", url,
"error", err,
)
return nil, fmt.Errorf("failed to query kubelet /pods: %w", err)
}

defer resp.Body.Close()

slog.Debug("Received response from kubelet /pods",
"url", url,
"statusCode", resp.StatusCode,
"status", resp.Status,
)

if resp.StatusCode != http.StatusOK {
slog.Warn("Unexpected status from kubelet /pods",
"url", url,
"statusCode", resp.StatusCode,
"status", resp.Status,
)
return nil, fmt.Errorf("unexpected status from kubelet /pods: %s", resp.Status)
}

var podList corev1.PodList
if err := json.NewDecoder(resp.Body).Decode(&podList); err != nil {
slog.Warn("Failed to decode kubelet /pods response",
"url", url,
"error", err,
)
return nil, fmt.Errorf("failed to decode kubelet /pods response: %w", err)
}

slog.Debug("Decoded kubelet /pods response",
"totalPods", len(podList.Items),
"namespace", namespace,
"pod", podName,
)

for _, pod := range podList.Items {
if pod.Namespace == namespace && pod.Name == podName {
sanitizedLabels := make(map[string]string)
for k, v := range pod.Labels {
if !p.shouldIncludeLabel(k) {
slog.Debug("Filtering out pod label",
"label", k,
"pod", podName,
"namespace", namespace)
continue
}

sanitizedKey := utils.SanitizeLabelName(k)
sanitizedLabels[sanitizedKey] = v
}

slog.Debug("Found pod in kubelet /pods response",
"pod", podName,
"namespace", namespace,
"uid", string(pod.UID),
"labelCount", len(sanitizedLabels),
)

return &PodMetadata{
UID: string(pod.UID),
Labels: sanitizedLabels,
}, nil
}
}

slog.Warn("Pod not found in kubelet /pods response",
"pod", podName,
"namespace", namespace,
"totalPods", len(podList.Items),
)

return nil, fmt.Errorf("pod %s/%s not found in kubelet /pods response", namespace, podName)
}

// getPodMetadata fetches metadata (labels and UID) from a Kubernetes pod via the API server.
// It sanitizes label names to ensure they are valid for Prometheus metrics and applies allowlist filtering.
func (p *PodMapper) getPodMetadata(namespace, podName string) (*PodMetadata, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/transformation/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type DRAResourceSliceManager struct {
migDevices map[string]*DRAMigDeviceInfo // pool/device -> MIG info (for MIG devices)
}

// PodMetadata holds pod metadata from API server
// PodMetadata holds pod metadata from API server or kubelet
type PodMetadata struct {
UID string
Labels map[string]string
Expand Down
24 changes: 24 additions & 0 deletions pkg/cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ const (
CLIKubernetesEnablePodUID = "kubernetes-enable-pod-uid"
CLIKubernetesGPUIDType = "kubernetes-gpu-id-type"
CLIKubernetesPodLabelAllowlistRegex = "kubernetes-pod-label-allowlist-regex"
CLIKubernetesPodLabelCacheSize = "kubernetes-pod-label-cache-size"
CLIKubernetesUseKubeletAPI = "kubernetes-use-kubelet-api"
CLIKubernetesKubeletURL = "kubernetes-kubelet-url"
CLIUseOldNamespace = "use-old-namespace"
CLIRemoteHEInfo = "remote-hostengine-info"
CLIGPUDevices = "devices"
Expand Down Expand Up @@ -200,6 +203,24 @@ func NewApp(buildVersion ...string) *cli.App {
Usage: "Regex patterns for filtering pod labels to include in metrics (comma-separated). Empty means include all labels. This parameter is effective only when '--kubernetes-enable-pod-labels' is true.",
EnvVars: []string{"DCGM_EXPORTER_KUBERNETES_POD_LABEL_ALLOWLIST_REGEX"},
},
&cli.IntFlag{
Name: CLIKubernetesPodLabelCacheSize,
Value: 150000,
Usage: "Maximum number of label keys to cache for allowlist filtering. Larger values use more memory but reduce regex evaluations.",
EnvVars: []string{"DCGM_EXPORTER_KUBERNETES_POD_LABEL_CACHE_SIZE"},
},
&cli.BoolFlag{
Name: CLIKubernetesUseKubeletAPI,
Value: false,
Usage: "Use kubelet API instead of apiserver for fetching pod metadata (labels, UID). This reduces load on the apiserver in large clusters.",
EnvVars: []string{"DCGM_EXPORTER_KUBERNETES_USE_KUBELET_API"},
},
&cli.StringFlag{
Name: CLIKubernetesKubeletURL,
Value: "https://127.0.0.1:10250",
Usage: "Kubelet API URL for fetching pod metadata. Only used when --kubernetes-use-kubelet-api is true.",
EnvVars: []string{"DCGM_EXPORTER_KUBERNETES_KUBELET_URL"},
},
&cli.StringFlag{
Name: CLIGPUDevices,
Aliases: []string{"d"},
Expand Down Expand Up @@ -1071,6 +1092,9 @@ func contextToConfig(c *cli.Context) (*appconfig.Config, error) {
KubernetesEnablePodUID: c.Bool(CLIKubernetesEnablePodUID),
KubernetesGPUIdType: appconfig.KubernetesGPUIDType(c.String(CLIKubernetesGPUIDType)),
KubernetesPodLabelAllowlistRegex: c.StringSlice(CLIKubernetesPodLabelAllowlistRegex),
KubernetesPodLabelCacheSize: c.Int(CLIKubernetesPodLabelCacheSize),
KubernetesUseKubeletAPI: c.Bool(CLIKubernetesUseKubeletAPI),
KubernetesKubeletURL: c.String(CLIKubernetesKubeletURL),
CollectDCP: true,
UseOldNamespace: c.Bool(CLIUseOldNamespace),
UseRemoteHE: c.IsSet(CLIRemoteHEInfo),
Expand Down
Loading