Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/go/pt-k8s-debug-collector/dumper/dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type individualFile struct {
resourceName string
containerName string
filepaths []string
dirpaths map[string][]string // map[tarFolder][]dirPaths
}

// resourceMap struct is used to dump the resources from namespace scope or cluster scope
Expand Down Expand Up @@ -504,15 +505,17 @@ func matchesCR(cr string, podLabels map[string]string) bool {

func (d *Dumper) exportPodSummaryAndFiles(ctx context.Context, job exportJob) {
for _, cr := range d.crTypes {
if !matchesCR(cr, job.Pod.Labels) {
normalizedCR := resourceType(cr)

if !matchesCR(normalizedCR, job.Pod.Labels) {
continue
}

if !d.skipPodSummary {
d.getSummary(ctx, job, cr, d.PodSummaryPath(job.Pod.Namespace, job.Pod.Name))
d.getSummary(ctx, job, normalizedCR, d.PodSummaryPath(job.Pod.Namespace, job.Pod.Name))
}

d.getIndividualFiles(ctx, job, cr)
d.getIndividualFiles(ctx, job, normalizedCR)
}
}

Expand Down
164 changes: 130 additions & 34 deletions src/go/pt-k8s-debug-collector/dumper/individual_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,165 @@ package dumper

import (
"archive/tar"
"bytes"
"context"
"errors"
"fmt"
"io"
"path"
"strings"

log "github.com/sirupsen/logrus"

corev1 "k8s.io/api/core/v1"
)

// getContainerEnvMap parses environment variables from pod container spec once
func (d *Dumper) getContainerEnvMap(pod corev1.Pod, containerName string) (map[string]string, error) {
envMap := make(map[string]string)
for _, c := range pod.Spec.Containers {
if c.Name == containerName {
for _, e := range c.Env {
envMap[e.Name] = e.Value
}
return envMap, nil
}
}

return nil, fmt.Errorf("container %s not found in pod %s/%s", containerName, pod.Namespace, pod.Name)
}

// replaceEnvVars replaces environment variables in input using provided env map
func replaceEnvVars(input string, envMap map[string]string) string {
result := input
for envName, envValue := range envMap {
result = strings.ReplaceAll(result, "$"+envName, envValue)
}
return result
}

func (d *Dumper) getIndividualFiles(ctx context.Context, job exportJob, crType string) {
normalizedCRType := resourceType(crType)

for _, indf := range d.individualFiles {
if indf.resourceName == crType {
for _, indPath := range indf.filepaths {
file, err := d.getFileFromPod(ctx, job.Pod, indPath, indf.containerName)
if err != nil {
log.Infof("skipping file dump for %s/%s due to error: %s", job.Pod.Namespace, job.Pod.Name, err)
continue
}
if resourceType(indf.resourceName) != normalizedCRType {
continue
}

// Parse environment variables once for this container
envMap, err := d.getContainerEnvMap(job.Pod, indf.containerName)
if err != nil {
log.Warnf("Failed to get env for container %q: %v", indf.containerName, err)
continue
}

if len(file) != 0 {
log.Infof("pod: %q writing individual file with path %s to dump", job.Pod.Name, indPath)
path := d.PodIndividualFilesPath(job.Pod.Namespace, job.Pod.Name, indPath)
err = d.archive.WriteVirtualFile(path, file)
if err != nil {
log.Errorf("error while dumping individual files for %s/%s: %s", job.Pod.Namespace, job.Pod.Name, err)
}
// Process individual files
for _, indPath := range indf.filepaths {
resolvedPath := replaceEnvVars(indPath, envMap)
if err := d.processSingleFile(ctx, job, indf.containerName, "", resolvedPath); err != nil {
log.Warnf("Failed to process file %q: %v", resolvedPath, err)
}
}

// Process directories
for tarFolder, dirPaths := range indf.dirpaths {
for _, dirPath := range dirPaths {
resolvedPath := replaceEnvVars(dirPath, envMap)
if err := d.processDir(ctx, job, indf.containerName, tarFolder, resolvedPath); err != nil {
log.Warnf("Skipping directory %q: %v", resolvedPath, err)
}
}
}
}
}

func (d *Dumper) getFileFromPod(ctx context.Context, pod corev1.Pod, filepath, containerName string) ([]byte, error) {
if len(filepath) == 0 || len(containerName) == 0 {
return nil, errors.New("container name or filepath is not specified")
}
func (d *Dumper) processSingleFile(
ctx context.Context,
job exportJob,
container, tarFolder, filePath string,
) error {

cmd := []string{"tar", "cf", "-", filepath}
stdout, stderr, err := d.executeInPod(ctx, cmd, pod, containerName, nil)
tr, rc, err := d.tarFromPod(ctx, job.Pod, container, filePath)
if err != nil {
return nil, fmt.Errorf("failed to execute command in Pod: stderr: %s: %w", &stderr, err)
return fmt.Errorf("exec tar: %w", err)
}
defer rc.Close()

tarReader := tar.NewReader(&stdout)
var fileContentBuffer bytes.Buffer
for {
header, err := tarReader.Next()
hdr, err := tr.Next()
if err == io.EOF {
break
}
if err != nil {
return nil, fmt.Errorf("error reading tar header: %w", err)
return err
}

if header.Typeflag == tar.TypeReg && header.Name == filepath {
_, copyErr := io.Copy(&fileContentBuffer, tarReader)
if copyErr != nil {
return nil, fmt.Errorf("error copying file content: %w", copyErr)
}
if hdr.Typeflag != tar.TypeReg {
continue
}

if path.Base(hdr.Name) != path.Base(filePath) {
continue
}

dst := d.PodIndividualFilesPath(
job.Pod.Namespace,
job.Pod.Name,
path.Join(tarFolder, path.Clean(strings.TrimPrefix(filePath, "/"))),
)

return d.archive.WriteFile(dst, tr, hdr.Size)
}

return fmt.Errorf("file %q not found", filePath)
}

func (d *Dumper) processDir(
ctx context.Context,
job exportJob,
container, tarFolder, dir string,
) error {

tr, rc, err := d.tarFromPod(ctx, job.Pod, container, "-C", dir, ".")
if err != nil {
return err
Comment on lines +115 to +123
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description mentions adding a getAllFilesFromDirectory function in dumper.go, but the implementation here introduces processDir/tarFromPod instead. Consider updating the PR description (or naming) to match the actual approach so future maintainers can find the relevant code quickly.

Copilot uses AI. Check for mistakes.
}
defer rc.Close()

baseDir := path.Clean(strings.TrimPrefix(dir, "/"))

return fileContentBuffer.Bytes(), nil
for {
hdr, err := tr.Next()
if err == io.EOF {
return nil
}
if err != nil {
return err
}

if hdr.Typeflag != tar.TypeReg {
continue
}

// Preserve the relative path from the tar header while ensuring it
// cannot escape the intended destination directory.
relPath := path.Clean(hdr.Name)
// Normalize common tar prefixes like "./"
relPath = strings.TrimPrefix(relPath, "./")
// Prevent path traversal outside tarFolder by stripping leading "../"
for strings.HasPrefix(relPath, "../") {
relPath = strings.TrimPrefix(relPath, "../")
}
// Skip entries that do not resolve to a meaningful relative path
if relPath == "" || relPath == "." {
continue
}

dst := d.PodIndividualFilesPath(
job.Pod.Namespace,
job.Pod.Name,
path.Join(tarFolder, baseDir, relPath),
)

if err := d.archive.WriteFile(dst, tr, hdr.Size); err != nil {
return err
}
}
}
111 changes: 111 additions & 0 deletions src/go/pt-k8s-debug-collector/dumper/kube_utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dumper

import (
"archive/tar"
"bytes"
"context"
"errors"
Expand All @@ -10,10 +11,12 @@ import (
"net/url"
"path"
"strconv"
"strings"

log "github.com/sirupsen/logrus"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/tools/remotecommand"
Expand Down Expand Up @@ -153,3 +156,111 @@ func (d *Dumper) executeInPod(ctx context.Context, command []string, pod corev1.

return outb, errb, nil
}

// tarFromPod executes tar command in pod and returns tar reader and read closer.
func (d *Dumper) tarFromPod(
ctx context.Context,
pod corev1.Pod,
container string,
args ...string,
) (*tar.Reader, io.ReadCloser, error) {
cmd := append([]string{"tar", "cf", "-"}, args...)

stdout, err := d.executeInPodStream(ctx, cmd, pod, container, nil)
if err != nil {
return nil, nil, err
}

return tar.NewReader(stdout), stdout, nil
}

// DrainCloser wraps an io.ReadCloser to ensure proper closure of pod exec streams.
// Kubernetes SPDY transport may try to write to a closed pipe if stdout is closed
// before fully read, causing "io: read/write on closed pipe" logs.
// Close() drains the remaining data to io.Discard to avoid these errors.
type DrainCloser struct{ io.ReadCloser }

func (d DrainCloser) Close() error {
if d.ReadCloser == nil {
return nil
}
_, _ = io.Copy(io.Discard, d.ReadCloser)
err := d.ReadCloser.Close()
d.ReadCloser = nil
return err
}

// executeInPodStream executes command in pod and streams the output.
// Streaming errors are logged from the background goroutine because they can
// happen after this function has already returned the stdout reader.
func (d *Dumper) executeInPodStream(ctx context.Context, command []string, pod corev1.Pod, container string, stdin io.Reader) (io.ReadCloser, error) {
stdinFlag := stdin != nil
var stderr bytes.Buffer

req := d.clientSet.CoreV1().RESTClient().Post().
Resource("pods").
Name(pod.Name).
Namespace(pod.Namespace).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Command: command,
Stdin: stdinFlag,
Stdout: true,
Stderr: true,
TTY: false,
Container: container,
}, scheme.ParameterCodec)

exec, err := remotecommand.NewSPDYExecutor(d.restConfig, "POST", req.URL())
if err != nil {
return nil, fmt.Errorf("error creating SPDY executor: %w", err)
}

pr, pw := io.Pipe()

go func() {
if err := exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: stdin,
Stdout: pw,
Stderr: &stderr,
Tty: false,
}); err != nil && !errors.Is(err, context.Canceled) {
if stderr.Len() > 0 {
log.Errorf("error while streaming files from pod: %s (stderr: %s)", err, stderr.String())
_ = pw.CloseWithError(fmt.Errorf("%w: %s", err, strings.TrimSpace(stderr.String())))
return
}
log.Errorf("error while streaming files from pod: %s", err)
_ = pw.CloseWithError(err)
return
}

_ = pw.Close()
}()

return DrainCloser{pr}, nil
}

// ParseEnvsFromSpec parses environment variables in input string
func (d *Dumper) ParseEnvsFromSpec(ctx context.Context, namespace, podName, container, input string) (string, error) {
if !strings.Contains(input, "$") {
return input, nil
}

pod, err := d.clientSet.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return "", err
}

for _, c := range pod.Spec.Containers {
if c.Name == container {
resolved := input
for _, e := range c.Env {
resolved = strings.ReplaceAll(resolved, "$"+e.Name, e.Value)
}
return resolved, nil
}
}

return "", fmt.Errorf("container %s not found in pod %s", container, podName)
}
18 changes: 18 additions & 0 deletions src/go/pt-k8s-debug-collector/dumper/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,28 @@ import (
var resourcesRe = regexp.MustCompile(`(\w+\.(\w+).percona\.com)`)

func (d *Dumper) addPg1() error {
dirpaths := map[string][]string{
"pg_log": {"$PGBACKREST_DB_PATH/pg_log"},
}

d.individualFiles = append(d.individualFiles, individualFile{
resourceName: "pgo",
containerName: "database",
dirpaths: dirpaths,
})
return nil
}

func (d *Dumper) addPg2() error {
dirpaths := map[string][]string{
"pg_log": {"$PGDATA/log"},
}

d.individualFiles = append(d.individualFiles, individualFile{
resourceName: "pgv2",
containerName: "database",
dirpaths: dirpaths,
})
return nil
}

Expand Down
Loading
Loading