diff --git a/src/main/java/org/apache/flink/connector/lance/LanceDeleteExecutor.java b/src/main/java/org/apache/flink/connector/lance/LanceDeleteExecutor.java new file mode 100644 index 0000000..c0eaaca --- /dev/null +++ b/src/main/java/org/apache/flink/connector/lance/LanceDeleteExecutor.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.lance; + +import com.lancedb.lance.Dataset; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; + +/** + * Executor for DELETE operations on Lance datasets. + * + *
Wraps the Lance SDK's {@code Dataset.delete(String predicate)} method to provide + * a clean interface for deleting rows from a Lance dataset based on a SQL-like predicate. + * + *
Usage example: + *
{@code
+ * try (LanceDeleteExecutor executor = new LanceDeleteExecutor("/path/to/dataset")) {
+ * long deletedCount = executor.delete("id = 1");
+ * // or delete with complex predicates
+ * long deletedCount2 = executor.delete("age > 30 AND status = 'inactive'");
+ * }
+ * }
+ *
+ * This implementation is inspired by the lance-spark DELETE support. + * The predicate syntax follows Lance's SQL-like filter syntax. + */ +public class LanceDeleteExecutor implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(LanceDeleteExecutor.class); + + private final String datasetPath; + private BufferAllocator allocator; + + /** + * Create a LanceDeleteExecutor for the given dataset path. + * + * @param datasetPath path to the Lance dataset + */ + public LanceDeleteExecutor(String datasetPath) { + if (datasetPath == null || datasetPath.isEmpty()) { + throw new IllegalArgumentException("Dataset path cannot be null or empty"); + } + this.datasetPath = datasetPath; + this.allocator = new RootAllocator(Long.MAX_VALUE); + } + + /** + * Create a LanceDeleteExecutor with a custom allocator. + * + * @param datasetPath path to the Lance dataset + * @param allocator Arrow memory allocator + */ + public LanceDeleteExecutor(String datasetPath, BufferAllocator allocator) { + if (datasetPath == null || datasetPath.isEmpty()) { + throw new IllegalArgumentException("Dataset path cannot be null or empty"); + } + if (allocator == null) { + throw new IllegalArgumentException("Allocator cannot be null"); + } + this.datasetPath = datasetPath; + this.allocator = allocator; + } + + /** + * Delete rows from the dataset matching the given predicate. + * + *
The predicate uses SQL-like syntax supported by Lance, for example: + *
Writes Flink RowData to Lance dataset, supports batch writing and Checkpoint. + *
Writes Flink RowData to Lance dataset, supports batch writing, Checkpoint, and DELETE operations. * *
Usage example: *
{@code
@@ -80,7 +82,9 @@ public class LanceSink extends RichSinkFunction implements Checkpointed
private transient RowDataConverter converter;
private transient Schema arrowSchema;
private transient List buffer;
+ private transient List deleteBuffer;
private transient long totalWrittenRows;
+ private transient long totalDeletedRows;
private transient boolean datasetExists;
private transient boolean isFirstWrite;
@@ -103,7 +107,9 @@ public void open(Configuration parameters) throws Exception {
this.allocator = new RootAllocator(Long.MAX_VALUE);
this.buffer = new ArrayList<>(options.getWriteBatchSize());
+ this.deleteBuffer = new ArrayList<>();
this.totalWrittenRows = 0;
+ this.totalDeletedRows = 0;
this.isFirstWrite = true;
// Initialize converter and Schema
@@ -131,12 +137,25 @@ public void open(Configuration parameters) throws Exception {
@Override
public void invoke(RowData value, Context context) throws Exception {
- buffer.add(value);
+ RowKind rowKind = value.getRowKind();
- // When buffer reaches batch size, execute write
- if (buffer.size() >= options.getWriteBatchSize()) {
- flush();
+ if (rowKind == RowKind.DELETE) {
+ // Buffer DELETE rows for batch processing
+ deleteBuffer.add(value);
+
+ // Process deletes when buffer reaches batch size
+ if (deleteBuffer.size() >= options.getWriteBatchSize()) {
+ flushDeletes();
+ }
+ } else if (rowKind == RowKind.INSERT || rowKind == RowKind.UPDATE_AFTER) {
+ buffer.add(value);
+
+ // When buffer reaches batch size, execute write
+ if (buffer.size() >= options.getWriteBatchSize()) {
+ flush();
+ }
}
+ // Ignore UPDATE_BEFORE rows
}
/**
@@ -203,6 +222,7 @@ public void close() throws Exception {
LOG.info("Closing Lance Sink");
// Flush remaining data
try {
+ flushDeletes();
flush();
} catch (Exception e) {
LOG.warn("Failed to flush data on close", e);
@@ -225,7 +245,7 @@ public void close() throws Exception {
allocator = null;
}
- LOG.info("Lance Sink closed, total written {} rows", totalWrittenRows);
+ LOG.info("Lance Sink closed, total written {} rows, total deleted {} rows", totalWrittenRows, totalDeletedRows);
super.close();
}
@@ -235,6 +255,7 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
LOG.debug("Snapshot state, checkpointId: {}", context.getCheckpointId());
// Flush all buffered data at Checkpoint
+ flushDeletes();
flush();
}
@@ -265,6 +286,134 @@ public long getTotalWrittenRows() {
return totalWrittenRows;
}
+ /**
+ * Get total deleted row count
+ */
+ public long getTotalDeletedRows() {
+ return totalDeletedRows;
+ }
+
+ /**
+ * Flush buffered DELETE operations.
+ *
+ * Builds a predicate from the buffered DELETE rows and executes
+ * the delete operation using Lance's native predicate-based deletion.
+ */
+ private void flushDeletes() throws IOException {
+ if (deleteBuffer.isEmpty()) {
+ return;
+ }
+
+ LOG.debug("Flushing delete buffer, row count: {}", deleteBuffer.size());
+
+ String datasetPath = options.getPath();
+ if (!datasetExists) {
+ LOG.warn("Cannot delete from non-existent dataset: {}", datasetPath);
+ deleteBuffer.clear();
+ return;
+ }
+
+ try {
+ // Build delete predicate from buffered rows
+ String predicate = buildDeletePredicate(deleteBuffer);
+
+ if (predicate != null && !predicate.isEmpty()) {
+ LOG.info("Executing DELETE with predicate: {}", predicate);
+
+ try (Dataset deleteDataset = Dataset.open(datasetPath, allocator)) {
+ deleteDataset.delete(predicate);
+ }
+
+ totalDeletedRows += deleteBuffer.size();
+ LOG.debug("Deleted {} rows, total deleted: {} rows", deleteBuffer.size(), totalDeletedRows);
+ }
+
+ deleteBuffer.clear();
+ } catch (Exception e) {
+ throw new IOException("Failed to execute DELETE on Lance dataset", e);
+ }
+ }
+
+ /**
+ * Build a delete predicate from a list of RowData.
+ *
+ *
Constructs an OR-combined predicate where each row contributes
+ * an AND-combined equality condition on all fields.
+ * For example, for rows with fields (id=1, name='Alice') and (id=2, name='Bob'),
+ * the predicate would be:
+ * {@code (id = 1 AND name = 'Alice') OR (id = 2 AND name = 'Bob')}
+ *
+ * @param rows the rows to build the predicate from
+ * @return the combined predicate string
+ */
+ private String buildDeletePredicate(List rows) {
+ List rowPredicates = new ArrayList<>();
+ List fieldNames = rowType.getFieldNames();
+ List fieldTypes = new ArrayList<>();
+ for (RowType.RowField field : rowType.getFields()) {
+ fieldTypes.add(field.getType());
+ }
+
+ for (RowData row : rows) {
+ List fieldPredicates = new ArrayList<>();
+ for (int i = 0; i < fieldNames.size(); i++) {
+ String fieldName = fieldNames.get(i);
+ String value = extractFieldValue(row, i, fieldTypes.get(i));
+ if (value != null) {
+ fieldPredicates.add(fieldName + " = " + value);
+ }
+ }
+ if (!fieldPredicates.isEmpty()) {
+ rowPredicates.add("(" + String.join(" AND ", fieldPredicates) + ")");
+ }
+ }
+
+ if (rowPredicates.isEmpty()) {
+ return null;
+ }
+
+ return String.join(" OR ", rowPredicates);
+ }
+
+ /**
+ * Extract a field value from RowData as a string suitable for a predicate.
+ *
+ * @param row the RowData
+ * @param fieldIndex the field index
+ * @param type the logical type of the field
+ * @return the field value as a predicate string, or null if the field is null
+ */
+ private String extractFieldValue(RowData row, int fieldIndex, LogicalType type) {
+ if (row.isNullAt(fieldIndex)) {
+ return null;
+ }
+
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ return String.valueOf(row.getBoolean(fieldIndex));
+ case TINYINT:
+ return String.valueOf(row.getByte(fieldIndex));
+ case SMALLINT:
+ return String.valueOf(row.getShort(fieldIndex));
+ case INTEGER:
+ return String.valueOf(row.getInt(fieldIndex));
+ case BIGINT:
+ return String.valueOf(row.getLong(fieldIndex));
+ case FLOAT:
+ return String.valueOf(row.getFloat(fieldIndex));
+ case DOUBLE:
+ return String.valueOf(row.getDouble(fieldIndex));
+ case CHAR:
+ case VARCHAR:
+ String strValue = row.getString(fieldIndex).toString();
+ return "'" + strValue.replace("'", "''") + "'";
+ default:
+ // For unsupported types, skip this field in the predicate
+ LOG.debug("Unsupported type for delete predicate: {}", type);
+ return null;
+ }
+ }
+
/**
* Recursively delete directory
*/
diff --git a/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableSink.java b/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableSink.java
index 09ed914..de86638 100644
--- a/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableSink.java
+++ b/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableSink.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.lance.table;
+import org.apache.flink.connector.lance.LanceDeleteExecutor;
import org.apache.flink.connector.lance.LanceSink;
import org.apache.flink.connector.lance.config.LanceOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -36,6 +37,8 @@
* Lance dynamic table sink.
*
* Implements DynamicTableSink interface, supports writing Flink data to Lance dataset.
+ *
Supports INSERT and DELETE operations. DELETE is handled via Lance's native
+ * predicate-based deletion using {@link LanceDeleteExecutor}.
*/
public class LanceDynamicTableSink implements DynamicTableSink {
@@ -49,10 +52,16 @@ public LanceDynamicTableSink(LanceOptions options, DataType physicalDataType) {
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
- // Lance only supports INSERT operations
- return ChangelogMode.newBuilder()
- .addContainedKind(RowKind.INSERT)
- .build();
+ // Lance supports INSERT and DELETE operations
+ ChangelogMode.Builder builder = ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT);
+
+ // Add DELETE support if requested
+ if (requestedMode.contains(RowKind.DELETE)) {
+ builder.addContainedKind(RowKind.DELETE);
+ }
+
+ return builder.build();
}
@Override
diff --git a/src/test/java/org/apache/flink/connector/lance/LanceDeleteTest.java b/src/test/java/org/apache/flink/connector/lance/LanceDeleteTest.java
new file mode 100644
index 0000000..015c211
--- /dev/null
+++ b/src/test/java/org/apache/flink/connector/lance/LanceDeleteTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.lance;
+
+import org.apache.flink.connector.lance.config.LanceOptions;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.connector.lance.table.LanceDynamicTableSink;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * Tests for Lance DELETE support.
+ *
+ *
Validates the LanceDeleteExecutor, LanceSink DELETE handling,
+ * and LanceDynamicTableSink changelog mode support.
+ */
+public class LanceDeleteTest {
+
+ @TempDir
+ Path tempDir;
+
+ // ==================== LanceDeleteExecutor Tests ====================
+
+ @Test
+ public void testDeleteExecutorCreation() {
+ String path = tempDir.resolve("test_dataset").toString();
+ try (LanceDeleteExecutor executor = new LanceDeleteExecutor(path)) {
+ assertEquals(path, executor.getDatasetPath());
+ } catch (IOException e) {
+ fail("Should not throw exception on creation: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDeleteExecutorNullPath() {
+ assertThrows(IllegalArgumentException.class, () -> {
+ new LanceDeleteExecutor(null);
+ });
+ }
+
+ @Test
+ public void testDeleteExecutorEmptyPath() {
+ assertThrows(IllegalArgumentException.class, () -> {
+ new LanceDeleteExecutor("");
+ });
+ }
+
+ @Test
+ public void testDeleteExecutorNullPredicate() {
+ String path = tempDir.resolve("test_dataset").toString();
+ try (LanceDeleteExecutor executor = new LanceDeleteExecutor(path)) {
+ assertThrows(IllegalArgumentException.class, () -> {
+ executor.delete(null);
+ });
+ } catch (IOException e) {
+ fail("Should not throw exception: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDeleteExecutorEmptyPredicate() {
+ String path = tempDir.resolve("test_dataset").toString();
+ try (LanceDeleteExecutor executor = new LanceDeleteExecutor(path)) {
+ assertThrows(IllegalArgumentException.class, () -> {
+ executor.delete("");
+ });
+ } catch (IOException e) {
+ fail("Should not throw exception: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDeleteExecutorBlankPredicate() {
+ String path = tempDir.resolve("test_dataset").toString();
+ try (LanceDeleteExecutor executor = new LanceDeleteExecutor(path)) {
+ assertThrows(IllegalArgumentException.class, () -> {
+ executor.delete(" ");
+ });
+ } catch (IOException e) {
+ fail("Should not throw exception: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDeleteAndCountNullPredicate() {
+ String path = tempDir.resolve("test_dataset").toString();
+ try (LanceDeleteExecutor executor = new LanceDeleteExecutor(path)) {
+ assertThrows(IllegalArgumentException.class, () -> {
+ executor.deleteAndCount(null);
+ });
+ } catch (IOException e) {
+ fail("Should not throw exception: " + e.getMessage());
+ }
+ }
+
+ // ==================== LanceDynamicTableSink ChangelogMode Tests ====================
+
+ @Test
+ public void testSinkSupportsInsertOnly() {
+ LanceOptions options = LanceOptions.builder()
+ .path(tempDir.resolve("test_dataset").toString())
+ .build();
+
+ RowType rowType = RowType.of(new IntType(), new VarCharType());
+ DataType dataType = TypeConversions.fromLogicalToDataType(rowType);
+
+ LanceDynamicTableSink sink = new LanceDynamicTableSink(options, dataType);
+
+ // When only INSERT is requested
+ ChangelogMode insertOnly = ChangelogMode.insertOnly();
+ ChangelogMode result = sink.getChangelogMode(insertOnly);
+
+ assertTrue(result.contains(RowKind.INSERT));
+ assertFalse(result.contains(RowKind.DELETE));
+ }
+
+ @Test
+ public void testSinkSupportsDeleteWhenRequested() {
+ LanceOptions options = LanceOptions.builder()
+ .path(tempDir.resolve("test_dataset").toString())
+ .build();
+
+ RowType rowType = RowType.of(new IntType(), new VarCharType());
+ DataType dataType = TypeConversions.fromLogicalToDataType(rowType);
+
+ LanceDynamicTableSink sink = new LanceDynamicTableSink(options, dataType);
+
+ // When DELETE is requested
+ ChangelogMode withDelete = ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ ChangelogMode result = sink.getChangelogMode(withDelete);
+
+ assertTrue(result.contains(RowKind.INSERT));
+ assertTrue(result.contains(RowKind.DELETE));
+ }
+
+ @Test
+ public void testSinkCopy() {
+ LanceOptions options = LanceOptions.builder()
+ .path(tempDir.resolve("test_dataset").toString())
+ .build();
+
+ RowType rowType = RowType.of(new IntType(), new VarCharType());
+ DataType dataType = TypeConversions.fromLogicalToDataType(rowType);
+
+ LanceDynamicTableSink sink = new LanceDynamicTableSink(options, dataType);
+ DynamicTableSink copy = sink.copy();
+
+ assertNotSame(sink, copy);
+ assertTrue(copy instanceof LanceDynamicTableSink);
+ }
+
+ @Test
+ public void testSinkSummary() {
+ LanceOptions options = LanceOptions.builder()
+ .path(tempDir.resolve("test_dataset").toString())
+ .build();
+
+ RowType rowType = RowType.of(new IntType(), new VarCharType());
+ DataType dataType = TypeConversions.fromLogicalToDataType(rowType);
+
+ LanceDynamicTableSink sink = new LanceDynamicTableSink(options, dataType);
+ assertEquals("Lance Table Sink", sink.asSummaryString());
+ }
+
+ // ==================== LanceSink DELETE Configuration Tests ====================
+
+ @Test
+ public void testLanceSinkBuilder() {
+ RowType rowType = RowType.of(
+ new LogicalType[]{new BigIntType(), new VarCharType()},
+ new String[]{"id", "name"});
+
+ LanceSink sink = LanceSink.builder()
+ .path(tempDir.resolve("test_dataset").toString())
+ .rowType(rowType)
+ .batchSize(512)
+ .build();
+
+ assertNotNull(sink);
+ assertEquals(rowType, sink.getRowType());
+ }
+
+ @Test
+ public void testLanceSinkBuilderNullPath() {
+ RowType rowType = RowType.of(new IntType());
+
+ assertThrows(IllegalArgumentException.class, () -> {
+ LanceSink.builder()
+ .rowType(rowType)
+ .build();
+ });
+ }
+
+ @Test
+ public void testLanceSinkBuilderNullRowType() {
+ assertThrows(IllegalArgumentException.class, () -> {
+ LanceSink.builder()
+ .path(tempDir.resolve("test_dataset").toString())
+ .build();
+ });
+ }
+
+ @Test
+ public void testDeleteExecutorNullAllocator() {
+ assertThrows(IllegalArgumentException.class, () -> {
+ new LanceDeleteExecutor(tempDir.resolve("test_dataset").toString(), null);
+ });
+ }
+}