diff --git a/CMakeLists.txt b/CMakeLists.txt index 98266ee..731d80e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -213,6 +213,7 @@ add_library(core src/core.cpp src/query.cpp src/query_execution.cpp + src/join.cpp src/storage.cpp src/metadata.cpp src/file_utils.cpp diff --git a/include/join.hpp b/include/join.hpp new file mode 100644 index 0000000..ed7ab46 --- /dev/null +++ b/include/join.hpp @@ -0,0 +1,172 @@ +#ifndef JOIN_HPP +#define JOIN_HPP + +#include + +#include + +#include "query.hpp" + +namespace tundradb { + +/** + * @brief Input data for join ID computation + * + * Captures the state accumulated during edge traversal so that + * a JoinStrategy can decide which target (and source) IDs survive. + */ +struct JoinInput { + // All node IDs currently in the source schema within query state + const llvm::DenseSet& source_ids; + + // All node IDs that exist in the target table (full scan of target schema) + const llvm::DenseSet& all_target_ids; + + // Source nodes that had at least one matching edge + const llvm::DenseSet& matched_source_ids; + + // Target nodes that were reached via matching edges + const llvm::DenseSet& matched_target_ids; + + // Target IDs already accumulated from a previous traversal that shares + // the same target alias (e.g. multi-pattern queries). Empty on the first + // pass. + const llvm::DenseSet& existing_target_ids; + + // Source nodes that had NO matching edge + const llvm::DenseSet& unmatched_source_ids; + + // Whether source and target resolve to the same concrete schema + bool is_self_join; +}; + +/** + * @brief Output of join ID computation + */ +struct JoinOutput { + // Final set of target node IDs to store in query_state.ids[target] + llvm::DenseSet target_ids; + + // Source IDs that should be removed from query_state (INNER join pruning) + llvm::DenseSet source_ids_to_remove; + + // Whether the source table needs to be rebuilt after pruning + bool rebuild_source_table = false; +}; + +/** + * @brief Strategy interface for computing join results + * + * Each join type (INNER, LEFT, RIGHT, FULL) implements this interface + * to determine which node IDs should be included in the query result. + * + * The strategy only computes IDs - it does not modify QueryState or + * touch Arrow tables. That keeps it pure, testable, and composable. + */ +class JoinStrategy { + public: + virtual ~JoinStrategy() = default; + + /** + * Compute which target/source IDs survive this join. + */ + [[nodiscard]] virtual JoinOutput compute(const JoinInput& input) const = 0; + + /** + * Human-readable name for logging / debugging. + */ + [[nodiscard]] virtual const char* name() const noexcept = 0; +}; + +/** + * INNER JOIN + * + * Only matched targets survive. + * Unmatched sources are pruned (and the source table is rebuilt). + * + * When existing_target_ids is non-empty (multi-pattern), the result is + * the intersection of existing and newly matched target IDs. + */ +class InnerJoinStrategy final : public JoinStrategy { + public: + [[nodiscard]] JoinOutput compute(const JoinInput& input) const override; + [[nodiscard]] const char* name() const noexcept override { return "INNER"; } +}; + +/** + * LEFT JOIN + * + * All source nodes are kept. Target IDs are the union of matched + * targets and any previously accumulated targets (multi-pattern). + */ +class LeftJoinStrategy final : public JoinStrategy { + public: + [[nodiscard]] JoinOutput compute(const JoinInput& input) const override; + [[nodiscard]] const char* name() const noexcept override { return "LEFT"; } +}; + +/** + * RIGHT JOIN (self-join variant) + * + * target_ids = all_targets − matched_sources + * + * For self-joins the source and target live in the same schema, so + * we exclude matched *source* IDs to prevent a node appearing both + * as a matched source and as an unmatched target. + */ +class RightJoinSelfStrategy final : public JoinStrategy { + public: + [[nodiscard]] JoinOutput compute(const JoinInput& input) const override; + [[nodiscard]] const char* name() const noexcept override { + return "RIGHT_SELF"; + } +}; + +/** + * RIGHT JOIN (cross-schema variant) + * + * target_ids = matched_targets ∪ (all_targets − matched_targets) + * = all_targets (but computed in two steps so logging is clear) + * + * For cross-schema joins, IDs live in separate namespaces, so we compare + * within the target schema only. + */ +class RightJoinCrossSchemaStrategy final : public JoinStrategy { + public: + [[nodiscard]] JoinOutput compute(const JoinInput& input) const override; + [[nodiscard]] const char* name() const noexcept override { + return "RIGHT_CROSS"; + } +}; + +/** + * FULL OUTER JOIN + * + * Combines the RIGHT logic (all targets survive) with the LEFT logic + * (all sources survive). Delegates the target-side computation to an + * inner RIGHT strategy (self or cross-schema). + */ +class FullJoinStrategy final : public JoinStrategy { + public: + explicit FullJoinStrategy(std::unique_ptr right_strategy); + + [[nodiscard]] JoinOutput compute(const JoinInput& input) const override; + [[nodiscard]] const char* name() const noexcept override { return "FULL"; } + + private: + std::unique_ptr right_strategy_; +}; + +/** + * @brief Creates the appropriate JoinStrategy for a given TraverseType + * and join context (self-join vs. cross-schema). + */ +class JoinStrategyFactory { + public: + static std::unique_ptr create(TraverseType type, + bool is_self_join); +}; + +} // namespace tundradb + +#endif // JOIN_HPP diff --git a/include/utils.hpp b/include/utils.hpp index 8a13592..d562892 100644 --- a/include/utils.hpp +++ b/include/utils.hpp @@ -21,6 +21,31 @@ #include "types.hpp" namespace tundradb { + +template +void dense_intersection(const SetA& a, const SetB& b, OutSet& out) { + const auto& small = a.size() < b.size() ? a : b; + const auto& large = a.size() < b.size() ? b : a; + out.clear(); + out.reserve(std::min(a.size(), b.size())); + for (const auto& x : small) { + if (large.contains(x)) { + out.insert(x); + } + } +} + +template +void dense_difference(const SetA& a, const SetB& b, OutSet& out) { + out.clear(); + out.reserve(a.size()); + for (const auto& x : a) { + if (!b.contains(x)) { + out.insert(x); + } + } +} + static std::string generate_uuid() { uuid_t uuid; uuid_generate(uuid); diff --git a/src/core.cpp b/src/core.cpp index f4bb20b..b707dd0 100644 --- a/src/core.cpp +++ b/src/core.cpp @@ -1,5 +1,6 @@ #include "../include/core.hpp" +#include "../include/join.hpp" #include "../include/temporal_context.hpp" using namespace tundradb; @@ -1700,30 +1701,6 @@ arrow::Status prepare_query(Query& query, QueryState& query_state) { return arrow::Status::OK(); } -template -void dense_intersection(const SetA& a, const SetB& b, OutSet& out) { - const auto& small = a.size() < b.size() ? a : b; - const auto& large = a.size() < b.size() ? b : a; - out.clear(); - out.reserve(std::min(a.size(), b.size())); - for (const auto& x : small) { - if (large.contains(x)) { - out.insert(x); - } - } -} - -template -void dense_difference(const SetA& a, const SetB& b, OutSet& out) { - out.clear(); - out.reserve(a.size()); - for (const auto& x : a) { - if (!b.contains(x)) { - out.insert(x); - } - } -} - arrow::Result> Database::query( const Query& query) const { QueryState query_state(this->schema_registry_); @@ -1983,106 +1960,67 @@ arrow::Result> Database::query( unmatched_source_ids.insert(source_id); } } - if (traverse->traverse_type() == TraverseType::Inner && - !unmatched_source_ids.empty()) { - for (auto id : unmatched_source_ids) { - IF_DEBUG_ENABLED { - log_debug("remove unmatched node={}:{}", source.value(), id); - } - query_state.remove_node(id, source); - } - IF_DEBUG_ENABLED { - auto resolved = query_state.resolve_schema(source); - if (resolved.ok()) { - log_debug("rebuild table for schema {}:{}", source.value(), - resolved.ValueOrDie()); - } - } - auto table_result = - filter_table_by_id(query_state.tables[source.value()], - query_state.ids()[source.value()]); - if (!table_result.ok()) { - return table_result.status(); - } - query_state.tables[source.value()] = table_result.ValueOrDie(); - } IF_DEBUG_ENABLED { log_debug("found {} neighbors for {}", matched_target_ids.size(), traverse->target().toString()); } - if (traverse->traverse_type() == TraverseType::Inner) { - // intersect - // a:0 -> c:0 - // b:0 -> c:1 - // after a:0 -> c:0 => ids[c] = {0} - // after b:0 -> c:1 we need to intersect it with ids[c] = - // intersect({0}, {1}) => {} - auto target_ids = query_state.get_ids(traverse->target()); - llvm::DenseSet intersect_ids; - if (target_ids.empty()) { - intersect_ids = matched_target_ids; - } else { - dense_intersection(target_ids, matched_target_ids, intersect_ids); - } - - query_state.ids()[traverse->target().value()] = intersect_ids; - IF_DEBUG_ENABLED { - log_debug("intersect_ids count: {}", intersect_ids.size()); - log_debug("{} intersect_ids: {}", traverse->target().toString(), - join_container(intersect_ids)); - } - - } else if (traverse->traverse_type() == TraverseType::Left) { - query_state.ids()[traverse->target().value()].insert( - matched_target_ids.begin(), matched_target_ids.end()); - } else { // Right, Full: matched targets + unmatched targets - auto target_ids = + // For RIGHT/FULL joins we need all target IDs from the table + llvm::DenseSet all_target_ids; + if (traverse->traverse_type() == TraverseType::Right || + traverse->traverse_type() == TraverseType::Full) { + all_target_ids = get_ids_from_table( get_table(target_schema, query_state.temporal_context.get()) .ValueOrDie()) .ValueOrDie(); + } - llvm::DenseSet result; + const bool is_self_join = source_schema == target_schema; + auto strategy = JoinStrategyFactory::create(traverse->traverse_type(), + is_self_join); - // Check if this is a self-join (same schema for source and target) - if (source_schema == target_schema) { - // Self-join: Exclude nodes that were sources with matches - // (prevents same node appearing as both source and unmatched - // target) - dense_difference(target_ids, matched_source_ids, result); - IF_DEBUG_ENABLED { - log_debug( - "traverse type: '{}' (Right/Full, self-join), " - "matched_source_ids=[{}], unmatched_targets=[{}], total={}", - traverse->target().value(), - join_container(matched_source_ids), join_container(result), - result.size()); - } - } else { - // Cross-schema join: Include matched targets + unmatched targets - // (compare IDs within same schema to avoid ID collision) - result = matched_target_ids; - llvm::DenseSet unmatched_targets; - dense_difference(target_ids, matched_target_ids, unmatched_targets); - result.insert(unmatched_targets.begin(), unmatched_targets.end()); + IF_DEBUG_ENABLED { + log_debug("Using {} join strategy (self_join={})", strategy->name(), + is_self_join); + } + + JoinInput join_input{ + .source_ids = query_state.ids()[source.value()], + .all_target_ids = all_target_ids, + .matched_source_ids = matched_source_ids, + .matched_target_ids = matched_target_ids, + .existing_target_ids = query_state.get_ids(traverse->target()), + .unmatched_source_ids = unmatched_source_ids, + .is_self_join = is_self_join, + }; + + auto join_output = strategy->compute(join_input); + + // Apply target IDs + query_state.ids()[traverse->target().value()] = join_output.target_ids; + + // Apply source pruning (INNER join removes unmatched sources) + if (join_output.rebuild_source_table) { + for (auto id : join_output.source_ids_to_remove) { IF_DEBUG_ENABLED { - log_debug( - "traverse type: '{}' (Right/Full, cross-schema), " - "matched_targets=[{}], unmatched_targets=[{}], total={}", - traverse->target().value(), - join_container(matched_target_ids), - join_container(unmatched_targets), result.size()); + log_debug("remove unmatched node={}:{}", source.value(), id); } + query_state.remove_node(id, source); } - - query_state.ids()[traverse->target().value()] = result; + auto table_result = + filter_table_by_id(query_state.tables[source.value()], + query_state.ids()[source.value()]); + if (!table_result.ok()) { + return table_result.status(); + } + query_state.tables[source.value()] = table_result.ValueOrDie(); } std::vector> neighbors; for (auto id : query_state.ids()[traverse->target().value()]) { - auto node_res = node_manager_->get_node(target_schema, id); - if (node_res.ok()) { + if (auto node_res = node_manager_->get_node(target_schema, id); + node_res.ok()) { neighbors.push_back(node_res.ValueOrDie()); } } diff --git a/src/join.cpp b/src/join.cpp new file mode 100644 index 0000000..e6fb207 --- /dev/null +++ b/src/join.cpp @@ -0,0 +1,378 @@ +#include "join.hpp" + +#include "logger.hpp" +#include "utils.hpp" + +namespace tundradb { + +// ============================================================================ +// INNER JOIN +// ============================================================================ +// +// Semantics: Keep only nodes that have a match on BOTH sides. +// +// - Single-pattern example: +// +// Graph: +// User(0) --works_at--> Company(0) (Google) +// User(1) --works_at--> Company(1) (Apple) +// User(2) (no edge) +// Company(2) (no incoming edge) +// +// Query: FROM u:User TRAVERSE u -works_at-> c:Company (INNER) +// +// After edge traversal: +// matched_source_ids = {0, 1} <- users that had an edge +// matched_target_ids = {0, 1} <- companies that were reached +// unmatched_source_ids = {2} <- User(2) had no edge +// existing_target_ids = {} <- first time we see "c" +// +// Strategy output: +// target_ids = {0, 1} <- only matched companies +// source_ids_to_remove = {2} <- User(2) is pruned +// rebuild_source_table = true +// +// Result rows: (User 0, Company 0), (User 1, Company 1) +// +// - Multi-pattern example: +// +// Two traversals share the same target alias "c": +// TRAVERSE a -works_at-> c:Company +// TRAVERSE b -works_at-> c:Company +// +// After 1st traversal: existing_target_ids = {0} (from pattern a) +// After 2nd traversal: matched_target_ids = {0, 1} (from pattern b) +// +// We INTERSECT existing ∩ matched = {0} +// Only Company(0) survives both patterns. +// +// ============================================================================ + +JoinOutput InnerJoinStrategy::compute(const JoinInput& input) const { + JoinOutput out; + + if (input.existing_target_ids.empty()) { + // First traversal targeting this alias: keep all matched + out.target_ids = input.matched_target_ids; + } else { + // Multi-pattern: intersect with what previous traversals produced + dense_intersection(input.existing_target_ids, input.matched_target_ids, + out.target_ids); + } + + // Source pruning: remove sources that had no match + out.source_ids_to_remove = input.unmatched_source_ids; + out.rebuild_source_table = !out.source_ids_to_remove.empty(); + + IF_DEBUG_ENABLED { + log_debug( + "INNER JOIN: {} matched targets, {} sources to remove, " + "rebuild_source={}", + out.target_ids.size(), out.source_ids_to_remove.size(), + out.rebuild_source_table); + } + + return out; +} + +// ============================================================================ +// LEFT JOIN +// ============================================================================ +// +// Semantics: Keep ALL source nodes (even if they have no match). +// Target side = only nodes that were actually reached. +// Sources without a match get NULL columns for the target. +// +// - Example: +// +// Graph: +// User(0) --works_at--> Company(0) +// User(1) (no edge) +// +// Query: FROM u:User LEFT TRAVERSE u -works_at-> c:Company +// +// After edge traversal: +// matched_source_ids = {0} +// matched_target_ids = {0} +// unmatched_source_ids= {1} +// existing_target_ids = {} +// +// Strategy output: +// target_ids = {0} <- only matched companies +// source_ids_to_remove= {} <- no pruning! +// rebuild_source_table= false +// +// Result rows: +// (User 0, Company 0) <- matched +// (User 1, NULL) <- unmatched source, target is NULL +// +// - Multi-pattern: +// +// Two LEFT traversals share target "c": +// Pattern 1 matched targets: {0} -> existing = {0} +// Pattern 2 matched targets: {0, 1} +// +// Result: existing ∪ matched = {0, 1} (union, not intersection!) +// +// ============================================================================ + +JoinOutput LeftJoinStrategy::compute(const JoinInput& input) const { + JoinOutput out; + + // Start with whatever was accumulated from previous traversals + out.target_ids = input.existing_target_ids; + // Union in newly matched targets + out.target_ids.insert(input.matched_target_ids.begin(), + input.matched_target_ids.end()); + + // Sources are NEVER pruned in a LEFT join + + IF_DEBUG_ENABLED { + log_debug("LEFT JOIN: {} target IDs (matched={}, existing={})", + out.target_ids.size(), input.matched_target_ids.size(), + input.existing_target_ids.size()); + } + + return out; +} + +// ============================================================================ +// RIGHT JOIN - self-join variant +// ============================================================================ +// +// Semantics: Keep ALL target nodes (even unmatched ones). +// Unmatched targets get NULL columns for the source. +// +// Why a separate "self" variant? +// When source and target are the same schema (e.g., User -friends-> User), +// the IDs live in the SAME namespace. A node can be both a source and a +// target. If User(0) matched as a source, we must NOT also include it as +// an "unmatched target" - that would produce a duplicate row. +// +// Formula: target_ids = all_targets − matched_sources +// +// - Example: +// +// Graph (self-join, same schema "User"): +// User(0) --friends--> User(1) +// User(0) --friends--> User(2) +// User(3) (isolated node) +// +// Query: FROM u:User RIGHT TRAVERSE u -friends-> f:User +// +// Note: u and f are aliases but both resolve to schema "User" +// +// After edge traversal: +// matched_source_ids = {0} <- User(0) had outgoing edges +// matched_target_ids = {1, 2} <- users that were reached +// all_target_ids = {0,1,2,3} <- every user in the table +// +// Strategy: all_targets − matched_sources = {0,1,2,3} − {0} = {1,2,3} +// +// Why not all_targets − matched_targets = {0,3}? +// Because we want User(1) and User(2) to appear as matched targets +// with their source connections. The "right" side is all targets +// that are NOT already covered as matched sources. +// +// Result rows: +// (User 0, User 1) <- matched +// (User 0, User 2) <- matched +// (NULL, User 3) <- unmatched target +// +// ============================================================================ + +JoinOutput RightJoinSelfStrategy::compute(const JoinInput& input) const { + JoinOutput out; + + dense_difference(input.all_target_ids, input.matched_source_ids, + out.target_ids); + + IF_DEBUG_ENABLED { + log_debug( + "RIGHT JOIN (self): all_targets={}, matched_sources={}, result={}", + input.all_target_ids.size(), input.matched_source_ids.size(), + out.target_ids.size()); + } + + return out; +} + +// ============================================================================ +// RIGHT JOIN - cross-schema variant +// ============================================================================ +// +// Semantics: Same as RIGHT (keep all targets), but source and target are +// DIFFERENT schemas. IDs live in separate namespaces, so there +// is no collision risk. We simply include all target IDs. +// +// Formula: target_ids = matched_targets ∪ (all_targets − matched_targets) +// = all_targets +// +// We compute it in two steps (matched + unmatched) so the debug log shows +// exactly which targets were reached and which were not. +// +// - Example: +// +// Graph: +// User(0) --works_at--> Company(0) (Google) +// User(1) --works_at--> Company(0) (Google) +// Company(1) (Apple, no incoming edges) +// +// Query: FROM u:User RIGHT TRAVERSE u -works_at-> c:Company +// +// After edge traversal: +// matched_source_ids = {0, 1} +// matched_target_ids = {0} <- only Google was reached +// all_target_ids = {0, 1} <- Google + Apple +// +// Strategy: matched ∪ unmatched = {0} ∪ {1} = {0, 1} +// +// Result rows: +// (User 0, Company 0) <- matched (Google) +// (User 1, Company 0) <- matched (Google) +// (NULL, Company 1) <- unmatched target (Apple) +// +// Why not use the self-join formula here? +// +// With per-schema IDs, User(0) and Company(0) both have id=0 but they +// are in different schemas. If we did all_targets − matched_sources: +// {0, 1} − {0, 1} = {} <- WRONG! We'd lose all companies! +// +// ============================================================================ + +JoinOutput RightJoinCrossSchemaStrategy::compute(const JoinInput& input) const { + JoinOutput out; + + out.target_ids = input.matched_target_ids; + llvm::DenseSet unmatched; + dense_difference(input.all_target_ids, input.matched_target_ids, unmatched); + out.target_ids.insert(unmatched.begin(), unmatched.end()); + + IF_DEBUG_ENABLED { + log_debug( + "RIGHT JOIN (cross): matched_targets={}, unmatched_targets={}, " + "total={}", + input.matched_target_ids.size(), unmatched.size(), + out.target_ids.size()); + } + + return out; +} + +// ============================================================================ +// FULL OUTER JOIN +// ============================================================================ +// +// Semantics: Keep ALL nodes from BOTH sides. +// = LEFT (all sources kept) + RIGHT (all targets kept) +// +// Implementation: delegates to the appropriate RIGHT strategy for the +// target side. The source side is handled by NOT pruning any sources +// (source_ids_to_remove stays empty). +// +// - Cross-schema example: +// +// Graph: +// User(0) --works_at--> Company(0) +// User(1) (no edge) +// Company(1) (no incoming edge) +// +// Query: FROM u:User FULL TRAVERSE u -works_at-> c:Company +// +// After edge traversal: +// matched_source_ids = {0} +// matched_target_ids = {0} +// unmatched_source_ids= {1} +// all_target_ids = {0, 1} +// +// RIGHT strategy (cross): target_ids = {0} ∪ {1} = {0, 1} +// FULL adds: source_ids_to_remove = {} (no pruning) +// +// Result rows: +// (User 0, Company 0) <- matched +// (User 1, NULL) <- unmatched source +// (NULL, Company 1) <- unmatched target +// +// - Self-join example: +// +// Graph: +// User(0) --friends--> User(1) +// User(2) (isolated) +// +// Query: FROM u:User FULL TRAVERSE u -friends-> f:User +// +// RIGHT strategy (self): all_targets − matched_sources = {0,1,2} − {0} +// = {1, 2} +// FULL adds: source_ids_to_remove = {} +// +// Result rows: +// (User 0, User 1) <- matched +// (NULL, User 2) <- unmatched target +// Note: User(0) appears as source only, not duplicated as target +// +// ============================================================================ + +FullJoinStrategy::FullJoinStrategy(std::unique_ptr right_strategy) + : right_strategy_(std::move(right_strategy)) {} + +JoinOutput FullJoinStrategy::compute(const JoinInput& input) const { + // Target side: delegate to the appropriate RIGHT strategy + JoinOutput out = right_strategy_->compute(input); + + // Source side: all sources survive (no pruning) + // source_ids_to_remove stays empty, rebuild_source_table stays false + + IF_DEBUG_ENABLED { + log_debug("FULL JOIN (via {}): {} target IDs", right_strategy_->name(), + out.target_ids.size()); + } + + return out; +} + +// ============================================================================ +// Factory +// ============================================================================ +// +// Picks the right strategy based on TraverseType and whether source/target +// resolve to the same concrete schema (self-join). +// +// The key branching: +// INNER -> InnerJoinStrategy (always) +// LEFT -> LeftJoinStrategy (always) +// RIGHT -> RightJoinSelfStrategy (if self-join) +// -> RightJoinCrossSchemaStrategy (if cross-schema) +// FULL -> FullJoinStrategy wrapping RightJoinSelf (if self-join) +// -> FullJoinStrategy wrapping RightJoinCross (if cross-schema) +// +// ============================================================================ + +std::unique_ptr JoinStrategyFactory::create(TraverseType type, + bool is_self_join) { + switch (type) { + case TraverseType::Inner: + return std::make_unique(); + + case TraverseType::Left: + return std::make_unique(); + + case TraverseType::Right: + if (is_self_join) { + return std::make_unique(); + } + return std::make_unique(); + + case TraverseType::Full: + if (is_self_join) { + return std::make_unique( + std::make_unique()); + } + return std::make_unique( + std::make_unique()); + } + + // Unreachable if the enum is exhaustive, but just in case: + return std::make_unique(); +} + +} // namespace tundradb