diff --git a/pom.xml b/pom.xml
index d78486c..601835d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,9 +20,9 @@
1.16.1
- 0.23.3
- 14.0.0
- 5.9.3
+ 5.0.0-beta.6
+ 15.0.2
+ 5.10.1
1.7.36
2.20.0
5.3.1
@@ -73,9 +73,15 @@
- com.lancedb
+ org.lance
lance-core
${lance.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
diff --git a/src/main/java/org/apache/flink/connector/lance/LanceAggregateSource.java b/src/main/java/org/apache/flink/connector/lance/LanceAggregateSource.java
index ed512e5..6cf8d6c 100644
--- a/src/main/java/org/apache/flink/connector/lance/LanceAggregateSource.java
+++ b/src/main/java/org/apache/flink/connector/lance/LanceAggregateSource.java
@@ -28,10 +28,10 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
-import com.lancedb.lance.Dataset;
-import com.lancedb.lance.Fragment;
-import com.lancedb.lance.ipc.LanceScanner;
-import com.lancedb.lance.ipc.ScanOptions;
+import org.lance.Dataset;
+import org.lance.Fragment;
+import org.lance.ipc.LanceScanner;
+import org.lance.ipc.ScanOptions;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
@@ -111,7 +111,7 @@ public void open(Configuration parameters) throws Exception {
}
try {
- this.dataset = Dataset.open(datasetPath, allocator);
+ this.dataset = Dataset.open().allocator(allocator).uri(datasetPath).build();
} catch (Exception e) {
throw new IOException("Failed to open Lance dataset: " + datasetPath, e);
}
diff --git a/src/main/java/org/apache/flink/connector/lance/LanceIndexBuilder.java b/src/main/java/org/apache/flink/connector/lance/LanceIndexBuilder.java
index 3f93897..55aa98d 100644
--- a/src/main/java/org/apache/flink/connector/lance/LanceIndexBuilder.java
+++ b/src/main/java/org/apache/flink/connector/lance/LanceIndexBuilder.java
@@ -20,14 +20,15 @@
import org.apache.flink.connector.lance.config.LanceOptions;
-import com.lancedb.lance.Dataset;
-import com.lancedb.lance.index.DistanceType;
-import com.lancedb.lance.index.IndexParams;
-import com.lancedb.lance.index.IndexType;
-import com.lancedb.lance.index.vector.HnswBuildParams;
-import com.lancedb.lance.index.vector.IvfBuildParams;
-import com.lancedb.lance.index.vector.PQBuildParams;
-import com.lancedb.lance.index.vector.VectorIndexParams;
+import org.lance.Dataset;
+import org.lance.index.DistanceType;
+import org.lance.index.IndexOptions;
+import org.lance.index.IndexParams;
+import org.lance.index.IndexType;
+import org.lance.index.vector.HnswBuildParams;
+import org.lance.index.vector.IvfBuildParams;
+import org.lance.index.vector.PQBuildParams;
+import org.lance.index.vector.VectorIndexParams;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.slf4j.Logger;
@@ -105,7 +106,7 @@ public IndexBuildResult buildIndex() throws IOException {
try {
// Initialize resources
this.allocator = new RootAllocator(Long.MAX_VALUE);
- this.dataset = Dataset.open(datasetPath, allocator);
+ this.dataset = Dataset.open().allocator(allocator).uri(datasetPath).build();
// Validate column exists
validateColumn();
@@ -131,8 +132,7 @@ public IndexBuildResult buildIndex() throws IOException {
.build();
VectorIndexParams ivfPqParams = VectorIndexParams.withIvfPqParams(
distanceType, ivfParams, pqParams);
- indexParams = new IndexParams.Builder()
- .setDistanceType(distanceType)
+ indexParams = IndexParams.builder()
.setVectorIndexParams(ivfPqParams)
.build();
break;
@@ -150,8 +150,7 @@ public IndexBuildResult buildIndex() throws IOException {
.build();
VectorIndexParams ivfHnswParams = VectorIndexParams.withIvfHnswPqParams(
distanceType, ivfParams, hnswParams, hnswPqParams);
- indexParams = new IndexParams.Builder()
- .setDistanceType(distanceType)
+ indexParams = IndexParams.builder()
.setVectorIndexParams(ivfHnswParams)
.build();
break;
@@ -159,8 +158,7 @@ public IndexBuildResult buildIndex() throws IOException {
case IVF_FLAT:
lanceIndexType = IndexType.IVF_FLAT;
VectorIndexParams ivfFlatParams = VectorIndexParams.ivfFlat(numPartitions, distanceType);
- indexParams = new IndexParams.Builder()
- .setDistanceType(distanceType)
+ indexParams = IndexParams.builder()
.setVectorIndexParams(ivfFlatParams)
.build();
break;
@@ -170,13 +168,13 @@ public IndexBuildResult buildIndex() throws IOException {
}
// Create index
- dataset.createIndex(
- Collections.singletonList(columnName),
- lanceIndexType,
- Optional.empty(), // Index name, use default
- indexParams,
- replace
- );
+ IndexOptions indexOptions = IndexOptions
+ .builder(Collections.singletonList(columnName), lanceIndexType, indexParams)
+ .replace(replace)
+ .withIndexName(null)
+ .build();
+
+ dataset.createIndex(indexOptions);
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
diff --git a/src/main/java/org/apache/flink/connector/lance/LanceInputFormat.java b/src/main/java/org/apache/flink/connector/lance/LanceInputFormat.java
index 2785a00..43d9f2c 100644
--- a/src/main/java/org/apache/flink/connector/lance/LanceInputFormat.java
+++ b/src/main/java/org/apache/flink/connector/lance/LanceInputFormat.java
@@ -28,10 +28,10 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
-import com.lancedb.lance.Dataset;
-import com.lancedb.lance.Fragment;
-import com.lancedb.lance.ipc.LanceScanner;
-import com.lancedb.lance.ipc.ScanOptions;
+import org.lance.Dataset;
+import org.lance.Fragment;
+import org.lance.ipc.LanceScanner;
+import org.lance.ipc.ScanOptions;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
@@ -107,7 +107,7 @@ public LanceSplit[] createInputSplits(int minNumSplits) throws IOException {
BufferAllocator tempAllocator = new RootAllocator(Long.MAX_VALUE);
try {
- Dataset tempDataset = Dataset.open(datasetPath, tempAllocator);
+ Dataset tempDataset = Dataset.open().allocator(tempAllocator).uri(datasetPath).build();
try {
List fragments = tempDataset.getFragments();
LanceSplit[] splits = new LanceSplit[fragments.size()];
@@ -143,7 +143,7 @@ public void open(LanceSplit split) throws IOException {
// Open dataset
String datasetPath = split.getDatasetPath();
try {
- this.dataset = Dataset.open(datasetPath, allocator);
+ this.dataset = Dataset.open().allocator(allocator).uri(datasetPath).build();
} catch (Exception e) {
throw new IOException("Cannot open dataset: " + datasetPath, e);
}
diff --git a/src/main/java/org/apache/flink/connector/lance/LanceSink.java b/src/main/java/org/apache/flink/connector/lance/LanceSink.java
index feeec90..a0da278 100644
--- a/src/main/java/org/apache/flink/connector/lance/LanceSink.java
+++ b/src/main/java/org/apache/flink/connector/lance/LanceSink.java
@@ -29,15 +29,18 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
-import com.lancedb.lance.Dataset;
-import com.lancedb.lance.Fragment;
-import com.lancedb.lance.FragmentMetadata;
-import com.lancedb.lance.FragmentOperation;
-import com.lancedb.lance.WriteParams;
+import org.lance.Dataset;
+import org.lance.Fragment;
+import org.lance.FragmentMetadata;
+import org.lance.WriteParams;
+import org.lance.CommitBuilder;
+import org.lance.Transaction;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
+import org.lance.operation.Append;
+import org.lance.operation.Overwrite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -159,19 +162,23 @@ public void flush() throws IOException {
WriteParams writeParams = new WriteParams.Builder()
.withMaxRowsPerFile(options.getWriteMaxRowsPerFile())
.build();
-
+
// Create Fragment
- List fragments = Fragment.create(
- datasetPath,
- allocator,
- root,
- writeParams
- );
-
+ List fragments = Fragment.write()
+ .datasetUri(datasetPath)
+ .allocator(allocator)
+ .data(root)
+ .writeParams(writeParams)
+ .execute();
+
if (!datasetExists) {
// Create new dataset (using Overwrite operation)
- FragmentOperation.Overwrite overwrite = new FragmentOperation.Overwrite(fragments, arrowSchema);
- dataset = overwrite.commit(allocator, datasetPath, Optional.empty(), Collections.emptyMap());
+ Overwrite operation = Overwrite.builder().fragments(fragments).schema(arrowSchema).build();
+ final CommitBuilder builder =
+ new CommitBuilder(datasetPath, allocator).writeParams(Collections.emptyMap());
+ try (Transaction txn = new Transaction.Builder().operation(operation).build()) {
+ dataset = builder.execute(txn);
+ }
datasetExists = true;
isFirstWrite = false;
LOG.info("Created new dataset: {}", datasetPath);
@@ -179,13 +186,19 @@ public void flush() throws IOException {
// Append data
if (isFirstWrite && options.getWriteMode() == LanceOptions.WriteMode.OVERWRITE) {
// First write and overwrite mode
- FragmentOperation.Overwrite overwrite = new FragmentOperation.Overwrite(fragments, arrowSchema);
- dataset = overwrite.commit(allocator, datasetPath, Optional.empty(), Collections.emptyMap());
+ Overwrite operation = Overwrite.builder().fragments(fragments).schema(arrowSchema).build();
+ final CommitBuilder builder = new CommitBuilder(datasetPath, allocator);
+ try (Transaction txn = new Transaction.Builder().operation(operation).build()) {
+ dataset = builder.execute(txn);
+ }
isFirstWrite = false;
} else {
// Append mode
- FragmentOperation.Append append = new FragmentOperation.Append(fragments);
- dataset = append.commit(allocator, datasetPath, Optional.empty(), Collections.emptyMap());
+ Append operation = Append.builder().fragments(fragments).build();
+ final CommitBuilder builder = new CommitBuilder(datasetPath, allocator);
+ try (Transaction txn = new Transaction.Builder().operation(operation).build()) {
+ dataset = builder.execute(txn);
+ }
}
}
diff --git a/src/main/java/org/apache/flink/connector/lance/LanceSource.java b/src/main/java/org/apache/flink/connector/lance/LanceSource.java
index ade00a0..6a12720 100644
--- a/src/main/java/org/apache/flink/connector/lance/LanceSource.java
+++ b/src/main/java/org/apache/flink/connector/lance/LanceSource.java
@@ -27,10 +27,10 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
-import com.lancedb.lance.Dataset;
-import com.lancedb.lance.Fragment;
-import com.lancedb.lance.ipc.LanceScanner;
-import com.lancedb.lance.ipc.ScanOptions;
+import org.lance.Dataset;
+import org.lance.Fragment;
+import org.lance.ipc.LanceScanner;
+import org.lance.ipc.ScanOptions;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
@@ -126,7 +126,7 @@ public void open(Configuration parameters) throws Exception {
Path path = Paths.get(datasetPath);
try {
- this.dataset = Dataset.open(path.toString(), allocator);
+ this.dataset = Dataset.open().allocator(allocator).uri(path.toString()).build();
} catch (Exception e) {
throw new IOException("Cannot open Lance dataset: " + datasetPath, e);
}
diff --git a/src/main/java/org/apache/flink/connector/lance/LanceVectorSearch.java b/src/main/java/org/apache/flink/connector/lance/LanceVectorSearch.java
index faf0c5a..5cab763 100644
--- a/src/main/java/org/apache/flink/connector/lance/LanceVectorSearch.java
+++ b/src/main/java/org/apache/flink/connector/lance/LanceVectorSearch.java
@@ -26,11 +26,11 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
-import com.lancedb.lance.Dataset;
-import com.lancedb.lance.index.DistanceType;
-import com.lancedb.lance.ipc.LanceScanner;
-import com.lancedb.lance.ipc.Query;
-import com.lancedb.lance.ipc.ScanOptions;
+import org.lance.Dataset;
+import org.lance.index.DistanceType;
+import org.lance.ipc.LanceScanner;
+import org.lance.ipc.Query;
+import org.lance.ipc.ScanOptions;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.Float8Vector;
@@ -99,7 +99,7 @@ public void open() throws IOException {
this.allocator = new RootAllocator(Long.MAX_VALUE);
try {
- this.dataset = Dataset.open(datasetPath, allocator);
+ this.dataset = Dataset.open().allocator(allocator).uri(datasetPath).build();
// Get Schema and create converter
Schema arrowSchema = dataset.getSchema();
diff --git a/src/main/java/org/apache/flink/connector/lance/table/LanceCatalog.java b/src/main/java/org/apache/flink/connector/lance/table/LanceCatalog.java
index 74fed94..686a208 100644
--- a/src/main/java/org/apache/flink/connector/lance/table/LanceCatalog.java
+++ b/src/main/java/org/apache/flink/connector/lance/table/LanceCatalog.java
@@ -49,7 +49,7 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
-import com.lancedb.lance.Dataset;
+import org.lance.Dataset;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.slf4j.Logger;
@@ -421,7 +421,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
if (isRemoteStorage) {
configureStorageEnvironment();
}
- Dataset dataset = Dataset.open(datasetPath, allocator);
+ Dataset dataset = Dataset.open().allocator(allocator).uri(datasetPath).build();
try {
// Infer Flink Schema from Lance Schema
@@ -476,7 +476,7 @@ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
// Try to open dataset to verify existence
try {
configureStorageEnvironment();
- Dataset dataset = Dataset.open(datasetPath, allocator);
+ Dataset dataset = Dataset.open().allocator(allocator).uri(datasetPath).build();
dataset.close();
knownTables.add(tableKey);
return true;