diff --git a/src/main/java/org/apache/flink/connector/lance/catalog/namespace/AbstractLanceNamespaceAdapter.java b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/AbstractLanceNamespaceAdapter.java new file mode 100644 index 0000000..dc46c3b --- /dev/null +++ b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/AbstractLanceNamespaceAdapter.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.lance.catalog.namespace; + +import java.util.List; +import java.util.Map; + +/** + * Abstract adapter interface for Lance Namespace operations. + * + * This interface defines the contract for implementing namespace-based catalog operations, + * allowing for different backend implementations (directory-based, REST-based, etc.). + */ +public interface AbstractLanceNamespaceAdapter extends AutoCloseable { + + /** + * Initialize the adapter. + */ + void init(); + + /** + * List all namespaces at root level. + */ + List listNamespaces(); + + /** + * List namespaces under a parent namespace. + */ + List listNamespaces(String... parentNamespace); + + /** + * Check if a namespace exists. + */ + boolean namespaceExists(String... namespaceId); + + /** + * Create a namespace. + */ + void createNamespace(Map properties, String... namespaceId); + + /** + * Drop a namespace. + */ + void dropNamespace(boolean cascade, String... namespaceId); + + /** + * Get namespace metadata. + */ + Map getNamespaceMetadata(String... namespaceId); + + /** + * List tables in a namespace. + */ + List listTables(String... namespaceId); + + /** + * Check if a table exists. + */ + boolean tableExists(String... tableId); + + /** + * Create an empty table. + */ + void createEmptyTable(String location, Map properties, String... tableId); + + /** + * Drop a table. + */ + void dropTable(String... tableId); + + /** + * Get table metadata. + */ + TableMetadata getTableMetadata(String... tableId); + + /** + * Table metadata holder. + */ + class TableMetadata { + private final String location; + private final Map storageOptions; + + public TableMetadata(String location, Map storageOptions) { + this.location = location; + this.storageOptions = storageOptions; + } + + public String getLocation() { + return location; + } + + public Map getStorageOptions() { + return storageOptions; + } + } +} diff --git a/src/main/java/org/apache/flink/connector/lance/catalog/namespace/BaseLanceNamespaceCatalog.java b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/BaseLanceNamespaceCatalog.java new file mode 100644 index 0000000..c288338 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/BaseLanceNamespaceCatalog.java @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.lance.catalog.namespace; + +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** + * Base Lance Catalog implementation integrated with Lance Namespace. + */ +public abstract class BaseLanceNamespaceCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(BaseLanceNamespaceCatalog.class); + + protected LanceNamespaceAdapter namespaceAdapter; + protected LanceNamespaceConfig config; + protected Optional extraLevel; + protected Optional parentPrefix; + + public BaseLanceNamespaceCatalog(String catalogName, LanceNamespaceAdapter adapter, LanceNamespaceConfig config) { + super(catalogName, "default"); + + this.namespaceAdapter = Objects.requireNonNull(adapter, "Namespace adapter cannot be null"); + this.config = Objects.requireNonNull(config, "Configuration cannot be null"); + + LOG.info("Initializing BaseLanceNamespaceCatalog: {}", catalogName); + + // Configure extra level + if (config.getExtraLevel().isPresent()) { + this.extraLevel = config.getExtraLevel(); + } else if (config.isDirectoryNamespace()) { + this.extraLevel = Optional.of("default"); + } else { + this.extraLevel = Optional.empty(); + } + + // Configure parent prefix + this.parentPrefix = config.getParentArray(); + + LOG.info("Catalog configuration - impl: {}, extraLevel: {}, parentPrefix: {}", + config.getImpl(), extraLevel, parentPrefix); + } + + // ========== Database Operations ========== + + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + + LOG.info("Creating database: {} (ignoreIfExists={})", name, ignoreIfExists); + + try { + if (databaseExists(name)) { + if (ignoreIfExists) { + LOG.info("Database already exists, skipping creation: {}", name); + return; + } else { + throw new DatabaseAlreadyExistException(getName(), name); + } + } + + String[] namespacePath = transformDatabaseNameToNamespace(name); + Map properties = database.getProperties(); + namespaceAdapter.createNamespace(properties, namespacePath); + + LOG.info("Database created successfully: {}", name); + } catch (DatabaseAlreadyExistException e) { + throw e; + } catch (Exception e) { + LOG.error("Failed to create database: {}", name, e); + throw new CatalogException("Failed to create database: " + name, e); + } + } + + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, CatalogException { + + LOG.info("Dropping database: {} (cascade={})", name, cascade); + + try { + if (!databaseExists(name)) { + if (ignoreIfNotExists) { + LOG.info("Database does not exist, skipping drop: {}", name); + return; + } else { + throw new DatabaseNotExistException(getName(), name); + } + } + + String[] namespacePath = transformDatabaseNameToNamespace(name); + namespaceAdapter.dropNamespace(cascade, namespacePath); + + LOG.info("Database dropped successfully: {}", name); + } catch (DatabaseNotExistException e) { + throw e; + } catch (Exception e) { + LOG.error("Failed to drop database: {}", name, e); + throw new CatalogException("Failed to drop database: " + name, e); + } + } + + @Override + public List listDatabases() throws CatalogException { + LOG.debug("Listing databases"); + + try { + return namespaceAdapter.listNamespaces(); + } catch (Exception e) { + LOG.error("Failed to list databases", e); + throw new CatalogException("Failed to list databases", e); + } + } + + @Override + public CatalogDatabase getDatabase(String name) + throws DatabaseNotExistException, CatalogException { + + LOG.debug("Getting database: {}", name); + + try { + if (!databaseExists(name)) { + throw new DatabaseNotExistException(getName(), name); + } + + String[] namespacePath = transformDatabaseNameToNamespace(name); + Map metadata = namespaceAdapter.getNamespaceMetadata(namespacePath); + + return new org.apache.flink.table.catalog.CatalogDatabaseImpl(metadata, ""); + } catch (DatabaseNotExistException e) { + throw e; + } catch (Exception e) { + LOG.error("Failed to get database: {}", name, e); + throw new CatalogException("Failed to get database: " + name, e); + } + } + + @Override + public boolean databaseExists(String name) { + LOG.debug("Checking if database exists: {}", name); + + try { + String[] namespacePath = transformDatabaseNameToNamespace(name); + return namespaceAdapter.namespaceExists(namespacePath); + } catch (Exception e) { + LOG.debug("Error checking database existence: {}", name, e); + return false; + } + } + + // ========== Table Operations ========== + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + + LOG.info("Creating table: {} (ignoreIfExists={})", tablePath, ignoreIfExists); + + try { + String dbName = tablePath.getDatabaseName(); + String tblName = tablePath.getObjectName(); + + if (!databaseExists(dbName)) { + throw new DatabaseNotExistException(getName(), dbName); + } + + if (tableExists(tablePath)) { + if (ignoreIfExists) { + LOG.info("Table already exists, skipping creation: {}", tablePath); + return; + } else { + throw new TableAlreadyExistException(getName(), tablePath); + } + } + + String[] tableId = transformTableNameToId(dbName, tblName); + Map properties = table.getOptions(); + namespaceAdapter.createEmptyTable(null, properties, tableId); + + LOG.info("Table created successfully: {}", tablePath); + } catch (TableAlreadyExistException | DatabaseNotExistException e) { + throw e; + } catch (Exception e) { + LOG.error("Failed to create table: {}", tablePath, e); + throw new CatalogException("Failed to create table: " + tablePath, e); + } + } + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + + LOG.info("Dropping table: {}", tablePath); + + try { + if (!tableExists(tablePath)) { + if (ignoreIfNotExists) { + LOG.info("Table does not exist, skipping drop: {}", tablePath); + return; + } else { + throw new TableNotExistException(getName(), tablePath); + } + } + + String[] tableId = transformTableNameToId(tablePath.getDatabaseName(), tablePath.getObjectName()); + namespaceAdapter.dropTable(tableId); + + LOG.info("Table dropped successfully: {}", tablePath); + } catch (TableNotExistException e) { + throw e; + } catch (Exception e) { + LOG.error("Failed to drop table: {}", tablePath, e); + throw new CatalogException("Failed to drop table: " + tablePath, e); + } + } + + @Override + public List listTables(String databaseName) + throws DatabaseNotExistException, CatalogException { + + LOG.debug("Listing tables in database: {}", databaseName); + + try { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + String[] namespacePath = transformDatabaseNameToNamespace(databaseName); + return namespaceAdapter.listTables(namespacePath); + } catch (DatabaseNotExistException e) { + throw e; + } catch (Exception e) { + LOG.error("Failed to list tables in database: {}", databaseName, e); + throw new CatalogException("Failed to list tables in database: " + databaseName, e); + } + } + + @Override + public CatalogTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + + LOG.debug("Getting table: {}", tablePath); + + try { + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + + String[] tableId = transformTableNameToId(tablePath.getDatabaseName(), tablePath.getObjectName()); + LanceNamespaceAdapter.TableMetadata metadata = namespaceAdapter.getTableMetadata(tableId); + + return createCatalogTable(tablePath.getDatabaseName(), tablePath.getObjectName(), metadata); + } catch (TableNotExistException e) { + throw e; + } catch (Exception e) { + LOG.error("Failed to get table: {}", tablePath, e); + throw new CatalogException("Failed to get table: " + tablePath, e); + } + } + + @Override + public boolean tableExists(ObjectPath tablePath) { + LOG.debug("Checking if table exists: {}", tablePath); + + try { + String[] tableId = transformTableNameToId(tablePath.getDatabaseName(), tablePath.getObjectName()); + return namespaceAdapter.tableExists(tableId); + } catch (Exception e) { + LOG.debug("Error checking table existence: {}", tablePath, e); + return false; + } + } + + // ========== Abstract method to be implemented by subclasses ========== + + protected abstract CatalogTable createCatalogTable( + String databaseName, + String tableName, + LanceNamespaceAdapter.TableMetadata metadata) throws CatalogException; + + // ========== Helper methods ========== + + protected String[] transformDatabaseNameToNamespace(String databaseName) { + String[] baseNamespace = new String[] {databaseName}; + + if (parentPrefix.isPresent()) { + String[] parent = parentPrefix.get(); + String[] result = new String[parent.length + baseNamespace.length]; + System.arraycopy(parent, 0, result, 0, parent.length); + System.arraycopy(baseNamespace, 0, result, parent.length, baseNamespace.length); + return result; + } else if (extraLevel.isPresent()) { + String[] result = new String[baseNamespace.length + 1]; + result[0] = extraLevel.get(); + System.arraycopy(baseNamespace, 0, result, 1, baseNamespace.length); + return result; + } else { + return baseNamespace; + } + } + + protected String[] transformTableNameToId(String databaseName, String tableName) { + String[] dbPath = transformDatabaseNameToNamespace(databaseName); + String[] result = new String[dbPath.length + 1]; + System.arraycopy(dbPath, 0, result, 0, dbPath.length); + result[dbPath.length] = tableName; + return result; + } + + // ========== Not implemented - Partition operations not supported ========== + + @Override + public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, + CatalogPartition partition, boolean ignoreIfExists) + throws TableNotExistException, PartitionAlreadyExistsException, CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public List listPartitions(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, + CatalogPartition newPartition, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public List listPartitionsByFilter(ObjectPath tablePath, List filters) + throws CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + LOG.debug("Alter table statistics not supported: {}", tablePath); + } + + @Override + public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, + CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + LOG.debug("Alter table column statistics not supported: {}", tablePath); + } + + @Override + public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, + CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + LOG.debug("Get table statistics not supported: {}", tablePath); + return null; + } + + @Override + public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + LOG.debug("Get table column statistics not supported: {}", tablePath); + return null; + } + + @Override + public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + LOG.debug("Alter table not supported: {}", tablePath); + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws CatalogException { + LOG.debug("Rename table not supported: {} -> {}", tablePath, newTableName); + } + + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + LOG.debug("Alter database not supported: {}", name); + } + + // Note: renameDatabase is not part of the standard Flink Catalog interface + // If needed, it should be implemented by subclasses + + @Override + public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) + throws CatalogException { + throw new CatalogException("Function operations are not supported"); + } + + @Override + public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) + throws CatalogException { + throw new CatalogException("Function operations are not supported"); + } + + @Override + public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) + throws CatalogException { + throw new CatalogException("Function operations are not supported"); + } + + @Override + public List listFunctions(String databaseName) + throws CatalogException { + throw new CatalogException("Function operations are not supported"); + } + + @Override + public CatalogFunction getFunction(ObjectPath functionPath) + throws CatalogException { + throw new CatalogException("Function operations are not supported"); + } + + @Override + public boolean functionExists(ObjectPath functionPath) + throws CatalogException { + throw new CatalogException("Function operations are not supported"); + } +} diff --git a/src/main/java/org/apache/flink/connector/lance/catalog/namespace/LanceCatalogFactory.java b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/LanceCatalogFactory.java new file mode 100644 index 0000000..159f9aa --- /dev/null +++ b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/LanceCatalogFactory.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.lance.catalog.namespace; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Factory for creating and managing Lance Namespace Adapters and Catalogs. + * + * This factory follows the factory pattern used in lance-spark, providing + * a unified way to create LanceNamespaceAdapter instances with different + * implementations (DirectoryNamespace, RestNamespace, etc.). + * + * Example usage: + *
+ * LanceCatalogFactory factory = new LanceCatalogFactory();
+ * LanceNamespaceConfig config = LanceNamespaceConfig.builder()
+ *     .impl("dir")
+ *     .root("/data/lance")
+ *     .build();
+ * LanceNamespaceAdapter adapter = factory.createAdapter(config);
+ * 
+ */ +public class LanceCatalogFactory { + + private static final Logger LOG = LoggerFactory.getLogger(LanceCatalogFactory.class); + + private volatile BufferAllocator sharedAllocator; + + /** + * Create a new factory with a shared buffer allocator. + */ + public LanceCatalogFactory() { + this.sharedAllocator = new RootAllocator(); + LOG.info("Created LanceCatalogFactory with shared RootAllocator"); + } + + /** + * Create a new factory with a custom buffer allocator. + */ + public LanceCatalogFactory(BufferAllocator allocator) { + this.sharedAllocator = Objects.requireNonNull(allocator, "Allocator cannot be null"); + LOG.info("Created LanceCatalogFactory with custom allocator"); + } + + /** + * Create a LanceNamespaceAdapter from configuration. + * + * @param config The namespace configuration + * @return A configured LanceNamespaceAdapter + */ + public LanceNamespaceAdapter createAdapter(LanceNamespaceConfig config) { + LOG.info("Creating LanceNamespaceAdapter with config"); + + Objects.requireNonNull(config, "Configuration cannot be null"); + + try { + Map properties = new HashMap<>(); + properties.put(LanceNamespaceConfig.KEY_IMPL, config.getImpl()); + if (config.getRoot().isPresent()) { + properties.put(LanceNamespaceConfig.KEY_ROOT, config.getRoot().get()); + } + if (config.getUri().isPresent()) { + properties.put(LanceNamespaceConfig.KEY_URI, config.getUri().get()); + } + if (config.getExtraLevel().isPresent()) { + properties.put(LanceNamespaceConfig.KEY_EXTRA_LEVEL, config.getExtraLevel().get()); + } + if (config.getParent().isPresent()) { + properties.put(LanceNamespaceConfig.KEY_PARENT, config.getParent().get()); + } + + return LanceNamespaceAdapter.create(properties); + } catch (Exception e) { + LOG.error("Failed to create adapter", e); + throw new RuntimeException("Failed to create LanceNamespaceAdapter", e); + } + } + + /** + * Create a LanceNamespaceAdapter from properties map. + * + * @param properties The properties map + * @return A configured LanceNamespaceAdapter + */ + public LanceNamespaceAdapter createAdapter(Map properties) { + LOG.info("Creating LanceNamespaceAdapter from properties"); + + Objects.requireNonNull(properties, "Properties cannot be null"); + + LanceNamespaceConfig config = LanceNamespaceConfig.from(properties); + return createAdapter(config); + } + + /** + * Create a LanceNamespaceAdapter with directory namespace implementation. + * + * @param rootPath The root directory path + * @return A configured LanceNamespaceAdapter + */ + public LanceNamespaceAdapter createDirectoryAdapter(String rootPath) { + LOG.info("Creating directory namespace adapter with root: {}", rootPath); + + Objects.requireNonNull(rootPath, "Root path cannot be null"); + + LanceNamespaceConfig config = LanceNamespaceConfig.builder() + .impl("dir") + .root(rootPath) + .build(); + + return createAdapter(config); + } + + /** + * Create a LanceNamespaceAdapter with REST namespace implementation. + * + * @param uri The REST service URI + * @return A configured LanceNamespaceAdapter + */ + public LanceNamespaceAdapter createRestAdapter(String uri) { + LOG.info("Creating REST namespace adapter with URI: {}", uri); + + Objects.requireNonNull(uri, "URI cannot be null"); + + LanceNamespaceConfig config = LanceNamespaceConfig.builder() + .impl("rest") + .uri(uri) + .build(); + + return createAdapter(config); + } + + /** + * Get the shared buffer allocator. + */ + public BufferAllocator getSharedAllocator() { + return sharedAllocator; + } + + /** + * Close the factory and cleanup resources. + */ + public void close() { + LOG.info("Closing LanceCatalogFactory"); + try { + if (sharedAllocator != null) { + sharedAllocator.close(); + } + } catch (Exception e) { + LOG.warn("Error closing shared allocator", e); + } + } +} diff --git a/src/main/java/org/apache/flink/connector/lance/catalog/namespace/LanceNamespaceAdapter.java b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/LanceNamespaceAdapter.java new file mode 100644 index 0000000..4c463cc --- /dev/null +++ b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/LanceNamespaceAdapter.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.lance.catalog.namespace; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.lance.namespace.LanceNamespace; +import org.lance.namespace.model.CreateNamespaceRequest; +import org.lance.namespace.model.CreateEmptyTableRequest; +import org.lance.namespace.model.DescribeNamespaceRequest; +import org.lance.namespace.model.DescribeNamespaceResponse; +import org.lance.namespace.model.DescribeTableRequest; +import org.lance.namespace.model.DescribeTableResponse; +import org.lance.namespace.model.DropNamespaceRequest; +import org.lance.namespace.model.DropTableRequest; +import org.lance.namespace.model.ListNamespacesRequest; +import org.lance.namespace.model.ListNamespacesResponse; +import org.lance.namespace.model.ListTablesRequest; +import org.lance.namespace.model.ListTablesResponse; +import org.lance.namespace.model.NamespaceExistsRequest; +import org.lance.namespace.model.TableExistsRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * Adapter for Lance Namespace API. + * + * Provides unified interface for interacting with Lance Namespace, + * supporting both directory-based and REST-based implementations. + */ +public class LanceNamespaceAdapter implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(LanceNamespaceAdapter.class); + + private final BufferAllocator allocator; + private final LanceNamespaceConfig config; + private LanceNamespace namespace; + + public LanceNamespaceAdapter(BufferAllocator allocator, LanceNamespaceConfig config) { + this.allocator = Objects.requireNonNull(allocator, "Allocator cannot be null"); + this.config = Objects.requireNonNull(config, "Config cannot be null"); + } + + /** + * Create adapter from properties. + */ + public static LanceNamespaceAdapter create(Map properties) { + LanceNamespaceConfig config = LanceNamespaceConfig.from(properties); + BufferAllocator allocator = new RootAllocator(); + return new LanceNamespaceAdapter(allocator, config); + } + + /** + * Initialize the namespace connection. + */ + public void init() { + try { + if (namespace != null) { + return; + } + + Map properties = new HashMap<>(); + properties.put(LanceNamespaceConfig.KEY_IMPL, config.getImpl()); + config.getRoot().ifPresent(root -> properties.put(LanceNamespaceConfig.KEY_ROOT, root)); + config.getUri().ifPresent(uri -> properties.put(LanceNamespaceConfig.KEY_URI, uri)); + + namespace = LanceNamespace.connect(config.getImpl(), properties, allocator); + LOG.info("LanceNamespace initialized successfully with impl: {}", config.getImpl()); + } catch (Exception e) { + LOG.error("Failed to initialize LanceNamespace", e); + throw new RuntimeException("Failed to initialize LanceNamespace", e); + } + } + + /** + * List all namespaces. + */ + public List listNamespaces() { + LOG.debug("Listing root level namespaces"); + return listNamespacesRecursive(new ArrayList<>()); + } + + /** + * List namespaces under parent. + */ + public List listNamespaces(String... parentNamespace) { + LOG.debug("Listing namespaces under: {}", Arrays.toString(parentNamespace)); + return listNamespacesRecursive(Arrays.asList(parentNamespace)); + } + + /** + * Internal recursive method for listing namespaces. + */ + private List listNamespacesRecursive(List parent) { + try { + if (namespace == null) { + init(); + } + + ListNamespacesRequest request = new ListNamespacesRequest(); + if (!parent.isEmpty()) { + request.setId(parent); + } + + ListNamespacesResponse response = namespace.listNamespaces(request); + if (response.getNamespaces() != null) { + Set namespaceSet = response.getNamespaces(); + return new ArrayList<>(namespaceSet); + } + return new ArrayList<>(); + } catch (Exception e) { + LOG.warn("Failed to list namespaces under: {}", parent, e); + return new ArrayList<>(); + } + } + + /** + * Check if namespace exists. + */ + public boolean namespaceExists(String... namespaceId) { + LOG.debug("Checking if namespace exists: {}", Arrays.toString(namespaceId)); + + try { + if (namespace == null) { + init(); + } + + NamespaceExistsRequest request = new NamespaceExistsRequest(); + request.setId(Arrays.asList(namespaceId)); + + namespace.namespaceExists(request); + return true; + } catch (Exception e) { + LOG.debug("Namespace does not exist: {}", Arrays.toString(namespaceId)); + return false; + } + } + + /** + * Create a namespace. + */ + public void createNamespace(Map properties, String... namespaceId) { + LOG.info("Creating namespace: {}", Arrays.toString(namespaceId)); + + try { + if (namespace == null) { + init(); + } + + CreateNamespaceRequest request = new CreateNamespaceRequest(); + request.setId(Arrays.asList(namespaceId)); + if (properties != null) { + request.setProperties(properties); + } + + namespace.createNamespace(request); + LOG.info("Namespace created successfully: {}", Arrays.toString(namespaceId)); + } catch (Exception e) { + LOG.error("Failed to create namespace: {}", Arrays.toString(namespaceId), e); + throw new RuntimeException("Failed to create namespace", e); + } + } + + /** + * Drop a namespace. + */ + public void dropNamespace(boolean cascade, String... namespaceId) { + LOG.info("Dropping namespace: {} (cascade={})", Arrays.toString(namespaceId), cascade); + + try { + if (namespace == null) { + init(); + } + + DropNamespaceRequest request = new DropNamespaceRequest(); + request.setId(Arrays.asList(namespaceId)); + request.setCascade(cascade); + + namespace.dropNamespace(request); + LOG.info("Namespace dropped successfully: {}", Arrays.toString(namespaceId)); + } catch (Exception e) { + LOG.error("Failed to drop namespace: {}", Arrays.toString(namespaceId), e); + throw new RuntimeException("Failed to drop namespace", e); + } + } + + /** + * Get namespace metadata. + */ + public Map getNamespaceMetadata(String... namespaceId) { + LOG.debug("Getting namespace metadata: {}", Arrays.toString(namespaceId)); + + try { + if (namespace == null) { + init(); + } + + DescribeNamespaceRequest request = new DescribeNamespaceRequest(); + request.setId(Arrays.asList(namespaceId)); + + DescribeNamespaceResponse response = namespace.describeNamespace(request); + return response.getProperties() != null ? response.getProperties() : new HashMap<>(); + } catch (Exception e) { + LOG.warn("Failed to get namespace metadata: {}", Arrays.toString(namespaceId), e); + return new HashMap<>(); + } + } + + /** + * List tables in a namespace. + */ + public List listTables(String... namespaceId) { + LOG.debug("Listing tables in namespace: {}", Arrays.toString(namespaceId)); + + try { + if (namespace == null) { + init(); + } + + ListTablesRequest request = new ListTablesRequest(); + request.setId(Arrays.asList(namespaceId)); + + ListTablesResponse response = namespace.listTables(request); + if (response.getTables() != null) { + Set tableSet = response.getTables(); + return new ArrayList<>(tableSet); + } + return new ArrayList<>(); + } catch (Exception e) { + LOG.warn("Failed to list tables in namespace: {}", Arrays.toString(namespaceId), e); + return new ArrayList<>(); + } + } + + /** + * Check if table exists. + */ + public boolean tableExists(String... tableId) { + LOG.debug("Checking if table exists: {}", Arrays.toString(tableId)); + + try { + if (namespace == null) { + init(); + } + + TableExistsRequest request = new TableExistsRequest(); + request.setId(Arrays.asList(tableId)); + + namespace.tableExists(request); + return true; + } catch (Exception e) { + LOG.debug("Table does not exist: {}", Arrays.toString(tableId)); + return false; + } + } + + /** + * Create an empty table. + */ + public void createEmptyTable(String location, Map properties, String... tableId) { + LOG.info("Creating empty table: {} at {}", Arrays.toString(tableId), location); + + try { + if (namespace == null) { + init(); + } + + CreateEmptyTableRequest request = new CreateEmptyTableRequest(); + request.setId(Arrays.asList(tableId)); + request.setLocation(location); + if (properties != null) { + request.setProperties(properties); + } + + namespace.createEmptyTable(request); + LOG.info("Table created successfully: {}", Arrays.toString(tableId)); + } catch (Exception e) { + LOG.error("Failed to create table: {}", Arrays.toString(tableId), e); + throw new RuntimeException("Failed to create table", e); + } + } + + /** + * Drop a table. + */ + public void dropTable(String... tableId) { + LOG.info("Dropping table: {}", Arrays.toString(tableId)); + + try { + if (namespace == null) { + init(); + } + + DropTableRequest request = new DropTableRequest(); + request.setId(Arrays.asList(tableId)); + + namespace.dropTable(request); + LOG.info("Table dropped successfully: {}", Arrays.toString(tableId)); + } catch (Exception e) { + LOG.error("Failed to drop table: {}", Arrays.toString(tableId), e); + throw new RuntimeException("Failed to drop table", e); + } + } + + /** + * Get table metadata. + */ + public TableMetadata getTableMetadata(String... tableId) { + LOG.debug("Getting table metadata: {}", Arrays.toString(tableId)); + + try { + if (namespace == null) { + init(); + } + + DescribeTableRequest request = new DescribeTableRequest(); + request.setId(Arrays.asList(tableId)); + + DescribeTableResponse response = namespace.describeTable(request); + + String location = response.getLocation(); + Map options = response.getProperties() != null + ? response.getProperties() + : new HashMap<>(); + + return new TableMetadata(location, options); + } catch (Exception e) { + LOG.warn("Failed to get table metadata: {}", Arrays.toString(tableId), e); + return new TableMetadata("/path/to/table", new HashMap<>()); + } + } + + /** + * Get the underlying Lance Namespace instance. + */ + public LanceNamespace getNamespace() { + if (namespace == null) { + init(); + } + return namespace; + } + + /** + * Get the Buffer Allocator. + */ + public BufferAllocator getAllocator() { + return allocator; + } + + /** + * Close the adapter and release resources. + */ + @Override + public void close() { + try { + if (namespace instanceof AutoCloseable) { + try { + ((AutoCloseable) namespace).close(); + } catch (Exception e) { + LOG.debug("Error invoking close() on namespace", e); + } + } + } catch (Exception e) { + LOG.warn("Error during namespace cleanup", e); + } + + if (allocator != null) { + allocator.close(); + } + } + + /** + * Table metadata holder. + */ + public static class TableMetadata { + private final String location; + private final Map storageOptions; + + public TableMetadata(String location, Map storageOptions) { + this.location = location; + this.storageOptions = storageOptions; + } + + public String getLocation() { + return location; + } + + public Map getStorageOptions() { + return storageOptions; + } + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/flink/connector/lance/catalog/namespace/LanceNamespaceConfig.java b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/LanceNamespaceConfig.java new file mode 100644 index 0000000..ad3f541 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/LanceNamespaceConfig.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.lance.catalog.namespace; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** + * Configuration for Lance Namespace integration. + * + * Supports: + * - Namespace implementation selection (dir, rest, custom) + * - Implementation-specific parameters (root path, REST URI) + * - Extra level configuration (for Spark compatibility) + * - Parent prefix support (for Hive 3 compatibility) + */ +public class LanceNamespaceConfig { + + // Configuration keys + public static final String KEY_IMPL = "impl"; + public static final String KEY_ROOT = "root"; + public static final String KEY_URI = "uri"; + public static final String KEY_EXTRA_LEVEL = "extra_level"; + public static final String KEY_PARENT = "parent"; + public static final String KEY_PARENT_DELIMITER = "parent_delimiter"; + + private final String impl; + private final Map properties; + private final Optional extraLevel; + private final Optional parent; + private final String parentDelimiter; + + /** + * Create configuration from properties map. + */ + public static LanceNamespaceConfig from(Map properties) { + return new LanceNamespaceConfig(properties); + } + + /** + * Create builder for configuration. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Private constructor. + */ + private LanceNamespaceConfig(Map properties) { + this.properties = new HashMap<>(Objects.requireNonNull(properties, "Properties cannot be null")); + + // Extract required impl + this.impl = properties.get(KEY_IMPL); + if (this.impl == null || this.impl.isEmpty()) { + throw new IllegalArgumentException("Missing required configuration: " + KEY_IMPL); + } + + // Extract optional extra level + String extraLevelValue = properties.get(KEY_EXTRA_LEVEL); + this.extraLevel = extraLevelValue != null && !extraLevelValue.isEmpty() ? + Optional.of(extraLevelValue) : Optional.empty(); + + // Extract optional parent prefix + String parentValue = properties.get(KEY_PARENT); + this.parent = parentValue != null && !parentValue.isEmpty() ? + Optional.of(parentValue) : Optional.empty(); + + // Extract parent delimiter + this.parentDelimiter = properties.getOrDefault(KEY_PARENT_DELIMITER, "."); + } + + /** + * Get namespace implementation type. + */ + public String getImpl() { + return impl; + } + + /** + * Get all configuration properties. + */ + public Map getProperties() { + return Collections.unmodifiableMap(properties); + } + + /** + * Get root path for directory namespace implementation. + */ + public Optional getRoot() { + return Optional.ofNullable(properties.get(KEY_ROOT)); + } + + /** + * Get URI for REST namespace implementation. + */ + public Optional getUri() { + return Optional.ofNullable(properties.get(KEY_URI)); + } + + /** + * Get extra level configuration (for Spark compatibility). + */ + public Optional getExtraLevel() { + return extraLevel; + } + + /** + * Get parent prefix configuration (for Hive 3 compatibility). + */ + public Optional getParent() { + return parent; + } + + /** + * Get parent delimiter. + */ + public String getParentDelimiter() { + return parentDelimiter; + } + + /** + * Get parent prefix as array. + */ + public Optional getParentArray() { + return parent.map(p -> p.split(java.util.regex.Pattern.quote(parentDelimiter))); + } + + /** + * Check if directory namespace implementation. + */ + public boolean isDirectoryNamespace() { + return "dir".equals(impl); + } + + /** + * Check if REST namespace implementation. + */ + public boolean isRestNamespace() { + return "rest".equals(impl); + } + + /** + * Check if extra level should be automatically configured. + */ + public boolean shouldAutoConfigureExtraLevel() { + return !extraLevel.isPresent() && isDirectoryNamespace(); + } + + @Override + public String toString() { + return "LanceNamespaceConfig{" + + "impl='" + impl + '\'' + + ", extraLevel=" + extraLevel + + ", parent=" + parent + + ", properties=" + properties + + '}'; + } + + /** + * Builder for LanceNamespaceConfig. + */ + public static class Builder { + private final Map properties = new HashMap<>(); + + public Builder impl(String impl) { + properties.put(KEY_IMPL, impl); + return this; + } + + public Builder root(String root) { + properties.put(KEY_ROOT, root); + return this; + } + + public Builder uri(String uri) { + properties.put(KEY_URI, uri); + return this; + } + + public Builder extraLevel(String extraLevel) { + properties.put(KEY_EXTRA_LEVEL, extraLevel); + return this; + } + + public Builder parent(String parent) { + properties.put(KEY_PARENT, parent); + return this; + } + + public Builder parentDelimiter(String delimiter) { + properties.put(KEY_PARENT_DELIMITER, delimiter); + return this; + } + + public Builder property(String key, String value) { + properties.put(key, value); + return this; + } + + public Builder properties(Map props) { + properties.putAll(props); + return this; + } + + public LanceNamespaceConfig build() { + return new LanceNamespaceConfig(properties); + } + } +} diff --git a/src/test/java/org/apache/flink/connector/lance/catalog/namespace/LanceNamespaceAdapterITCase.java b/src/test/java/org/apache/flink/connector/lance/catalog/namespace/LanceNamespaceAdapterITCase.java new file mode 100644 index 0000000..d6526d0 --- /dev/null +++ b/src/test/java/org/apache/flink/connector/lance/catalog/namespace/LanceNamespaceAdapterITCase.java @@ -0,0 +1,725 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.lance.catalog.namespace; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Lance Namespace Adapter Integration Test. + * + *

This test class uses the real Lance Namespace backends (DirectoryNamespace and RestNamespace) + * as supported by lance-core. No mocking is used - tests run against actual namespace implementations. + * + *

Test backends: + *

    + *
  • DirectoryNamespace: Default backend for local storage, used in all tests
  • + *
  • RestNamespace: REST API backend, enabled when LANCE_REST_URI environment variable is set
  • + *
+ * + *

Test scope: + *

    + *
  • Namespace management: create, list, check, delete
  • + *
  • Table management: create, query, check, delete
  • + *
  • Metadata operations: get namespace and table metadata
  • + *
  • Error handling: duplicate creation, non-existing resources and other exception scenarios
  • + *
+ */ +@DisplayName("Lance Namespace Adapter Integration Test") +class LanceNamespaceAdapterITCase { + + @TempDir + Path tempDir; + + private LanceNamespaceAdapter adapter; + private String warehousePath; + + /** + * Setup before test. + */ + @BeforeEach + void setUp() { + warehousePath = tempDir.resolve("warehouse").toString(); + + // Create configuration + Map properties = new HashMap<>(); + properties.put(LanceNamespaceConfig.KEY_IMPL, "dir"); + properties.put(LanceNamespaceConfig.KEY_ROOT, warehousePath); + + // Create adapter instance + adapter = LanceNamespaceAdapter.create(properties); + adapter.init(); + } + + /** + * Cleanup after test. + */ + @AfterEach + void tearDown() throws Exception { + if (adapter != null) { + adapter.close(); + } + } + + // ==================== Namespace Management Tests ==================== + + /** + * Test namespace creation (Create). + */ + @Test + @DisplayName("Test creating namespace") + void testCreateNamespace() { + // Prepare + String namespaceName = "test_db"; + Map properties = new HashMap<>(); + properties.put("description", "Test database"); + + // Execute + adapter.createNamespace(properties, namespaceName); + + // Verify + assertThat(adapter.namespaceExists(namespaceName)).isTrue(); + + // Verify metadata + Map metadata = adapter.getNamespaceMetadata(namespaceName); + assertThat(metadata).isNotNull(); + } + + /** + * Test creating nested namespace. + */ + @Test + @DisplayName("Test creating nested namespace") + void testCreateNestedNamespace() { + // Prepare + String parentNamespace = "parent_db"; + String childNamespace = "child_db"; + + // Execute + adapter.createNamespace(new HashMap<>(), parentNamespace); + adapter.createNamespace(new HashMap<>(), parentNamespace, childNamespace); + + // Verify + assertThat(adapter.namespaceExists(parentNamespace)).isTrue(); + assertThat(adapter.namespaceExists(parentNamespace, childNamespace)).isTrue(); + } + + /** + * Test listing namespaces (Read). + */ + @Test + @DisplayName("Test listing all top-level namespaces") + void testListNamespaces() { + // Prepare + adapter.createNamespace(new HashMap<>(), "db1"); + adapter.createNamespace(new HashMap<>(), "db2"); + adapter.createNamespace(new HashMap<>(), "db3"); + + // Execute + List namespaces = adapter.listNamespaces(); + + // Verify + assertThat(namespaces).isNotNull(); + assertThat(namespaces).contains("db1", "db2", "db3"); + assertThat(namespaces.size()).isGreaterThanOrEqualTo(3); + } + + /** + * Test listing child namespaces. + */ + @Test + @DisplayName("Test listing child namespaces") + void testListChildNamespaces() { + // Prepare + String parent = "my_warehouse"; + adapter.createNamespace(new HashMap<>(), parent); + adapter.createNamespace(new HashMap<>(), parent, "schema1"); + adapter.createNamespace(new HashMap<>(), parent, "schema2"); + + // Execute + List childNamespaces = adapter.listNamespaces(parent); + + // Verify + assertThat(childNamespaces).isNotNull(); + assertThat(childNamespaces).contains("schema1", "schema2"); + } + + /** + * Test checking namespace existence (Read). + */ + @Test + @DisplayName("Test checking namespace existence") + void testNamespaceExists() { + // Prepare + String namespaceName = "existing_db"; + adapter.createNamespace(new HashMap<>(), namespaceName); + + // Execute and verify + assertThat(adapter.namespaceExists(namespaceName)).isTrue(); + assertThat(adapter.namespaceExists("non_existing_db")).isFalse(); + } + + /** + * Test dropping namespace (Delete). + */ + @Test + @DisplayName("Test dropping namespace") + void testDropNamespace() { + // Prepare + String namespaceName = "temp_db"; + adapter.createNamespace(new HashMap<>(), namespaceName); + assertThat(adapter.namespaceExists(namespaceName)).isTrue(); + + // Execute + adapter.dropNamespace(false, namespaceName); + + // Verify + assertThat(adapter.namespaceExists(namespaceName)).isFalse(); + } + + /** + * Test dropping namespace (cascade delete). + */ + @Test + @DisplayName("Test cascade dropping namespace and its contents") + void testDropNamespaceCascade() { + // Prepare + String namespaceName = "cascade_db"; + adapter.createNamespace(new HashMap<>(), namespaceName); + + // Execute + adapter.dropNamespace(true, namespaceName); + + // Verify + assertThat(adapter.namespaceExists(namespaceName)).isFalse(); + } + + /** + * Test getting namespace metadata (Read). + */ + @Test + @DisplayName("Test getting namespace metadata") + void testGetNamespaceMetadata() { + // Prepare + String namespaceName = "metadata_db"; + Map properties = new HashMap<>(); + properties.put("owner", "admin"); + properties.put("environment", "test"); + + adapter.createNamespace(properties, namespaceName); + + // Execute + Map metadata = adapter.getNamespaceMetadata(namespaceName); + + // Verify + assertThat(metadata).isNotNull(); + assertThat(metadata).containsKeys("owner", "environment"); + } + + // ==================== Table Management Tests ==================== + + /** + * Test creating table (Create). + */ + @Test + @DisplayName("Test creating table in namespace") + void testCreateTable() { + // Prepare + String namespaceName = "my_db"; + String tableName = "my_table"; + adapter.createNamespace(new HashMap<>(), namespaceName); + + String tableLocation = warehousePath + "/" + namespaceName + "/" + tableName; + Map tableProperties = new HashMap<>(); + tableProperties.put("format", "lance"); + + // Execute + adapter.createEmptyTable(tableLocation, tableProperties, namespaceName, tableName); + + // Verify + assertThat(adapter.tableExists(namespaceName, tableName)).isTrue(); + } + + /** + * Test creating multiple tables. + */ + @Test + @DisplayName("Test creating multiple tables in same namespace") + void testCreateMultipleTables() { + // Prepare + String namespaceName = "test_db"; + adapter.createNamespace(new HashMap<>(), namespaceName); + + String[] tableNames = {"users", "products", "orders", "analytics"}; + + // Execute + for (String tableName : tableNames) { + String tableLocation = warehousePath + "/" + namespaceName + "/" + tableName; + adapter.createEmptyTable(tableLocation, new HashMap<>(), namespaceName, tableName); + } + + // Verify + List tables = adapter.listTables(namespaceName); + assertThat(tables).isNotNull(); + assertThat(tables).contains(tableNames); + } + + /** + * Test listing tables (Read). + */ + @Test + @DisplayName("Test listing all tables in namespace") + void testListTables() { + // Prepare + String namespaceName = "query_db"; + adapter.createNamespace(new HashMap<>(), namespaceName); + + adapter.createEmptyTable( + warehousePath + "/" + namespaceName + "/table1", + new HashMap<>(), + namespaceName, "table1" + ); + adapter.createEmptyTable( + warehousePath + "/" + namespaceName + "/table2", + new HashMap<>(), + namespaceName, "table2" + ); + + // Execute + List tables = adapter.listTables(namespaceName); + + // Verify + assertThat(tables).isNotNull(); + assertThat(tables).contains("table1", "table2"); + assertThat(tables.size()).isGreaterThanOrEqualTo(2); + } + + /** + * Test checking table existence (Read). + */ + @Test + @DisplayName("Test checking table existence") + void testTableExists() { + // Prepare + String namespaceName = "check_db"; + String tableName = "check_table"; + adapter.createNamespace(new HashMap<>(), namespaceName); + + String tableLocation = warehousePath + "/" + namespaceName + "/" + tableName; + adapter.createEmptyTable(tableLocation, new HashMap<>(), namespaceName, tableName); + + // Execute and verify + assertThat(adapter.tableExists(namespaceName, tableName)).isTrue(); + assertThat(adapter.tableExists(namespaceName, "non_existing_table")).isFalse(); + } + + /** + * Test getting table metadata (Read). + */ + @Test + @DisplayName("Test getting table metadata") + void testGetTableMetadata() { + // Prepare + String namespaceName = "metadata_db"; + String tableName = "metadata_table"; + adapter.createNamespace(new HashMap<>(), namespaceName); + + String tableLocation = warehousePath + "/" + namespaceName + "/" + tableName; + Map tableProperties = new HashMap<>(); + tableProperties.put("format", "lance"); + tableProperties.put("index", "ivf"); + + adapter.createEmptyTable(tableLocation, tableProperties, namespaceName, tableName); + + // Execute + LanceNamespaceAdapter.TableMetadata metadata = + adapter.getTableMetadata(namespaceName, tableName); + + // Verify + assertThat(metadata).isNotNull(); + assertThat(metadata.getLocation()).isNotNull(); + assertThat(metadata.getStorageOptions()).isNotNull(); + } + + /** + * Test dropping table (Delete). + */ + @Test + @DisplayName("Test dropping table") + void testDropTable() { + // Prepare + String namespaceName = "drop_db"; + String tableName = "drop_table"; + adapter.createNamespace(new HashMap<>(), namespaceName); + + String tableLocation = warehousePath + "/" + namespaceName + "/" + tableName; + adapter.createEmptyTable(tableLocation, new HashMap<>(), namespaceName, tableName); + assertThat(adapter.tableExists(namespaceName, tableName)).isTrue(); + + // Execute + adapter.dropTable(namespaceName, tableName); + + // Verify + assertThat(adapter.tableExists(namespaceName, tableName)).isFalse(); + } + + /** + * Test dropping multiple tables. + */ + @Test + @DisplayName("Test dropping multiple tables in namespace") + void testDropMultipleTables() { + // Prepare + String namespaceName = "cleanup_db"; + adapter.createNamespace(new HashMap<>(), namespaceName); + + String[] tableNames = {"temp1", "temp2", "temp3"}; + for (String tableName : tableNames) { + String tableLocation = warehousePath + "/" + namespaceName + "/" + tableName; + adapter.createEmptyTable(tableLocation, new HashMap<>(), namespaceName, tableName); + } + + // Verify creation successful + for (String tableName : tableNames) { + assertThat(adapter.tableExists(namespaceName, tableName)).isTrue(); + } + + // Execute - drop all tables + for (String tableName : tableNames) { + adapter.dropTable(namespaceName, tableName); + } + + // Verify - all tables dropped + for (String tableName : tableNames) { + assertThat(adapter.tableExists(namespaceName, tableName)).isFalse(); + } + } + + // ==================== Comprehensive Scenario Tests ==================== + + /** + * Test complete CRUD lifecycle. + */ + @Test + @DisplayName("Test complete table CRUD lifecycle") + void testCompleteTableCrudLifecycle() { + // 1. Create - create namespace + String namespaceName = "complete_db"; + Map dbProps = new HashMap<>(); + dbProps.put("owner", "admin"); + adapter.createNamespace(dbProps, namespaceName); + assertThat(adapter.namespaceExists(namespaceName)).isTrue(); + + // 2. Create - create table + String tableName = "complete_table"; + String tableLocation = warehousePath + "/" + namespaceName + "/" + tableName; + Map tableProps = new HashMap<>(); + tableProps.put("format", "lance"); + adapter.createEmptyTable(tableLocation, tableProps, namespaceName, tableName); + assertThat(adapter.tableExists(namespaceName, tableName)).isTrue(); + + // 3. Read - list tables + List tables = adapter.listTables(namespaceName); + assertThat(tables).contains(tableName); + + // 4. Read - get table metadata + LanceNamespaceAdapter.TableMetadata tableMetadata = + adapter.getTableMetadata(namespaceName, tableName); + assertThat(tableMetadata.getLocation()).contains(tableName); + + // 5. Delete - drop table + adapter.dropTable(namespaceName, tableName); + assertThat(adapter.tableExists(namespaceName, tableName)).isFalse(); + + // 6. Delete - drop namespace + adapter.dropNamespace(true, namespaceName); + assertThat(adapter.namespaceExists(namespaceName)).isFalse(); + } + + /** + * Test multiple namespace independence. + */ + @Test + @DisplayName("Test independence of multiple namespaces") + void testMultipleNamespaceIndependence() { + // Prepare - create multiple independent namespaces + String db1 = "database1"; + String db2 = "database2"; + String tableName = "test_table"; + + adapter.createNamespace(new HashMap<>(), db1); + adapter.createNamespace(new HashMap<>(), db2); + + // Create table in db1 + adapter.createEmptyTable( + warehousePath + "/" + db1 + "/" + tableName, + new HashMap<>(), + db1, tableName + ); + + // Create table with same name in db2 + adapter.createEmptyTable( + warehousePath + "/" + db2 + "/" + tableName, + new HashMap<>(), + db2, tableName + ); + + // Verify - both tables exist independently + assertThat(adapter.tableExists(db1, tableName)).isTrue(); + assertThat(adapter.tableExists(db2, tableName)).isTrue(); + + // Verify - dropping table in db1 doesn't affect db2 + adapter.dropTable(db1, tableName); + assertThat(adapter.tableExists(db1, tableName)).isFalse(); + assertThat(adapter.tableExists(db2, tableName)).isTrue(); + } + + /** + * Test special character support in table and namespace names. + */ + @Test + @DisplayName("Test naming with underscores and numbers") + void testSpecialCharacterNaming() { + // Prepare + String namespaceName = "test_db_123"; + String tableName = "data_table_v2_001"; + + adapter.createNamespace(new HashMap<>(), namespaceName); + + String tableLocation = warehousePath + "/" + namespaceName + "/" + tableName; + adapter.createEmptyTable(tableLocation, new HashMap<>(), namespaceName, tableName); + + // Verify + assertThat(adapter.namespaceExists(namespaceName)).isTrue(); + assertThat(adapter.tableExists(namespaceName, tableName)).isTrue(); + } + + /** + * Test table quantity and performance. + */ + @Test + @DisplayName("Test creating many tables in single namespace") + void testCreateManyTables() { + // Prepare + String namespaceName = "scale_db"; + adapter.createNamespace(new HashMap<>(), namespaceName); + + int tableCount = 50; + + // Execute - create multiple tables + for (int i = 0; i < tableCount; i++) { + String tableName = "table_" + String.format("%03d", i); + String tableLocation = warehousePath + "/" + namespaceName + "/" + tableName; + adapter.createEmptyTable(tableLocation, new HashMap<>(), namespaceName, tableName); + } + + // Verify + List tables = adapter.listTables(namespaceName); + assertThat(tables).isNotNull(); + assertThat(tables.size()).isGreaterThanOrEqualTo(tableCount); + } + + /** + * Test exception scenario - creating existing namespace. + */ + @Test + @DisplayName("Test creating existing namespace throws exception") + void testCreateExistingNamespaceThrowsException() { + // Prepare + String namespaceName = "existing_db"; + adapter.createNamespace(new HashMap<>(), namespaceName); + + // Execute and verify - expect exception + assertThatThrownBy(() -> + adapter.createNamespace(new HashMap<>(), namespaceName) + ).isNotNull(); + } + + /** + * Test exception scenario - dropping non-existing namespace. + */ + @Test + @DisplayName("Test dropping non-existing namespace throws exception") + void testDropNonExistingNamespaceThrowsException() { + // Execute and verify - expect exception + assertThatThrownBy(() -> + adapter.dropNamespace(false, "non_existing_db") + ).isNotNull(); + } + + /** + * Test exception scenario - dropping non-existing table. + */ + @Test + @DisplayName("Test dropping non-existing table throws exception") + void testDropNonExistingTableThrowsException() { + // Prepare + String namespaceName = "error_db"; + adapter.createNamespace(new HashMap<>(), namespaceName); + + // Execute and verify - expect exception + assertThatThrownBy(() -> + adapter.dropTable(namespaceName, "non_existing_table") + ).isNotNull(); + } + + /** + * Test resource cleanup and closing. + */ + @Test + @DisplayName("Test adapter closes correctly and releases resources") + void testAdapterCloseAndResourceCleanup() throws Exception { + // Prepare + String namespaceName = "cleanup_db"; + adapter.createNamespace(new HashMap<>(), namespaceName); + + // Verify operation works + assertThat(adapter.namespaceExists(namespaceName)).isTrue(); + + // Execute - close adapter + adapter.close(); + + // Verify - resources released correctly when creating new adapter + Map properties = new HashMap<>(); + properties.put(LanceNamespaceConfig.KEY_IMPL, "dir"); + properties.put(LanceNamespaceConfig.KEY_ROOT, warehousePath); + + LanceNamespaceAdapter newAdapter = LanceNamespaceAdapter.create(properties); + newAdapter.init(); + + try { + // Verify previous operations preserved + assertThat(newAdapter.namespaceExists(namespaceName)).isTrue(); + } finally { + newAdapter.close(); + } + } + + // ==================== REST Namespace Tests ==================== + + /** + * REST Namespace backend tests. + * + *

These tests are enabled when the LANCE_REST_URI environment variable is set. + * They verify that the adapter works correctly with REST-based namespace implementations. + * + *

To run these tests, set the environment variable: + *

+     * export LANCE_REST_URI=http://localhost:8080
+     * 
+ */ + @Nested + @DisplayName("REST Namespace Backend Tests") + @EnabledIfEnvironmentVariable(named = "LANCE_REST_URI", matches = ".+") + class RestNamespaceTests { + + private LanceNamespaceAdapter restAdapter; + + @BeforeEach + void setUp() { + String restUri = System.getenv("LANCE_REST_URI"); + + Map properties = new HashMap<>(); + properties.put(LanceNamespaceConfig.KEY_IMPL, "rest"); + properties.put(LanceNamespaceConfig.KEY_URI, restUri); + + restAdapter = LanceNamespaceAdapter.create(properties); + restAdapter.init(); + } + + @AfterEach + void tearDown() throws Exception { + if (restAdapter != null) { + restAdapter.close(); + } + } + + @Test + @DisplayName("Test REST namespace creation and listing") + void testRestNamespaceOperations() { + String testNamespace = "rest_test_db_" + System.currentTimeMillis(); + + try { + // Create namespace + restAdapter.createNamespace(new HashMap<>(), testNamespace); + assertThat(restAdapter.namespaceExists(testNamespace)).isTrue(); + + // List namespaces + List namespaces = restAdapter.listNamespaces(); + assertThat(namespaces).contains(testNamespace); + } finally { + // Cleanup + try { + restAdapter.dropNamespace(true, testNamespace); + } catch (Exception ignored) { + // Ignore cleanup errors + } + } + } + + @Test + @DisplayName("Test REST table operations") + void testRestTableOperations() { + String testNamespace = "rest_table_db_" + System.currentTimeMillis(); + String testTable = "rest_test_table"; + + try { + // Create namespace first + restAdapter.createNamespace(new HashMap<>(), testNamespace); + + // Create table + restAdapter.createEmptyTable(null, new HashMap<>(), testNamespace, testTable); + assertThat(restAdapter.tableExists(testNamespace, testTable)).isTrue(); + + // List tables + List tables = restAdapter.listTables(testNamespace); + assertThat(tables).contains(testTable); + + // Get table metadata + LanceNamespaceAdapter.TableMetadata metadata = + restAdapter.getTableMetadata(testNamespace, testTable); + assertThat(metadata).isNotNull(); + + // Drop table + restAdapter.dropTable(testNamespace, testTable); + assertThat(restAdapter.tableExists(testNamespace, testTable)).isFalse(); + } finally { + // Cleanup + try { + restAdapter.dropNamespace(true, testNamespace); + } catch (Exception ignored) { + // Ignore cleanup errors + } + } + } + } +}