diff --git a/helper/helper_test.go b/helper/helper_test.go index f9e0d3b83..fa10d7686 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -3,7 +3,9 @@ package helper import ( "bufio" "fmt" + "io" "os" + "strings" "github.com/greenplum-db/gpbackup/utils" "golang.org/x/sys/unix" @@ -21,7 +23,9 @@ var ( ) type restoreReaderTestImpl struct { - waitCount int + waitCount int + discardedBytes int64 + discardErr error } func (r *restoreReaderTestImpl) waitForPlugin() error { @@ -45,7 +49,16 @@ func (r *restoreReaderTestImpl) closeFileHandle() { } func (r *restoreReaderTestImpl) getReaderType() ReaderType { - return "nil" + return SUBSET +} + +func (r *restoreReaderTestImpl) discardData(num int64) (int64, error) { + if r.discardErr != nil { + return 0, r.discardErr + } + + r.discardedBytes += num + return num, nil } type helperTestStep struct { @@ -198,6 +211,47 @@ func (pt *testPluginCmd) Wait() error { func (pt *testPluginCmd) errLog() { } +type limitReader struct { + remainder int + err error +} + +func (r *limitReader) Read(p []byte) (n int, err error) { + if r.remainder <= 0 { + return 0, r.err + } + + if len(p) > r.remainder { + p = p[0:r.remainder] + } + + n = len(p) + for i := 0; i < n; i++ { + p[i] = 1 + } + r.remainder -= n + return +} + +type limitWriter struct { + remainder int +} + +func (w *limitWriter) Write(p []byte) (n int, err error) { + if w.remainder < len(p) { + n = w.remainder + } else { + n = len(p) + } + + if w.remainder == 0 { + err = io.ErrShortWrite + } + + w.remainder -= n + return +} + var _ = Describe("helper tests", func() { var pluginConfig utils.PluginConfig var isSubset bool @@ -243,7 +297,7 @@ var _ = Describe("helper tests", func() { pluginConfig.Options["restore_subset"] = "on" *onErrorContinue = true isSubset = getSubsetFlag(fileToRead, &pluginConfig) - Expect(isSubset).To(Equal(false)) + Expect(isSubset).To(Equal(true)) }) It("when restore_subset is off, --on-error-continue is false, compression \"gz\" is used", func() { pluginConfig.Options["restore_subset"] = "off" @@ -416,6 +470,52 @@ var _ = Describe("helper tests", func() { err := doRestoreAgentInternal(helper) Expect(err).To(BeNil()) }) + It("discard data if skip file is discovered with single datafile", func() { + *singleDataFile = true + *isResizeRestore = false + *tocFile = testTocFile + + writeTestTOC(testTocFile) + defer func() { + _ = os.Remove(*tocFile) + }() + + oidBatch := []oidWithBatch{ + {1 /* The first oid from TOC */, 0}, + } + + expectedScenario := []helperTestStep{ + {"mock_1_0", false, 1, true, "Can not open pipe for table 1, check_skip_file shall called, skip file exists"}, + } + + helper := newHelperTest(oidBatch, expectedScenario) + err := doRestoreAgentInternal(helper) + Expect(err).ToNot(HaveOccurred()) + Expect(helper.restoreData.discardedBytes).To(Equal(int64(18))) + }) + It("discard error data if skip file is discovered with single datafile", func() { + *singleDataFile = true + *isResizeRestore = false + *tocFile = testTocFile + + writeTestTOC(testTocFile) + defer func() { + _ = os.Remove(*tocFile) + }() + + oidBatch := []oidWithBatch{ + {1 /* The first oid from TOC */, 0}, + } + + expectedScenario := []helperTestStep{ + {"mock_1_0", false, 1, true, "Can not open pipe for table 1, check_skip_file shall called, skip file exists"}, + } + + helper := newHelperTest(oidBatch, expectedScenario) + helper.restoreData.discardErr = io.EOF + err := doRestoreAgentInternal(helper) + Expect(err).To(Equal(io.EOF)) + }) It("calls Wait in waitForPlugin doRestoreAgent for single data file", func() { *singleDataFile = true *isResizeRestore = false @@ -476,6 +576,10 @@ var _ = Describe("helper tests", func() { }) }) Describe("RestoreReader tests", func() { + AfterEach(func() { + *onErrorContinue = false + writer = nil + }) It("waitForPlugin normal completion", func() { test_cmd1 := testPluginCmd{hasProcess_: true} test_reader := new(RestoreReader) @@ -504,6 +608,109 @@ var _ = Describe("helper tests", func() { Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal(msg)) }) + It("CopyData, readerType is SUBSET. Normal completion", func() { + writer = bufio.NewWriterSize(&limitWriter{100}, 5) + + test_reader := RestoreReader{ + readerType: SUBSET, + bufReader: bufio.NewReader(&limitReader{100, io.EOF}), + } + + bytesRead, err := test_reader.copyData(18) + Expect(bytesRead).To(Equal(int64(18))) + Expect(err).ToNot(HaveOccurred()) + }) + It("CopyData, readerType is SUBSET. Error on write", func() { + *onErrorContinue = true + bufSize := 5 + toRead := int64(18) + writer = bufio.NewWriterSize(&limitWriter{7}, bufSize) + + test_reader := RestoreReader{ + readerType: SUBSET, + bufReader: bufio.NewReader(&limitReader{100, io.EOF}), + } + + bytesRead, err := test_reader.copyData(toRead) + Expect(bytesRead).To(Equal(toRead)) + Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true)) + str := fmt.Sprintf("copied %d bytes from %d: ", bufSize*2, toRead) + Expect(strings.HasPrefix(err.Error(), str)).To(Equal(true)) + + }) + It("CopyData, readerType is SUBSET. EOF", func() { + *onErrorContinue = true + writer = bufio.NewWriterSize(&limitWriter{100}, 5) + + test_reader := RestoreReader{ + readerType: SUBSET, + bufReader: bufio.NewReader(&limitReader{25, io.EOF}), + } + + bytesRead, err := test_reader.copyData(30) + Expect(bytesRead).To(Equal(int64(25))) + Expect(err).To(Equal(io.EOF)) + }) + It("CopyData, readerType is SUBSET. Error on write and EOF", func() { + *onErrorContinue = true + bufSize := 5 + toCopy := int64(30) + rLmt := int64(25) + writer = bufio.NewWriterSize(&limitWriter{7}, bufSize) + + test_reader := RestoreReader{ + readerType: SUBSET, + bufReader: bufio.NewReader(&limitReader{int(rLmt), io.EOF}), + } + + bytesRead, err := test_reader.copyData(toCopy) + Expect(bytesRead).To(Equal(rLmt)) + Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true)) + Expect(errors.Is(err, io.EOF)).To(Equal(true)) + readBeforeErr := int64(bufSize * 2) + prefix := fmt.Sprintf("discard error in copyData: discarded %d bytes from %d: ", rLmt-readBeforeErr, toCopy-readBeforeErr) + Expect(strings.HasPrefix(err.Error(), prefix)).To(Equal(true)) + strCopied := fmt.Sprintf("copied %d bytes from %d: ", readBeforeErr, toCopy) + Expect(strings.Contains(err.Error(), strCopied)).To(Equal(true)) + + bytesRead, err = test_reader.copyData(10) + Expect(bytesRead).To(Equal(int64(0))) + Expect(err.Error()).To(Equal("10 bytes to copy, but discard error has already occurred. Skipping read.")) + + bytesRead, err = test_reader.discardData(5) + Expect(bytesRead).To(Equal(int64(0))) + Expect(err.Error()).To(Equal("5 bytes to discard, but discard error has already occurred. Skipping read.")) + }) + It("CopyData, readerType is SUBSET. Error on write and on read", func() { + *onErrorContinue = true + bufSize := 5 + toCopy := int64(30) + rLmt := int64(25) + writer = bufio.NewWriterSize(&limitWriter{7}, bufSize) + + test_reader := RestoreReader{ + readerType: SUBSET, + bufReader: bufio.NewReader(&limitReader{int(rLmt), io.ErrNoProgress}), + } + + bytesRead, err := test_reader.copyData(toCopy) + Expect(bytesRead).To(Equal(rLmt)) + Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true)) + Expect(errors.Is(err, io.ErrNoProgress)).To(Equal(true)) + readBeforeErr := int64(bufSize * 2) + prefix := fmt.Sprintf("discard error in copyData: discarded %d bytes from %d: ", rLmt-readBeforeErr, toCopy-readBeforeErr) + Expect(strings.HasPrefix(err.Error(), prefix)).To(Equal(true)) + strCopied := fmt.Sprintf("copied %d bytes from %d: ", readBeforeErr, toCopy) + Expect(strings.Contains(err.Error(), strCopied)).To(Equal(true)) + + bytesRead, err = test_reader.copyData(10) + Expect(bytesRead).To(Equal(int64(0))) + Expect(err.Error()).To(Equal("10 bytes to copy, but discard error has already occurred. Skipping read.")) + + bytesRead, err = test_reader.discardData(5) + Expect(bytesRead).To(Equal(int64(0))) + Expect(err.Error()).To(Equal("5 bytes to discard, but discard error has already occurred. Skipping read.")) + }) }) }) @@ -511,13 +718,13 @@ func writeTestTOC(tocFile string) { // Write test TOC. We are not going to read data using it, so dataLength is a random number dataLength := 100 customTOC := fmt.Sprintf(`dataentries: -1: + 1: startbyte: 0 endbyte: 18 -2: + 2: startbyte: 18 endbyte: %[1]d -3: + 3: startbyte: %[1]d endbyte: %d `, dataLength+18, dataLength+18+18) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index a4c5120fe..a0374fd03 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "compress/gzip" + errorsStd "errors" "fmt" "io" "io/ioutil" @@ -54,6 +55,7 @@ type IRestoreReader interface { copyAllData() (int64, error) closeFileHandle() getReaderType() ReaderType + discardData(num int64) (int64, error) } type RestoreReader struct { @@ -62,6 +64,7 @@ type RestoreReader struct { seekReader io.ReadSeeker pluginCmd IPluginCmd readerType ReaderType + discardErr bool } // Wait for plugin process that should be already finished. This should be @@ -102,14 +105,53 @@ func (r *RestoreReader) positionReader(pos uint64, oid int) error { return nil } +func (r *RestoreReader) discardData(num int64) (int64, error) { + if r.readerType != SUBSET { + panic("discardData should be called for readerType == SUBSET only") + } + + if r.discardErr { + err := fmt.Errorf("%d bytes to discard, but discard error has already occurred. Skipping read.", num) + logVerbose(err.Error()) + return 0, err + } + + n, err := io.CopyN(io.Discard, r.bufReader, num) + if err == nil { + logVerbose(fmt.Sprintf("discarded %d bytes", n)) + } else { + r.discardErr = true + err = errors.Wrapf(err, "discarded %d bytes from %d", n, num) + logError(err.Error()) + } + return n, err +} + func (r *RestoreReader) copyData(num int64) (int64, error) { var bytesRead int64 var err error switch r.readerType { case SEEKABLE: bytesRead, err = io.CopyN(writer, r.seekReader, num) - case NONSEEKABLE, SUBSET: + case NONSEEKABLE: + bytesRead, err = io.CopyN(writer, r.bufReader, num) + case SUBSET: + if r.discardErr { + err := fmt.Errorf("%d bytes to copy, but discard error has already occurred. Skipping read.", num) + logVerbose(err.Error()) + return 0, err + } + bytesRead, err = io.CopyN(writer, r.bufReader, num) + if err != nil && err != io.EOF && *onErrorContinue { + err = errors.Wrapf(err, "copied %d bytes from %d", bytesRead, num) + bytesDiscard, errDiscard := r.discardData(num - bytesRead) + bytesRead += bytesDiscard + if errDiscard != nil { + err = errorsStd.Join(errDiscard, err) + err = errors.Wrap(err, "discard error in copyData") + } + } } return bytesRead, err } @@ -351,6 +393,10 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum)) err = nil skipOid = tableOid + if *singleDataFile && readers[contentToRestore] != nil && readers[contentToRestore].getReaderType() == SUBSET { + bytesToDiscard := int64(end[contentToRestore] - start[contentToRestore]) + _, err = readers[contentToRestore].discardData(bytesToDiscard) + } /* Close up to *copyQueue files with this tableOid */ for idx := 0; idx < *copyQueue; idx++ { batchToDelete := batchNum + idx @@ -613,7 +659,7 @@ func getSubsetFlag(fileToRead string, pluginConfig *utils.PluginConfig) bool { return false } // Helper's option does not allow to use subset - if !*isFiltered || *onErrorContinue { + if !*isFiltered { return false } // Restore subset and compression does not allow together