diff --git a/CMakeLists.txt b/CMakeLists.txt index ec14971..728a1fa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -223,10 +223,11 @@ add_library(core src/query_execution.cpp src/join.cpp src/row.cpp + src/shard.cpp src/storage.cpp src/metadata.cpp src/file_utils.cpp - src/snapshot.cpp + src/snapshot_manager.cpp src/edge_store.cpp src/table_info.cpp src/utils.cpp diff --git a/antlr/TundraQL.g4 b/antlr/TundraQL.g4 index 9b9df70..3e06a7e 100644 --- a/antlr/TundraQL.g4 +++ b/antlr/TundraQL.g4 @@ -1,7 +1,7 @@ grammar TundraQL; // Entry point for parsing a full command -statement: createSchemaStatement | createNodeStatement | createEdgeStatement | matchStatement | deleteStatement | commitStatement | showStatement EOF; +statement: createSchemaStatement | createNodeStatement | createEdgeStatement | matchStatement | deleteStatement | updateStatement | commitStatement | showStatement EOF; // --- Schema Definition --- createSchemaStatement: K_CREATE K_SCHEMA IDENTIFIER LPAREN schemaFieldList RPAREN SEMI; @@ -47,6 +47,20 @@ edgeDeleteTarget: | K_EDGE IDENTIFIER K_TO nodeSelector // DELETE EDGE edge_type TO node; | K_EDGE IDENTIFIER K_FROM nodeSelector K_TO nodeSelector; // DELETE EDGE edge_type FROM node TO node; +// --- Update Statement --- +// UPDATE User(0) SET name = "Bob", age = 31; +// UPDATE (u:User) SET u.age = 31 WHERE u.name = "Alice"; +// UPDATE MATCH (u:User)-[:WORKS_AT]->(c:Company) SET u.employed = true, c.size = 1 WHERE c.name = "Acme"; +updateStatement: K_UPDATE updateTarget K_SET setClause (K_WHERE whereClause)? SEMI; + +updateTarget: + nodeLocator // UPDATE User(0) SET ...; + | K_MATCH patternList // UPDATE MATCH (u:User)-[:WORKS_AT]->(c:Company) SET ...; + | nodePattern; // UPDATE (u:User) SET ... WHERE ...; + +setClause: setAssignment (COMMA setAssignment)*; +setAssignment: IDENTIFIER (DOT IDENTIFIER)? EQ value; + // --- Commit Statement --- commitStatement: K_COMMIT SEMI; @@ -106,6 +120,8 @@ K_RIGHT: 'RIGHT'; K_FULL: 'FULL'; K_AND: 'AND'; K_OR: 'OR'; +K_UPDATE: 'UPDATE'; +K_SET: 'SET'; K_COMMIT: 'COMMIT'; K_UNIQUE: 'UNIQUE'; K_SHOW: 'SHOW'; diff --git a/docs/tundraql.html b/docs/tundraql.html index ff90f06..e8fa4eb 100644 --- a/docs/tundraql.html +++ b/docs/tundraql.html @@ -129,6 +129,7 @@

Statements

CREATE NODE CREATE EDGE MATCH + UPDATE DELETE COMMIT SHOW @@ -136,6 +137,7 @@

Statements

Clauses

WHERE SELECT + SET JOIN Types

Reference

@@ -300,6 +302,85 @@

DELETE

DELETE EDGE works_at TO Company(0); + +

UPDATE

+

Modifies field values on existing nodes. Supports three forms: by ID (direct), by pattern (single schema + optional WHERE), and by MATCH (traversals / joins + optional WHERE).

+ +

Form 1 — Update by ID

+

Targets a single node using Schema(id). Field names are bare (no alias prefix).

+ +
+
Syntax DML
+
UPDATE Schema(id) SET field = value [, field = value ...] ;
+
+ +
+
Examples
+
// Update a single field
+UPDATE User(0) SET age = 31;
+
+// Update multiple fields at once (creates one version)
+UPDATE User(0) SET name = "Alice B.", age = 31;
+
+ +

Form 2 — Update by Pattern

+

Uses a node pattern (alias:Schema) with an optional WHERE clause to match nodes. Field names must be alias-qualified (alias.field).

+ +
+
Syntax DML
+
UPDATE (alias:Schema) SET alias.field = value [, ...]
+    [WHERE alias.field op value] ;
+
+ +
+
Examples
+
// Update all users named Alice
+UPDATE (u:User) SET u.age = 31
+    WHERE u.name = "Alice";
+
+// Update with compound condition
+UPDATE (u:User) SET u.name = "Senior"
+    WHERE u.age > 30 AND u.age < 50;
+
+// Update all nodes of a schema (no WHERE)
+UPDATE (u:User) SET u.age = 0;
+
+ +

Form 3 — Update by MATCH (traversals / joins)

+

Uses a full MATCH pattern with traversals to find nodes across multiple schemas, then applies SET assignments. Field names must be alias-qualified. Multiple schemas can be updated in a single statement.

+ +
+
Syntax DML
+
UPDATE MATCH (a:Schema1)-[:EDGE_TYPE]->(b:Schema2)
+    SET a.field = value [, b.field = value ...]
+    [WHERE alias.field op value] ;
+
+ +
+
Examples
+
// Update both user and company for a traversal
+UPDATE MATCH (u:User)-[:WORKS_AT]->(c:Company)
+    SET u.employed = true, c.size = 1
+    WHERE c.name = "Acme Corp";
+
+// Update only one side of the relationship
+UPDATE MATCH (u:User)-[:WORKS_AT]->(c:Company)
+    SET u.status = "employed"
+    WHERE c.name = "Google";
+
+// Same-schema traversal (e.g. friends)
+UPDATE MATCH (a:User)-[:FRIEND]->(b:User)
+    SET a.has_friend = true, b.has_friend = true;
+
+ +
+ Versioning: When multiple fields are updated in a single SET clause, TundraDB creates one version for the entire batch — not one per field. +
+ +
+ Pattern/MATCH forms require alias prefix: SET age = 31 is only valid in the by-ID form. In pattern and MATCH forms you must write SET u.age = 31. +
+

COMMIT

Persists the current database state to disk (Parquet files + JSON metadata).

@@ -360,6 +441,37 @@

SELECT Clause

SELECT u.name AS employee, c.name AS company, u.age; + +

SET Clause

+

Specifies field assignments in an UPDATE statement. Comma-separated list of field = value pairs.

+ +
+
Syntax
+
SET field = value [, field = value ...]
+
+ + + + + + + + + + + + + + + + + +
Update FormField Name FormatExample
By IDBare nameSET name = "Alice", age = 31
By PatternAlias-qualifiedSET u.name = "Alice", u.age = 31
+ +
+ Batch semantics: All assignments in a single SET clause are applied atomically — one version is created per node, regardless of how many fields are changed. +
+

JOIN Types

Specified inside the edge pattern -[:EDGE_TYPE JOIN]->. Controls how unmatched nodes are handled.

@@ -481,7 +593,18 @@

Pattern Syntax Reference

WHERE u.name = "Alice" AND c.name = "Google" SELECT u.name AS user, f.name AS friend, c.name AS company; -// 5. Persist +// 5. Update: Alice turned 31 +UPDATE User(0) SET age = 31; + +// 6. Bulk update: set all users older than 30 to "Senior" +UPDATE (u:User) SET u.name = "Senior" WHERE u.age > 30; + +// 7. Update with MATCH (traversal) — set employed flag for Google employees +UPDATE MATCH (u:User)-[:works_at]->(c:Company) + SET u.name = "Employed" + WHERE c.name = "Google"; + +// 8. Persist COMMIT; diff --git a/include/core.hpp b/include/core.hpp index 5305c44..030d090 100644 --- a/include/core.hpp +++ b/include/core.hpp @@ -1,19 +1,10 @@ #pragma once #include -#include #include #include -#include -#include -#include -#include -#include -#include #include -#include -#include #include #include #include @@ -21,661 +12,19 @@ #include "arrow_utils.hpp" #include "config.hpp" #include "edge_store.hpp" -#include "file_utils.hpp" -#include "json.hpp" #include "logger.hpp" #include "metadata.hpp" #include "node.hpp" #include "query.hpp" #include "query_execution.hpp" #include "schema.hpp" +#include "shard.hpp" +#include "snapshot_manager.hpp" #include "storage.hpp" #include "utils.hpp" namespace tundradb { -class Database; -class Node; -class Shard; -class ShardManager; -class MetadataManager; -class Storage; -class NodeManager; - -class SnapshotManager { - public: - explicit SnapshotManager(std::shared_ptr metadata_manager, - std::shared_ptr storage, - std::shared_ptr shard_manager, - std::shared_ptr edge_store, - std::shared_ptr node_manager, - std::shared_ptr schema_registry) - : metadata_manager_(std::move(metadata_manager)), - storage_(std::move(storage)), - shard_manager_(std::move(shard_manager)), - edge_store_(std::move(edge_store)), - node_manager_(std::move(node_manager)), - schema_registry_(std::move(schema_registry)) {} - - arrow::Result initialize(); - arrow::Result commit(); - Snapshot *current_snapshot(); - std::shared_ptr get_manifest(); - - private: - std::shared_ptr metadata_manager_; - std::shared_ptr storage_; - std::shared_ptr shard_manager_; - std::shared_ptr schema_registry_; - std::shared_ptr edge_store_; - std::shared_ptr node_manager_; - Metadata metadata_; - std::shared_ptr manifest_; - std::shared_ptr edge_metadata_; -}; - -class Shard { - private: - std::pmr::monotonic_buffer_resource memory_pool_; - std::pmr::unordered_map> nodes_; - std::set nodes_ids_; - std::atomic dirty_{false}; - std::shared_ptr table_; - std::shared_ptr schema_registry_; - int64_t updated_ts_ = now_millis(); - bool updated_ = true; // todo should be false when we read from snapshot and - // after commit - - public: - const int64_t id; // Unique shard identifier - const int64_t index; // index of the shard in the shard manager - int64_t min_id; // Minimum node ID in this shard - int64_t max_id; // Maximum node ID in this shard - const size_t capacity; // Maximum number of nodes - const size_t chunk_size; // Size of chunks for table creation - std::string schema_name; // Name of the schema this shard holds - - Shard(int64_t id, int64_t index, size_t capacity, int64_t min_id, - int64_t max_id, size_t chunk_size, const std::string &schema_name, - std::shared_ptr schema_registry, - size_t buffer_size = 10 * 1024 * 1024) - : id(id), - index(index), - capacity(capacity), - min_id(min_id), - max_id(max_id), - chunk_size(chunk_size), - memory_pool_(buffer_size), - nodes_(&memory_pool_), - schema_registry_(std::move(schema_registry)), - schema_name(schema_name) {} - - Shard(int64_t id, int64_t index, const DatabaseConfig &config, int64_t min_id, - int64_t max_id, std::string schema_name, - std::shared_ptr schema_registry) - : memory_pool_(config.get_shard_memory_pool_size()), - nodes_(&memory_pool_), - schema_registry_(std::move(schema_registry)), - id(id), - index(index), - min_id(min_id), - max_id(max_id), - capacity(config.get_shard_capacity()), - chunk_size(config.get_chunk_size()), - schema_name(std::move(schema_name)) {} - - ~Shard() { - nodes_.clear(); - nodes_ids_.clear(); - table_.reset(); - - // The memory_pool will be automatically destroyed - // The schema_registry is a shared_ptr, so it will be handled by reference - // counting - } - - bool is_updated() const { return updated_; } - - bool set_updated(bool v) { - updated_ = v; - return updated_; - } - - int64_t get_updated_ts() const { return updated_ts_; } - - std::string compound_id() const { - return this->schema_name + "-" + std::to_string(this->id); - } - - arrow::Result add(const std::shared_ptr &node) { - if (node->id < min_id || node->id > max_id) { - return arrow::Status::Invalid("Node id is out of range"); - } - if (nodes_.contains(node->id)) { - return arrow::Status::KeyError("Node already exists: ", node->id); - } - if (nodes_.size() >= capacity) { - return arrow::Status::KeyError("Shard is full"); - } - nodes_.insert(std::make_pair(node->id, node)); - nodes_ids_.insert(node->id); - dirty_ = true; - updated_ = true; - return true; - } - - arrow::Result extend(const std::shared_ptr &node) { - if (nodes_.contains(node->id)) { - return arrow::Status::KeyError("Node already exists: ", node->id); - } - if (nodes_.size() >= capacity) { - return arrow::Status::KeyError("Shard is full"); - } - - if (empty()) { - min_id = node->id; - max_id = node->id; - } else { - if (node->id < min_id) { - return arrow::Status::Invalid("Node id is below the minimum range"); - } - max_id = std::max(max_id, node->id); - } - - nodes_.insert(std::make_pair(node->id, node)); - nodes_ids_.insert(node->id); - - dirty_ = true; - updated_ = true; - updated_ts_ = now_millis(); - return true; - } - - arrow::Result> remove(int64_t id) { - const auto it = nodes_.find(id); - if (it == nodes_.end()) { - return arrow::Status::Invalid("Node not found: ", id); - } - auto node = it->second; - nodes_.erase(id); - nodes_ids_.erase(id); - dirty_ = true; - updated_ = true; - return node; - } - - arrow::Result> poll_first() { - if (nodes_ids_.empty()) { - return arrow::Status::Invalid("Shard is empty"); - } - auto first = nodes_ids_.begin(); - auto node_id = *first; - nodes_ids_.erase(first); - auto node = nodes_[node_id]; - nodes_.erase(node_id); - - if (!nodes_ids_.empty()) { - min_id = *nodes_ids_.begin(); - } - - dirty_ = true; - updated_ = true; - updated_ts_ = now_millis(); - return node; - } - - arrow::Result update(const int64_t node_id, - const std::shared_ptr field, - const Value &value, const UpdateType update_type) { - updated_ = true; - if (!nodes_.contains(node_id)) { - return arrow::Status::KeyError("Node not found: ", node_id); - } - dirty_ = true; - updated_ = true; - updated_ts_ = now_millis(); - return nodes_[node_id]->update(field, value, update_type); - } - - arrow::Result> get_table(TemporalContext *ctx) { - // if we have ctx we need to create a new table every time - if (dirty_ || !table_ || ctx) { - ARROW_ASSIGN_OR_RAISE(const auto schema, - schema_registry_->get(schema_name)); - auto arrow_schema = schema->arrow(); - - std::vector> result; - std::ranges::transform(nodes_, std::back_inserter(result), - [](const auto &pair) { return pair.second; }); - - std::ranges::sort( - result, [](const std::shared_ptr &a, - const std::shared_ptr &b) { return a->id < b->id; }); - - ARROW_ASSIGN_OR_RAISE(auto table_res, - create_table(schema, result, chunk_size, ctx)); - - if (!ctx) { - // Non-temporal query: cache the table for reuse - table_ = table_res; - dirty_ = false; - } - - // Return the newly created table (temporal or non-temporal) - return table_res; - } - - // Reuse cached table (only for non-temporal queries) - return table_; - } - - size_t size() const { return nodes_.size(); } - - bool has_space() const { return nodes_.size() < capacity; } - - bool empty() const { return nodes_.empty(); } - - std::vector> get_nodes() const { - std::vector> result; - result.reserve(nodes_.size()); - for (const auto &node : nodes_ | std::views::values) { - result.push_back(node); - } - return result; - } -}; - -class ShardManager { - private: - std::pmr::monotonic_buffer_resource memory_pool_; - std::pmr::unordered_map>> - shards_; - std::shared_ptr schema_registry_; - const size_t shard_capacity_; - const size_t chunk_size_; - const DatabaseConfig config_; - std::atomic id_counter_{ - 0}; // Global unique ID counter for all shards - std::unordered_map> - index_counters_; // Per-schema index/position counter - mutable std::mutex index_counter_mutex_; // todo use tbb map instead - - void create_new_shard(const std::shared_ptr &node) { - auto new_min_id = node->id; - auto new_max_id = node->id + shard_capacity_ - 1; - - int64_t shard_index; - { - std::lock_guard lock(index_counter_mutex_); - shard_index = index_counters_[node->schema_name]++; - } - - auto shard = std::make_shared(id_counter_.fetch_add(1), shard_index, - config_, new_min_id, new_max_id, - node->schema_name, schema_registry_); - auto result = shard->add(node); - if (!result.ok()) { - log_error("Error adding node to new shard: {}", - result.status().ToString()); - } - - shards_[node->schema_name].push_back(shard); - } - - public: - explicit ShardManager(std::shared_ptr schema_registry, - const DatabaseConfig &config) - : memory_pool_(config.get_manager_memory_pool_size()), - shards_(&memory_pool_), - schema_registry_(std::move(schema_registry)), - shard_capacity_(config.get_shard_capacity()), - chunk_size_(config.get_chunk_size()), - config_(config) {} - - void set_id_counter(const int64_t value) { id_counter_.store(value); } - int64_t get_id_counter() const { return id_counter_.load(); } - - void set_index_counter(const std::string &schema_name, const int64_t value) { - std::lock_guard lock(index_counter_mutex_); - index_counters_[schema_name].store(value); - } - - arrow::Result> get_shard( - const std::string &schema_name, const int64_t id) { - return shards_[schema_name][id]; - } - - int64_t get_index_counter(const std::string &schema_name) const { - std::lock_guard lock(index_counter_mutex_); - const auto it = index_counters_.find(schema_name); - return it != index_counters_.end() ? it->second.load() : 0; - } - - std::vector get_schema_names() const { - std::vector schema_names; - schema_names.reserve(shards_.size()); - for (const auto &schema_name : shards_ | std::views::keys) { - schema_names.push_back(schema_name); - } - return schema_names; - } - - arrow::Result>> get_shards( - const std::string &schema_name) const { - const auto it = shards_.find(schema_name); - if (it == shards_.end()) { - return arrow::Status::KeyError("Schema '", schema_name, - "' not found in shards"); - } - return it->second; - } - - arrow::Result is_shard_clean(std::string s, int64_t id) { - return !shards_[s][id]->is_updated(); - } - - arrow::Result compact(const std::string &schema_name) { - const auto it = shards_.find(schema_name); - if (it == shards_.end()) { - return arrow::Status::Invalid("Shard not found for the given schema: ", - schema_name); - } - - auto &shard_list = it->second; - if (shard_list.size() <= 1) { - // nothing to compact - return true; - } - - for (size_t i = 1; i < shard_list.size(); i++) { - const auto &prev = shard_list[i - 1]; - const auto &curr = shard_list[i]; - - while (prev->has_space() && !curr->empty()) { - auto node = curr->poll_first().ValueOrDie(); - prev->extend(node).ValueOrDie(); - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { - log_debug("node id: " + std::to_string(node->id) + - " moved from shard: " + std::to_string(i) + - " to shard: " + std::to_string(i - 1)); - log_debug("prev shard id: " + std::to_string(i - 1) + - " min_id=" + std::to_string(prev->min_id) + - " max_id=" + std::to_string(prev->max_id)); - - log_debug("curr shard id: " + std::to_string(i) + - " min_id=" + std::to_string(curr->min_id) + - " max_id=" + std::to_string(curr->max_id)); - } - } - } - - // second pass: remove empty shards - auto it_shard = shard_list.begin(); - while (it_shard != shard_list.end()) { - if ((*it_shard)->empty()) { - it_shard = shard_list.erase(it_shard); - } else { - ++it_shard; - } - } - - return true; - } - - // сompact all schemas in the database - arrow::Result compact_all() { - const std::vector schema_names = - schema_registry_->get_schema_names(); - bool success = true; - - for (const auto &schema_name : schema_names) { - if (auto result = compact(schema_name); !result.ok()) { - log_error("Error compacting schema '{}':{}", schema_name, - result.status().ToString()); - success = false; - } - } - - return success; - } - - arrow::Result insert_node(const std::shared_ptr &node) { - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { - log_debug("inserting node id " + std::to_string(node->id)); - } - const auto it = shards_.find(node->schema_name); - if (it == shards_.end()) { - shards_[node->schema_name] = std::vector>(); - create_new_shard(node); - return true; - } - - const auto &shard_list = it->second; - if (shard_list.empty()) { - create_new_shard(node); - return true; - } - - // first try to find shards that can directly add the node (ID is in range) - for (auto &shard : shard_list) { - if (node->id >= shard->min_id && node->id <= shard->max_id && - shard->has_space()) { - if (auto result = shard->add(node); result.ok()) { - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { - log_debug("node id: '" + std::to_string(node->id) + - "' inserted to shard id: " + std::to_string(shard->id)); - } - - return true; - } - // if there was an error, we'll try the next shard - } - } - - for (auto &shard : shard_list) { - if (shard->has_space()) { - if (node->id > shard->max_id) { - if (auto result = shard->extend(node); result.ok()) { - return true; - } - } - } - } - create_new_shard(node); - return true; - } - - arrow::Result> get_node(const std::string &schema_name, - int64_t node_id) { - const auto schema_it = shards_.find(schema_name); - if (schema_it == shards_.end()) { - return arrow::Status::KeyError("Schema '", schema_name, - "' not found in shards"); - } - - for (const auto &shard : schema_it->second) { - if (node_id >= shard->min_id && node_id <= shard->max_id) { - try { - if (auto node_result = shard->remove(node_id); node_result.ok()) { - return node_result.ValueOrDie(); - } - } catch (...) { - // node wasn't in this shard, continue to next shard - } - } - } - - return arrow::Status::KeyError("Node with id ", node_id, - " not found in schema '", schema_name, "'"); - } - - arrow::Result remove_node(const std::string &schema_name, - int64_t node_id) { - if (!shards_.contains(schema_name)) { - return arrow::Status::KeyError("Schema '", schema_name, - "' not found in shards"); - } - - for (const auto &shard : shards_[schema_name]) { - if (node_id >= shard->min_id && node_id <= shard->max_id) { - if (auto remove_result = shard->remove(node_id); remove_result.ok()) { - return true; - } - } - } - - return arrow::Status::KeyError("Node with id ", node_id, - " not found in schema '", schema_name, "'"); - } - - arrow::Result update_node(const std::string &schema_name, - const int64_t id, - const std::shared_ptr &field, - const Value &value, - const UpdateType update_type) { - auto schema_it = shards_.find(schema_name); - if (schema_it == shards_.end()) { - return arrow::Status::KeyError("Schema not found: ", schema_name); - } - - for (const auto &shard : schema_it->second) { - if (id >= shard->min_id && id <= shard->max_id) { - return shard->update(id, field, value, update_type); - } - } - - return arrow::Status::KeyError("Node with id ", id, " not found in schema ", - schema_name); - } - - arrow::Result update_node(const std::string &schema_name, - const int64_t id, - const std::string &field_name, - const Value &value, - const UpdateType update_type) { - auto schema_it = shards_.find(schema_name); - if (schema_it == shards_.end()) { - return arrow::Status::KeyError("Schema not found: ", schema_name, - " in shards"); - } - - auto field = - schema_registry_->get(schema_name).ValueOrDie()->get_field(field_name); - - for (const auto &shard : schema_it->second) { - if (id >= shard->min_id && id <= shard->max_id) { - return shard->update(id, field, value, update_type); - } - } - - return arrow::Status::KeyError("Node with id ", id, " not found in schema ", - schema_name); - } - - arrow::Result>> get_nodes( - const std::string &schema_name) { - const auto schema_it = shards_.find(schema_name); - if (schema_it == shards_.end()) { - return arrow::Status::KeyError("Schema '", schema_name, - "' not found in shards"); - } - - std::vector> result; - size_t total_estimated_nodes = 0; - for (const auto &shard : schema_it->second) { - total_estimated_nodes += shard->size(); - } - result.reserve(total_estimated_nodes); - - for (const auto &shard : schema_it->second) { - auto nodes = shard->get_nodes(); - result.insert(result.end(), nodes.begin(), nodes.end()); - } - - return result; - } - - arrow::Result>> get_tables( - const std::string &schema_name, TemporalContext *temporal_context) { - const auto schema_it = shards_.find(schema_name); - if (schema_it == shards_.end()) { - return std::vector>{}; - } - - std::vector> sorted_shards = schema_it->second; - - std::ranges::sort(sorted_shards, [](const std::shared_ptr &a, - const std::shared_ptr &b) { - return a->min_id < b->min_id; - }); - - std::vector> tables; - for (const auto &shard : sorted_shards) { - ARROW_ASSIGN_OR_RAISE(auto table, shard->get_table(temporal_context)); - if (table->num_rows() > 0) { - tables.push_back(table); - } - } - - return tables; - } - - bool has_shards(const std::string &schema_name) const { - const auto it = shards_.find(schema_name); - return it != shards_.end() && !it->second.empty(); - } - - arrow::Result get_shard_count(const std::string &schema_name) const { - if (!has_shards(schema_name)) { - return arrow::Status::Invalid("Schema '", schema_name, "' not found"); - } - return shards_.find(schema_name)->second.size(); - } - - arrow::Result> get_shard_sizes( - const std::string &schema_name) const { - if (!has_shards(schema_name)) { - return arrow::Status::Invalid("Schema '", schema_name, "' not found"); - } - std::vector sizes; - for (const auto &shard : shards_.find(schema_name)->second) { - sizes.push_back(shard->size()); - } - return sizes; - } - - arrow::Result>> get_shard_ranges( - const std::string &schema_name) const { - if (!has_shards(schema_name)) { - return arrow::Status::Invalid("Schema '", schema_name, "' not found"); - } - std::vector> ranges; - for (const auto &shard : shards_.find(schema_name)->second) { - ranges.emplace_back(shard->min_id, shard->max_id); - } - return ranges; - } - - arrow::Result add_shard(const std::shared_ptr &shard) { - if (!shard) { - return arrow::Status::Invalid("Cannot add null shard"); - } - - shards_[shard->schema_name].push_back(shard); - return true; - } - - arrow::Result reset_all_updated() { - log_debug("Resetting 'updated' flag for all shards"); - for (auto &schema_shards : shards_ | std::views::values) { - for (auto &shard : schema_shards) { - shard->set_updated(false); - } - } - return true; - } -}; - class Database { private: std::shared_ptr schema_registry_; @@ -757,7 +106,7 @@ class Database { arrow::Result> create_node( const std::string &schema_name, - std::unordered_map &data) { + const std::unordered_map &data) { if (schema_name.empty()) { return arrow::Status::Invalid("Schema name cannot be empty"); } @@ -785,6 +134,18 @@ class Database { update_type); } + /** + * @brief Batch-update multiple fields on one node (creates 1 version). + */ + arrow::Result update_node_fields( + const std::string &schema_name, const int64_t id, + const std::vector, Value>> + &field_updates, + const UpdateType update_type) { + return shard_manager_->update_node_fields(schema_name, id, field_updates, + update_type); + } + arrow::Result remove_node(const std::string &schema_name, int64_t node_id) { if (auto res = node_manager_->remove_node(schema_name, node_id); !res) { @@ -877,6 +238,51 @@ class Database { [[nodiscard]] arrow::Result> query( const Query &query) const; + + /** + * @brief Execute an UpdateQuery. + * + * Mode 1 — by ID (bare field names): + * db.update(UpdateQuery::on("User", 0).set("age", Value(31)).build()); + * + * Mode 2 — by MATCH query (alias-qualified SET, multi-schema): + * db.update(UpdateQuery::match( + * Query::from("u:User") + * .traverse("u", "WORKS_AT", "c:Company") + * .where("c.name", CompareOp::Eq, Value("Google")) + * .build() + * ).set("u.status", Value("employed")) + * .set("c.size", Value(int32_t(5001))) + * .build()); + */ + [[nodiscard]] arrow::Result update(const UpdateQuery &uq); + + private: + /** Mode 1: update a single node by schema + ID. */ + [[nodiscard]] arrow::Result update_by_id(const UpdateQuery &uq); + + /** Mode 2: find nodes via MATCH query, then batch-update each. */ + [[nodiscard]] arrow::Result update_by_match( + const UpdateQuery &uq); + + /** + * Apply field updates to every node whose ID appears in @p id_column. + * One call to update_node_fields() per unique node ID (1 version each). + */ + void apply_updates( + const std::string &schema_name, + const std::shared_ptr &id_column, + const std::vector, Value>> &fields, + UpdateType update_type, UpdateResult &result); + + /** + * Build an alias→schema mapping from a Query's FROM + TRAVERSE clauses. + * Only declarations ("alias:Schema") are recorded; bare references ("alias") + * are skipped. Returns an error if the same alias is bound to two different + * schemas. + */ + static arrow::Result> + resolve_alias_map(const Query &query); }; } // namespace tundradb diff --git a/include/node.hpp b/include/node.hpp index 757aa82..b764bb1 100644 --- a/include/node.hpp +++ b/include/node.hpp @@ -114,6 +114,36 @@ class Node { return true; } + /** + * @brief Batch-update multiple fields in a single version. + * + * When using the arena (versioned storage), this creates exactly ONE + * new version for all field updates instead of N versions. + */ + arrow::Result update_fields( + const std::vector, Value>> + &field_updates, + UpdateType update_type) { + if (field_updates.empty()) return true; + + if (arena_ != nullptr) { + return arena_->update_fields(*handle_, layout_, field_updates); + } + + // Non-arena fallback: update data_ map directly + for (const auto &[field, value] : field_updates) { + if (data_.find(field->name()) == data_.end()) { + return arrow::Status::KeyError("Field not found: ", field->name()); + } + switch (update_type) { + case SET: + data_[field->name()] = value; + break; + } + } + return true; + } + [[deprecated]] arrow::Result set_value(const std::string &field, const Value &value) { log_warn("set_value by string is deprecated"); diff --git a/include/query.hpp b/include/query.hpp index c4cc92f..18d4e69 100644 --- a/include/query.hpp +++ b/include/query.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include "node.hpp" @@ -630,6 +631,179 @@ class QueryResult { QueryExecutionStats stats_; }; +/** @brief A single field assignment: field_name = value. */ +struct SetAssignment { + std::string field_name; + Value value; + + SetAssignment(std::string field_name, Value value) + : field_name(std::move(field_name)), value(std::move(value)) {} +}; + +/** + * @brief Describes an UPDATE operation. + * + * Two modes: + * + * **Mode 1 — by ID** (no query engine involved, bare field names): + * @code + * UpdateQuery::on("User", 0).set("age", Value(31)).build(); + * @endcode + * + * **Mode 2 — by MATCH query** (alias-qualified SET fields): + * @code + * // Simple WHERE — update one schema: + * UpdateQuery::match( + * Query::from("u:User") + * .where("u.city", CompareOp::Eq, Value("NYC")) + * .build() + * ).set("u.status", Value("active")).build(); + * + * // Traversal — update multiple schemas: + * UpdateQuery::match( + * Query::from("u:User") + * .traverse("u", "WORKS_AT", "c:Company") + * .where("c.name", CompareOp::Eq, Value("Google")) + * .build() + * ).set("u.status", Value("employed")) + * .set("c.size", Value(int32_t(5001))) + * .build(); + * @endcode + * + * In Mode 2, each SET field must be alias-qualified ("alias.field"). + * The target aliases are derived from the SET assignments automatically. + */ +class UpdateQuery { + public: + class Builder; + + /** @brief Schema name (Mode 1 only). */ + [[nodiscard]] const std::string& schema() const { return schema_; } + + [[nodiscard]] const std::vector& assignments() const { + return assignments_; + } + + /** @brief Node ID for Mode 1 (direct update). */ + [[nodiscard]] const std::optional& node_id() const { + return node_id_; + } + + /** @brief The MATCH query for Mode 2 (query-based update). */ + [[nodiscard]] const std::optional& match_query() const { + return match_query_; + } + + /** @brief True if this is a Mode 2 (query-based) update. */ + [[nodiscard]] bool has_match() const { return match_query_.has_value(); } + + [[nodiscard]] UpdateType update_type() const { return update_type_; } + + /** + * @brief Extract unique aliases referenced in SET assignments. + * + * For Mode 2 only. Parses each "alias.field" to collect the set of + * distinct alias prefixes (e.g. {"u", "c"}). + */ + [[nodiscard]] std::vector target_aliases() const { + std::vector aliases; + for (const auto& a : assignments_) { + auto dot = a.field_name.find('.'); + if (dot == std::string::npos) continue; + std::string alias = a.field_name.substr(0, dot); + if (std::find(aliases.begin(), aliases.end(), alias) == aliases.end()) { + aliases.push_back(alias); + } + } + return aliases; + } + + /** @brief Mode 1 — target a specific node by schema + ID. */ + static Builder on(const std::string& schema, int64_t node_id) { + return {schema, node_id}; + } + + /** @brief Mode 2 — target nodes found by a MATCH query. */ + static Builder match(Query query) { return Builder{std::move(query)}; } + + class Builder { + public: + /** @brief Mode 1 constructor: update a specific node by ID. */ + Builder(std::string schema, int64_t node_id) + : schema_(std::move(schema)), node_id_(node_id) {} + + /** @brief Mode 2 constructor: update nodes found by a MATCH query. */ + explicit Builder(Query query) : match_query_(std::move(query)) {} + + /** + * @brief Add a field assignment. + * + * - Mode 1: bare name — set("age", Value(31)) + * - Mode 2: qualified — set("u.age", Value(31)) + */ + Builder& set(std::string field_name, Value value) { + assignments_.emplace_back(std::move(field_name), std::move(value)); + return *this; + } + + /** @brief Override the update type (default: SET). */ + Builder& type(UpdateType t) { + update_type_ = t; + return *this; + } + + /** @brief Build the immutable UpdateQuery (rvalue). */ + [[nodiscard]] UpdateQuery build() && { + if (assignments_.empty()) { + throw std::runtime_error( + "UpdateQuery must have at least one SET assignment"); + } + return UpdateQuery(std::move(schema_), std::move(assignments_), node_id_, + std::move(match_query_), update_type_); + } + + /** @brief Build the immutable UpdateQuery (lvalue). */ + [[nodiscard]] UpdateQuery build() & { + if (assignments_.empty()) { + throw std::runtime_error( + "UpdateQuery must have at least one SET assignment"); + } + return UpdateQuery(schema_, assignments_, node_id_, match_query_, + update_type_); + } + + private: + std::string schema_; + std::vector assignments_; + std::optional node_id_; + std::optional match_query_; + UpdateType update_type_ = UpdateType::SET; + }; + + private: + UpdateQuery(std::string schema, std::vector assignments, + std::optional node_id, std::optional match_query, + UpdateType update_type) + : schema_(std::move(schema)), + assignments_(std::move(assignments)), + node_id_(std::move(node_id)), + match_query_(std::move(match_query)), + update_type_(update_type) {} + + std::string schema_; + std::vector assignments_; + std::optional node_id_; + std::optional match_query_; + UpdateType update_type_; +}; + +/** @brief Result of an update operation. */ +struct UpdateResult { + int64_t updated_count = 0; ///< Number of nodes updated. + int64_t failed_count = 0; ///< Number of nodes that failed to update. + std::vector errors; ///< Error messages for failed updates. +}; + } // namespace tundradb #endif // QUERY_HPP diff --git a/include/shard.hpp b/include/shard.hpp new file mode 100644 index 0000000..6309c6d --- /dev/null +++ b/include/shard.hpp @@ -0,0 +1,170 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "config.hpp" +#include "node.hpp" +#include "schema.hpp" + +namespace tundradb { + +class TemporalContext; + +// ========================================================================= +// Shard — a contiguous range of nodes for one schema +// ========================================================================= + +class Shard { + private: + std::pmr::monotonic_buffer_resource memory_pool_; + std::pmr::unordered_map> nodes_; + std::set nodes_ids_; + std::atomic dirty_{false}; + std::shared_ptr table_; + std::shared_ptr schema_registry_; + int64_t updated_ts_; + bool updated_ = true; + + public: + const int64_t id; + const int64_t index; + int64_t min_id; + int64_t max_id; + const size_t capacity; + const size_t chunk_size; + std::string schema_name; + + Shard(int64_t id, int64_t index, size_t capacity, int64_t min_id, + int64_t max_id, size_t chunk_size, std::string schema_name, + std::shared_ptr schema_registry, + size_t buffer_size = 10 * 1024 * 1024); + + Shard(int64_t id, int64_t index, const DatabaseConfig &config, int64_t min_id, + int64_t max_id, std::string schema_name, + std::shared_ptr schema_registry); + + ~Shard(); + + [[nodiscard]] bool is_updated() const; + bool set_updated(bool v); + [[nodiscard]] int64_t get_updated_ts() const; + [[nodiscard]] std::string compound_id() const; + + arrow::Result add(const std::shared_ptr &node); + arrow::Result extend(const std::shared_ptr &node); + arrow::Result> remove(int64_t id); + arrow::Result> poll_first(); + + arrow::Result update(int64_t node_id, std::shared_ptr field, + const Value &value, UpdateType update_type); + + arrow::Result update_fields( + int64_t node_id, + const std::vector, Value>> + &field_updates, + UpdateType update_type); + + arrow::Result> get_table(TemporalContext *ctx); + + [[nodiscard]] size_t size() const; + [[nodiscard]] bool has_space() const; + [[nodiscard]] bool empty() const; + [[nodiscard]] std::vector> get_nodes() const; +}; + +// ========================================================================= +// ShardManager — manages per-schema shard collections +// ========================================================================= + +class ShardManager { + private: + std::pmr::monotonic_buffer_resource memory_pool_; + std::pmr::unordered_map>> + shards_; + std::shared_ptr schema_registry_; + const size_t shard_capacity_; + const size_t chunk_size_; + const DatabaseConfig config_; + std::atomic id_counter_{0}; + std::unordered_map> index_counters_; + mutable std::mutex index_counter_mutex_; + + void create_new_shard(const std::shared_ptr &node); + + public: + explicit ShardManager(std::shared_ptr schema_registry, + const DatabaseConfig &config); + + void set_id_counter(int64_t value); + [[nodiscard]] int64_t get_id_counter() const; + void set_index_counter(const std::string &schema_name, int64_t value); + [[nodiscard]] int64_t get_index_counter(const std::string &schema_name) const; + + arrow::Result> get_shard( + const std::string &schema_name, int64_t id); + + [[nodiscard]] std::vector get_schema_names() const; + + [[nodiscard]] arrow::Result>> get_shards( + const std::string &schema_name) const; + + arrow::Result is_shard_clean(std::string s, int64_t id); + + arrow::Result compact(const std::string &schema_name); + arrow::Result compact_all(); + + arrow::Result insert_node(const std::shared_ptr &node); + + arrow::Result> get_node(const std::string &schema_name, + int64_t node_id); + + arrow::Result remove_node(const std::string &schema_name, + int64_t node_id); + + arrow::Result update_node(const std::string &schema_name, int64_t id, + const std::shared_ptr &field, + const Value &value, UpdateType update_type); + + arrow::Result update_node(const std::string &schema_name, int64_t id, + const std::string &field_name, + const Value &value, UpdateType update_type); + + arrow::Result update_node_fields( + const std::string &schema_name, int64_t id, + const std::vector, Value>> + &field_updates, + UpdateType update_type); + + arrow::Result>> get_nodes( + const std::string &schema_name); + + arrow::Result>> get_tables( + const std::string &schema_name, TemporalContext *temporal_context); + + [[nodiscard]] bool has_shards(const std::string &schema_name) const; + + [[nodiscard]] arrow::Result get_shard_count( + const std::string &schema_name) const; + + [[nodiscard]] arrow::Result> get_shard_sizes( + const std::string &schema_name) const; + + [[nodiscard]] arrow::Result>> + get_shard_ranges(const std::string &schema_name) const; + + arrow::Result add_shard(const std::shared_ptr &shard); + arrow::Result reset_all_updated(); +}; + +} // namespace tundradb diff --git a/include/snapshot_manager.hpp b/include/snapshot_manager.hpp new file mode 100644 index 0000000..5a10edf --- /dev/null +++ b/include/snapshot_manager.hpp @@ -0,0 +1,44 @@ +#pragma once + +#include + +#include + +#include "edge_store.hpp" +#include "metadata.hpp" +#include "schema.hpp" + +namespace tundradb { + +// Forward declarations +class ShardManager; +class Storage; +class NodeManager; + +class SnapshotManager { + public: + explicit SnapshotManager(std::shared_ptr metadata_manager, + std::shared_ptr storage, + std::shared_ptr shard_manager, + std::shared_ptr edge_store, + std::shared_ptr node_manager, + std::shared_ptr schema_registry); + + arrow::Result initialize(); + arrow::Result commit(); + Snapshot *current_snapshot(); + std::shared_ptr get_manifest(); + + private: + std::shared_ptr metadata_manager_; + std::shared_ptr storage_; + std::shared_ptr shard_manager_; + std::shared_ptr schema_registry_; + std::shared_ptr edge_store_; + std::shared_ptr node_manager_; + Metadata metadata_; + std::shared_ptr manifest_; + std::shared_ptr edge_metadata_; +}; + +} // namespace tundradb diff --git a/include/storage.hpp b/include/storage.hpp index ff6e42e..3114faf 100644 --- a/include/storage.hpp +++ b/include/storage.hpp @@ -13,6 +13,7 @@ namespace tundradb { class SchemaRegistry; +class NodeManager; class Shard; class Storage { @@ -48,7 +49,4 @@ class Storage { } // namespace tundradb -// Include core.hpp after our declarations to prevent circular dependencies -#include "core.hpp" - #endif // STORAGE_HPP \ No newline at end of file diff --git a/src/core.cpp b/src/core.cpp index 9f8b3e2..f57e240 100644 --- a/src/core.cpp +++ b/src/core.cpp @@ -848,4 +848,197 @@ arrow::Result> Database::query( return result; } +// --------------------------------------------------------------------------- +// Database::update - dispatch to Mode 1 or Mode 2 +// --------------------------------------------------------------------------- +arrow::Result Database::update(const UpdateQuery& uq) { + if (uq.node_id().has_value()) { + return update_by_id(uq); + } + if (uq.has_match()) { + return update_by_match(uq); + } + return arrow::Status::Invalid( + "UpdateQuery must specify a node ID or a MATCH query"); +} + +// --------------------------------------------------------------------------- +// Mode 1: update a single node by schema + ID +// --------------------------------------------------------------------------- +arrow::Result Database::update_by_id(const UpdateQuery& uq) { + UpdateResult result; + + auto schema_result = schema_registry_->get(uq.schema()); + if (!schema_result.ok()) { + return arrow::Status::KeyError("Schema '", uq.schema(), "' not found"); + } + const auto& schema = schema_result.ValueOrDie(); + + // Resolve fields upfront - fail early on bad field names + std::vector, Value>> resolved; + resolved.reserve(uq.assignments().size()); + for (const auto& a : uq.assignments()) { + auto field = schema->get_field(a.field_name); + if (!field) { + return arrow::Status::Invalid( + "Field '", a.field_name, "' not found in schema '", uq.schema(), "'"); + } + resolved.emplace_back(field, a.value); + } + + const int64_t id = uq.node_id().value(); + if (const auto r = + update_node_fields(uq.schema(), id, resolved, uq.update_type()); + !r.ok()) { + result.failed_count++; + result.errors.push_back(uq.schema() + "(" + std::to_string(id) + + "): " + r.status().ToString()); + } else { + result.updated_count = 1; + } + return result; +} + +// --------------------------------------------------------------------------- +// Mode 2: find nodes via MATCH query, then batch-update each +// --------------------------------------------------------------------------- +arrow::Result Database::update_by_match(const UpdateQuery& uq) { + UpdateResult result; + const auto& match_query = uq.match_query().value(); + + // 1. Resolve alias -> schema mapping (declarations only, with validation) + ARROW_ASSIGN_OR_RAISE(auto alias_to_schema, resolve_alias_map(match_query)); + + // 2. Group SET assignments by alias: { alias -> (schema, [(Field,Value)]) } + struct AliasUpdate { + std::string schema_name; + std::vector, Value>> fields; + }; + std::unordered_map grouped; + + for (const auto& a : uq.assignments()) { + auto dot = a.field_name.find('.'); + if (dot == std::string::npos) { + return arrow::Status::Invalid( + "SET field '", a.field_name, + "' must be alias-qualified (e.g. u.age) in a MATCH-based update"); + } + std::string alias = a.field_name.substr(0, dot); + std::string bare_field = a.field_name.substr(dot + 1); + + auto it = alias_to_schema.find(alias); + if (it == alias_to_schema.end()) { + return arrow::Status::Invalid("Alias '", alias, + "' not found in MATCH query"); + } + + auto schema_result = schema_registry_->get(it->second); + if (!schema_result.ok()) { + return arrow::Status::KeyError("Schema '", it->second, "' not found"); + } + const auto& schema = schema_result.ValueOrDie(); + auto field = schema->get_field(bare_field); + if (!field) { + return arrow::Status::Invalid("Field '", bare_field, + "' not found in schema '", it->second, "'"); + } + + auto& entry = grouped[alias]; + if (entry.schema_name.empty()) entry.schema_name = it->second; + entry.fields.emplace_back(field, a.value); + } + + // 3. Build ID-only SELECT: we only need "u.id", "c.id", etc. + std::vector id_columns; + id_columns.reserve(grouped.size()); + for (const auto& alias : grouped | std::views::keys) { + id_columns.push_back(alias + ".id"); + } + Query id_query(match_query.from(), match_query.clauses(), + std::make_shared