diff --git a/dataset b/dataset index 1cca9ae642..d8c7946bbc 160000 --- a/dataset +++ b/dataset @@ -1 +1 @@ -Subproject commit 1cca9ae642991d0c57a19070584f1d9958b812d0 +Subproject commit d8c7946bbc0d269d2a2bb6de9661c2f907e0d2e5 diff --git a/src/common/copier_config/csv_reader_config.cpp b/src/common/copier_config/csv_reader_config.cpp index c5a74cc6af..885815c634 100644 --- a/src/common/copier_config/csv_reader_config.cpp +++ b/src/common/copier_config/csv_reader_config.cpp @@ -29,6 +29,8 @@ static void bindBoolParsingOption(CSVReaderConfig& config, const std::string& op config.option.setHeader = true; } else if (optionName == "PARALLEL") { config.parallel = optionValue; + } else if (optionName == "MULTILINE_PARALLEL") { + config.multilineParallel = optionValue; } else if (optionName == "LIST_UNBRACED") { config.option.allowUnbracedList = optionValue; } else if (optionName == CopyConstants::IGNORE_ERRORS_OPTION_NAME) { diff --git a/src/include/common/constants.h b/src/include/common/constants.h index 3f0e9430e3..6cf9d543e3 100644 --- a/src/include/common/constants.h +++ b/src/include/common/constants.h @@ -115,10 +115,11 @@ struct CopyConstants { static constexpr const char* TO_OPTION_NAME = "TO"; static constexpr const char* BOOL_CSV_PARSING_OPTIONS[] = {"HEADER", "PARALLEL", - "LIST_UNBRACED", "AUTODETECT", "AUTO_DETECT", CopyConstants::IGNORE_ERRORS_OPTION_NAME, - CopyConstants::SKIP_DUPLICATE_PK_OPTION_NAME}; + "MULTILINE_PARALLEL", "LIST_UNBRACED", "AUTODETECT", "AUTO_DETECT", + CopyConstants::IGNORE_ERRORS_OPTION_NAME, CopyConstants::SKIP_DUPLICATE_PK_OPTION_NAME}; static constexpr bool DEFAULT_CSV_HAS_HEADER = false; static constexpr bool DEFAULT_CSV_PARALLEL = true; + static constexpr bool DEFAULT_CSV_MULTILINE_PARALLEL = false; // Default configuration for csv file parsing static constexpr const char* STRING_CSV_PARSING_OPTIONS[] = {"ESCAPE", "DELIM", "DELIMITER", diff --git a/src/include/common/copier_config/csv_reader_config.h b/src/include/common/copier_config/csv_reader_config.h index 555fd821ce..cbe0cdb2df 100644 --- a/src/include/common/copier_config/csv_reader_config.h +++ b/src/include/common/copier_config/csv_reader_config.h @@ -97,15 +97,19 @@ struct CSVOption { struct CSVReaderConfig { CSVOption option; bool parallel; + bool multilineParallel; - CSVReaderConfig() : option{}, parallel{CopyConstants::DEFAULT_CSV_PARALLEL} {} + CSVReaderConfig() + : option{}, parallel{CopyConstants::DEFAULT_CSV_PARALLEL}, + multilineParallel{CopyConstants::DEFAULT_CSV_MULTILINE_PARALLEL} {} EXPLICIT_COPY_DEFAULT_MOVE(CSVReaderConfig); static CSVReaderConfig construct(const case_insensitive_map_t& options); private: CSVReaderConfig(const CSVReaderConfig& other) - : option{other.option.copy()}, parallel{other.parallel} {} + : option{other.option.copy()}, parallel{other.parallel}, + multilineParallel{other.multilineParallel} {} }; } // namespace common diff --git a/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h b/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h index 6af57d5eae..a45a700d10 100644 --- a/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h +++ b/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h @@ -110,6 +110,10 @@ class BaseCSVReader { template parse_result_t parseCSV(Driver&); + template + parse_result_t parseCSVStructural(Driver&); + + bool supportsStructuralParser() const; inline bool isNewLine(char c) { return c == '\n' || c == '\r'; } diff --git a/src/include/processor/operator/persistent/reader/csv/csv_boundary_scanner.h b/src/include/processor/operator/persistent/reader/csv/csv_boundary_scanner.h new file mode 100644 index 0000000000..fc258faa89 --- /dev/null +++ b/src/include/processor/operator/persistent/reader/csv/csv_boundary_scanner.h @@ -0,0 +1,49 @@ +#pragma once + +#include +#include +#include + +#include "common/copier_config/csv_reader_config.h" +#include "common/types/types.h" + +namespace lbug { + +namespace main { +class ClientContext; +} + +namespace processor { + +enum class CSVBoundaryScannerState : uint8_t { + OutsideField, + InQuotedField, + AfterQuote, + Escaped, + CarriageReturn, +}; + +struct CSVParseRange { + common::idx_t fileIdx; + uint64_t startOffset; + uint64_t endOffset; + common::block_idx_t rangeIdx; + bool startsAtFileStart; +}; + +struct CSVBoundaryScanResult { + uint64_t fileSize = 0; + bool detectedQuotedMultiline = false; + bool detectedOversizedLogicalRow = false; + bool usePlannedRanges = false; + std::vector ranges; +}; + +class CSVBoundaryScanner { +public: + static CSVBoundaryScanResult planFixedChunkOverlap(const std::string& filePath, + common::idx_t fileIdx, const common::CSVOption& option, main::ClientContext* context); +}; + +} // namespace processor +} // namespace lbug diff --git a/src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h b/src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h index 0fceb09ab4..c8a9abebeb 100644 --- a/src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h +++ b/src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h @@ -6,6 +6,7 @@ #include "function/table/bind_input.h" #include "function/table/scan_file_function.h" #include "function/table/table_function.h" +#include "processor/operator/persistent/reader/csv/csv_boundary_scanner.h" #include "processor/operator/persistent/reader/file_error_handler.h" namespace lbug { @@ -16,12 +17,15 @@ class ParallelCSVReader final : public BaseCSVReader { friend class ParallelParsingDriver; public: + enum class ParseMode : uint8_t { FIXED_BLOCK, PLANNED_RANGE }; + ParallelCSVReader(const std::string& filePath, common::idx_t fileIdx, common::CSVOption option, CSVColumnInfo columnInfo, main::ClientContext* context, LocalFileErrorHandler* errorHandler); bool hasMoreToRead() const; uint64_t parseBlock(common::block_idx_t blockIdx, common::DataChunk& resultChunk) override; + uint64_t parseRange(const CSVParseRange& range, common::DataChunk& resultChunk); uint64_t continueBlock(common::DataChunk& resultChunk); void reportFinishedBlock(); @@ -32,25 +36,50 @@ class ParallelCSVReader final : public BaseCSVReader { private: bool finishedBlock() const; void seekToBlockStart(); + void seekToRangeStart(); + +private: + ParseMode parseMode = ParseMode::FIXED_BLOCK; + uint64_t currentRangeStartOffset = 0; + uint64_t currentRangeEndOffset = 0; + bool currentUnitStartsAtFileStart = false; }; struct ParallelCSVLocalState final : public function::TableFuncLocalState { std::unique_ptr reader; std::unique_ptr errorHandler; common::idx_t fileIdx = common::INVALID_IDX; + uint64_t currentTaskBytes = 0; }; struct ParallelCSVScanSharedState final : public function::ScanFileWithProgressSharedState { + struct FileScanPlan { + uint64_t fileSize = 0; + uint64_t numFixedBlocks = 0; + bool usePlannedRanges = false; + std::vector ranges; + }; + + struct ParseTask { + common::idx_t fileIdx = common::INVALID_IDX; + common::block_idx_t unitIdx = 0; + uint64_t byteSize = 0; + bool usePlannedRange = false; + CSVParseRange range{}; + }; + common::CSVOption csvOption; CSVColumnInfo columnInfo; - std::atomic numBlocksReadByFiles = 0; + std::atomic completedBytes = 0; std::vector errorHandlers; populate_func_t populateErrorFunc; + std::vector filePlans; ParallelCSVScanSharedState(common::FileScanInfo fileScanInfo, uint64_t numRows, - main::ClientContext* context, common::CSVOption csvOption, CSVColumnInfo columnInfo); + main::ClientContext* context, common::CSVOption csvOption, CSVColumnInfo columnInfo, + std::vector filePlans); - void setFileComplete(uint64_t completedFileIdx); + ParseTask getNextTask(); populate_func_t constructPopulateFunc(); }; diff --git a/src/processor/operator/persistent/reader/csv/CMakeLists.txt b/src/processor/operator/persistent/reader/csv/CMakeLists.txt index 23592dfd1d..39bd290658 100644 --- a/src/processor/operator/persistent/reader/csv/CMakeLists.txt +++ b/src/processor/operator/persistent/reader/csv/CMakeLists.txt @@ -1,6 +1,7 @@ add_library(lbug_processor_operator_csv_reader OBJECT base_csv_reader.cpp + csv_boundary_scanner.cpp driver.cpp parallel_csv_reader.cpp serial_csv_reader.cpp diff --git a/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp index b589892614..9db3b127b8 100644 --- a/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp @@ -12,6 +12,13 @@ #include "utf8proc_wrapper.h" #include +#if defined(__x86_64__) || defined(_M_X64) +#include +#endif +#if defined(__ARM_NEON) || defined(__ARM_NEON__) +#include +#endif + using namespace lbug::common; namespace lbug { @@ -518,11 +525,299 @@ BaseCSVReader::parse_result_t BaseCSVReader::parseCSV(Driver& driver) { return {curRowIdx, numErrors}; } column++; - } else if (column > 0) { - // File ends right after a delimiter with an empty trailing field. - // Without this, a row like "a,b," (no trailing newline) loses its last - // empty field and undercounts columns, causing "expected N, got N-1". - if (!addValue(driver, curRowIdx, column, std::string_view{}, escapePositions)) { + } + if (column > 0) { + curRowIdx += driver.addRow(curRowIdx, column, + getOptionalWarningData(columnInfo, option, getWarningSourceData())); + } + return {curRowIdx, numErrors}; + ignore_error: + // we skip the current row then restart the state machine to continue parsing + skipCurrentLine(); + if (driver.done(curRowIdx)) { + return {curRowIdx, numErrors}; + } + continue; + } + UNREACHABLE_CODE; +} + +bool BaseCSVReader::supportsStructuralParser() const { + return option.delimiter == CopyConstants::DEFAULT_CSV_DELIMITER && + option.quoteChar == CopyConstants::DEFAULT_CSV_QUOTE_CHAR && + option.escapeChar == CopyConstants::DEFAULT_CSV_ESCAPE_CHAR && !option.ignoreErrors; +} + +static uint64_t findNextCSVStructuralByteScalar(const char* buffer, uint64_t position, + uint64_t bufferSize, const CSVOption& option, bool inQuotedField) { + for (; position < bufferSize; ++position) { + const auto c = buffer[position]; + if (inQuotedField) { + if (c == option.quoteChar || c == option.escapeChar || c == '\r' || c == '\n') { + return position; + } + } else if (c == option.delimiter || c == '\r' || c == '\n') { + return position; + } + } + return bufferSize; +} + +#if defined(__x86_64__) || defined(_M_X64) +#if defined(__GNUC__) || defined(__clang__) +__attribute__((target("avx2"))) +#endif +static uint64_t +findNextCSVStructuralByteAVX2(const char* buffer, uint64_t position, uint64_t bufferSize, + const CSVOption& option, bool inQuotedField) { + const auto quote = _mm256_set1_epi8(option.quoteChar); + const auto escape = _mm256_set1_epi8(option.escapeChar); + const auto delimiter = _mm256_set1_epi8(option.delimiter); + const auto carriageReturn = _mm256_set1_epi8('\r'); + const auto newline = _mm256_set1_epi8('\n'); + + for (; position + 32 <= bufferSize; position += 32) { + const auto bytes = _mm256_loadu_si256(reinterpret_cast(buffer + position)); + auto mask = _mm256_movemask_epi8(_mm256_or_si256(_mm256_cmpeq_epi8(bytes, carriageReturn), + _mm256_cmpeq_epi8(bytes, newline))); + if (inQuotedField) { + mask |= _mm256_movemask_epi8( + _mm256_or_si256(_mm256_cmpeq_epi8(bytes, quote), _mm256_cmpeq_epi8(bytes, escape))); + } else { + mask |= _mm256_movemask_epi8(_mm256_cmpeq_epi8(bytes, delimiter)); + } + if (mask != 0) { + return position + static_cast(__builtin_ctz(static_cast(mask))); + } + } + return findNextCSVStructuralByteScalar(buffer, position, bufferSize, option, inQuotedField); +} + +static bool hasAVX2() { +#if defined(__GNUC__) || defined(__clang__) + return __builtin_cpu_supports("avx2"); +#else + return false; +#endif +} +#endif + +#if defined(__ARM_NEON) || defined(__ARM_NEON__) +static uint64_t findFirstNeonMatch(uint64_t position, const uint8_t* matches) { + for (auto i = 0u; i < 16; ++i) { + if (matches[i] != 0) { + return position + i; + } + } + return position + 16; +} + +static uint64_t findNextCSVStructuralByteNEON(const char* buffer, uint64_t position, + uint64_t bufferSize, const CSVOption& option, bool inQuotedField) { + const auto quote = vdupq_n_u8(option.quoteChar); + const auto escape = vdupq_n_u8(option.escapeChar); + const auto delimiter = vdupq_n_u8(option.delimiter); + const auto carriageReturn = vdupq_n_u8('\r'); + const auto newline = vdupq_n_u8('\n'); + + alignas(16) uint8_t matches[16]; + for (; position + 16 <= bufferSize; position += 16) { + const auto bytes = vld1q_u8(reinterpret_cast(buffer + position)); + auto mask = vorrq_u8(vceqq_u8(bytes, carriageReturn), vceqq_u8(bytes, newline)); + if (inQuotedField) { + mask = vorrq_u8(mask, vorrq_u8(vceqq_u8(bytes, quote), vceqq_u8(bytes, escape))); + } else { + mask = vorrq_u8(mask, vceqq_u8(bytes, delimiter)); + } + if (vmaxvq_u8(mask) != 0) { + vst1q_u8(matches, mask); + return findFirstNeonMatch(position, matches); + } + } + return findNextCSVStructuralByteScalar(buffer, position, bufferSize, option, inQuotedField); +} +#endif + +static uint64_t findNextCSVStructuralByte(const char* buffer, uint64_t position, + uint64_t bufferSize, const CSVOption& option, bool inQuotedField) { +#if defined(__x86_64__) || defined(_M_X64) + static const bool avx2 = hasAVX2(); + if (avx2) { + return findNextCSVStructuralByteAVX2(buffer, position, bufferSize, option, inQuotedField); + } +#endif +#if defined(__ARM_NEON) || defined(__ARM_NEON__) + return findNextCSVStructuralByteNEON(buffer, position, bufferSize, option, inQuotedField); +#else + return findNextCSVStructuralByteScalar(buffer, position, bufferSize, option, inQuotedField); +#endif +} + +template +BaseCSVReader::parse_result_t BaseCSVReader::parseCSVStructural(Driver& driver) { + DASSERT(nullptr != errorHandler); + DASSERT(supportsStructuralParser()); + + curRowIdx = 0; + numErrors = 0; + + while (true) { + column_id_t column = 0; + auto start = position.load(); + bool hasQuotes = false; + std::vector escapePositions; + lineContext.setNewLine(getFileOffset()); + + if (!maybeReadBuffer(&start)) { + return {curRowIdx, numErrors}; + } + + goto value_start; + value_start: + if (buffer[position] == option.quoteChar) { + start = position + 1; + hasQuotes = true; + ++position; + goto in_quotes; + } + start = position; + hasQuotes = false; + goto normal; + normal: + do { + position = findNextCSVStructuralByte(buffer.get(), position, bufferSize, option, false); + if (position >= bufferSize) { + continue; + } + if (buffer[position] == option.delimiter) { + goto add_value; + } + if (isNewLine(buffer[position])) { + goto add_row; + } + } while (readBuffer(&start)); + goto final_state; + add_value: + DASSERT(buffer[position] == option.delimiter || + buffer[position] == CopyConstants::DEFAULT_CSV_LIST_END_CHAR); + if (!addValue(driver, curRowIdx, column, + std::string_view(buffer.get() + start, position - start - hasQuotes), + escapePositions)) { + goto ignore_error; + } + column++; + ++position; + start = position; + if (!maybeReadBuffer(&start)) { + goto final_state; + } + goto value_start; + add_row: { + DASSERT(isNewLine(buffer[position])); + lineContext.setEndOfLine(getFileOffset()); + bool isCarriageReturn = buffer[position] == '\r'; + if (!addValue(driver, curRowIdx, column, + std::string_view(buffer.get() + start, position - start - hasQuotes), + escapePositions)) { + goto ignore_error; + } + column++; + + curRowIdx += driver.addRow(curRowIdx, column, + getOptionalWarningData(columnInfo, option, getWarningSourceData())); + + column = 0; + position++; + start = position; + lineContext.setNewLine(getFileOffset()); + if (!maybeReadBuffer(&start)) { + goto final_state; + } + if (isCarriageReturn) { + goto carriage_return; + } + if (driver.done(curRowIdx)) { + return {curRowIdx, numErrors}; + } + goto value_start; + } + in_quotes: + do { + position = findNextCSVStructuralByte(buffer.get(), position, bufferSize, option, true); + if (position >= bufferSize) { + continue; + } + if (buffer[position] == option.quoteChar) { + goto unquote; + } + if (buffer[position] == option.escapeChar) { + escapePositions.push_back(position - start); + goto handle_escape; + } + if (isNewLine(buffer[position])) { + if (!handleQuotedNewline()) { + goto ignore_error; + } + ++position; + goto in_quotes; + } + } while (readBuffer(&start)); + lineContext.setEndOfLine(getFileOffset()); + handleCopyException("unterminated quotes."); + goto ignore_error; + unquote: + DASSERT(hasQuotes && buffer[position] == option.quoteChar); + position++; + if (!maybeReadBuffer(&start)) { + goto final_state; + } + if (buffer[position] == option.quoteChar) { + escapePositions.push_back(position - start); + ++position; + goto in_quotes; + } + if (buffer[position] == option.delimiter || + buffer[position] == CopyConstants::DEFAULT_CSV_LIST_END_CHAR) { + goto add_value; + } + if (isNewLine(buffer[position])) { + goto add_row; + } + handleCopyException("quote should be followed by " + "end of file, end of value, end of " + "row or another quote."); + goto ignore_error; + handle_escape: + position++; + if (!maybeReadBuffer(&start)) { + lineContext.setEndOfLine(getFileOffset()); + handleCopyException("escape at end of file."); + goto ignore_error; + } + if (buffer[position] != option.quoteChar && buffer[position] != option.escapeChar) { + ++position; + handleCopyException("neither QUOTE nor ESCAPE is proceeded by ESCAPE."); + goto ignore_error; + } + ++position; + goto in_quotes; + carriage_return: + if (buffer[position] == '\n') { + start = ++position; + if (!maybeReadBuffer(&start)) { + goto final_state; + } + } + if (driver.done(curRowIdx)) { + return {curRowIdx, numErrors}; + } + goto value_start; + final_state: + lineContext.setEndOfLine(getFileOffset()); + if (position > start) { + if (!addValue(driver, curRowIdx, column, + std::string_view(buffer.get() + start, position - start - hasQuotes), + escapePositions)) { return {curRowIdx, numErrors}; } column++; @@ -533,7 +828,6 @@ BaseCSVReader::parse_result_t BaseCSVReader::parseCSV(Driver& driver) { } return {curRowIdx, numErrors}; ignore_error: - // we skip the current row then restart the state machine to continue parsing skipCurrentLine(); if (driver.done(curRowIdx)) { return {curRowIdx, numErrors}; @@ -578,6 +872,8 @@ common::idx_t BaseCSVReader::getFileIdxFunc(const CopyFromFileError& error) { template BaseCSVReader::parse_result_t BaseCSVReader::parseCSV( ParallelParsingDriver&); +template BaseCSVReader::parse_result_t BaseCSVReader::parseCSVStructural( + ParallelParsingDriver&); template BaseCSVReader::parse_result_t BaseCSVReader::parseCSV( SerialParsingDriver&); template BaseCSVReader::parse_result_t BaseCSVReader::parseCSV( diff --git a/src/processor/operator/persistent/reader/csv/csv_boundary_scanner.cpp b/src/processor/operator/persistent/reader/csv/csv_boundary_scanner.cpp new file mode 100644 index 0000000000..b43e03a87a --- /dev/null +++ b/src/processor/operator/persistent/reader/csv/csv_boundary_scanner.cpp @@ -0,0 +1,574 @@ +#include "processor/operator/persistent/reader/csv/csv_boundary_scanner.h" + +#include +#include + +#include "common/constants.h" +#include "common/file_system/virtual_file_system.h" +#include "main/client_context.h" +#include + +#if defined(__SSE2__) +#include +#endif + +#if defined(__ARM_NEON) +#include +#endif + +namespace lbug { +namespace processor { + +using namespace common; + +namespace { + +enum class CSVBoundaryScannerCharClass : uint8_t { + Other, + Delimiter, + Quote, + Escape, + CarriageReturn, + LineFeed, + COUNT, +}; + +enum CSVBoundaryScannerAction : uint8_t { + None = 0, + MarkQuotedMultiline = 1u << 0u, + MarkLogicalBoundary = 1u << 1u, + SetFieldStartTrue = 1u << 2u, + SetFieldStartFalse = 1u << 3u, + MarkInvalid = 1u << 4u, +}; + +struct CSVBoundaryScannerTransition { + CSVBoundaryScannerState nextState; + uint8_t actions; +}; + +struct CSVBoundaryScannerMasks { + uint32_t interesting = 0; +}; + +enum class CSVBoundaryScannerSeed : uint8_t { + OutsideFieldStart, + OutsideFieldMiddle, + InQuotedField, + AfterQuote, + Escaped, + CarriageReturn, + COUNT, +}; + +struct CSVOverlapBoundaryResult { + bool foundBoundary = false; + bool detectedQuotedMultiline = false; + bool sawInvalidQuotedTransition = false; + uint64_t boundaryOffset = 0; +}; + +struct CSVOverlapSeedBoundaryResult { + bool usable = false; + bool hasBoundaryAtOrAfterCut = false; + bool detectedQuotedMultiline = false; + bool sawInvalidQuotedTransition = false; + uint64_t boundaryOffset = 0; +}; + +std::vector makeSingleFileRange(idx_t fileIdx, uint64_t fileSize) { + std::vector ranges; + if (fileSize > 0) { + ranges.push_back(CSVParseRange{fileIdx, 0, fileSize, 0, true}); + } + return ranges; +} + +struct CSVAdjustedChunkRanges { + std::vector ranges; + bool detectedOversizedLogicalRow = false; +}; + +struct CSVFileBoundaryProperties { + bool detectedQuotedMultiline = false; + bool detectedOversizedLogicalRow = false; +}; + +CSVAdjustedChunkRanges makeAdjustedChunkRanges(idx_t fileIdx, uint64_t fileSize, + const std::vector& adjustedBoundaries) { + CSVAdjustedChunkRanges result; + if (fileSize == 0) { + return result; + } + uint64_t currentRangeStart = 0; + uint64_t previousBoundary = 0; + block_idx_t nextRangeIdx = 0; + for (auto boundaryOffset : adjustedBoundaries) { + if (boundaryOffset <= currentRangeStart) { + continue; + } + result.detectedOversizedLogicalRow = + result.detectedOversizedLogicalRow || + boundaryOffset - previousBoundary > CopyConstants::PARALLEL_BLOCK_SIZE; + previousBoundary = boundaryOffset; + result.ranges.push_back(CSVParseRange{fileIdx, currentRangeStart, boundaryOffset, + nextRangeIdx++, currentRangeStart == 0}); + currentRangeStart = boundaryOffset; + } + result.detectedOversizedLogicalRow = + result.detectedOversizedLogicalRow || + fileSize - previousBoundary > CopyConstants::PARALLEL_BLOCK_SIZE; + if (fileSize > currentRangeStart) { + result.ranges.push_back(CSVParseRange{fileIdx, currentRangeStart, fileSize, nextRangeIdx++, + currentRangeStart == 0}); + } + return result; +} + +struct CSVBoundaryScannerRuntimeState { + CSVBoundaryScannerState state = CSVBoundaryScannerState::OutsideField; + bool atFieldStart = true; + uint64_t pendingCarriageReturnOffset = 0; + bool sawInvalidQuotedTransition = false; +}; + +CSVBoundaryScannerRuntimeState runtimeStateFromSeed(CSVBoundaryScannerSeed seed, + uint64_t chunkOffset) { + CSVBoundaryScannerRuntimeState state; + switch (seed) { + case CSVBoundaryScannerSeed::OutsideFieldStart: + state.state = CSVBoundaryScannerState::OutsideField; + state.atFieldStart = true; + break; + case CSVBoundaryScannerSeed::OutsideFieldMiddle: + state.state = CSVBoundaryScannerState::OutsideField; + state.atFieldStart = false; + break; + case CSVBoundaryScannerSeed::InQuotedField: + state.state = CSVBoundaryScannerState::InQuotedField; + state.atFieldStart = false; + break; + case CSVBoundaryScannerSeed::AfterQuote: + state.state = CSVBoundaryScannerState::AfterQuote; + state.atFieldStart = false; + break; + case CSVBoundaryScannerSeed::Escaped: + state.state = CSVBoundaryScannerState::Escaped; + state.atFieldStart = false; + break; + case CSVBoundaryScannerSeed::CarriageReturn: + state.state = CSVBoundaryScannerState::CarriageReturn; + state.atFieldStart = true; + state.pendingCarriageReturnOffset = chunkOffset == 0 ? 0 : chunkOffset - 1; + break; + case CSVBoundaryScannerSeed::COUNT: + UNREACHABLE_CODE; + } + return state; +} + +class CSVBoundaryScannerFSM { +public: + explicit CSVBoundaryScannerFSM(const CSVOption& option) : classTable{buildClassTable(option)} {} + + template + void consumeBoringSpan(uint64_t spanLength, BoundaryFunc&& onBoundary, + CSVBoundaryScannerRuntimeState& runtimeState) const { + if (spanLength == 0) { + return; + } + switch (runtimeState.state) { + case CSVBoundaryScannerState::OutsideField: + runtimeState.atFieldStart = false; + break; + case CSVBoundaryScannerState::InQuotedField: + break; + case CSVBoundaryScannerState::AfterQuote: + runtimeState.sawInvalidQuotedTransition = true; + runtimeState.state = CSVBoundaryScannerState::OutsideField; + runtimeState.atFieldStart = false; + break; + case CSVBoundaryScannerState::Escaped: + runtimeState.sawInvalidQuotedTransition = true; + runtimeState.state = CSVBoundaryScannerState::InQuotedField; + break; + case CSVBoundaryScannerState::CarriageReturn: + onBoundary(runtimeState.pendingCarriageReturnOffset + 1); + runtimeState.state = CSVBoundaryScannerState::OutsideField; + runtimeState.atFieldStart = false; + break; + } + } + + template + void step(char c, uint64_t absoluteOffset, BoundaryFunc&& onBoundary, + bool& detectedQuotedMultiline, CSVBoundaryScannerRuntimeState& runtimeState) const { + auto cls = getCharClass(c); + if (runtimeState.state == CSVBoundaryScannerState::CarriageReturn && + cls == CSVBoundaryScannerCharClass::LineFeed) { + onBoundary(absoluteOffset + 1); + runtimeState.state = CSVBoundaryScannerState::OutsideField; + runtimeState.atFieldStart = true; + return; + } + if (runtimeState.state == CSVBoundaryScannerState::OutsideField && + cls == CSVBoundaryScannerCharClass::Quote) { + if (runtimeState.atFieldStart) { + runtimeState.state = CSVBoundaryScannerState::InQuotedField; + runtimeState.atFieldStart = false; + } else { + runtimeState.atFieldStart = false; + } + return; + } + if (runtimeState.state == CSVBoundaryScannerState::CarriageReturn) { + onBoundary(runtimeState.pendingCarriageReturnOffset + 1); + runtimeState.state = CSVBoundaryScannerState::OutsideField; + } + + const auto transition = + TRANSITIONS[static_cast(runtimeState.state)][static_cast(cls)]; + runtimeState.state = transition.nextState; + applyActions(transition.actions, absoluteOffset, onBoundary, detectedQuotedMultiline, + runtimeState); + } + +private: + static constexpr std::array(CSVBoundaryScannerCharClass::COUNT)>, + 5> + TRANSITIONS{{ + // OutsideField + {{{CSVBoundaryScannerState::OutsideField, SetFieldStartFalse}, + {CSVBoundaryScannerState::OutsideField, SetFieldStartTrue}, + {CSVBoundaryScannerState::OutsideField, None}, + {CSVBoundaryScannerState::OutsideField, SetFieldStartFalse}, + {CSVBoundaryScannerState::CarriageReturn, SetFieldStartTrue}, + {CSVBoundaryScannerState::OutsideField, MarkLogicalBoundary | SetFieldStartTrue}}}, + // InQuotedField + {{{CSVBoundaryScannerState::InQuotedField, None}, + {CSVBoundaryScannerState::InQuotedField, None}, + {CSVBoundaryScannerState::AfterQuote, None}, + {CSVBoundaryScannerState::Escaped, None}, + {CSVBoundaryScannerState::InQuotedField, MarkQuotedMultiline}, + {CSVBoundaryScannerState::InQuotedField, MarkQuotedMultiline}}}, + // AfterQuote + {{{CSVBoundaryScannerState::OutsideField, MarkInvalid | SetFieldStartFalse}, + {CSVBoundaryScannerState::OutsideField, SetFieldStartTrue}, + {CSVBoundaryScannerState::InQuotedField, None}, + {CSVBoundaryScannerState::OutsideField, MarkInvalid | SetFieldStartFalse}, + {CSVBoundaryScannerState::CarriageReturn, SetFieldStartTrue}, + {CSVBoundaryScannerState::OutsideField, MarkLogicalBoundary | SetFieldStartTrue}}}, + // Escaped + {{{CSVBoundaryScannerState::InQuotedField, MarkInvalid}, + {CSVBoundaryScannerState::InQuotedField, MarkInvalid}, + {CSVBoundaryScannerState::InQuotedField, None}, + {CSVBoundaryScannerState::InQuotedField, None}, + {CSVBoundaryScannerState::InQuotedField, MarkInvalid | MarkQuotedMultiline}, + {CSVBoundaryScannerState::InQuotedField, MarkInvalid | MarkQuotedMultiline}}}, + // CarriageReturn + {{{CSVBoundaryScannerState::OutsideField, SetFieldStartFalse}, + {CSVBoundaryScannerState::OutsideField, SetFieldStartTrue}, + {CSVBoundaryScannerState::OutsideField, None}, + {CSVBoundaryScannerState::OutsideField, SetFieldStartFalse}, + {CSVBoundaryScannerState::CarriageReturn, SetFieldStartTrue}, + {CSVBoundaryScannerState::OutsideField, MarkLogicalBoundary | SetFieldStartTrue}}}, + }}; + + static std::array buildClassTable(const CSVOption& option) { + std::array table; + table.fill(CSVBoundaryScannerCharClass::Other); + table[static_cast('\r')] = CSVBoundaryScannerCharClass::CarriageReturn; + table[static_cast('\n')] = CSVBoundaryScannerCharClass::LineFeed; + table[static_cast(option.delimiter)] = CSVBoundaryScannerCharClass::Delimiter; + table[static_cast(option.escapeChar)] = CSVBoundaryScannerCharClass::Escape; + table[static_cast(option.quoteChar)] = CSVBoundaryScannerCharClass::Quote; + return table; + } + + CSVBoundaryScannerCharClass getCharClass(char c) const { + return classTable[static_cast(c)]; + } + + template + static void applyActions(uint8_t actions, uint64_t absoluteOffset, BoundaryFunc&& onBoundary, + bool& detectedQuotedMultiline, CSVBoundaryScannerRuntimeState& runtimeState) { + if ((actions & MarkQuotedMultiline) != 0) { + detectedQuotedMultiline = true; + } + if ((actions & MarkLogicalBoundary) != 0) { + onBoundary(absoluteOffset + 1); + } + if ((actions & SetFieldStartTrue) != 0) { + runtimeState.atFieldStart = true; + } else if ((actions & SetFieldStartFalse) != 0) { + runtimeState.atFieldStart = false; + } + if ((actions & MarkInvalid) != 0) { + runtimeState.sawInvalidQuotedTransition = true; + } + if (runtimeState.state == CSVBoundaryScannerState::CarriageReturn) { + runtimeState.pendingCarriageReturnOffset = absoluteOffset; + } + } + + std::array classTable; +}; + +CSVBoundaryScannerMasks scanScalar(const char* buffer, uint64_t bytesToRead, + const CSVOption& option) { + CSVBoundaryScannerMasks masks; + for (uint64_t i = 0; i < bytesToRead; ++i) { + const auto c = buffer[i]; + if (c == option.delimiter || c == option.quoteChar || c == option.escapeChar || c == '\r' || + c == '\n') { + masks.interesting |= (1u << i); + } + } + return masks; +} + +#if defined(__SSE2__) +CSVBoundaryScannerMasks scanSSE2(const char* buffer, const CSVOption& option) { + const auto data = _mm_loadu_si128(reinterpret_cast(buffer)); + auto mask = _mm_or_si128(_mm_cmpeq_epi8(data, _mm_set1_epi8(option.delimiter)), + _mm_cmpeq_epi8(data, _mm_set1_epi8(option.quoteChar))); + mask = _mm_or_si128(mask, _mm_cmpeq_epi8(data, _mm_set1_epi8(option.escapeChar))); + mask = _mm_or_si128(mask, _mm_cmpeq_epi8(data, _mm_set1_epi8('\r'))); + mask = _mm_or_si128(mask, _mm_cmpeq_epi8(data, _mm_set1_epi8('\n'))); + return {static_cast(_mm_movemask_epi8(mask))}; +} +#endif + +#if defined(__ARM_NEON) +CSVBoundaryScannerMasks scanNEON(const char* buffer, const CSVOption& option) { + const auto data = vld1q_u8(reinterpret_cast(buffer)); + auto mask = vorrq_u8(vceqq_u8(data, vdupq_n_u8(static_cast(option.delimiter))), + vceqq_u8(data, vdupq_n_u8(static_cast(option.quoteChar)))); + mask = vorrq_u8(mask, vceqq_u8(data, vdupq_n_u8(static_cast(option.escapeChar)))); + mask = vorrq_u8(mask, vceqq_u8(data, vdupq_n_u8(static_cast('\r')))); + mask = vorrq_u8(mask, vceqq_u8(data, vdupq_n_u8(static_cast('\n')))); + alignas(16) uint8_t bytes[16]; + vst1q_u8(bytes, mask); + uint32_t interesting = 0; + for (uint32_t i = 0; i < 16; ++i) { + if (bytes[i] != 0) { + interesting |= (1u << i); + } + } + return {interesting}; +} +#endif + +CSVBoundaryScannerMasks scanInterestingBytes(const char* buffer, uint64_t bytesToRead, + const CSVOption& option) { +#if defined(__SSE2__) + if (bytesToRead == 16) { + return scanSSE2(buffer, option); + } +#elif defined(__ARM_NEON) + if (bytesToRead == 16) { + return scanNEON(buffer, option); + } +#endif + return scanScalar(buffer, bytesToRead, option); +} + +template +void processChunk(const char* buffer, uint64_t bytesToRead, uint64_t chunkOffset, + const CSVOption& option, const CSVBoundaryScannerFSM& fsm, + CSVBoundaryScannerRuntimeState& runtimeState, bool& detectedQuotedMultiline, + BoundaryFunc&& onBoundary) { + static constexpr uint64_t SIMD_WIDTH = 16; + uint64_t i = 0; + while (i < bytesToRead) { + const auto laneWidth = std::min(SIMD_WIDTH, bytesToRead - i); + const auto masks = scanInterestingBytes(buffer + i, laneWidth, option); + if (masks.interesting == 0) { + fsm.consumeBoringSpan(laneWidth, onBoundary, runtimeState); + i += laneWidth; + continue; + } + + uint64_t lastProcessed = 0; + auto interesting = masks.interesting; + while (interesting != 0) { + const auto bit = std::countr_zero(interesting); + fsm.consumeBoringSpan(bit - lastProcessed, onBoundary, runtimeState); + const auto absoluteOffset = chunkOffset + i + bit; + fsm.step(buffer[i + bit], absoluteOffset, onBoundary, detectedQuotedMultiline, + runtimeState); + lastProcessed = bit + 1; + interesting &= interesting - 1; + } + fsm.consumeBoringSpan(laneWidth - lastProcessed, onBoundary, runtimeState); + i += laneWidth; + } +} + +CSVFileBoundaryProperties scanWholeFileProperties(FileInfo* fileInfo, uint64_t fileSize, + const CSVOption& option) { + CSVFileBoundaryProperties result; + auto buffer = std::make_unique(fileSize); + fileInfo->readFromFile(buffer.get(), fileSize, 0); + + CSVBoundaryScannerFSM fsm{option}; + auto runtimeState = runtimeStateFromSeed(CSVBoundaryScannerSeed::OutsideFieldStart, 0); + uint64_t previousBoundary = 0; + auto onBoundary = [&](uint64_t boundaryOffset) { + result.detectedOversizedLogicalRow = + result.detectedOversizedLogicalRow || + boundaryOffset - previousBoundary > CopyConstants::PARALLEL_BLOCK_SIZE; + previousBoundary = boundaryOffset; + }; + processChunk(buffer.get(), fileSize, 0, option, fsm, runtimeState, + result.detectedQuotedMultiline, onBoundary); + result.detectedOversizedLogicalRow = + result.detectedOversizedLogicalRow || + fileSize - previousBoundary > CopyConstants::PARALLEL_BLOCK_SIZE; + return result; +} + +CSVOverlapSeedBoundaryResult scanOverlapSeedForBoundary(const char* buffer, uint64_t bytesToRead, + uint64_t windowOffset, uint64_t cutOffset, uint64_t fileSize, const CSVOption& option, + const CSVBoundaryScannerFSM& fsm, CSVBoundaryScannerSeed seed, bool startsAtFileStart) { + CSVOverlapSeedBoundaryResult result; + auto runtimeState = runtimeStateFromSeed(seed, windowOffset); + bool hasBoundaryBeforeOrAtCut = startsAtFileStart; + auto onBoundary = [&](uint64_t boundaryOffset) { + if (boundaryOffset <= cutOffset) { + hasBoundaryBeforeOrAtCut = true; + } + if (boundaryOffset >= cutOffset && !result.hasBoundaryAtOrAfterCut) { + result.hasBoundaryAtOrAfterCut = true; + result.boundaryOffset = boundaryOffset; + } + }; + processChunk(buffer, bytesToRead, windowOffset, option, fsm, runtimeState, + result.detectedQuotedMultiline, onBoundary); + result.sawInvalidQuotedTransition = runtimeState.sawInvalidQuotedTransition; + if (!result.hasBoundaryAtOrAfterCut && windowOffset + bytesToRead == fileSize && + runtimeState.state != CSVBoundaryScannerState::InQuotedField && + runtimeState.state != CSVBoundaryScannerState::Escaped) { + result.hasBoundaryAtOrAfterCut = true; + result.boundaryOffset = fileSize; + } + result.usable = hasBoundaryBeforeOrAtCut && !result.sawInvalidQuotedTransition && + result.hasBoundaryAtOrAfterCut; + return result; +} + +CSVOverlapBoundaryResult scanOverlapForBoundary(FileInfo* fileInfo, uint64_t fileSize, + uint64_t cutOffset, uint64_t overlapSize, const CSVOption& option) { + const auto windowOffset = cutOffset > overlapSize ? cutOffset - overlapSize : 0; + const auto windowEnd = std::min(fileSize, cutOffset + overlapSize); + const auto bytesToRead = windowEnd - windowOffset; + auto buffer = std::make_unique(bytesToRead); + fileInfo->readFromFile(buffer.get(), bytesToRead, windowOffset); + + CSVOverlapBoundaryResult result; + CSVBoundaryScannerFSM fsm{option}; + const auto firstSeed = + windowOffset == 0 ? static_cast(CSVBoundaryScannerSeed::OutsideFieldStart) : 0; + const auto endSeed = + windowOffset == 0 ? firstSeed + 1 : static_cast(CSVBoundaryScannerSeed::COUNT); + for (auto seedIdx = firstSeed; seedIdx < endSeed; ++seedIdx) { + // CSV quoting is not self-synchronizing in general. A cut is safe only when every usable + // seed that reaches a real row boundary before the cut agrees on the same next boundary. + // Disagreement means the window is still ambiguous, so the caller expands the overlap and + // eventually falls back to a single file-sized range if ambiguity remains. + auto seedResult = + scanOverlapSeedForBoundary(buffer.get(), bytesToRead, windowOffset, cutOffset, fileSize, + option, fsm, static_cast(seedIdx), windowOffset == 0); + result.detectedQuotedMultiline = + result.detectedQuotedMultiline || seedResult.detectedQuotedMultiline; + result.sawInvalidQuotedTransition = + result.sawInvalidQuotedTransition || seedResult.sawInvalidQuotedTransition; + if (!seedResult.usable) { + continue; + } + if (!result.foundBoundary) { + result.foundBoundary = true; + result.boundaryOffset = seedResult.boundaryOffset; + } else if (result.boundaryOffset != seedResult.boundaryOffset) { + result.foundBoundary = false; + return result; + } + } + return result; +} + +} // namespace + +CSVBoundaryScanResult CSVBoundaryScanner::planFixedChunkOverlap(const std::string& filePath, + idx_t fileIdx, const CSVOption& option, main::ClientContext* context) { + auto fileInfo = VirtualFileSystem::GetUnsafe(*context)->openFile(filePath, + FileOpenFlags(FileFlags::READ_ONLY +#ifdef _WIN32 + | FileFlags::BINARY +#endif + ), + context); + CSVBoundaryScanResult result; + result.fileSize = fileInfo->getFileSize(); + if (result.fileSize == 0) { + return result; + } + + const auto blockSize = CopyConstants::PARALLEL_BLOCK_SIZE; + const auto numBlocks = (result.fileSize + blockSize - 1) / blockSize; + if (numBlocks == 1) { + const auto properties = scanWholeFileProperties(fileInfo.get(), result.fileSize, option); + result.detectedQuotedMultiline = properties.detectedQuotedMultiline; + result.detectedOversizedLogicalRow = properties.detectedOversizedLogicalRow; + result.usePlannedRanges = + result.detectedQuotedMultiline || result.detectedOversizedLogicalRow; + if (result.usePlannedRanges) { + result.ranges = makeSingleFileRange(fileIdx, result.fileSize); + } + return result; + } + std::vector adjustedBoundaries; + adjustedBoundaries.reserve(numBlocks > 0 ? numBlocks - 1 : 0); + for (uint64_t blockIdx = 1; blockIdx < numBlocks; ++blockIdx) { + const auto cutOffset = blockIdx * blockSize; + CSVOverlapBoundaryResult overlapResult; + auto currentOverlapSize = blockSize; + while (true) { + overlapResult = scanOverlapForBoundary(fileInfo.get(), result.fileSize, cutOffset, + currentOverlapSize, option); + result.detectedQuotedMultiline = + result.detectedQuotedMultiline || overlapResult.detectedQuotedMultiline; + if (overlapResult.foundBoundary) { + break; + } + if (currentOverlapSize >= result.fileSize) { + result.ranges = makeSingleFileRange(fileIdx, result.fileSize); + result.usePlannedRanges = !result.ranges.empty(); + return result; + } + currentOverlapSize = std::min(result.fileSize, currentOverlapSize * 2); + } + if (!adjustedBoundaries.empty() && + overlapResult.boundaryOffset <= adjustedBoundaries.back()) { + continue; + } + // Planned ranges must start and end at logical row boundaries. We only accept a cut when + // every usable FSM seed in the overlap window resolves to the same next boundary; otherwise + // the overlap expands, and an unresolved cut falls back to one file-sized range. + adjustedBoundaries.push_back(overlapResult.boundaryOffset); + } + auto adjustedRanges = makeAdjustedChunkRanges(fileIdx, result.fileSize, adjustedBoundaries); + result.ranges = std::move(adjustedRanges.ranges); + result.detectedOversizedLogicalRow = adjustedRanges.detectedOversizedLogicalRow; + result.usePlannedRanges = + (result.detectedQuotedMultiline || result.detectedOversizedLogicalRow) && + !result.ranges.empty(); + if (!result.usePlannedRanges) { + result.ranges.clear(); + } + return result; +} + +} // namespace processor +} // namespace lbug diff --git a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp index a49d82ae2d..1f7f843204 100644 --- a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp @@ -1,6 +1,10 @@ #include "processor/operator/persistent/reader/csv/parallel_csv_reader.h" +#include + #include "binder/binder.h" +#include "common/constants.h" +#include "common/file_system/virtual_file_system.h" #include "function/table/bind_data.h" #include "processor/execution_context.h" #include "processor/operator/persistent/reader/csv/serial_csv_reader.h" @@ -32,7 +36,9 @@ bool ParallelCSVReader::hasMoreToRead() const { } uint64_t ParallelCSVReader::parseBlock(block_idx_t blockIdx, DataChunk& resultChunk) { + parseMode = ParseMode::FIXED_BLOCK; currentBlockIdx = blockIdx; + currentUnitStartsAtFileStart = blockIdx == 0; resetNumRowsInCurrentBlock(); seekToBlockStart(); if (blockIdx == 0) { @@ -51,6 +57,31 @@ uint64_t ParallelCSVReader::parseBlock(block_idx_t blockIdx, DataChunk& resultCh return numRowsRead; } +uint64_t ParallelCSVReader::parseRange(const CSVParseRange& range, DataChunk& resultChunk) { + parseMode = ParseMode::PLANNED_RANGE; + currentBlockIdx = range.rangeIdx; + currentRangeStartOffset = range.startOffset; + currentRangeEndOffset = range.endOffset; + currentUnitStartsAtFileStart = range.startsAtFileStart; + resetNumRowsInCurrentBlock(); + seekToRangeStart(); + if (currentUnitStartsAtFileStart) { + readBOM(); + if (option.hasHeader) { + const auto [numRowsRead, numErrors] = readHeader(); + errorHandler->setHeaderNumRows(numRowsRead + numErrors); + } + } + if (finishedBlock()) { + return 0; + } + ParallelParsingDriver driver(resultChunk, this); + const auto [numRowsRead, numErrors] = + supportsStructuralParser() ? parseCSVStructural(driver) : parseCSV(driver); + increaseNumRowsInCurrentBlock(numRowsRead, numErrors); + return numRowsRead; +} + void ParallelCSVReader::reportFinishedBlock() { errorHandler->reportFinishedBlock(currentBlockIdx, getNumRowsInCurrentBlock()); } @@ -58,7 +89,10 @@ void ParallelCSVReader::reportFinishedBlock() { uint64_t ParallelCSVReader::continueBlock(DataChunk& resultChunk) { DASSERT(hasMoreToRead()); ParallelParsingDriver driver(resultChunk, this); - const auto [numRowsParsed, numErrors] = parseCSV(driver); + const auto [numRowsParsed, numErrors] = + parseMode == ParseMode::PLANNED_RANGE && supportsStructuralParser() ? + parseCSVStructural(driver) : + parseCSV(driver); increaseNumRowsInCurrentBlock(numRowsParsed, numErrors); return numRowsParsed; } @@ -107,7 +141,22 @@ void ParallelCSVReader::seekToBlockStart() { } while (readBuffer(nullptr)); } +void ParallelCSVReader::seekToRangeStart() { + if (fileInfo->seek(currentRangeStartOffset, SEEK_SET) == -1) { + handleCopyException( + std::format("Failed to seek to range {}: {}", currentBlockIdx, posixErrMessage()), + true); + } + osFileOffset = currentRangeStartOffset; + position = 0; + bufferSize = 0; + buffer.reset(); +} + bool ParallelCSVReader::handleQuotedNewline() { + if (parseMode == ParseMode::PLANNED_RANGE) { + return true; + } lineContext.setEndOfLine(getFileOffset()); handleCopyException("Quoted newlines are not supported in parallel CSV reader." " Please specify PARALLEL=FALSE in the options."); @@ -115,18 +164,24 @@ bool ParallelCSVReader::handleQuotedNewline() { } bool ParallelCSVReader::finishedBlock() const { + if (parseMode == ParseMode::PLANNED_RANGE) { + return getFileOffset() >= currentRangeEndOffset; + } // Only stop if we've ventured into the next block by at least a byte. // Use `>` because `position` points to just past the newline right now. return getFileOffset() > (currentBlockIdx + 1) * CopyConstants::PARALLEL_BLOCK_SIZE; } ParallelCSVScanSharedState::ParallelCSVScanSharedState(FileScanInfo fileScanInfo, uint64_t numRows, - main::ClientContext* context, CSVOption csvOption, CSVColumnInfo columnInfo) + main::ClientContext* context, CSVOption csvOption, CSVColumnInfo columnInfo, + std::vector filePlans) : ScanFileWithProgressSharedState{std::move(fileScanInfo), numRows, context}, - csvOption{std::move(csvOption)}, columnInfo{std::move(columnInfo)}, numBlocksReadByFiles{0} { + csvOption{std::move(csvOption)}, columnInfo{std::move(columnInfo)}, completedBytes{0}, + filePlans{std::move(filePlans)} { errorHandlers.reserve(this->fileScanInfo.getNumFiles()); for (idx_t i = 0; i < this->fileScanInfo.getNumFiles(); ++i) { errorHandlers.emplace_back(i, &mtx); + totalSize += this->filePlans[i].fileSize; } populateErrorFunc = constructPopulateFunc(); for (auto& errorHandler : errorHandlers) { @@ -153,13 +208,32 @@ populate_func_t ParallelCSVScanSharedState::constructPopulateFunc() { }; } -void ParallelCSVScanSharedState::setFileComplete(uint64_t completedFileIdx) { +ParallelCSVScanSharedState::ParseTask ParallelCSVScanSharedState::getNextTask() { std::lock_guard guard{mtx}; - if (completedFileIdx == fileIdx) { - numBlocksReadByFiles += blockIdx; + while (fileIdx < fileScanInfo.getNumFiles()) { + const auto currentFileIdx = fileIdx.load(); + const auto& plan = filePlans[currentFileIdx]; + const auto unitCount = plan.usePlannedRanges ? plan.ranges.size() : plan.numFixedBlocks; + if (blockIdx < unitCount) { + ParseTask task; + task.fileIdx = currentFileIdx; + task.unitIdx = blockIdx++; + task.usePlannedRange = plan.usePlannedRanges; + if (task.usePlannedRange) { + task.range = plan.ranges[task.unitIdx]; + task.byteSize = task.range.endOffset - task.range.startOffset; + } else { + const auto startOffset = task.unitIdx * CopyConstants::PARALLEL_BLOCK_SIZE; + const auto endOffset = std::min(plan.fileSize, + startOffset + CopyConstants::PARALLEL_BLOCK_SIZE); + task.byteSize = endOffset > startOffset ? endOffset - startOffset : 0; + } + return task; + } blockIdx = 0; fileIdx++; } + return {}; } static offset_t tableFunc(const TableFuncInput& input, TableFuncOutput& output) { @@ -178,12 +252,16 @@ static offset_t tableFunc(const TableFuncInput& input, TableFuncOutput& output) } } localState->reader->reportFinishedBlock(); + sharedState->completedBytes += localState->currentTaskBytes; + localState->currentTaskBytes = 0; } - auto [fileIdx, blockIdx] = sharedState->getNext(); - if (fileIdx == UINT64_MAX) { + auto task = sharedState->getNextTask(); + if (task.fileIdx == INVALID_IDX) { return 0; } - if (fileIdx != localState->fileIdx) { + const auto fileIdx = task.fileIdx; + localState->currentTaskBytes = task.byteSize; + if (fileIdx != localState->fileIdx || localState->reader == nullptr) { localState->fileIdx = fileIdx; localState->errorHandler = std::make_unique(&sharedState->errorHandlers[fileIdx], @@ -193,7 +271,9 @@ static offset_t tableFunc(const TableFuncInput& input, TableFuncOutput& output) fileIdx, sharedState->csvOption.copy(), sharedState->columnInfo.copy(), sharedState->context, localState->errorHandler.get()); } - auto numRowsRead = localState->reader->parseBlock(blockIdx, outputChunk); + auto numRowsRead = task.usePlannedRange ? + localState->reader->parseRange(task.range, outputChunk) : + localState->reader->parseBlock(task.unitIdx, outputChunk); // if there are any pending errors to throw, stop the parsing // the actual error will be thrown during finalize @@ -208,8 +288,9 @@ static offset_t tableFunc(const TableFuncInput& input, TableFuncOutput& output) } if (localState->reader->isEOF()) { localState->reader->reportFinishedBlock(); + sharedState->completedBytes += localState->currentTaskBytes; + localState->currentTaskBytes = 0; localState->errorHandler->finalize(); - sharedState->setFileComplete(localState->fileIdx); localState->reader = nullptr; localState->errorHandler = nullptr; } @@ -270,20 +351,43 @@ static std::unique_ptr bindFunc(main::ClientContext* context, static std::unique_ptr initSharedState( const TableFuncInitSharedStateInput& input) { auto bindData = input.bindData->constPtrCast(); - auto csvOption = CSVReaderConfig::construct(bindData->fileScanInfo.options).option; + auto csvConfig = CSVReaderConfig::construct(bindData->fileScanInfo.options); + auto csvOption = csvConfig.option.copy(); auto columnInfo = CSVColumnInfo(bindData->getNumColumns() - bindData->numWarningDataColumns, bindData->getColumnSkips(), bindData->numWarningDataColumns); - auto sharedState = std::make_unique(bindData->fileScanInfo.copy(), - 0 /* numRows */, bindData->context, csvOption.copy(), columnInfo.copy()); - - for (idx_t i = 0; i < sharedState->fileScanInfo.getNumFiles(); ++i) { - auto filePath = sharedState->fileScanInfo.filePaths[i]; - auto reader = std::make_unique(filePath, i, csvOption.copy(), - columnInfo.copy(), bindData->context, nullptr); - sharedState->totalSize += reader->getFileSize(); + std::vector filePlans; + filePlans.reserve(bindData->fileScanInfo.getNumFiles()); + for (idx_t i = 0; i < bindData->fileScanInfo.getNumFiles(); ++i) { + auto filePath = bindData->fileScanInfo.filePaths[i]; + ParallelCSVScanSharedState::FileScanPlan plan; + if (csvConfig.multilineParallel) { + auto scanResult = CSVBoundaryScanner::planFixedChunkOverlap(filePath, i, + csvOption.copy(), bindData->context); + plan.fileSize = scanResult.fileSize; + plan.usePlannedRanges = scanResult.usePlannedRanges; + if (plan.usePlannedRanges) { + plan.ranges = std::move(scanResult.ranges); + } + } else { + auto fileInfo = VirtualFileSystem::GetUnsafe(*bindData->context) + ->openFile(filePath, + FileOpenFlags(FileFlags::READ_ONLY +#ifdef _WIN32 + | FileFlags::BINARY +#endif + ), + bindData->context); + plan.fileSize = fileInfo->getFileSize(); + } + if (!plan.usePlannedRanges && plan.fileSize > 0) { + plan.numFixedBlocks = (plan.fileSize + CopyConstants::PARALLEL_BLOCK_SIZE - 1) / + CopyConstants::PARALLEL_BLOCK_SIZE; + } + filePlans.push_back(std::move(plan)); } - - return sharedState; + return std::make_unique(bindData->fileScanInfo.copy(), + 0 /* numRows */, bindData->context, csvOption.copy(), columnInfo.copy(), + std::move(filePlans)); } static std::unique_ptr initLocalState(const TableFuncInitLocalStateInput&) { @@ -300,8 +404,7 @@ static double progressFunc(TableFuncSharedState* sharedState) { if (state->totalSize == 0) { return 0.0; } - uint64_t totalReadSize = - (state->numBlocksReadByFiles + state->blockIdx) * CopyConstants::PARALLEL_BLOCK_SIZE; + uint64_t totalReadSize = state->completedBytes; if (totalReadSize > state->totalSize) { return 1.0; } diff --git a/test/copy/copy_test.cpp b/test/copy/copy_test.cpp index 307474f011..fce33697fe 100644 --- a/test/copy/copy_test.cpp +++ b/test/copy/copy_test.cpp @@ -2,6 +2,7 @@ #include #include +#include "common/constants.h" #include "common/file_system/local_file_system.h" #include "common/file_system/virtual_file_system.h" #include "graph_test/base_graph_test.h" @@ -14,6 +15,10 @@ #include "transaction/transaction_manager.h" #include +using ::testing::TestParamInfo; +using ::testing::Values; +using ::testing::WithParamInterface; + namespace lbug { namespace testing { @@ -134,6 +139,98 @@ class CopyTest : public BaseGraphTest { FlakyBufferManager* currentBM; }; +struct StructuralCSVReaderTestCase { + std::string name; + std::vector rows; + std::string query; + std::vector> expectedRows; +}; + +class StructuralCSVReaderTest : public CopyTest, + public WithParamInterface {}; + +static std::string bindStructuralCSVPath(std::string query, const std::string& filePath) { + const auto markerPos = query.find("{}"); + DASSERT(markerPos != std::string::npos); + query.replace(markerPos, 2, filePath); + return query; +} + +static StructuralCSVReaderTestCase makeLargeStructuralCSVTestCase() { + static constexpr auto numRows = common::DEFAULT_VECTOR_CAPACITY + 17; + const auto fieldBody = + std::string(common::CopyConstants::INITIAL_BUFFER_SIZE + 128, 'x') + "\n" + "tail"; + std::vector rows; + rows.reserve(numRows); + for (auto i = 0u; i < numRows; ++i) { + rows.push_back(std::to_string(i) + ",\"" + fieldBody + "\""); + } + return StructuralCSVReaderTestCase{ + "LargeContinuationAndBufferRefill", + std::move(rows), + R"(LOAD FROM "{}" (MULTILINE_PARALLEL=true, AUTO_DETECT=false) RETURN COUNT(*), MAX(SIZE(column1)))", + {{std::to_string(numRows), std::to_string(fieldBody.size())}}, + }; +} + +TEST_P(StructuralCSVReaderTest, MultilineParallelStructuralParser) { + createDBAndConn(); + const auto& testCase = GetParam(); + const auto filePath = writeCSV("structural.csv", testCase.rows); + auto result = conn->query(bindStructuralCSVPath(testCase.query, filePath)); + ASSERT_TRUE(result->isSuccess()) << result->getErrorMessage(); + for (const auto& expectedRow : testCase.expectedRows) { + ASSERT_TRUE(result->hasNext()) << testCase.name; + auto tuple = result->getNext(); + ASSERT_EQ(expectedRow.size(), tuple->len()) << testCase.name; + for (auto i = 0u; i < expectedRow.size(); ++i) { + EXPECT_EQ(expectedRow[i], tuple->getValue(i)->toString()) << testCase.name; + } + } + EXPECT_FALSE(result->hasNext()) << testCase.name; +} + +INSTANTIATE_TEST_SUITE_P(CSV, StructuralCSVReaderTest, + Values( + StructuralCSVReaderTestCase{ + "QuotedNewline", + {R"("abc +def")"}, + R"(LOAD FROM "{}" (MULTILINE_PARALLEL=true, AUTO_DETECT=false) RETURN COUNT(*), SIZE(column0))", + {{"1", "7"}}, + }, + StructuralCSVReaderTestCase{ + "QuotedCRLF", + {"1,\"abc\r\ndef\""}, + R"(LOAD FROM "{}" (MULTILINE_PARALLEL=true, AUTO_DETECT=false) RETURN COUNT(*), SIZE(column1))", + {{"1", "8"}}, + }, + StructuralCSVReaderTestCase{ + "CustomDelimiterScalarPlannedRange", + {"1;\"abc\ndef\""}, + R"(LOAD FROM "{}" (MULTILINE_PARALLEL=true, AUTO_DETECT=false, DELIM=';') RETURN COUNT(*), SIZE(column1))", + {{"1", "7"}}, + }, + StructuralCSVReaderTestCase{ + "EscapedQuotes", + {R"(1,"a ""quoted"" value",tail)"}, + R"(LOAD FROM "{}" (MULTILINE_PARALLEL=true, AUTO_DETECT=false) RETURN column0, SIZE(column1), column2)", + {{"1", "16", "tail"}}, + }, + StructuralCSVReaderTestCase{ + "MultiColumnQuotedDelimiters", + {R"(1,"first +line","middle,with,delimiters","last +line")", + R"(2,"alpha","middle +line","omega +tail")"}, + R"(LOAD FROM "{}" (MULTILINE_PARALLEL=true, AUTO_DETECT=false) RETURN column0, SIZE(column1), SIZE(column2), SIZE(column3) ORDER BY column0)", + {{"1", "10", "22", "9"}, {"2", "5", "11", "10"}}, + }, + makeLargeStructuralCSVTestCase()), + [](const TestParamInfo& info) { return info.param.name; }); + void CopyTest::BMExceptionRecoveryTest(BMExceptionRecoveryTestConfig cfg) { if (inMemMode) { failureFrequency = UINT64_MAX;