From ffafce9dcdd3381b82a7df6bdbe364357eb886ae Mon Sep 17 00:00:00 2001 From: Stepanishchev Stepan s60073796 Date: Thu, 16 Apr 2026 15:11:22 +0700 Subject: [PATCH] Bump lance version to 5.0.0-beta.6 and arrow to 15.0.2 --- pom.xml | 14 +++-- .../connector/lance/LanceAggregateSource.java | 10 ++-- .../connector/lance/LanceIndexBuilder.java | 42 ++++++++------- .../connector/lance/LanceInputFormat.java | 12 ++--- .../flink/connector/lance/LanceSink.java | 51 ++++++++++++------- .../flink/connector/lance/LanceSource.java | 10 ++-- .../connector/lance/LanceVectorSearch.java | 12 ++--- .../connector/lance/table/LanceCatalog.java | 6 +-- 8 files changed, 87 insertions(+), 70 deletions(-) diff --git a/pom.xml b/pom.xml index d78486c..601835d 100644 --- a/pom.xml +++ b/pom.xml @@ -20,9 +20,9 @@ 1.16.1 - 0.23.3 - 14.0.0 - 5.9.3 + 5.0.0-beta.6 + 15.0.2 + 5.10.1 1.7.36 2.20.0 5.3.1 @@ -73,9 +73,15 @@ - com.lancedb + org.lance lance-core ${lance.version} + + + org.slf4j + slf4j-api + + diff --git a/src/main/java/org/apache/flink/connector/lance/LanceAggregateSource.java b/src/main/java/org/apache/flink/connector/lance/LanceAggregateSource.java index ed512e5..6cf8d6c 100644 --- a/src/main/java/org/apache/flink/connector/lance/LanceAggregateSource.java +++ b/src/main/java/org/apache/flink/connector/lance/LanceAggregateSource.java @@ -28,10 +28,10 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; -import com.lancedb.lance.Dataset; -import com.lancedb.lance.Fragment; -import com.lancedb.lance.ipc.LanceScanner; -import com.lancedb.lance.ipc.ScanOptions; +import org.lance.Dataset; +import org.lance.Fragment; +import org.lance.ipc.LanceScanner; +import org.lance.ipc.ScanOptions; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; @@ -111,7 +111,7 @@ public void open(Configuration parameters) throws Exception { } try { - this.dataset = Dataset.open(datasetPath, allocator); + this.dataset = Dataset.open().allocator(allocator).uri(datasetPath).build(); } catch (Exception e) { throw new IOException("Failed to open Lance dataset: " + datasetPath, e); } diff --git a/src/main/java/org/apache/flink/connector/lance/LanceIndexBuilder.java b/src/main/java/org/apache/flink/connector/lance/LanceIndexBuilder.java index 3f93897..55aa98d 100644 --- a/src/main/java/org/apache/flink/connector/lance/LanceIndexBuilder.java +++ b/src/main/java/org/apache/flink/connector/lance/LanceIndexBuilder.java @@ -20,14 +20,15 @@ import org.apache.flink.connector.lance.config.LanceOptions; -import com.lancedb.lance.Dataset; -import com.lancedb.lance.index.DistanceType; -import com.lancedb.lance.index.IndexParams; -import com.lancedb.lance.index.IndexType; -import com.lancedb.lance.index.vector.HnswBuildParams; -import com.lancedb.lance.index.vector.IvfBuildParams; -import com.lancedb.lance.index.vector.PQBuildParams; -import com.lancedb.lance.index.vector.VectorIndexParams; +import org.lance.Dataset; +import org.lance.index.DistanceType; +import org.lance.index.IndexOptions; +import org.lance.index.IndexParams; +import org.lance.index.IndexType; +import org.lance.index.vector.HnswBuildParams; +import org.lance.index.vector.IvfBuildParams; +import org.lance.index.vector.PQBuildParams; +import org.lance.index.vector.VectorIndexParams; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.slf4j.Logger; @@ -105,7 +106,7 @@ public IndexBuildResult buildIndex() throws IOException { try { // Initialize resources this.allocator = new RootAllocator(Long.MAX_VALUE); - this.dataset = Dataset.open(datasetPath, allocator); + this.dataset = Dataset.open().allocator(allocator).uri(datasetPath).build(); // Validate column exists validateColumn(); @@ -131,8 +132,7 @@ public IndexBuildResult buildIndex() throws IOException { .build(); VectorIndexParams ivfPqParams = VectorIndexParams.withIvfPqParams( distanceType, ivfParams, pqParams); - indexParams = new IndexParams.Builder() - .setDistanceType(distanceType) + indexParams = IndexParams.builder() .setVectorIndexParams(ivfPqParams) .build(); break; @@ -150,8 +150,7 @@ public IndexBuildResult buildIndex() throws IOException { .build(); VectorIndexParams ivfHnswParams = VectorIndexParams.withIvfHnswPqParams( distanceType, ivfParams, hnswParams, hnswPqParams); - indexParams = new IndexParams.Builder() - .setDistanceType(distanceType) + indexParams = IndexParams.builder() .setVectorIndexParams(ivfHnswParams) .build(); break; @@ -159,8 +158,7 @@ public IndexBuildResult buildIndex() throws IOException { case IVF_FLAT: lanceIndexType = IndexType.IVF_FLAT; VectorIndexParams ivfFlatParams = VectorIndexParams.ivfFlat(numPartitions, distanceType); - indexParams = new IndexParams.Builder() - .setDistanceType(distanceType) + indexParams = IndexParams.builder() .setVectorIndexParams(ivfFlatParams) .build(); break; @@ -170,13 +168,13 @@ public IndexBuildResult buildIndex() throws IOException { } // Create index - dataset.createIndex( - Collections.singletonList(columnName), - lanceIndexType, - Optional.empty(), // Index name, use default - indexParams, - replace - ); + IndexOptions indexOptions = IndexOptions + .builder(Collections.singletonList(columnName), lanceIndexType, indexParams) + .replace(replace) + .withIndexName(null) + .build(); + + dataset.createIndex(indexOptions); long endTime = System.currentTimeMillis(); long duration = endTime - startTime; diff --git a/src/main/java/org/apache/flink/connector/lance/LanceInputFormat.java b/src/main/java/org/apache/flink/connector/lance/LanceInputFormat.java index 2785a00..43d9f2c 100644 --- a/src/main/java/org/apache/flink/connector/lance/LanceInputFormat.java +++ b/src/main/java/org/apache/flink/connector/lance/LanceInputFormat.java @@ -28,10 +28,10 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; -import com.lancedb.lance.Dataset; -import com.lancedb.lance.Fragment; -import com.lancedb.lance.ipc.LanceScanner; -import com.lancedb.lance.ipc.ScanOptions; +import org.lance.Dataset; +import org.lance.Fragment; +import org.lance.ipc.LanceScanner; +import org.lance.ipc.ScanOptions; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; @@ -107,7 +107,7 @@ public LanceSplit[] createInputSplits(int minNumSplits) throws IOException { BufferAllocator tempAllocator = new RootAllocator(Long.MAX_VALUE); try { - Dataset tempDataset = Dataset.open(datasetPath, tempAllocator); + Dataset tempDataset = Dataset.open().allocator(tempAllocator).uri(datasetPath).build(); try { List fragments = tempDataset.getFragments(); LanceSplit[] splits = new LanceSplit[fragments.size()]; @@ -143,7 +143,7 @@ public void open(LanceSplit split) throws IOException { // Open dataset String datasetPath = split.getDatasetPath(); try { - this.dataset = Dataset.open(datasetPath, allocator); + this.dataset = Dataset.open().allocator(allocator).uri(datasetPath).build(); } catch (Exception e) { throw new IOException("Cannot open dataset: " + datasetPath, e); } diff --git a/src/main/java/org/apache/flink/connector/lance/LanceSink.java b/src/main/java/org/apache/flink/connector/lance/LanceSink.java index feeec90..a0da278 100644 --- a/src/main/java/org/apache/flink/connector/lance/LanceSink.java +++ b/src/main/java/org/apache/flink/connector/lance/LanceSink.java @@ -29,15 +29,18 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; -import com.lancedb.lance.Dataset; -import com.lancedb.lance.Fragment; -import com.lancedb.lance.FragmentMetadata; -import com.lancedb.lance.FragmentOperation; -import com.lancedb.lance.WriteParams; +import org.lance.Dataset; +import org.lance.Fragment; +import org.lance.FragmentMetadata; +import org.lance.WriteParams; +import org.lance.CommitBuilder; +import org.lance.Transaction; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.pojo.Schema; +import org.lance.operation.Append; +import org.lance.operation.Overwrite; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -159,19 +162,23 @@ public void flush() throws IOException { WriteParams writeParams = new WriteParams.Builder() .withMaxRowsPerFile(options.getWriteMaxRowsPerFile()) .build(); - + // Create Fragment - List fragments = Fragment.create( - datasetPath, - allocator, - root, - writeParams - ); - + List fragments = Fragment.write() + .datasetUri(datasetPath) + .allocator(allocator) + .data(root) + .writeParams(writeParams) + .execute(); + if (!datasetExists) { // Create new dataset (using Overwrite operation) - FragmentOperation.Overwrite overwrite = new FragmentOperation.Overwrite(fragments, arrowSchema); - dataset = overwrite.commit(allocator, datasetPath, Optional.empty(), Collections.emptyMap()); + Overwrite operation = Overwrite.builder().fragments(fragments).schema(arrowSchema).build(); + final CommitBuilder builder = + new CommitBuilder(datasetPath, allocator).writeParams(Collections.emptyMap()); + try (Transaction txn = new Transaction.Builder().operation(operation).build()) { + dataset = builder.execute(txn); + } datasetExists = true; isFirstWrite = false; LOG.info("Created new dataset: {}", datasetPath); @@ -179,13 +186,19 @@ public void flush() throws IOException { // Append data if (isFirstWrite && options.getWriteMode() == LanceOptions.WriteMode.OVERWRITE) { // First write and overwrite mode - FragmentOperation.Overwrite overwrite = new FragmentOperation.Overwrite(fragments, arrowSchema); - dataset = overwrite.commit(allocator, datasetPath, Optional.empty(), Collections.emptyMap()); + Overwrite operation = Overwrite.builder().fragments(fragments).schema(arrowSchema).build(); + final CommitBuilder builder = new CommitBuilder(datasetPath, allocator); + try (Transaction txn = new Transaction.Builder().operation(operation).build()) { + dataset = builder.execute(txn); + } isFirstWrite = false; } else { // Append mode - FragmentOperation.Append append = new FragmentOperation.Append(fragments); - dataset = append.commit(allocator, datasetPath, Optional.empty(), Collections.emptyMap()); + Append operation = Append.builder().fragments(fragments).build(); + final CommitBuilder builder = new CommitBuilder(datasetPath, allocator); + try (Transaction txn = new Transaction.Builder().operation(operation).build()) { + dataset = builder.execute(txn); + } } } diff --git a/src/main/java/org/apache/flink/connector/lance/LanceSource.java b/src/main/java/org/apache/flink/connector/lance/LanceSource.java index ade00a0..6a12720 100644 --- a/src/main/java/org/apache/flink/connector/lance/LanceSource.java +++ b/src/main/java/org/apache/flink/connector/lance/LanceSource.java @@ -27,10 +27,10 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; -import com.lancedb.lance.Dataset; -import com.lancedb.lance.Fragment; -import com.lancedb.lance.ipc.LanceScanner; -import com.lancedb.lance.ipc.ScanOptions; +import org.lance.Dataset; +import org.lance.Fragment; +import org.lance.ipc.LanceScanner; +import org.lance.ipc.ScanOptions; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; @@ -126,7 +126,7 @@ public void open(Configuration parameters) throws Exception { Path path = Paths.get(datasetPath); try { - this.dataset = Dataset.open(path.toString(), allocator); + this.dataset = Dataset.open().allocator(allocator).uri(path.toString()).build(); } catch (Exception e) { throw new IOException("Cannot open Lance dataset: " + datasetPath, e); } diff --git a/src/main/java/org/apache/flink/connector/lance/LanceVectorSearch.java b/src/main/java/org/apache/flink/connector/lance/LanceVectorSearch.java index faf0c5a..5cab763 100644 --- a/src/main/java/org/apache/flink/connector/lance/LanceVectorSearch.java +++ b/src/main/java/org/apache/flink/connector/lance/LanceVectorSearch.java @@ -26,11 +26,11 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; -import com.lancedb.lance.Dataset; -import com.lancedb.lance.index.DistanceType; -import com.lancedb.lance.ipc.LanceScanner; -import com.lancedb.lance.ipc.Query; -import com.lancedb.lance.ipc.ScanOptions; +import org.lance.Dataset; +import org.lance.index.DistanceType; +import org.lance.ipc.LanceScanner; +import org.lance.ipc.Query; +import org.lance.ipc.ScanOptions; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.Float8Vector; @@ -99,7 +99,7 @@ public void open() throws IOException { this.allocator = new RootAllocator(Long.MAX_VALUE); try { - this.dataset = Dataset.open(datasetPath, allocator); + this.dataset = Dataset.open().allocator(allocator).uri(datasetPath).build(); // Get Schema and create converter Schema arrowSchema = dataset.getSchema(); diff --git a/src/main/java/org/apache/flink/connector/lance/table/LanceCatalog.java b/src/main/java/org/apache/flink/connector/lance/table/LanceCatalog.java index 74fed94..686a208 100644 --- a/src/main/java/org/apache/flink/connector/lance/table/LanceCatalog.java +++ b/src/main/java/org/apache/flink/connector/lance/table/LanceCatalog.java @@ -49,7 +49,7 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; -import com.lancedb.lance.Dataset; +import org.lance.Dataset; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.slf4j.Logger; @@ -421,7 +421,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep if (isRemoteStorage) { configureStorageEnvironment(); } - Dataset dataset = Dataset.open(datasetPath, allocator); + Dataset dataset = Dataset.open().allocator(allocator).uri(datasetPath).build(); try { // Infer Flink Schema from Lance Schema @@ -476,7 +476,7 @@ public boolean tableExists(ObjectPath tablePath) throws CatalogException { // Try to open dataset to verify existence try { configureStorageEnvironment(); - Dataset dataset = Dataset.open(datasetPath, allocator); + Dataset dataset = Dataset.open().allocator(allocator).uri(datasetPath).build(); dataset.close(); knownTables.add(tableKey); return true;