Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion src/binder/bind/bind_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ BoundCreateTableInfo Binder::bindCreateRelTableGroupInfo(const CreateTableInfo*
// Handle special case where icebug-disk storage could contain a dot
// Otherwise, treat as file path (e.g., "dataset/demo-db/icebug-disk/demo" or
// "data.parquet")
if (storageFormat != StorageFormat::ICEBUG_DISK && dotPos != std::string::npos) {
if (storageFormat == StorageFormat::NONE && dotPos != std::string::npos) {
std::string dbName = storage.substr(0, dotPos);
std::string tableName = storage.substr(dotPos + 1);
if (!dbName.empty()) {
Expand Down Expand Up @@ -365,6 +365,25 @@ BoundCreateTableInfo Binder::bindCreateRelTableGroupInfo(const CreateTableInfo*
"Cannot mix icebug-disk tables with non-icebug-disk tables in CREATE REL TABLE.");
}

bool isSrcExternal = srcEntry->getType() == CatalogEntryType::NODE_TABLE_ENTRY ?
srcEntry->ptrCast<NodeTableCatalogEntry>()->getStorageFormat() ==
StorageFormat::EXTERNAL :
false;
bool isDstExternal = dstEntry->getType() == CatalogEntryType::NODE_TABLE_ENTRY ?
dstEntry->ptrCast<NodeTableCatalogEntry>()->getStorageFormat() ==
StorageFormat::EXTERNAL :
false;
bool isRelExternal = (storageFormat == StorageFormat::EXTERNAL);

// External rel tables must connect external node tables, and non-external rel tables
// cannot connect external node tables.
if ((!isRelExternal && (isSrcExternal || isDstExternal)) ||
(isRelExternal && (!isSrcExternal || !isDstExternal))) {
throw BinderException(
"Cannot mix external tables with non-external tables in CREATE REL TABLE. "
"External rel tables must connect external node tables.");
}

// Use the actual shadow table IDs, not FOREIGN_TABLE_ID
// The shadow tables allow the query planner to distinguish between different node tables
auto srcTableID = srcEntry->getTableID();
Expand Down
7 changes: 5 additions & 2 deletions src/common/enums/storage_format.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ StorageFormat StorageFormatUtils::fromString(const std::string& str) {
if (str == "icebug-disk") {
return StorageFormat::ICEBUG_DISK;
}
throw BinderException(
std::format("Unsupported storage format '{}'. Valid options are: icebug-disk.", str));
if (str == "external") {
return StorageFormat::EXTERNAL;
}
throw BinderException(std::format(
"Unsupported storage format '{}'. Valid options are: icebug-disk, external.", str));
}

} // namespace common
Expand Down
3 changes: 3 additions & 0 deletions src/extension/extension_entries.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ static constexpr std::array vectorExtensionFunctions = {"QUERY_VECTOR_INDEX", "C
"DROP_VECTOR_INDEX"};
static constexpr std::array llmExtensionFunctions = {"CREATE_EMBEDDING"};
static constexpr std::array neo4jExtensionFunctions = {"NEO4J_MIGRATE"};
static constexpr std::array lanceExtensionFunctions = {"LANCE_VECTOR_SEARCH", "LANCE_FTS",
"LANCE_HYBRID_SEARCH"};
static constexpr std::array algoExtensionFunctions = {"K_CORE_DECOMPOSITION", "PAGE_RANK",
"STRONGLY_CONNECTED_COMPONENTS_KOSARAJU", "STRONGLY_CONNECTED_COMPONENTS",
"WEAKLY_CONNECTED_COMPONENTS"};
Expand All @@ -40,6 +42,7 @@ static constexpr EntriesForExtension functionsForExtensionsRaw[] = {
{"LLM", llmExtensionFunctions, llmExtensionFunctions.size()},
{"NEO4J", neo4jExtensionFunctions, neo4jExtensionFunctions.size()},
{"ALGO", algoExtensionFunctions, algoExtensionFunctions.size()},
{"LANCE", lanceExtensionFunctions, lanceExtensionFunctions.size()},
};
static constexpr std::array functionsForExtensions = std::to_array(functionsForExtensionsRaw);

Expand Down
2 changes: 2 additions & 0 deletions src/graph/on_disk_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ OnDiskGraphNbrScanState::OnDiskGraphNbrScanState(ClientContext* context,
if (dynamic_cast<IceDiskRelTable*>(table) != nullptr) {
scanState = std::make_unique<IceDiskRelTableScanState>(*mm, srcNodeIDVector.get(),
outVectors, state);
} else if (auto ext = table->createScanState(srcNodeIDVector.get(), outVectors, mm)) {
scanState = std::move(ext);
} else {
scanState = std::make_unique<RelTableScanState>(*MemoryManager::Get(*context),
srcNodeIDVector.get(), outVectors, dstNodeIDVector->state, randomLookup);
Expand Down
15 changes: 15 additions & 0 deletions src/include/common/arrow/arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ struct ArrowArray {
void* private_data;
};

// Arrow C Stream Interface
// https://arrow.apache.org/docs/format/CStreamInterface.html
#ifndef ARROW_C_STREAM_INTERFACE
#define ARROW_C_STREAM_INTERFACE

struct ArrowArrayStream {
int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
const char* (*get_last_error)(struct ArrowArrayStream*);
void (*release)(struct ArrowArrayStream*);
void* private_data;
};

#endif // ARROW_C_STREAM_INTERFACE

#endif // ARROW_C_DATA_INTERFACE

#ifdef __cplusplus
Expand Down
8 changes: 7 additions & 1 deletion src/include/common/enums/storage_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@
namespace lbug {
namespace common {

enum class StorageFormat : uint8_t { NONE, ICEBUG_DISK };
enum class StorageFormat : uint8_t {
NONE = 0,
// first class citizens
ICEBUG_DISK = 1,
// external formats
EXTERNAL = 20
};

struct StorageFormatUtils {
static StorageFormat fromString(const std::string& str);
Expand Down
2 changes: 1 addition & 1 deletion src/include/extension/extension.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ struct LBUG_API ExtensionUtils {

static constexpr const char* OFFICIAL_EXTENSION[] = {"ADBC", "HTTPFS", "POSTGRES", "DUCKDB",
"JSON", "SQLITE", "FTS", "DELTA", "ICEBERG", "AZURE", "UNITY_CATALOG", "VECTOR", "NEO4J",
"ALGO", "LLM"};
"ALGO", "LLM", "LANCE"};

static constexpr const char* EXTENSION_LOADER_SUFFIX = "_loader";

Expand Down
27 changes: 27 additions & 0 deletions src/include/storage/storage_manager.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#pragma once

#include <functional>
#include <mutex>
#include <shared_mutex>
#include <unordered_map>

#include "common/enums/storage_format.h"
#include "shadow_file.h"
#include "storage/index/index.h"
#include "storage/stats/planner_stats.h"
Expand Down Expand Up @@ -33,6 +36,15 @@ class RelTable;
class DiskArrayCollection;
struct DatabaseHeader;

/// Factory signature for extension-provided node tables (e.g. LanceNodeTable).
using NodeTableFactory = std::function<std::unique_ptr<Table>(const StorageManager*,
const catalog::NodeTableCatalogEntry*, MemoryManager*, main::ClientContext*)>;

/// Factory signature for extension-provided rel tables (e.g. LanceRelTable).
using RelTableFactory =
std::function<std::unique_ptr<Table>(catalog::RelGroupCatalogEntry*, common::table_id_t,
common::table_id_t, const StorageManager*, MemoryManager*, main::ClientContext*)>;

class LBUG_API StorageManager {
public:
StorageManager(const std::string& databasePath, bool readOnly, bool enableChecksums,
Expand Down Expand Up @@ -83,6 +95,16 @@ class LBUG_API StorageManager {
std::optional<std::reference_wrapper<const IndexType>> getIndexType(
const std::string& typeName) const;

/// Register factories for an extension-defined storage format (e.g. LANCE).
/// The extension must call this during its load() function before any
/// CREATE TABLE with that format is attempted.
void registerStorageFormatHandler(common::StorageFormat format, NodeTableFactory nodeFactory,
RelTableFactory relFactory) {
std::unique_lock lck{formatFactoryMtx};
nodeTableFactories[format] = std::move(nodeFactory);
relTableFactories[format] = std::move(relFactory);
}

void serialize(const catalog::Catalog& catalog, common::Serializer& ser);
void serialize(const catalog::Catalog& catalog, const transaction::Transaction& snapshotTxn,
common::Serializer& ser);
Expand Down Expand Up @@ -132,6 +154,11 @@ class LBUG_API StorageManager {
std::unordered_map<common::table_id_t, PlannerTableStats> plannerStatsCache;
std::unordered_map<common::table_id_t, std::string> tableNameCache;
common::VirtualFileSystem* vfs_; // non-owning, owned by Database

// Extension-provided storage format factories (protected by formatFactoryMtx)
mutable std::mutex formatFactoryMtx;
std::unordered_map<common::StorageFormat, NodeTableFactory> nodeTableFactories;
std::unordered_map<common::StorageFormat, RelTableFactory> relTableFactories;
};

} // namespace storage
Expand Down
9 changes: 7 additions & 2 deletions src/include/storage/table/arrow_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,19 @@ class ArrowNodeTable final : public ColumnarNodeTableBase {
bool isVisibleNoLock(const transaction::Transaction* transaction,
common::offset_t offset) const override;

// Virtual dispatch methods for scan_node_table.cpp extensibility
bool requiresExplicitScanInit() const override { return true; }
bool usesMorselScan() const override { return true; }
size_t getNumScanMorsels(const transaction::Transaction* transaction) const override;
// Note: createScanState() left at base default; ArrowNodeTable scan state is
// created via the existing dynamic_cast path in createNodeTableScanState().

const ArrowSchemaWrapper& getArrowSchema() const { return schema; }
const std::vector<ArrowArrayWrapper>& getArrowArrays() const { return arrays; }

common::node_group_idx_t getNumBatches(
const transaction::Transaction* transaction) const override;

size_t getNumScanMorsels(const transaction::Transaction* transaction) const;

const catalog::NodeTableCatalogEntry* getCatalogEntry() const { return nodeTableCatalogEntry; }

protected:
Expand Down
5 changes: 5 additions & 0 deletions src/include/storage/table/ice_disk_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ class IceDiskNodeTable final : public ColumnarNodeTableBase {
bool isVisibleNoLock(const transaction::Transaction* transaction,
common::offset_t offset) const override;

// Virtual dispatch for scan_node_table.cpp extensibility
bool requiresExplicitScanInit() const override { return true; }
// IceDisk uses nodeGroupIdx-based scanning, not morsel-based.
// usesMorselScan() and getNumScanMorsels() are left at NodeTable defaults (false/0).

const std::string& getParquetFilePath() const { return parquetFilePath; }

protected:
Expand Down
30 changes: 30 additions & 0 deletions src/include/storage/table/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,36 @@ class LBUG_API NodeTable : public Table {
virtual void initializeScanCoordination(
[[maybe_unused]] const transaction::Transaction* transaction) {}

// Virtual dispatch methods for scan_node_table.cpp extensibility.
// These allow extension-defined table types (e.g. LanceNodeTable) to plug
// into the core scan infrastructure without requiring dynamic_casts to
// concrete types that live in extension libraries.

/// Returns true if this table type requires explicit initScanState() calls
/// in initCurrentTable() (e.g. Arrow, Lance, IceDisk columnar tables).
virtual bool requiresExplicitScanInit() const { return false; }

/// Returns true if this table drives scanning via
/// ColumnarNodeTableScanSharedState::getNextMorsel() rather than via nodeGroupIdx assignment
/// (e.g. Arrow, Lance).
virtual bool usesMorselScan() const { return false; }

/// Returns the number of scan morsels for progress tracking.
/// Returns 0 for tables that use the IceDisk row-group path.
virtual size_t getNumScanMorsels(
[[maybe_unused]] const transaction::Transaction* transaction) const {
return 0;
}

/// Creates a format-specific TableScanState. Returns nullptr to fall back to
/// the built-in dispatch (ArrowNodeTable / IceDiskNodeTable / default).
virtual std::unique_ptr<TableScanState> createScanState(
[[maybe_unused]] common::ValueVector* nodeIDVector,
[[maybe_unused]] const std::vector<common::ValueVector*>& outVectors,
[[maybe_unused]] MemoryManager* memoryManager) const {
return nullptr;
}

bool scanInternal(transaction::Transaction* transaction, TableScanState& scanState) override;
template<bool lock = true>
bool lookup(const transaction::Transaction* transaction, const TableScanState& scanState) const;
Expand Down
5 changes: 5 additions & 0 deletions src/include/storage/table/rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ class LBUG_API RelTable : public Table {
common::table_id_t toTableID, const StorageManager* storageManager,
MemoryManager* memoryManager);

virtual std::unique_ptr<RelTableScanState> createScanState(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& outVectors, MemoryManager* memoryManager) const {
return nullptr;
}

common::table_id_t getFromNodeTableID() const { return fromNodeTableID; }
common::table_id_t getToNodeTableID() const { return toNodeTableID; }

Expand Down
18 changes: 12 additions & 6 deletions src/processor/operator/scan/scan_multi_rel_tables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "storage/local_storage/local_storage.h"
#include "storage/table/arrow_rel_table.h"
#include "storage/table/ice_disk_rel_table.h"
#include "storage/table/rel_table.h"

using namespace lbug::common;
using namespace lbug::storage;
Expand Down Expand Up @@ -67,27 +68,32 @@ void ScanMultiRelTable::initLocalStateInternal(ResultSet* resultSet, ExecutionCo
auto nbrNodeIDVector = outVectors[0];

// Check if any table in any scanner is an external rel table with a custom scan state.
// First, try the extension scan state mechanism (e.g. Lance rel tables).
std::unique_ptr<storage::RelTableScanState> extensionScanState;
bool hasArrowTable = false;
bool hasIceDiskTable = false;
for (auto& [_, scanner] : scanners) {
for (auto& relInfo : scanner.relInfos) {
if (!extensionScanState) {
if (auto* relTable = dynamic_cast<storage::RelTable*>(relInfo.table)) {
extensionScanState = relTable->createScanState(boundNodeIDVector, outVectors,
MemoryManager::Get(*clientContext));
}
}
if (dynamic_cast<storage::ArrowRelTable*>(relInfo.table) != nullptr) {
hasArrowTable = true;
break;
}
if (dynamic_cast<storage::IceDiskRelTable*>(relInfo.table) != nullptr) {
hasIceDiskTable = true;
break;
}
}
if (hasArrowTable || hasIceDiskTable) {
break;
}
}

// IceDisk scan state extends the common rel scan state and Arrow stores its per-table state
// there, so one scan state can now cover IceDisk, Arrow, and native rel tables.
if (hasIceDiskTable) {
if (extensionScanState) {
scanState = std::move(extensionScanState);
} else if (hasIceDiskTable) {
scanState =
std::make_unique<storage::IceDiskRelTableScanState>(*MemoryManager::Get(*clientContext),
boundNodeIDVector, outVectors, nbrNodeIDVector->state);
Expand Down
52 changes: 33 additions & 19 deletions src/processor/operator/scan/scan_node_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "storage/local_storage/local_node_table.h"
#include "storage/local_storage/local_storage.h"
#include "storage/table/arrow_node_table.h"
#include "storage/table/columnar_node_table_base.h"
#include "storage/table/ice_disk_node_table.h"

using namespace lbug::common;
Expand All @@ -17,6 +18,11 @@ namespace processor {
static std::unique_ptr<TableScanState> createNodeTableScanState(NodeTable* table,
ValueVector* nodeIDVector, const std::vector<ValueVector*>& outVectors,
MemoryManager* memoryManager) {
// Allow extension-defined table types to supply their own scan state via virtual factory.
auto extensionState = table->createScanState(nodeIDVector, outVectors, memoryManager);
if (extensionState) {
return extensionState;
}
if (dynamic_cast<IceDiskNodeTable*>(table) != nullptr) {
return std::make_unique<IceDiskNodeTableScanState>(*memoryManager, nodeIDVector, outVectors,
nodeIDVector->state);
Expand Down Expand Up @@ -69,12 +75,15 @@ void ScanNodeTableSharedState::initialize(const transaction::Transaction* transa
} catch (const std::exception& e) {
this->numCommittedNodeGroups = 1;
}
} else if (const auto arrowTable = dynamic_cast<ArrowNodeTable*>(table)) {
// For Arrow tables, set numCommittedNodeGroups to number of morsels
this->numCommittedNodeGroups =
static_cast<common::node_group_idx_t>(arrowTable->getNumScanMorsels(transaction));
} else {
this->numCommittedNodeGroups = table->getNumCommittedNodeGroups();
// For morsel-based columnar tables (Arrow, Lance, etc.) getNumScanMorsels() returns
// the morsel count; for regular NodeTable it returns 0 → fall back to node groups.
auto morselCount = table->getNumScanMorsels(transaction);
if (morselCount > 0) {
this->numCommittedNodeGroups = static_cast<common::node_group_idx_t>(morselCount);
} else {
this->numCommittedNodeGroups = table->getNumCommittedNodeGroups();
}
}
if (transaction->isWriteTransaction()) {
if (const auto localTable =
Expand All @@ -90,18 +99,22 @@ void ScanNodeTableSharedState::nextMorsel(TableScanState& scanState,
ScanNodeTableProgressSharedState& progressSharedState) {
std::unique_lock lck{mtx};

// ColumnarNodeTables handle morsel assignment internally
// Morsel-based columnar tables (Arrow, Lance, …) dispatch through
// ColumnarNodeTableBase::getTableScanSharedState() / usesMorselScan().
// TODO: icebug-disk tables https://github.com/LadybugDB/ladybug/issues/245
if (const auto arrowTable = dynamic_cast<ArrowNodeTable*>(this->table)) {
const auto tableSharedState = arrowTable->getTableScanSharedState();
if (tableSharedState->getNextMorsel(static_cast<ColumnarNodeTableScanState*>(&scanState))) {
scanState.source = TableScanSource::COMMITTED;
progressSharedState.numMorselsScanned++;
} else {
scanState.source = TableScanSource::NONE;
if (this->table->usesMorselScan()) {
const auto columnarTable = dynamic_cast<ColumnarNodeTableBase*>(this->table);
if (columnarTable) {
const auto tableSharedState = columnarTable->getTableScanSharedState();
if (tableSharedState->getNextMorsel(
static_cast<ColumnarNodeTableScanState*>(&scanState))) {
scanState.source = TableScanSource::COMMITTED;
progressSharedState.numMorselsScanned++;
} else {
scanState.source = TableScanSource::NONE;
}
return;
}

return;
}

auto& nodeScanState = scanState.cast<NodeTableScanState>();
Expand Down Expand Up @@ -149,11 +162,12 @@ void ScanNodeTable::initCurrentTable(ExecutionContext* context) {
outVectors, MemoryManager::Get(*context->clientContext));
currentInfo.initScanState(*scanState, outVectors, context->clientContext);
scanState->semiMask = sharedStates[currentTableIdx]->getSemiMask();
// Call table->initScanState for IceDiskNodeTable or ArrowNodeTable
if (dynamic_cast<IceDiskNodeTable*>(tableInfos[currentTableIdx].table) ||
dynamic_cast<ArrowNodeTable*>(tableInfos[currentTableIdx].table)) {
// Use virtual dispatch so extension-defined table types (Lance, etc.) work without
// hard-coding their concrete types here.
auto* nodeTable = tableInfos[currentTableIdx].table->ptrCast<NodeTable>();
if (nodeTable->requiresExplicitScanInit()) {
auto transaction = transaction::Transaction::Get(*context->clientContext);
tableInfos[currentTableIdx].table->initScanState(transaction, *scanState);
nodeTable->initScanState(transaction, *scanState);
}
}

Expand Down
Loading
Loading