diff --git a/src/go/pt-k8s-debug-collector/dumper/dumper.go b/src/go/pt-k8s-debug-collector/dumper/dumper.go index 3303e9fbe..00cee465e 100644 --- a/src/go/pt-k8s-debug-collector/dumper/dumper.go +++ b/src/go/pt-k8s-debug-collector/dumper/dumper.go @@ -70,6 +70,7 @@ type individualFile struct { resourceName string containerName string filepaths []string + dirpaths map[string][]string // map[tarFolder][]dirPaths } // resourceMap struct is used to dump the resources from namespace scope or cluster scope @@ -504,15 +505,17 @@ func matchesCR(cr string, podLabels map[string]string) bool { func (d *Dumper) exportPodSummaryAndFiles(ctx context.Context, job exportJob) { for _, cr := range d.crTypes { - if !matchesCR(cr, job.Pod.Labels) { + normalizedCR := resourceType(cr) + + if !matchesCR(normalizedCR, job.Pod.Labels) { continue } if !d.skipPodSummary { - d.getSummary(ctx, job, cr, d.PodSummaryPath(job.Pod.Namespace, job.Pod.Name)) + d.getSummary(ctx, job, normalizedCR, d.PodSummaryPath(job.Pod.Namespace, job.Pod.Name)) } - d.getIndividualFiles(ctx, job, cr) + d.getIndividualFiles(ctx, job, normalizedCR) } } diff --git a/src/go/pt-k8s-debug-collector/dumper/individual_files.go b/src/go/pt-k8s-debug-collector/dumper/individual_files.go index 046b8b020..e8bb9916e 100644 --- a/src/go/pt-k8s-debug-collector/dumper/individual_files.go +++ b/src/go/pt-k8s-debug-collector/dumper/individual_files.go @@ -2,69 +2,165 @@ package dumper import ( "archive/tar" - "bytes" "context" - "errors" "fmt" "io" + "path" + "strings" log "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" ) +// getContainerEnvMap parses environment variables from pod container spec once +func (d *Dumper) getContainerEnvMap(pod corev1.Pod, containerName string) (map[string]string, error) { + envMap := make(map[string]string) + for _, c := range pod.Spec.Containers { + if c.Name == containerName { + for _, e := range c.Env { + envMap[e.Name] = e.Value + } + return envMap, nil + } + } + + return nil, fmt.Errorf("container %s not found in pod %s/%s", containerName, pod.Namespace, pod.Name) +} + +// replaceEnvVars replaces environment variables in input using provided env map +func replaceEnvVars(input string, envMap map[string]string) string { + result := input + for envName, envValue := range envMap { + result = strings.ReplaceAll(result, "$"+envName, envValue) + } + return result +} + func (d *Dumper) getIndividualFiles(ctx context.Context, job exportJob, crType string) { + normalizedCRType := resourceType(crType) + for _, indf := range d.individualFiles { - if indf.resourceName == crType { - for _, indPath := range indf.filepaths { - file, err := d.getFileFromPod(ctx, job.Pod, indPath, indf.containerName) - if err != nil { - log.Infof("skipping file dump for %s/%s due to error: %s", job.Pod.Namespace, job.Pod.Name, err) - continue - } + if resourceType(indf.resourceName) != normalizedCRType { + continue + } + + // Parse environment variables once for this container + envMap, err := d.getContainerEnvMap(job.Pod, indf.containerName) + if err != nil { + log.Warnf("Failed to get env for container %q: %v", indf.containerName, err) + continue + } - if len(file) != 0 { - log.Infof("pod: %q writing individual file with path %s to dump", job.Pod.Name, indPath) - path := d.PodIndividualFilesPath(job.Pod.Namespace, job.Pod.Name, indPath) - err = d.archive.WriteVirtualFile(path, file) - if err != nil { - log.Errorf("error while dumping individual files for %s/%s: %s", job.Pod.Namespace, job.Pod.Name, err) - } + // Process individual files + for _, indPath := range indf.filepaths { + resolvedPath := replaceEnvVars(indPath, envMap) + if err := d.processSingleFile(ctx, job, indf.containerName, "", resolvedPath); err != nil { + log.Warnf("Failed to process file %q: %v", resolvedPath, err) + } + } + + // Process directories + for tarFolder, dirPaths := range indf.dirpaths { + for _, dirPath := range dirPaths { + resolvedPath := replaceEnvVars(dirPath, envMap) + if err := d.processDir(ctx, job, indf.containerName, tarFolder, resolvedPath); err != nil { + log.Warnf("Skipping directory %q: %v", resolvedPath, err) } } } } } -func (d *Dumper) getFileFromPod(ctx context.Context, pod corev1.Pod, filepath, containerName string) ([]byte, error) { - if len(filepath) == 0 || len(containerName) == 0 { - return nil, errors.New("container name or filepath is not specified") - } +func (d *Dumper) processSingleFile( + ctx context.Context, + job exportJob, + container, tarFolder, filePath string, +) error { - cmd := []string{"tar", "cf", "-", filepath} - stdout, stderr, err := d.executeInPod(ctx, cmd, pod, containerName, nil) + tr, rc, err := d.tarFromPod(ctx, job.Pod, container, filePath) if err != nil { - return nil, fmt.Errorf("failed to execute command in Pod: stderr: %s: %w", &stderr, err) + return fmt.Errorf("exec tar: %w", err) } + defer rc.Close() - tarReader := tar.NewReader(&stdout) - var fileContentBuffer bytes.Buffer for { - header, err := tarReader.Next() + hdr, err := tr.Next() if err == io.EOF { break } if err != nil { - return nil, fmt.Errorf("error reading tar header: %w", err) + return err } - if header.Typeflag == tar.TypeReg && header.Name == filepath { - _, copyErr := io.Copy(&fileContentBuffer, tarReader) - if copyErr != nil { - return nil, fmt.Errorf("error copying file content: %w", copyErr) - } + if hdr.Typeflag != tar.TypeReg { + continue + } + + if path.Base(hdr.Name) != path.Base(filePath) { + continue } + + dst := d.PodIndividualFilesPath( + job.Pod.Namespace, + job.Pod.Name, + path.Join(tarFolder, path.Clean(strings.TrimPrefix(filePath, "/"))), + ) + + return d.archive.WriteFile(dst, tr, hdr.Size) + } + + return fmt.Errorf("file %q not found", filePath) +} + +func (d *Dumper) processDir( + ctx context.Context, + job exportJob, + container, tarFolder, dir string, +) error { + + tr, rc, err := d.tarFromPod(ctx, job.Pod, container, "-C", dir, ".") + if err != nil { + return err } + defer rc.Close() + + baseDir := path.Clean(strings.TrimPrefix(dir, "/")) - return fileContentBuffer.Bytes(), nil + for { + hdr, err := tr.Next() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + + if hdr.Typeflag != tar.TypeReg { + continue + } + + // Preserve the relative path from the tar header while ensuring it + // cannot escape the intended destination directory. + relPath := path.Clean(hdr.Name) + // Normalize common tar prefixes like "./" + relPath = strings.TrimPrefix(relPath, "./") + // Prevent path traversal outside tarFolder by stripping leading "../" + for strings.HasPrefix(relPath, "../") { + relPath = strings.TrimPrefix(relPath, "../") + } + // Skip entries that do not resolve to a meaningful relative path + if relPath == "" || relPath == "." { + continue + } + + dst := d.PodIndividualFilesPath( + job.Pod.Namespace, + job.Pod.Name, + path.Join(tarFolder, baseDir, relPath), + ) + + if err := d.archive.WriteFile(dst, tr, hdr.Size); err != nil { + return err + } + } } diff --git a/src/go/pt-k8s-debug-collector/dumper/kube_utils.go b/src/go/pt-k8s-debug-collector/dumper/kube_utils.go index 5cead2994..195bda474 100644 --- a/src/go/pt-k8s-debug-collector/dumper/kube_utils.go +++ b/src/go/pt-k8s-debug-collector/dumper/kube_utils.go @@ -1,6 +1,7 @@ package dumper import ( + "archive/tar" "bytes" "context" "errors" @@ -10,10 +11,12 @@ import ( "net/url" "path" "strconv" + "strings" log "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/portforward" "k8s.io/client-go/tools/remotecommand" @@ -153,3 +156,111 @@ func (d *Dumper) executeInPod(ctx context.Context, command []string, pod corev1. return outb, errb, nil } + +// tarFromPod executes tar command in pod and returns tar reader and read closer. +func (d *Dumper) tarFromPod( + ctx context.Context, + pod corev1.Pod, + container string, + args ...string, +) (*tar.Reader, io.ReadCloser, error) { + cmd := append([]string{"tar", "cf", "-"}, args...) + + stdout, err := d.executeInPodStream(ctx, cmd, pod, container, nil) + if err != nil { + return nil, nil, err + } + + return tar.NewReader(stdout), stdout, nil +} + +// DrainCloser wraps an io.ReadCloser to ensure proper closure of pod exec streams. +// Kubernetes SPDY transport may try to write to a closed pipe if stdout is closed +// before fully read, causing "io: read/write on closed pipe" logs. +// Close() drains the remaining data to io.Discard to avoid these errors. +type DrainCloser struct{ io.ReadCloser } + +func (d DrainCloser) Close() error { + if d.ReadCloser == nil { + return nil + } + _, _ = io.Copy(io.Discard, d.ReadCloser) + err := d.ReadCloser.Close() + d.ReadCloser = nil + return err +} + +// executeInPodStream executes command in pod and streams the output. +// Streaming errors are logged from the background goroutine because they can +// happen after this function has already returned the stdout reader. +func (d *Dumper) executeInPodStream(ctx context.Context, command []string, pod corev1.Pod, container string, stdin io.Reader) (io.ReadCloser, error) { + stdinFlag := stdin != nil + var stderr bytes.Buffer + + req := d.clientSet.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(pod.Name). + Namespace(pod.Namespace). + SubResource("exec"). + VersionedParams(&corev1.PodExecOptions{ + Command: command, + Stdin: stdinFlag, + Stdout: true, + Stderr: true, + TTY: false, + Container: container, + }, scheme.ParameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(d.restConfig, "POST", req.URL()) + if err != nil { + return nil, fmt.Errorf("error creating SPDY executor: %w", err) + } + + pr, pw := io.Pipe() + + go func() { + if err := exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdin: stdin, + Stdout: pw, + Stderr: &stderr, + Tty: false, + }); err != nil && !errors.Is(err, context.Canceled) { + if stderr.Len() > 0 { + log.Errorf("error while streaming files from pod: %s (stderr: %s)", err, stderr.String()) + _ = pw.CloseWithError(fmt.Errorf("%w: %s", err, strings.TrimSpace(stderr.String()))) + return + } + log.Errorf("error while streaming files from pod: %s", err) + _ = pw.CloseWithError(err) + return + } + + _ = pw.Close() + }() + + return DrainCloser{pr}, nil +} + +// ParseEnvsFromSpec parses environment variables in input string +func (d *Dumper) ParseEnvsFromSpec(ctx context.Context, namespace, podName, container, input string) (string, error) { + if !strings.Contains(input, "$") { + return input, nil + } + + pod, err := d.clientSet.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) + if err != nil { + return "", err + } + + for _, c := range pod.Spec.Containers { + if c.Name == container { + resolved := input + for _, e := range c.Env { + resolved = strings.ReplaceAll(resolved, "$"+e.Name, e.Value) + } + return resolved, nil + } + } + + return "", fmt.Errorf("container %s not found in pod %s", container, podName) +} diff --git a/src/go/pt-k8s-debug-collector/dumper/resources.go b/src/go/pt-k8s-debug-collector/dumper/resources.go index 04d939d7e..95e482943 100644 --- a/src/go/pt-k8s-debug-collector/dumper/resources.go +++ b/src/go/pt-k8s-debug-collector/dumper/resources.go @@ -12,10 +12,28 @@ import ( var resourcesRe = regexp.MustCompile(`(\w+\.(\w+).percona\.com)`) func (d *Dumper) addPg1() error { + dirpaths := map[string][]string{ + "pg_log": {"$PGBACKREST_DB_PATH/pg_log"}, + } + + d.individualFiles = append(d.individualFiles, individualFile{ + resourceName: "pgo", + containerName: "database", + dirpaths: dirpaths, + }) return nil } func (d *Dumper) addPg2() error { + dirpaths := map[string][]string{ + "pg_log": {"$PGDATA/log"}, + } + + d.individualFiles = append(d.individualFiles, individualFile{ + resourceName: "pgv2", + containerName: "database", + dirpaths: dirpaths, + }) return nil } diff --git a/src/go/pt-k8s-debug-collector/main_test.go b/src/go/pt-k8s-debug-collector/main_test.go index 834018645..e6cf4b934 100644 --- a/src/go/pt-k8s-debug-collector/main_test.go +++ b/src/go/pt-k8s-debug-collector/main_test.go @@ -328,10 +328,63 @@ func (s *CollectorSuite) TestIndividualFiles() { return in[:nl] }, }, + { + namespace: "pgo", + // If the tool collects PostgreSQL log files + name: "pgo_pg_logs_exist", + // tar -tf cluster-dump.tar.gz --wildcards 'cluster-dump/*/pg_log/*.log' + cmd: []string{"tar", "-tf", "cluster-dump.tar.gz", "--wildcards", "cluster-dump/*/pg_log/*.log"}, + want: []string{".log"}, + preprocessor: func(in string) string { + files := strings.Split(in, "\n") + var result []string + for _, f := range files { + if strings.Contains(f, "pg_log") && strings.HasSuffix(f, ".log") { + result = append(result, ".log") + break // Just check if at least one .log file exists + } + } + return strings.Join(result, "") + }, + }, + { + namespace: "pgv2", + // If the tool collects PostgreSQL log files for pgv2 + name: "pgv2_pg_logs_exist", + // tar -tf cluster-dump.tar.gz --wildcards 'cluster-dump/*/pg_log/*.log' + cmd: []string{"tar", "-tf", "cluster-dump.tar.gz", "--wildcards", "cluster-dump/*/pg_log/*.log"}, + want: []string{".log"}, + preprocessor: func(in string) string { + files := strings.Split(in, "\n") + var result []string + for _, f := range files { + if strings.Contains(f, "pg_log") && strings.HasSuffix(f, ".log") { + result = append(result, ".log") + break // Just check if at least one .log file exists + } + } + return strings.Join(result, "") + }, + }, } - if s.Namespace != "pxc" { - s.T().Skip("This test is specifically for pxc namespace") + // Filter tests for current namespace + nsTests := []struct { + namespace string + name string + cmd []string + want []string + preprocessor func(string) string + }{} + + for _, test := range tests { + if test.namespace == s.Namespace { + nsTests = append(nsTests, test) + } + } + + if len(nsTests) == 0 { + s.T().Skip("No tests configured for namespace " + s.Namespace) } for _, resource := range s.Resources { @@ -340,7 +393,7 @@ func (s *CollectorSuite) TestIndividualFiles() { err := cmd.Run() s.NoError(err) - for _, test := range tests { + for _, test := range nsTests { out, err := exec.Command(test.cmd[0], test.cmd[1:]...).CombinedOutput() if err != nil && resource == "none" { continue