From d39cc24d0e5912dac7b72136d6b19da09248372e Mon Sep 17 00:00:00 2001 From: rockyyin Date: Wed, 8 Apr 2026 00:57:49 +0800 Subject: [PATCH] feat: support DELETE operation on Lance datasets (#8) - Add LanceDeleteExecutor for predicate-based row deletion - Update LanceSink to handle DELETE RowKind with buffered batch processing - Update LanceDynamicTableSink to support DELETE in ChangelogMode - Build delete predicates from RowData fields (supports common types) - Add comprehensive unit tests for DELETE functionality - Inspired by lance-spark DELETE implementation --- .../connector/lance/LanceDeleteExecutor.java | 202 +++++++++++++++ .../flink/connector/lance/LanceSink.java | 161 +++++++++++- .../lance/table/LanceDynamicTableSink.java | 17 +- .../connector/lance/LanceDeleteTest.java | 245 ++++++++++++++++++ 4 files changed, 615 insertions(+), 10 deletions(-) create mode 100644 src/main/java/org/apache/flink/connector/lance/LanceDeleteExecutor.java create mode 100644 src/test/java/org/apache/flink/connector/lance/LanceDeleteTest.java diff --git a/src/main/java/org/apache/flink/connector/lance/LanceDeleteExecutor.java b/src/main/java/org/apache/flink/connector/lance/LanceDeleteExecutor.java new file mode 100644 index 0000000..c0eaaca --- /dev/null +++ b/src/main/java/org/apache/flink/connector/lance/LanceDeleteExecutor.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.lance; + +import com.lancedb.lance.Dataset; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; + +/** + * Executor for DELETE operations on Lance datasets. + * + *

Wraps the Lance SDK's {@code Dataset.delete(String predicate)} method to provide + * a clean interface for deleting rows from a Lance dataset based on a SQL-like predicate. + * + *

Usage example: + *

{@code
+ * try (LanceDeleteExecutor executor = new LanceDeleteExecutor("/path/to/dataset")) {
+ *     long deletedCount = executor.delete("id = 1");
+ *     // or delete with complex predicates
+ *     long deletedCount2 = executor.delete("age > 30 AND status = 'inactive'");
+ * }
+ * }
+ * + *

This implementation is inspired by the lance-spark DELETE support. + * The predicate syntax follows Lance's SQL-like filter syntax. + */ +public class LanceDeleteExecutor implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(LanceDeleteExecutor.class); + + private final String datasetPath; + private BufferAllocator allocator; + + /** + * Create a LanceDeleteExecutor for the given dataset path. + * + * @param datasetPath path to the Lance dataset + */ + public LanceDeleteExecutor(String datasetPath) { + if (datasetPath == null || datasetPath.isEmpty()) { + throw new IllegalArgumentException("Dataset path cannot be null or empty"); + } + this.datasetPath = datasetPath; + this.allocator = new RootAllocator(Long.MAX_VALUE); + } + + /** + * Create a LanceDeleteExecutor with a custom allocator. + * + * @param datasetPath path to the Lance dataset + * @param allocator Arrow memory allocator + */ + public LanceDeleteExecutor(String datasetPath, BufferAllocator allocator) { + if (datasetPath == null || datasetPath.isEmpty()) { + throw new IllegalArgumentException("Dataset path cannot be null or empty"); + } + if (allocator == null) { + throw new IllegalArgumentException("Allocator cannot be null"); + } + this.datasetPath = datasetPath; + this.allocator = allocator; + } + + /** + * Delete rows from the dataset matching the given predicate. + * + *

The predicate uses SQL-like syntax supported by Lance, for example: + *

+ * + * @param predicate the filter predicate for rows to delete + * @throws IOException if the delete operation fails + * @throws IllegalArgumentException if the predicate is null or empty + */ + public void delete(String predicate) throws IOException { + if (predicate == null || predicate.trim().isEmpty()) { + throw new IllegalArgumentException("Delete predicate cannot be null or empty"); + } + + LOG.info("Executing DELETE on dataset: {} with predicate: {}", datasetPath, predicate); + + try (Dataset dataset = Dataset.open(datasetPath, allocator)) { + long countBefore = dataset.countRows(); + dataset.delete(predicate); + LOG.info("DELETE completed on dataset: {}. Rows before: {}", datasetPath, countBefore); + } catch (Exception e) { + throw new IOException( + String.format("Failed to delete from dataset '%s' with predicate '%s'", + datasetPath, predicate), e); + } + } + + /** + * Delete rows from the dataset matching the given predicate and return the count of remaining rows. + * + * @param predicate the filter predicate for rows to delete + * @return the number of rows remaining after deletion + * @throws IOException if the delete operation fails + */ + public long deleteAndCount(String predicate) throws IOException { + if (predicate == null || predicate.trim().isEmpty()) { + throw new IllegalArgumentException("Delete predicate cannot be null or empty"); + } + + LOG.info("Executing DELETE on dataset: {} with predicate: {}", datasetPath, predicate); + + try (Dataset dataset = Dataset.open(datasetPath, allocator)) { + long countBefore = dataset.countRows(); + dataset.delete(predicate); + LOG.info("DELETE completed. Rows before: {}", countBefore); + } catch (Exception e) { + throw new IOException( + String.format("Failed to delete from dataset '%s' with predicate '%s'", + datasetPath, predicate), e); + } + + // Re-open to get the count after deletion + try (Dataset dataset = Dataset.open(datasetPath, allocator)) { + long countAfter = dataset.countRows(); + LOG.info("Rows after deletion: {}", countAfter); + return countAfter; + } catch (Exception e) { + throw new IOException("Failed to count rows after deletion", e); + } + } + + /** + * Get the current row count of the dataset. + * + * @return the number of rows in the dataset + * @throws IOException if the operation fails + */ + public long countRows() throws IOException { + try (Dataset dataset = Dataset.open(datasetPath, allocator)) { + return dataset.countRows(); + } catch (Exception e) { + throw new IOException("Failed to count rows in dataset: " + datasetPath, e); + } + } + + /** + * Get the current row count matching a filter. + * + * @param filter the filter predicate + * @return the number of rows matching the filter + * @throws IOException if the operation fails + */ + public long countRows(String filter) throws IOException { + try (Dataset dataset = Dataset.open(datasetPath, allocator)) { + return dataset.countRows(filter); + } catch (Exception e) { + throw new IOException("Failed to count rows with filter in dataset: " + datasetPath, e); + } + } + + /** + * Get the dataset path. + * + * @return the dataset path + */ + public String getDatasetPath() { + return datasetPath; + } + + @Override + public void close() throws IOException { + if (allocator != null) { + try { + allocator.close(); + } catch (Exception e) { + LOG.warn("Failed to close allocator", e); + } + allocator = null; + } + } +} diff --git a/src/main/java/org/apache/flink/connector/lance/LanceSink.java b/src/main/java/org/apache/flink/connector/lance/LanceSink.java index feeec90..8d618a4 100644 --- a/src/main/java/org/apache/flink/connector/lance/LanceSink.java +++ b/src/main/java/org/apache/flink/connector/lance/LanceSink.java @@ -27,7 +27,9 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; import com.lancedb.lance.Dataset; import com.lancedb.lance.Fragment; @@ -53,7 +55,7 @@ /** * Lance Sink implementation. * - *

Writes Flink RowData to Lance dataset, supports batch writing and Checkpoint. + *

Writes Flink RowData to Lance dataset, supports batch writing, Checkpoint, and DELETE operations. * *

Usage example: *

{@code
@@ -80,7 +82,9 @@ public class LanceSink extends RichSinkFunction implements Checkpointed
     private transient RowDataConverter converter;
     private transient Schema arrowSchema;
     private transient List buffer;
+    private transient List deleteBuffer;
     private transient long totalWrittenRows;
+    private transient long totalDeletedRows;
     private transient boolean datasetExists;
     private transient boolean isFirstWrite;
 
@@ -103,7 +107,9 @@ public void open(Configuration parameters) throws Exception {
         
         this.allocator = new RootAllocator(Long.MAX_VALUE);
         this.buffer = new ArrayList<>(options.getWriteBatchSize());
+        this.deleteBuffer = new ArrayList<>();
         this.totalWrittenRows = 0;
+        this.totalDeletedRows = 0;
         this.isFirstWrite = true;
         
         // Initialize converter and Schema
@@ -131,12 +137,25 @@ public void open(Configuration parameters) throws Exception {
 
     @Override
     public void invoke(RowData value, Context context) throws Exception {
-        buffer.add(value);
+        RowKind rowKind = value.getRowKind();
         
-        // When buffer reaches batch size, execute write
-        if (buffer.size() >= options.getWriteBatchSize()) {
-            flush();
+        if (rowKind == RowKind.DELETE) {
+            // Buffer DELETE rows for batch processing
+            deleteBuffer.add(value);
+            
+            // Process deletes when buffer reaches batch size
+            if (deleteBuffer.size() >= options.getWriteBatchSize()) {
+                flushDeletes();
+            }
+        } else if (rowKind == RowKind.INSERT || rowKind == RowKind.UPDATE_AFTER) {
+            buffer.add(value);
+            
+            // When buffer reaches batch size, execute write
+            if (buffer.size() >= options.getWriteBatchSize()) {
+                flush();
+            }
         }
+        // Ignore UPDATE_BEFORE rows
     }
 
     /**
@@ -203,6 +222,7 @@ public void close() throws Exception {
         LOG.info("Closing Lance Sink");
         // Flush remaining data
         try {
+            flushDeletes();
             flush();
         } catch (Exception e) {
             LOG.warn("Failed to flush data on close", e);
@@ -225,7 +245,7 @@ public void close() throws Exception {
             allocator = null;
         }
         
-        LOG.info("Lance Sink closed, total written {} rows", totalWrittenRows);
+        LOG.info("Lance Sink closed, total written {} rows, total deleted {} rows", totalWrittenRows, totalDeletedRows);
         
         super.close();
     }
@@ -235,6 +255,7 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
         LOG.debug("Snapshot state, checkpointId: {}", context.getCheckpointId());
         
         // Flush all buffered data at Checkpoint
+        flushDeletes();
         flush();
     }
 
@@ -265,6 +286,134 @@ public long getTotalWrittenRows() {
         return totalWrittenRows;
     }
 
+    /**
+     * Get total deleted row count
+     */
+    public long getTotalDeletedRows() {
+        return totalDeletedRows;
+    }
+
+    /**
+     * Flush buffered DELETE operations.
+     * 
+     * 

Builds a predicate from the buffered DELETE rows and executes + * the delete operation using Lance's native predicate-based deletion. + */ + private void flushDeletes() throws IOException { + if (deleteBuffer.isEmpty()) { + return; + } + + LOG.debug("Flushing delete buffer, row count: {}", deleteBuffer.size()); + + String datasetPath = options.getPath(); + if (!datasetExists) { + LOG.warn("Cannot delete from non-existent dataset: {}", datasetPath); + deleteBuffer.clear(); + return; + } + + try { + // Build delete predicate from buffered rows + String predicate = buildDeletePredicate(deleteBuffer); + + if (predicate != null && !predicate.isEmpty()) { + LOG.info("Executing DELETE with predicate: {}", predicate); + + try (Dataset deleteDataset = Dataset.open(datasetPath, allocator)) { + deleteDataset.delete(predicate); + } + + totalDeletedRows += deleteBuffer.size(); + LOG.debug("Deleted {} rows, total deleted: {} rows", deleteBuffer.size(), totalDeletedRows); + } + + deleteBuffer.clear(); + } catch (Exception e) { + throw new IOException("Failed to execute DELETE on Lance dataset", e); + } + } + + /** + * Build a delete predicate from a list of RowData. + * + *

Constructs an OR-combined predicate where each row contributes + * an AND-combined equality condition on all fields. + * For example, for rows with fields (id=1, name='Alice') and (id=2, name='Bob'), + * the predicate would be: + * {@code (id = 1 AND name = 'Alice') OR (id = 2 AND name = 'Bob')} + * + * @param rows the rows to build the predicate from + * @return the combined predicate string + */ + private String buildDeletePredicate(List rows) { + List rowPredicates = new ArrayList<>(); + List fieldNames = rowType.getFieldNames(); + List fieldTypes = new ArrayList<>(); + for (RowType.RowField field : rowType.getFields()) { + fieldTypes.add(field.getType()); + } + + for (RowData row : rows) { + List fieldPredicates = new ArrayList<>(); + for (int i = 0; i < fieldNames.size(); i++) { + String fieldName = fieldNames.get(i); + String value = extractFieldValue(row, i, fieldTypes.get(i)); + if (value != null) { + fieldPredicates.add(fieldName + " = " + value); + } + } + if (!fieldPredicates.isEmpty()) { + rowPredicates.add("(" + String.join(" AND ", fieldPredicates) + ")"); + } + } + + if (rowPredicates.isEmpty()) { + return null; + } + + return String.join(" OR ", rowPredicates); + } + + /** + * Extract a field value from RowData as a string suitable for a predicate. + * + * @param row the RowData + * @param fieldIndex the field index + * @param type the logical type of the field + * @return the field value as a predicate string, or null if the field is null + */ + private String extractFieldValue(RowData row, int fieldIndex, LogicalType type) { + if (row.isNullAt(fieldIndex)) { + return null; + } + + switch (type.getTypeRoot()) { + case BOOLEAN: + return String.valueOf(row.getBoolean(fieldIndex)); + case TINYINT: + return String.valueOf(row.getByte(fieldIndex)); + case SMALLINT: + return String.valueOf(row.getShort(fieldIndex)); + case INTEGER: + return String.valueOf(row.getInt(fieldIndex)); + case BIGINT: + return String.valueOf(row.getLong(fieldIndex)); + case FLOAT: + return String.valueOf(row.getFloat(fieldIndex)); + case DOUBLE: + return String.valueOf(row.getDouble(fieldIndex)); + case CHAR: + case VARCHAR: + String strValue = row.getString(fieldIndex).toString(); + return "'" + strValue.replace("'", "''") + "'"; + default: + // For unsupported types, skip this field in the predicate + LOG.debug("Unsupported type for delete predicate: {}", type); + return null; + } + } + /** * Recursively delete directory */ diff --git a/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableSink.java b/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableSink.java index 09ed914..de86638 100644 --- a/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableSink.java +++ b/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableSink.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.lance.table; +import org.apache.flink.connector.lance.LanceDeleteExecutor; import org.apache.flink.connector.lance.LanceSink; import org.apache.flink.connector.lance.config.LanceOptions; import org.apache.flink.streaming.api.datastream.DataStream; @@ -36,6 +37,8 @@ * Lance dynamic table sink. * *

Implements DynamicTableSink interface, supports writing Flink data to Lance dataset. + *

Supports INSERT and DELETE operations. DELETE is handled via Lance's native + * predicate-based deletion using {@link LanceDeleteExecutor}. */ public class LanceDynamicTableSink implements DynamicTableSink { @@ -49,10 +52,16 @@ public LanceDynamicTableSink(LanceOptions options, DataType physicalDataType) { @Override public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { - // Lance only supports INSERT operations - return ChangelogMode.newBuilder() - .addContainedKind(RowKind.INSERT) - .build(); + // Lance supports INSERT and DELETE operations + ChangelogMode.Builder builder = ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT); + + // Add DELETE support if requested + if (requestedMode.contains(RowKind.DELETE)) { + builder.addContainedKind(RowKind.DELETE); + } + + return builder.build(); } @Override diff --git a/src/test/java/org/apache/flink/connector/lance/LanceDeleteTest.java b/src/test/java/org/apache/flink/connector/lance/LanceDeleteTest.java new file mode 100644 index 0000000..015c211 --- /dev/null +++ b/src/test/java/org/apache/flink/connector/lance/LanceDeleteTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.lance; + +import org.apache.flink.connector.lance.config.LanceOptions; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.connector.lance.table.LanceDynamicTableSink; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.RowKind; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Tests for Lance DELETE support. + * + *

Validates the LanceDeleteExecutor, LanceSink DELETE handling, + * and LanceDynamicTableSink changelog mode support. + */ +public class LanceDeleteTest { + + @TempDir + Path tempDir; + + // ==================== LanceDeleteExecutor Tests ==================== + + @Test + public void testDeleteExecutorCreation() { + String path = tempDir.resolve("test_dataset").toString(); + try (LanceDeleteExecutor executor = new LanceDeleteExecutor(path)) { + assertEquals(path, executor.getDatasetPath()); + } catch (IOException e) { + fail("Should not throw exception on creation: " + e.getMessage()); + } + } + + @Test + public void testDeleteExecutorNullPath() { + assertThrows(IllegalArgumentException.class, () -> { + new LanceDeleteExecutor(null); + }); + } + + @Test + public void testDeleteExecutorEmptyPath() { + assertThrows(IllegalArgumentException.class, () -> { + new LanceDeleteExecutor(""); + }); + } + + @Test + public void testDeleteExecutorNullPredicate() { + String path = tempDir.resolve("test_dataset").toString(); + try (LanceDeleteExecutor executor = new LanceDeleteExecutor(path)) { + assertThrows(IllegalArgumentException.class, () -> { + executor.delete(null); + }); + } catch (IOException e) { + fail("Should not throw exception: " + e.getMessage()); + } + } + + @Test + public void testDeleteExecutorEmptyPredicate() { + String path = tempDir.resolve("test_dataset").toString(); + try (LanceDeleteExecutor executor = new LanceDeleteExecutor(path)) { + assertThrows(IllegalArgumentException.class, () -> { + executor.delete(""); + }); + } catch (IOException e) { + fail("Should not throw exception: " + e.getMessage()); + } + } + + @Test + public void testDeleteExecutorBlankPredicate() { + String path = tempDir.resolve("test_dataset").toString(); + try (LanceDeleteExecutor executor = new LanceDeleteExecutor(path)) { + assertThrows(IllegalArgumentException.class, () -> { + executor.delete(" "); + }); + } catch (IOException e) { + fail("Should not throw exception: " + e.getMessage()); + } + } + + @Test + public void testDeleteAndCountNullPredicate() { + String path = tempDir.resolve("test_dataset").toString(); + try (LanceDeleteExecutor executor = new LanceDeleteExecutor(path)) { + assertThrows(IllegalArgumentException.class, () -> { + executor.deleteAndCount(null); + }); + } catch (IOException e) { + fail("Should not throw exception: " + e.getMessage()); + } + } + + // ==================== LanceDynamicTableSink ChangelogMode Tests ==================== + + @Test + public void testSinkSupportsInsertOnly() { + LanceOptions options = LanceOptions.builder() + .path(tempDir.resolve("test_dataset").toString()) + .build(); + + RowType rowType = RowType.of(new IntType(), new VarCharType()); + DataType dataType = TypeConversions.fromLogicalToDataType(rowType); + + LanceDynamicTableSink sink = new LanceDynamicTableSink(options, dataType); + + // When only INSERT is requested + ChangelogMode insertOnly = ChangelogMode.insertOnly(); + ChangelogMode result = sink.getChangelogMode(insertOnly); + + assertTrue(result.contains(RowKind.INSERT)); + assertFalse(result.contains(RowKind.DELETE)); + } + + @Test + public void testSinkSupportsDeleteWhenRequested() { + LanceOptions options = LanceOptions.builder() + .path(tempDir.resolve("test_dataset").toString()) + .build(); + + RowType rowType = RowType.of(new IntType(), new VarCharType()); + DataType dataType = TypeConversions.fromLogicalToDataType(rowType); + + LanceDynamicTableSink sink = new LanceDynamicTableSink(options, dataType); + + // When DELETE is requested + ChangelogMode withDelete = ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.DELETE) + .build(); + ChangelogMode result = sink.getChangelogMode(withDelete); + + assertTrue(result.contains(RowKind.INSERT)); + assertTrue(result.contains(RowKind.DELETE)); + } + + @Test + public void testSinkCopy() { + LanceOptions options = LanceOptions.builder() + .path(tempDir.resolve("test_dataset").toString()) + .build(); + + RowType rowType = RowType.of(new IntType(), new VarCharType()); + DataType dataType = TypeConversions.fromLogicalToDataType(rowType); + + LanceDynamicTableSink sink = new LanceDynamicTableSink(options, dataType); + DynamicTableSink copy = sink.copy(); + + assertNotSame(sink, copy); + assertTrue(copy instanceof LanceDynamicTableSink); + } + + @Test + public void testSinkSummary() { + LanceOptions options = LanceOptions.builder() + .path(tempDir.resolve("test_dataset").toString()) + .build(); + + RowType rowType = RowType.of(new IntType(), new VarCharType()); + DataType dataType = TypeConversions.fromLogicalToDataType(rowType); + + LanceDynamicTableSink sink = new LanceDynamicTableSink(options, dataType); + assertEquals("Lance Table Sink", sink.asSummaryString()); + } + + // ==================== LanceSink DELETE Configuration Tests ==================== + + @Test + public void testLanceSinkBuilder() { + RowType rowType = RowType.of( + new LogicalType[]{new BigIntType(), new VarCharType()}, + new String[]{"id", "name"}); + + LanceSink sink = LanceSink.builder() + .path(tempDir.resolve("test_dataset").toString()) + .rowType(rowType) + .batchSize(512) + .build(); + + assertNotNull(sink); + assertEquals(rowType, sink.getRowType()); + } + + @Test + public void testLanceSinkBuilderNullPath() { + RowType rowType = RowType.of(new IntType()); + + assertThrows(IllegalArgumentException.class, () -> { + LanceSink.builder() + .rowType(rowType) + .build(); + }); + } + + @Test + public void testLanceSinkBuilderNullRowType() { + assertThrows(IllegalArgumentException.class, () -> { + LanceSink.builder() + .path(tempDir.resolve("test_dataset").toString()) + .build(); + }); + } + + @Test + public void testDeleteExecutorNullAllocator() { + assertThrows(IllegalArgumentException.class, () -> { + new LanceDeleteExecutor(tempDir.resolve("test_dataset").toString(), null); + }); + } +}