Skip to content
This repository was archived by the owner on Oct 2, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
9b7a7c4
ADBDEV-7890
Aug 22, 2025
f52c130
Error handling
Aug 28, 2025
0b3fe62
Fix test
Aug 29, 2025
5ca11b7
Fix test
Aug 29, 2025
e2fab0a
Decrease indentation
Aug 29, 2025
6a71906
Merge branch 'master' into ADBDEV-7890
Aug 29, 2025
5c4db1a
Revert condition
Sep 1, 2025
90d5469
Add tests
Sep 1, 2025
87e6f24
skipoid test
Sep 2, 2025
bbae9ba
Remove parenthesis
Sep 3, 2025
663f7f7
Deduplication
Sep 3, 2025
c9acd36
Fix
Sep 3, 2025
4d66f3e
Revert "Remove parenthesis"
Sep 3, 2025
68aa2ee
Remove parenthesis
Sep 3, 2025
fe9fc5a
Assert
Sep 3, 2025
edeb4d9
fmt
Sep 3, 2025
708cf62
Fix
Sep 3, 2025
852a4a5
Fix tests
Sep 3, 2025
c95ed61
BeNil -> HaveOccurred
Sep 4, 2025
cbaba38
BeforeEach
Sep 4, 2025
4b52d50
writer
Sep 4, 2025
8caf16a
AfterEach
Sep 4, 2025
d3d034b
EOF and discardError are not fatal for seekable
Sep 5, 2025
d18a663
Rename
Sep 5, 2025
3cb04fa
Rewrite test
Sep 7, 2025
82c8ef1
Improve logging
Sep 8, 2025
3b0e82e
Improve logging
Sep 8, 2025
28807a1
Simplify
Sep 8, 2025
327db88
Add comment
Sep 8, 2025
f3b8da0
split conditions
Sep 8, 2025
3cad64a
Remove parenthesis
Sep 8, 2025
84fa2c5
Merge branch 'master' into ADBDEV-7890
Sep 8, 2025
b87a05d
Revert changes about exit from doRestoreAgentInternal
Sep 9, 2025
cb926b2
Add exit on discardError
Sep 11, 2025
12094de
Don't miss CopyN error
Sep 11, 2025
52568ff
change variable name
Sep 11, 2025
8bb569f
Fix hanging
Sep 12, 2025
aa88bf0
Fix logging
Sep 15, 2025
d6663ec
Revert changes in CopyAllData
Sep 15, 2025
c644bea
Rewrite to use Join, go fmt
Sep 15, 2025
5420290
Add RestoreReader tests
Sep 15, 2025
c7c9d66
Remove discardError
Sep 16, 2025
769bddc
Replace fmt.Errorf with errors.Wrap
Sep 16, 2025
bba8d88
Fix error message
Sep 16, 2025
3154bf6
Add test for discard error. Rename discarded to discardedCount
Sep 16, 2025
90420b9
Rename
Sep 16, 2025
4c4c517
Clear discardErr
Sep 16, 2025
3a54c48
Move global variables to struct
Sep 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 213 additions & 6 deletions helper/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package helper
import (
"bufio"
"fmt"
"io"
"os"
"strings"

"github.com/greenplum-db/gpbackup/utils"
"golang.org/x/sys/unix"
Expand All @@ -21,7 +23,9 @@ var (
)

type restoreReaderTestImpl struct {
waitCount int
waitCount int
discardedBytes int64
discardErr error
}

func (r *restoreReaderTestImpl) waitForPlugin() error {
Expand All @@ -45,7 +49,16 @@ func (r *restoreReaderTestImpl) closeFileHandle() {
}

func (r *restoreReaderTestImpl) getReaderType() ReaderType {
return "nil"
return SUBSET
}

func (r *restoreReaderTestImpl) discardData(num int64) (int64, error) {
if r.discardErr != nil {
return 0, r.discardErr
}

r.discardedBytes += num
return num, nil
}

type helperTestStep struct {
Expand Down Expand Up @@ -198,6 +211,47 @@ func (pt *testPluginCmd) Wait() error {
func (pt *testPluginCmd) errLog() {
}

type limitReader struct {
remainder int
err error
}

func (r *limitReader) Read(p []byte) (n int, err error) {
if r.remainder <= 0 {
return 0, r.err
}

if len(p) > r.remainder {
p = p[0:r.remainder]
}

n = len(p)
for i := 0; i < n; i++ {
p[i] = 1
}
r.remainder -= n
return
}

type limitWriter struct {
remainder int
}

func (w *limitWriter) Write(p []byte) (n int, err error) {
if w.remainder < len(p) {
n = w.remainder
} else {
n = len(p)
}

if w.remainder == 0 {
err = io.ErrShortWrite
}

w.remainder -= n
return
}

var _ = Describe("helper tests", func() {
var pluginConfig utils.PluginConfig
var isSubset bool
Expand Down Expand Up @@ -243,7 +297,7 @@ var _ = Describe("helper tests", func() {
pluginConfig.Options["restore_subset"] = "on"
*onErrorContinue = true
isSubset = getSubsetFlag(fileToRead, &pluginConfig)
Expect(isSubset).To(Equal(false))
Expect(isSubset).To(Equal(true))
})
It("when restore_subset is off, --on-error-continue is false, compression \"gz\" is used", func() {
pluginConfig.Options["restore_subset"] = "off"
Expand Down Expand Up @@ -416,6 +470,52 @@ var _ = Describe("helper tests", func() {
err := doRestoreAgentInternal(helper)
Expect(err).To(BeNil())
})
It("discard data if skip file is discovered with single datafile", func() {
*singleDataFile = true
*isResizeRestore = false
*tocFile = testTocFile

writeTestTOC(testTocFile)
defer func() {
_ = os.Remove(*tocFile)
}()

oidBatch := []oidWithBatch{
{1 /* The first oid from TOC */, 0},
}

expectedScenario := []helperTestStep{
{"mock_1_0", false, 1, true, "Can not open pipe for table 1, check_skip_file shall called, skip file exists"},
}

helper := newHelperTest(oidBatch, expectedScenario)
err := doRestoreAgentInternal(helper)
Expect(err).ToNot(HaveOccurred())
Expect(helper.restoreData.discardedBytes).To(Equal(int64(18)))
})
It("discard error data if skip file is discovered with single datafile", func() {
*singleDataFile = true
*isResizeRestore = false
*tocFile = testTocFile

writeTestTOC(testTocFile)
defer func() {
_ = os.Remove(*tocFile)
}()

oidBatch := []oidWithBatch{
{1 /* The first oid from TOC */, 0},
}

expectedScenario := []helperTestStep{
{"mock_1_0", false, 1, true, "Can not open pipe for table 1, check_skip_file shall called, skip file exists"},
}

helper := newHelperTest(oidBatch, expectedScenario)
helper.restoreData.discardErr = io.EOF
err := doRestoreAgentInternal(helper)
Expect(err).To(Equal(io.EOF))
})
It("calls Wait in waitForPlugin doRestoreAgent for single data file", func() {
*singleDataFile = true
*isResizeRestore = false
Expand Down Expand Up @@ -476,6 +576,10 @@ var _ = Describe("helper tests", func() {
})
})
Describe("RestoreReader tests", func() {
AfterEach(func() {
*onErrorContinue = false
writer = nil
})
It("waitForPlugin normal completion", func() {
test_cmd1 := testPluginCmd{hasProcess_: true}
test_reader := new(RestoreReader)
Expand Down Expand Up @@ -504,20 +608,123 @@ var _ = Describe("helper tests", func() {
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(msg))
})
It("CopyData, readerType is SUBSET. Normal completion", func() {
writer = bufio.NewWriterSize(&limitWriter{100}, 5)

test_reader := RestoreReader{
readerType: SUBSET,
bufReader: bufio.NewReader(&limitReader{100, io.EOF}),
}

bytesRead, err := test_reader.copyData(18)
Expect(bytesRead).To(Equal(int64(18)))
Expect(err).ToNot(HaveOccurred())
})
It("CopyData, readerType is SUBSET. Error on write", func() {
*onErrorContinue = true
bufSize := 5
toRead := int64(18)
writer = bufio.NewWriterSize(&limitWriter{7}, bufSize)

test_reader := RestoreReader{
readerType: SUBSET,
bufReader: bufio.NewReader(&limitReader{100, io.EOF}),
}

bytesRead, err := test_reader.copyData(toRead)
Expect(bytesRead).To(Equal(toRead))
Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true))
str := fmt.Sprintf("copied %d bytes from %d: ", bufSize*2, toRead)
Expect(strings.HasPrefix(err.Error(), str)).To(Equal(true))

})
It("CopyData, readerType is SUBSET. EOF", func() {
*onErrorContinue = true
writer = bufio.NewWriterSize(&limitWriter{100}, 5)

test_reader := RestoreReader{
readerType: SUBSET,
bufReader: bufio.NewReader(&limitReader{25, io.EOF}),
}

bytesRead, err := test_reader.copyData(30)
Expect(bytesRead).To(Equal(int64(25)))
Expect(err).To(Equal(io.EOF))
})
It("CopyData, readerType is SUBSET. Error on write and EOF", func() {
*onErrorContinue = true
bufSize := 5
toCopy := int64(30)
rLmt := int64(25)
writer = bufio.NewWriterSize(&limitWriter{7}, bufSize)

test_reader := RestoreReader{
readerType: SUBSET,
bufReader: bufio.NewReader(&limitReader{int(rLmt), io.EOF}),
}

bytesRead, err := test_reader.copyData(toCopy)
Expect(bytesRead).To(Equal(rLmt))
Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true))
Expect(errors.Is(err, io.EOF)).To(Equal(true))
readBeforeErr := int64(bufSize * 2)
prefix := fmt.Sprintf("discard error in copyData: discarded %d bytes from %d: ", rLmt-readBeforeErr, toCopy-readBeforeErr)
Expect(strings.HasPrefix(err.Error(), prefix)).To(Equal(true))
strCopied := fmt.Sprintf("copied %d bytes from %d: ", readBeforeErr, toCopy)
Expect(strings.Contains(err.Error(), strCopied)).To(Equal(true))

bytesRead, err = test_reader.copyData(10)
Expect(bytesRead).To(Equal(int64(0)))
Expect(err.Error()).To(Equal("10 bytes to copy, but discard error has already occurred. Skipping read."))

bytesRead, err = test_reader.discardData(5)
Expect(bytesRead).To(Equal(int64(0)))
Expect(err.Error()).To(Equal("5 bytes to discard, but discard error has already occurred. Skipping read."))
})
It("CopyData, readerType is SUBSET. Error on write and on read", func() {
*onErrorContinue = true
bufSize := 5
toCopy := int64(30)
rLmt := int64(25)
writer = bufio.NewWriterSize(&limitWriter{7}, bufSize)

test_reader := RestoreReader{
readerType: SUBSET,
bufReader: bufio.NewReader(&limitReader{int(rLmt), io.ErrNoProgress}),
}

bytesRead, err := test_reader.copyData(toCopy)
Expect(bytesRead).To(Equal(rLmt))
Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true))
Expect(errors.Is(err, io.ErrNoProgress)).To(Equal(true))
readBeforeErr := int64(bufSize * 2)
prefix := fmt.Sprintf("discard error in copyData: discarded %d bytes from %d: ", rLmt-readBeforeErr, toCopy-readBeforeErr)
Expect(strings.HasPrefix(err.Error(), prefix)).To(Equal(true))
strCopied := fmt.Sprintf("copied %d bytes from %d: ", readBeforeErr, toCopy)
Expect(strings.Contains(err.Error(), strCopied)).To(Equal(true))

bytesRead, err = test_reader.copyData(10)
Expect(bytesRead).To(Equal(int64(0)))
Expect(err.Error()).To(Equal("10 bytes to copy, but discard error has already occurred. Skipping read."))

bytesRead, err = test_reader.discardData(5)
Expect(bytesRead).To(Equal(int64(0)))
Expect(err.Error()).To(Equal("5 bytes to discard, but discard error has already occurred. Skipping read."))
})
})
})

func writeTestTOC(tocFile string) {
// Write test TOC. We are not going to read data using it, so dataLength is a random number
dataLength := 100
customTOC := fmt.Sprintf(`dataentries:
1:
1:
startbyte: 0
endbyte: 18
2:
2:
startbyte: 18
endbyte: %[1]d
3:
3:
startbyte: %[1]d
endbyte: %d
`, dataLength+18, dataLength+18+18)
Expand Down
50 changes: 48 additions & 2 deletions helper/restore_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"compress/gzip"
errorsStd "errors"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -54,6 +55,7 @@ type IRestoreReader interface {
copyAllData() (int64, error)
closeFileHandle()
getReaderType() ReaderType
discardData(num int64) (int64, error)
}

type RestoreReader struct {
Expand All @@ -62,6 +64,7 @@ type RestoreReader struct {
seekReader io.ReadSeeker
pluginCmd IPluginCmd
readerType ReaderType
discardErr bool
}

// Wait for plugin process that should be already finished. This should be
Expand Down Expand Up @@ -102,14 +105,53 @@ func (r *RestoreReader) positionReader(pos uint64, oid int) error {
return nil
}

func (r *RestoreReader) discardData(num int64) (int64, error) {
if r.readerType != SUBSET {
panic("discardData should be called for readerType == SUBSET only")
}

if r.discardErr {
err := fmt.Errorf("%d bytes to discard, but discard error has already occurred. Skipping read.", num)
logVerbose(err.Error())
return 0, err
}

n, err := io.CopyN(io.Discard, r.bufReader, num)
if err == nil {
logVerbose(fmt.Sprintf("discarded %d bytes", n))
} else {
r.discardErr = true
err = errors.Wrapf(err, "discarded %d bytes from %d", n, num)
logError(err.Error())
}
return n, err
}

func (r *RestoreReader) copyData(num int64) (int64, error) {
var bytesRead int64
var err error
switch r.readerType {
case SEEKABLE:
bytesRead, err = io.CopyN(writer, r.seekReader, num)
case NONSEEKABLE, SUBSET:
case NONSEEKABLE:
bytesRead, err = io.CopyN(writer, r.bufReader, num)
case SUBSET:
if r.discardErr {
err := fmt.Errorf("%d bytes to copy, but discard error has already occurred. Skipping read.", num)
logVerbose(err.Error())
return 0, err
}

bytesRead, err = io.CopyN(writer, r.bufReader, num)
if err != nil && err != io.EOF && *onErrorContinue {
err = errors.Wrapf(err, "copied %d bytes from %d", bytesRead, num)
bytesDiscard, errDiscard := r.discardData(num - bytesRead)
bytesRead += bytesDiscard
if errDiscard != nil {
err = errorsStd.Join(errDiscard, err)
err = errors.Wrap(err, "discard error in copyData")
}
}
}
return bytesRead, err
}
Expand Down Expand Up @@ -351,6 +393,10 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error {
logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum))
err = nil
skipOid = tableOid
if *singleDataFile && readers[contentToRestore] != nil && readers[contentToRestore].getReaderType() == SUBSET {
bytesToDiscard := int64(end[contentToRestore] - start[contentToRestore])
_, err = readers[contentToRestore].discardData(bytesToDiscard)
}
/* Close up to *copyQueue files with this tableOid */
for idx := 0; idx < *copyQueue; idx++ {
batchToDelete := batchNum + idx
Expand Down Expand Up @@ -613,7 +659,7 @@ func getSubsetFlag(fileToRead string, pluginConfig *utils.PluginConfig) bool {
return false
}
// Helper's option does not allow to use subset
if !*isFiltered || *onErrorContinue {
if !*isFiltered {
return false
}
// Restore subset and compression does not allow together
Expand Down