Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,27 @@ else()
add_compile_options(-Wno-unknown-pragmas)
endif()

# --- TEST PR ONLY: force SIMD compile flags so CI exercises the SIMD path ---
# Do NOT merge. This unconditionally enables arch SIMD flags to prove the
# structural CSV parser path compiles and runs green across the matrix.
string(TOLOWER "${CMAKE_SYSTEM_PROCESSOR}" _simd_arch)
if(_simd_arch MATCHES "^(x86_64|amd64|x64|i[3-6]86)$")
if(MSVC)
add_compile_options(/arch:AVX2)
else()
add_compile_options(-mavx2)
endif()
message(STATUS "TEST PR: x86 SIMD flags enabled")
elseif(_simd_arch MATCHES "^(arm64|aarch64)$")
message(STATUS "TEST PR: arm64 baseline NEON (no extra flags)")
elseif(_simd_arch MATCHES "^arm")
if(NOT MSVC)
add_compile_options(-mfpu=neon)
endif()
message(STATUS "TEST PR: arm32 NEON enabled")
endif()
# --- end TEST PR block ---

if(${BUILD_WASM})
if(NOT __SINGLE_THREADED__)
add_compile_options(-pthread)
Expand Down
2 changes: 2 additions & 0 deletions src/common/copier_config/csv_reader_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ static void bindBoolParsingOption(CSVReaderConfig& config, const std::string& op
config.option.setHeader = true;
} else if (optionName == "PARALLEL") {
config.parallel = optionValue;
} else if (optionName == "MULTILINE_PARALLEL") {
config.multilineParallel = optionValue;
} else if (optionName == "LIST_UNBRACED") {
config.option.allowUnbracedList = optionValue;
} else if (optionName == CopyConstants::IGNORE_ERRORS_OPTION_NAME) {
Expand Down
5 changes: 3 additions & 2 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,11 @@ struct CopyConstants {
static constexpr const char* TO_OPTION_NAME = "TO";

static constexpr const char* BOOL_CSV_PARSING_OPTIONS[] = {"HEADER", "PARALLEL",
"LIST_UNBRACED", "AUTODETECT", "AUTO_DETECT", CopyConstants::IGNORE_ERRORS_OPTION_NAME,
CopyConstants::SKIP_DUPLICATE_PK_OPTION_NAME};
"MULTILINE_PARALLEL", "LIST_UNBRACED", "AUTODETECT", "AUTO_DETECT",
CopyConstants::IGNORE_ERRORS_OPTION_NAME, CopyConstants::SKIP_DUPLICATE_PK_OPTION_NAME};
static constexpr bool DEFAULT_CSV_HAS_HEADER = false;
static constexpr bool DEFAULT_CSV_PARALLEL = true;
static constexpr bool DEFAULT_CSV_MULTILINE_PARALLEL = false;

// Default configuration for csv file parsing
static constexpr const char* STRING_CSV_PARSING_OPTIONS[] = {"ESCAPE", "DELIM", "DELIMITER",
Expand Down
8 changes: 6 additions & 2 deletions src/include/common/copier_config/csv_reader_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,19 @@ struct CSVOption {
struct CSVReaderConfig {
CSVOption option;
bool parallel;
bool multilineParallel;

CSVReaderConfig() : option{}, parallel{CopyConstants::DEFAULT_CSV_PARALLEL} {}
CSVReaderConfig()
: option{}, parallel{CopyConstants::DEFAULT_CSV_PARALLEL},
multilineParallel{CopyConstants::DEFAULT_CSV_MULTILINE_PARALLEL} {}
EXPLICIT_COPY_DEFAULT_MOVE(CSVReaderConfig);

static CSVReaderConfig construct(const case_insensitive_map_t<Value>& options);

private:
CSVReaderConfig(const CSVReaderConfig& other)
: option{other.option.copy()}, parallel{other.parallel} {}
: option{other.option.copy()}, parallel{other.parallel},
multilineParallel{other.multilineParallel} {}
};

} // namespace common
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ class BaseCSVReader {

template<typename Driver>
parse_result_t parseCSV(Driver&);
template<typename Driver>
parse_result_t parseCSVStructural(Driver&);

bool supportsStructuralParser() const;

inline bool isNewLine(char c) { return c == '\n' || c == '\r'; }

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#pragma once

#include <cstdint>
#include <string>
#include <vector>

#include "common/copier_config/csv_reader_config.h"
#include "common/types/types.h"

namespace lbug {

namespace main {
class ClientContext;
}

namespace processor {

enum class CSVBoundaryScannerState : uint8_t {
OutsideField,
InQuotedField,
AfterQuote,
Escaped,
CarriageReturn,
};

struct CSVParseRange {
common::idx_t fileIdx;
uint64_t startOffset;
uint64_t endOffset;
common::block_idx_t rangeIdx;
bool startsAtFileStart;
};

struct CSVBoundaryScanResult {
uint64_t fileSize = 0;
bool detectedQuotedMultiline = false;
bool detectedOversizedLogicalRow = false;
bool usePlannedRanges = false;
std::vector<CSVParseRange> ranges;
};

class CSVBoundaryScanner {
public:
static CSVBoundaryScanResult planFixedChunkOverlap(const std::string& filePath,
common::idx_t fileIdx, const common::CSVOption& option, main::ClientContext* context);
};

} // namespace processor
} // namespace lbug
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "function/table/bind_input.h"
#include "function/table/scan_file_function.h"
#include "function/table/table_function.h"
#include "processor/operator/persistent/reader/csv/csv_boundary_scanner.h"
#include "processor/operator/persistent/reader/file_error_handler.h"

namespace lbug {
Expand All @@ -16,12 +17,15 @@ class ParallelCSVReader final : public BaseCSVReader {
friend class ParallelParsingDriver;

public:
enum class ParseMode : uint8_t { FIXED_BLOCK, PLANNED_RANGE };

ParallelCSVReader(const std::string& filePath, common::idx_t fileIdx, common::CSVOption option,
CSVColumnInfo columnInfo, main::ClientContext* context,
LocalFileErrorHandler* errorHandler);

bool hasMoreToRead() const;
uint64_t parseBlock(common::block_idx_t blockIdx, common::DataChunk& resultChunk) override;
uint64_t parseRange(const CSVParseRange& range, common::DataChunk& resultChunk);
uint64_t continueBlock(common::DataChunk& resultChunk);

void reportFinishedBlock();
Expand All @@ -32,25 +36,50 @@ class ParallelCSVReader final : public BaseCSVReader {
private:
bool finishedBlock() const;
void seekToBlockStart();
void seekToRangeStart();

private:
ParseMode parseMode = ParseMode::FIXED_BLOCK;
uint64_t currentRangeStartOffset = 0;
uint64_t currentRangeEndOffset = 0;
bool currentUnitStartsAtFileStart = false;
};

struct ParallelCSVLocalState final : public function::TableFuncLocalState {
std::unique_ptr<ParallelCSVReader> reader;
std::unique_ptr<LocalFileErrorHandler> errorHandler;
common::idx_t fileIdx = common::INVALID_IDX;
uint64_t currentTaskBytes = 0;
};

struct ParallelCSVScanSharedState final : public function::ScanFileWithProgressSharedState {
struct FileScanPlan {
uint64_t fileSize = 0;
uint64_t numFixedBlocks = 0;
bool usePlannedRanges = false;
std::vector<CSVParseRange> ranges;
};

struct ParseTask {
common::idx_t fileIdx = common::INVALID_IDX;
common::block_idx_t unitIdx = 0;
uint64_t byteSize = 0;
bool usePlannedRange = false;
CSVParseRange range{};
};

common::CSVOption csvOption;
CSVColumnInfo columnInfo;
std::atomic<uint64_t> numBlocksReadByFiles = 0;
std::atomic<uint64_t> completedBytes = 0;
std::vector<SharedFileErrorHandler> errorHandlers;
populate_func_t populateErrorFunc;
std::vector<FileScanPlan> filePlans;

ParallelCSVScanSharedState(common::FileScanInfo fileScanInfo, uint64_t numRows,
main::ClientContext* context, common::CSVOption csvOption, CSVColumnInfo columnInfo);
main::ClientContext* context, common::CSVOption csvOption, CSVColumnInfo columnInfo,
std::vector<FileScanPlan> filePlans);

void setFileComplete(uint64_t completedFileIdx);
ParseTask getNextTask();
populate_func_t constructPopulateFunc();
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
add_library(lbug_processor_operator_csv_reader
OBJECT
base_csv_reader.cpp
csv_boundary_scanner.cpp
driver.cpp
parallel_csv_reader.cpp
serial_csv_reader.cpp
Expand Down
Loading
Loading