diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index ed608ae9558..d7de2c7c9d3 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -26,6 +26,10 @@ #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/vector/fuzzer/VectorFuzzer.h" +#include "velox/exec/PlanNodeStats.h" +#include "velox/experimental/cudf/exec/ToCudf.h" +// ./velox/exec/fuzzer/velox_join_fuzzer --enable_spill=false --v=1 --batch_size=100 --num_batches=1 --steps=10 --seed=2 +// ./velox/exec/fuzzer/velox_join_fuzzer --enable_spill=false --v=1 --batch_size=100 --num_batches=10 --steps=1 --seed=2 DEFINE_int32(steps, 10, "Number of plans to generate and test."); @@ -72,6 +76,21 @@ std::string makePercentageString(size_t value, size_t total) { return fmt::format("{} ({:.2f}%)", value, (double)value / total * 100); } +static std::vector kScalarTypes{ + BOOLEAN(), + TINYINT(), + SMALLINT(), + INTEGER(), + BIGINT(), + REAL(), + DOUBLE(), + VARCHAR(), + // VARBINARY(), + // TIMESTAMP(), + // DATE(), + // INTERVAL_DAY_TIME(), +}; + class JoinFuzzer { public: JoinFuzzer( @@ -216,6 +235,8 @@ JoinFuzzer::JoinFuzzer( dwrf::registerDwrfReaderFactory(); dwrf::registerDwrfWriterFactory(); + // print seed + std::cout << "Seed: " << initialSeed << std::endl; seed(initialSeed); } @@ -249,7 +270,8 @@ std::vector JoinFuzzer::generateJoinKeyTypes(int32_t numKeys) { for (auto i = 0; i < numKeys; ++i) { // Pick random scalar type. types.push_back(vectorFuzzer_.randType( - referenceQueryRunner_->supportedScalarTypes(), /*maxDepth=*/0)); + // referenceQueryRunner_->supportedScalarTypes(), /*maxDepth=*/0)); + kScalarTypes, /*maxDepth=*/0)); } return types; } @@ -273,7 +295,8 @@ std::vector JoinFuzzer::generateProbeInput( for (auto i = 0; i < numPayload; ++i) { names.push_back(fmt::format("tp{}", i + keyNames.size())); types.push_back(vectorFuzzer_.randType( - referenceQueryRunner_->supportedScalarTypes(), /*maxDepth=*/2)); + // referenceQueryRunner_->supportedScalarTypes(), /*maxDepth=*/2)); + kScalarTypes, /*maxDepth=*/0)); } const auto inputType = ROW(std::move(names), std::move(types)); @@ -307,7 +330,8 @@ std::vector JoinFuzzer::generateBuildInput( for (auto i = 0; i < numPayload; ++i) { names.push_back(fmt::format("bp{}", i + buildKeys.size())); types.push_back(vectorFuzzer_.randType( - referenceQueryRunner_->supportedScalarTypes(), /*maxDepth=*/2)); + // referenceQueryRunner_->supportedScalarTypes(), /*maxDepth=*/2)); + kScalarTypes, /*maxDepth=*/0)); } const auto rowType = ROW(std::move(names), std::move(types)); @@ -369,6 +393,12 @@ RowVectorPtr JoinFuzzer::execute( << ": " << std::endl << plan.plan->toString(true, true); + // Print the plan for debugging purposes + std::stringstream planStream; + planStream << "Plan #" << ":\n"; + planStream << plan.plan->toString(true, true); + std::cout << planStream.str() << std::endl; + test::AssertQueryBuilder builder(plan.plan); for (const auto& [planNodeId, nodeSplits] : plan.splits) { builder.splits(planNodeId, nodeSplits); @@ -400,8 +430,9 @@ RowVectorPtr JoinFuzzer::execute( TestScopedSpillInjection scopedSpillInjection(spillPct); RowVectorPtr result; + TaskStats stats; try { - result = builder.maxDrivers(2).copyResults(pool_.get()); + std::tie(result, stats) = builder.maxDrivers(2).copyResultsWithStats(pool_.get()); } catch (VeloxRuntimeError& e) { if (FLAGS_enable_oom_injection && e.errorCode() == facebook::velox::error_code::kMemCapExceeded && @@ -421,6 +452,8 @@ RowVectorPtr JoinFuzzer::execute( // avoid the potential interference of the background activities across query // executions. test::waitForAllTasksToBeDeleted(); + std::cout << exec::printPlanWithStats(*plan.plan, stats, true) + << std::endl; return result; } @@ -521,13 +554,13 @@ RowVectorPtr JoinFuzzer::testCrossProduct( std::vector altPlans; if (joinMaker.supportsTableScan()) { - altPlans.push_back(joinMaker.makeNestedLoopJoinWithTableScan( - JoinMaker::JoinOrder::NATURAL)); + // altPlans.push_back(joinMaker.makeNestedLoopJoinWithTableScan( + // JoinMaker::JoinOrder::NATURAL)); } if (joinMaker.supportsFlippingNestedLoopJoin()) { - altPlans.push_back( - joinMaker.makeNestedLoopJoin(inputType, JoinMaker::JoinOrder::FLIPPED)); + // altPlans.push_back( + // joinMaker.makeNestedLoopJoin(inputType, JoinMaker::JoinOrder::FLIPPED)); } for (const auto& altPlan : altPlans) { @@ -568,17 +601,17 @@ void addPlansForInputType( plans.push_back( joinMaker.makeMergeJoin(inputType, JoinMaker::JoinOrder::NATURAL)); if (joinMaker.supportsFlippingMergeJoin()) { - plans.push_back( - joinMaker.makeMergeJoin(inputType, JoinMaker::JoinOrder::FLIPPED)); + // plans.push_back( + // joinMaker.makeMergeJoin(inputType, JoinMaker::JoinOrder::FLIPPED)); } } if (joinMaker.supportsNestedLoopJoin()) { - plans.push_back( - joinMaker.makeNestedLoopJoin(inputType, JoinMaker::JoinOrder::NATURAL)); + // plans.push_back( + // joinMaker.makeNestedLoopJoin(inputType, JoinMaker::JoinOrder::NATURAL)); if (joinMaker.supportsFlippingNestedLoopJoin()) { - plans.push_back(joinMaker.makeNestedLoopJoin( - inputType, JoinMaker::JoinOrder::FLIPPED)); + // plans.push_back(joinMaker.makeNestedLoopJoin( + // inputType, JoinMaker::JoinOrder::FLIPPED)); } } } @@ -676,14 +709,14 @@ void JoinFuzzer::verify(core::JoinType joinType) { "" // It's a cross join, so no filter. ); - auto result = testCrossProduct( - crossJoinMaker, - JoinMaker::InputType::ENCODED, - probeSource, - buildSource); - auto flatResult = testCrossProduct( - crossJoinMaker, JoinMaker::InputType::FLAT, probeSource, buildSource); - test::assertEqualResults({result}, {flatResult}); + // auto result = testCrossProduct( + // crossJoinMaker, + // JoinMaker::InputType::ENCODED, + // probeSource, + // buildSource); + // auto flatResult = testCrossProduct( + // crossJoinMaker, JoinMaker::InputType::FLAT, probeSource, buildSource); + // test::assertEqualResults({result}, {flatResult}); } } @@ -756,8 +789,8 @@ void JoinFuzzer::verify(core::JoinType joinType) { altPlans.push_back(joinMaker.makeHashJoinWithTableScan( std::nullopt, JoinMaker::JoinOrder::FLIPPED)); // Use grouped execution. - altPlans.push_back(joinMaker.makeHashJoinWithTableScan( - numGroups, JoinMaker::JoinOrder::FLIPPED)); + // altPlans.push_back(joinMaker.makeHashJoinWithTableScan( + // numGroups, JoinMaker::JoinOrder::FLIPPED)); } if (joinMaker.supportsMergeJoin()) { @@ -770,11 +803,11 @@ void JoinFuzzer::verify(core::JoinType joinType) { } if (joinMaker.supportsNestedLoopJoin()) { - altPlans.push_back(joinMaker.makeNestedLoopJoinWithTableScan( - JoinMaker::JoinOrder::NATURAL)); + // altPlans.push_back(joinMaker.makeNestedLoopJoinWithTableScan( + // JoinMaker::JoinOrder::NATURAL)); if (joinMaker.supportsFlippingNestedLoopJoin()) { - altPlans.push_back(joinMaker.makeNestedLoopJoinWithTableScan( - JoinMaker::JoinOrder::FLIPPED)); + // altPlans.push_back(joinMaker.makeNestedLoopJoinWithTableScan( + // JoinMaker::JoinOrder::FLIPPED)); } } } diff --git a/velox/exec/fuzzer/JoinFuzzerRunner.cpp b/velox/exec/fuzzer/JoinFuzzerRunner.cpp index 7c350643793..5f3c6e45aec 100644 --- a/velox/exec/fuzzer/JoinFuzzerRunner.cpp +++ b/velox/exec/fuzzer/JoinFuzzerRunner.cpp @@ -22,6 +22,7 @@ #include "velox/exec/fuzzer/FuzzerUtil.h" #include "velox/exec/fuzzer/JoinFuzzer.h" #include "velox/exec/fuzzer/ReferenceQueryRunner.h" +#include "velox/experimental/cudf/exec/ToCudf.h" #include "velox/functions/prestosql/registration/RegistrationFunctions.h" #include "velox/parse/TypeResolver.h" #include "velox/serializers/CompactRowSerializer.h" @@ -116,5 +117,9 @@ int main(int argc, char** argv) { facebook::velox::serializer::spark::UnsafeRowVectorSerde:: registerNamedVectorSerde(); } + // Register cuDF + facebook::velox::cudf_velox::registerCudf(); + joinFuzzer(initialSeed, std::move(referenceQueryRunner)); + facebook::velox::cudf_velox::unregisterCudf(); } diff --git a/velox/exec/fuzzer/JoinMaker.cpp b/velox/exec/fuzzer/JoinMaker.cpp index 32f17c97cd7..576ad491796 100644 --- a/velox/exec/fuzzer/JoinMaker.cpp +++ b/velox/exec/fuzzer/JoinMaker.cpp @@ -391,17 +391,17 @@ JoinMaker::PlanWithSplits JoinMaker::makeHashJoin( test::PlanBuilder probeSourcePlan; test::PlanBuilder buildSourcePlan; - if (partitionStrategy == PartitionStrategy::NONE) { + // if (partitionStrategy == PartitionStrategy::NONE) { probeSourcePlan = makeJoinSourcePlan(probeSource, inputType, planNodeIdGenerator); buildSourcePlan = makeJoinSourcePlan(buildSource, inputType, planNodeIdGenerator); - } else { - probeSourcePlan = makePartitionedJoinSourcePlan( - partitionStrategy, probeSource, inputType, planNodeIdGenerator); - buildSourcePlan = makePartitionedJoinSourcePlan( - partitionStrategy, buildSource, inputType, planNodeIdGenerator); - } + // } else { + // probeSourcePlan = makePartitionedJoinSourcePlan( + // partitionStrategy, probeSource, inputType, planNodeIdGenerator); + // buildSourcePlan = makePartitionedJoinSourcePlan( + // partitionStrategy, buildSource, inputType, planNodeIdGenerator); + // } return PlanWithSplits(makeHashJoinPlan( probeSourcePlan, diff --git a/velox/exec/fuzzer/JoinMaker.h b/velox/exec/fuzzer/JoinMaker.h index 84f1c2c1f16..0f3b564d27f 100644 --- a/velox/exec/fuzzer/JoinMaker.h +++ b/velox/exec/fuzzer/JoinMaker.h @@ -207,11 +207,13 @@ class JoinMaker { bool supportsFlippingNestedLoopJoin() const; bool supportsMergeJoin() const { - return core::MergeJoinNode::isSupported(joinType_); + return false; + // return core::MergeJoinNode::isSupported(joinType_); } bool supportsNestedLoopJoin() const { - return core::NestedLoopJoinNode::isSupported(joinType_); + return false; + // return core::NestedLoopJoinNode::isSupported(joinType_); } /// Returns whether or not the types of the sources allow them to be read as diff --git a/velox/exec/tests/utils/AssertQueryBuilder.cpp b/velox/exec/tests/utils/AssertQueryBuilder.cpp index 2be9766b762..1842e1153b1 100644 --- a/velox/exec/tests/utils/AssertQueryBuilder.cpp +++ b/velox/exec/tests/utils/AssertQueryBuilder.cpp @@ -226,6 +226,14 @@ RowVectorPtr AssertQueryBuilder::copyResults(memory::MemoryPool* pool) { return copyResults(pool, unused); } +std::pair AssertQueryBuilder::copyResultsWithStats(memory::MemoryPool* pool) { + std::shared_ptr unused; + auto result = copyResults(pool, unused); + auto stats = unused->taskStats(); + return std::make_pair(result, stats); +} + + RowVectorPtr AssertQueryBuilder::copyResults( memory::MemoryPool* pool, std::shared_ptr& task) { diff --git a/velox/exec/tests/utils/AssertQueryBuilder.h b/velox/exec/tests/utils/AssertQueryBuilder.h index 16ca92389af..2376f9c30d1 100644 --- a/velox/exec/tests/utils/AssertQueryBuilder.h +++ b/velox/exec/tests/utils/AssertQueryBuilder.h @@ -178,6 +178,8 @@ class AssertQueryBuilder { /// query returns empty result. RowVectorPtr copyResults(memory::MemoryPool* pool); + std::pair copyResultsWithStats(memory::MemoryPool* pool); + /// Similar to above method and also returns the task. RowVectorPtr copyResults( memory::MemoryPool* pool, diff --git a/velox/experimental/cudf/exec/CudfHashJoin.cpp b/velox/experimental/cudf/exec/CudfHashJoin.cpp index 6761603e997..3686340540f 100644 --- a/velox/experimental/cudf/exec/CudfHashJoin.cpp +++ b/velox/experimental/cudf/exec/CudfHashJoin.cpp @@ -18,12 +18,14 @@ #include "velox/experimental/cudf/exec/ExpressionEvaluator.h" #include "velox/experimental/cudf/exec/ToCudf.h" #include "velox/experimental/cudf/exec/Utilities.h" +#include "velox/experimental/cudf/exec/VeloxCudfInterop.h" #include "velox/exec/Task.h" #include #include #include +#include #include @@ -98,6 +100,7 @@ void CudfHashJoinBuild::addInput(RowVectorPtr input) { std::cout << "Calling CudfHashJoinBuild::addInput" << std::endl; } // Queue inputs, process all at once. + // std::cout << "CHB: AddInput input->size() = " << input->size() << std::endl; if (input->size() > 0) { auto cudfInput = std::dynamic_pointer_cast(input); VELOX_CHECK_NOT_NULL(cudfInput); @@ -147,7 +150,14 @@ void CudfHashJoinBuild::noMoreInput() { }; auto stream = cudfGlobalStreamPool().get_stream(); - auto tbl = getConcatenatedTable(inputs_, stream); + // std::cout << "CHB: NoMoreInput inputs_.size() = " << inputs_.size() << std::endl; + std::unique_ptr tbl; + if (inputs_.size() == 0) { + auto emptyRowVector = RowVector::createEmpty(joinNode_->sources()[1]->outputType(), operatorCtx_->pool()); + tbl = facebook::velox::cudf_velox::with_arrow::toCudfTable(emptyRowVector, operatorCtx_->pool(), stream); + } else { + tbl = getConcatenatedTable(inputs_, stream); + } // Release input data after synchronizing stream.synchronize(); @@ -176,7 +186,7 @@ void CudfHashJoinBuild::noMoreInput() { !joinNode_->filter(); auto hashObject = (buildHashJoin) ? std::make_shared( tbl->view().select(buildKeyIndices), - cudf::null_equality::EQUAL, + cudf::null_equality::UNEQUAL, stream) : nullptr; if (buildHashJoin) { @@ -197,7 +207,7 @@ void CudfHashJoinBuild::noMoreInput() { auto cudfHashJoinBridge = std::dynamic_pointer_cast(joinBridge); cudfHashJoinBridge->setHashTable(std::make_optional( - std::make_pair(std::shared_ptr(std::move(tbl)), std::move(hashObject)))); + std::make_tuple(std::shared_ptr(std::move(tbl)), std::move(hashObject), std::make_shared>(false)))); } exec::BlockingReason CudfHashJoinBuild::isBlocked(ContinueFuture* future) { @@ -367,16 +377,33 @@ RowVectorPtr CudfHashJoinProbe::getOutput() { } VELOX_NVTX_OPERATOR_FUNC_RANGE(); + if (finished_) { + return nullptr; + } if (!input_) { + // right join should be designed to output only matching left table rows immediately, and + // wait for unmatched right table rows until end. + // After noMoreInput_ is set, only one probe driver should output all unmatched right table rows. + // following code is not correct. it was added as debug code. + if (!(joinNode_->isRightJoin() and !std::get<2>(hashObject_.value())->load(std::memory_order_relaxed) and noMoreInput_)) return nullptr; } if (!hashObject_.has_value()) { return nullptr; } auto cudfInput = std::dynamic_pointer_cast(input_); - VELOX_CHECK_NOT_NULL(cudfInput); - auto stream = cudfInput->stream(); - auto leftTable = cudfInput->release(); // probe table + rmm::cuda_stream_view stream; + std::unique_ptr leftTable; + if (!cudfInput) { + auto emptyRowVector = RowVector::createEmpty(joinNode_->sources()[0]->outputType(), operatorCtx_->pool()); + auto stream = cudfGlobalStreamPool().get_stream(); + leftTable = facebook::velox::cudf_velox::with_arrow::toCudfTable(emptyRowVector, operatorCtx_->pool(), stream); + } else { + VELOX_CHECK_NOT_NULL(cudfInput); + stream = cudfInput->stream(); + leftTable = cudfInput->release(); // probe table + } + if (cudfDebugEnabled()) { std::cout << "Probe table number of columns: " << leftTable->num_columns() << std::endl; @@ -387,8 +414,9 @@ RowVectorPtr CudfHashJoinProbe::getOutput() { // TODO pass the input pool !!! // TODO: We should probably subset columns before calling to_cudf_table? // Maybe that isn't a problem if we fuse operators together. - auto& rightTable = hashObject_.value().first; - auto& hb = hashObject_.value().second; + auto& rightTable = std::get<0>(hashObject_.value()); + auto& hb = std::get<1>(hashObject_.value()); + auto& isRightProbed = std::get<2>(hashObject_.value()); VELOX_CHECK_NOT_NULL(rightTable); if (cudfDebugEnabled()) { if (rightTable != nullptr) @@ -409,6 +437,28 @@ RowVectorPtr CudfHashJoinProbe::getOutput() { auto leftTableView = leftTable->view(); auto rightTableView = rightTable->view(); + // print the tables + auto probeType = joinNode_->sources()[0]->outputType(); + auto buildType = joinNode_->sources()[1]->outputType(); + if (std::getenv("PRINT_TABLES") != nullptr && std::string(std::getenv("PRINT_TABLES")) == "1") { + std::lock_guard lock(probePrintMutex_); + // move the table with toVeloxColumn and print it + auto veloxTable = with_arrow::toVeloxColumn(leftTable->view(), pool(), probeType->asRow().names(), stream); + std::cout << "Left table: " << veloxTable->toString() << std::endl; + // print each row in the velox table + for (int i = 0; i < veloxTable->size(); i++) { + std::cout << "Row " << std::setw(3) << i << ": " << veloxTable->toString(i) << std::endl; + } + // do it for right table + auto veloxTable2 = with_arrow::toVeloxColumn(rightTable->view(), pool(), buildType->asRow().names(), stream); + std::cout << "Right table: " << veloxTable2->toString() << std::endl; + // print each row in the velox table + for (int i = 0; i < veloxTable2->size(); i++) { + std::cout << "Row " << std::setw(3) << i << ": " << veloxTable2->toString(i) << std::endl; + } + std::cout << std::flush; + } + if (joinNode_->isInnerJoin()) { // left = probe, right = build if (joinNode_->filter()) { @@ -418,7 +468,7 @@ RowVectorPtr CudfHashJoinProbe::getOutput() { leftTableView, rightTableView, tree_.back(), - cudf::null_equality::EQUAL, + cudf::null_equality::UNEQUAL, std::nullopt, stream); } else { @@ -434,7 +484,7 @@ RowVectorPtr CudfHashJoinProbe::getOutput() { leftTableView, rightTableView, tree_.back(), - cudf::null_equality::EQUAL, + cudf::null_equality::UNEQUAL, std::nullopt, stream); } else { @@ -443,21 +493,24 @@ RowVectorPtr CudfHashJoinProbe::getOutput() { leftTableView.select(leftKeyIndices_), std::nullopt, stream); } } else if (joinNode_->isRightJoin()) { + std::cout << "Right join" << "," << noMoreInput_ << "," << input_ << std::endl; + isRightProbed->store(true, std::memory_order_relaxed); if (joinNode_->filter()) { + // TODO check if tree needs to be flipped. std::tie(rightJoinIndices, leftJoinIndices) = cudf::mixed_left_join( rightTableView.select(rightKeyIndices_), leftTableView.select(leftKeyIndices_), rightTableView, leftTableView, tree_.back(), - cudf::null_equality::EQUAL, + cudf::null_equality::UNEQUAL, std::nullopt, stream); } else { std::tie(rightJoinIndices, leftJoinIndices) = cudf::left_join( rightTableView.select(rightKeyIndices_), leftTableView.select(leftKeyIndices_), - cudf::null_equality::EQUAL, + cudf::null_equality::UNEQUAL, stream, cudf::get_current_device_resource_ref()); } @@ -469,14 +522,14 @@ RowVectorPtr CudfHashJoinProbe::getOutput() { leftTableView, rightTableView, tree_.back(), - cudf::null_equality::EQUAL, + cudf::null_equality::UNEQUAL, stream, cudf::get_current_device_resource_ref()); } else { leftJoinIndices = cudf::left_anti_join( leftTableView.select(leftKeyIndices_), rightTableView.select(rightKeyIndices_), - cudf::null_equality::EQUAL, + cudf::null_equality::UNEQUAL, stream, cudf::get_current_device_resource_ref()); } @@ -488,14 +541,14 @@ RowVectorPtr CudfHashJoinProbe::getOutput() { leftTableView, rightTableView, tree_.back(), - cudf::null_equality::EQUAL, + cudf::null_equality::UNEQUAL, stream, cudf::get_current_device_resource_ref()); } else { leftJoinIndices = cudf::left_semi_join( leftTableView.select(leftKeyIndices_), rightTableView.select(rightKeyIndices_), - cudf::null_equality::EQUAL, + cudf::null_equality::UNEQUAL, stream, cudf::get_current_device_resource_ref()); } @@ -507,14 +560,14 @@ RowVectorPtr CudfHashJoinProbe::getOutput() { rightTableView, leftTableView, tree_.back(), - cudf::null_equality::EQUAL, + cudf::null_equality::UNEQUAL, stream, cudf::get_current_device_resource_ref()); } else { rightJoinIndices = cudf::left_semi_join( rightTableView.select(rightKeyIndices_), leftTableView.select(leftKeyIndices_), - cudf::null_equality::EQUAL, + cudf::null_equality::UNEQUAL, stream, cudf::get_current_device_resource_ref()); } @@ -534,6 +587,38 @@ RowVectorPtr CudfHashJoinProbe::getOutput() { auto leftIndicesCol = cudf::column_view{leftIndicesSpan}; auto rightIndicesCol = cudf::column_view{rightIndicesSpan}; auto constexpr oobPolicy = cudf::out_of_bounds_policy::NULLIFY; + + // print the left indices + if (std::getenv("PRINT_TABLES") != nullptr && std::string(std::getenv("PRINT_TABLES")) == "1") { + std::lock_guard lock(probePrintMutex_); + // move the table with toVeloxColumn and print it + auto veloxTable = with_arrow::toVeloxColumn(cudf::table_view{{leftIndicesCol}}, pool(), "left_indices", stream); + std::cout << "Left indices: " << veloxTable->toString() << std::endl; + // print each row in the velox table + for (int i = 0; i < veloxTable->size(); i++) { + std::cout << "Row " << std::setw(3) << i << ": " << veloxTable->toString(i) << std::endl; + } + // do it for right table + auto veloxTable2 = with_arrow::toVeloxColumn(cudf::table_view{{rightIndicesCol}}, pool(), "right_indices", stream); + std::cout << "Right indices: " << veloxTable2->toString() << std::endl; + // print each row in the velox table + for (int i = 0; i < veloxTable2->size(); i++) { + std::cout << "Row " << std::setw(3) << i << ": " << veloxTable2->toString(i) << std::endl; + } + std::cout << std::flush; + } + + // if(!noMoreInput_ and joinNode_->isRightJoin()) { + // // drop out of bounds indices + // // auto nullified_indices = cudf::gather(cudf::table_view{{rightIndicesCol}}, rightIndicesCol, oobPolicy, stream); + // // nullified_indices = cudf::drop_nulls(nullified_indices->view(), {0}, stream); + // // rightTable = cudf::gather(rightTableView, nullified_indices->get_column(0), oobPolicy, stream); + // rightTable = cudf::gather(rightTableView, rightIndicesCol, oobPolicy, stream); + // // generate output for left table immediately, but update the right table. + + // input_.reset(); + // return nullptr; + // } auto leftResult = cudf::gather(leftInput, leftIndicesCol, oobPolicy, stream); auto rightResult = cudf::gather(rightInput, rightIndicesCol, oobPolicy, stream); @@ -558,6 +643,18 @@ RowVectorPtr CudfHashJoinProbe::getOutput() { auto cudfOutput = std::make_unique(std::move(joinedCols)); stream.synchronize(); + // print the output + if (std::getenv("PRINT_TABLES") != nullptr && std::string(std::getenv("PRINT_TABLES")) == "1") { + std::lock_guard lock(probePrintMutex_); + auto veloxTable = with_arrow::toVeloxColumn(cudfOutput->view(), pool(), outputType_->asRow().names(), stream); + std::cout << "Output table: " << veloxTable->toString() << std::endl; + // print each row in the velox table + for (int i = 0; i < veloxTable->size(); i++) { + std::cout << "Row " << std::setw(3) << i << ": " << veloxTable->toString(i) << std::endl; + } + std::cout << std::flush; + } + input_.reset(); finished_ = noMoreInput_; diff --git a/velox/experimental/cudf/exec/CudfHashJoin.h b/velox/experimental/cudf/exec/CudfHashJoin.h index a9384586fe1..97e66b28ebd 100644 --- a/velox/experimental/cudf/exec/CudfHashJoin.h +++ b/velox/experimental/cudf/exec/CudfHashJoin.h @@ -34,7 +34,7 @@ namespace facebook::velox::cudf_velox { class CudfHashJoinBridge : public exec::JoinBridge { public: using hash_type = - std::pair, std::shared_ptr>; + std::tuple, std::shared_ptr, std::shared_ptr>>; void setHashTable(std::optional hashObject); @@ -89,10 +89,8 @@ class CudfHashJoinProbe : public exec::Operator, public NvtxHelper { static bool isSupportedJoinType(core::JoinType joinType) { return joinType == core::JoinType::kInner || joinType == core::JoinType::kLeft || - joinType == core::JoinType::kRight || joinType == core::JoinType::kAnti || - joinType == core::JoinType::kLeftSemiFilter || - joinType == core::JoinType::kRightSemiFilter; + joinType == core::JoinType::kLeftSemiFilter; } bool isFinished() override; @@ -113,6 +111,7 @@ class CudfHashJoinProbe : public exec::Operator, public NvtxHelper { std::vector rightColumnIndicesToGather_; std::vector leftColumnOutputIndices_; std::vector rightColumnOutputIndices_; + std::mutex probePrintMutex_; bool finished_{false}; }; diff --git a/velox/experimental/cudf/exec/ToCudf.cpp b/velox/experimental/cudf/exec/ToCudf.cpp index 59e2fdf0c20..7f36264456f 100644 --- a/velox/experimental/cudf/exec/ToCudf.cpp +++ b/velox/experimental/cudf/exec/ToCudf.cpp @@ -116,6 +116,7 @@ bool CompileState::compile() { return false; } if (!CudfHashJoinProbe::isSupportedJoinType(planNode->joinType())) { + std::cout << "Unsupported join type: " << planNode->toString() << joinTypeName(planNode->joinType()) << std::endl; return false; } return true;