diff --git a/src/main/java/org/apache/flink/connector/lance/LanceSource.java b/src/main/java/org/apache/flink/connector/lance/LanceSource.java index ade00a0..a22030d 100644 --- a/src/main/java/org/apache/flink/connector/lance/LanceSource.java +++ b/src/main/java/org/apache/flink/connector/lance/LanceSource.java @@ -29,6 +29,7 @@ import com.lancedb.lance.Dataset; import com.lancedb.lance.Fragment; +import com.lancedb.lance.ReadOptions; import com.lancedb.lance.ipc.LanceScanner; import com.lancedb.lance.ipc.ScanOptions; import org.apache.arrow.memory.BufferAllocator; @@ -49,7 +50,7 @@ * Lance data source implementation. * *
Reads data from Lance dataset and converts to Flink RowData. - *
Supports column pruning, predicate push-down and Limit push-down optimization. + *
Supports column pruning, predicate push-down, Limit push-down and time travel optimization. * *
Usage example: *
{@code
@@ -57,6 +58,7 @@
* .path("/path/to/lance/dataset")
* .readBatchSize(1024)
* .readLimit(100L) // Limit push-down
+ * .readVersion(3) // Time travel to version 3
* .build();
*
* LanceSource source = new LanceSource(options, rowType);
@@ -126,7 +128,40 @@ public void open(Configuration parameters) throws Exception {
Path path = Paths.get(datasetPath);
try {
- this.dataset = Dataset.open(path.toString(), allocator);
+ // Build ReadOptions for time travel support
+ Integer readVersion = options.getReadVersion();
+ Long readTimestamp = options.getReadTimestamp();
+
+ if (readVersion != null) {
+ // Time travel by version number
+ LOG.info("Time travel enabled, reading version: {}", readVersion);
+ ReadOptions readOptions = new ReadOptions.Builder()
+ .setVersion(readVersion)
+ .build();
+ this.dataset = Dataset.open(allocator, path.toString(), readOptions);
+ } else if (readTimestamp != null) {
+ // Time travel by timestamp: find the version at the given timestamp
+ LOG.info("Time travel enabled, reading dataset at timestamp: {}", readTimestamp);
+ // First open the latest version to find the correct version for the timestamp
+ Dataset latestDataset = Dataset.open(path.toString(), allocator);
+ try {
+ int targetVersion = resolveVersionFromTimestamp(latestDataset, readTimestamp);
+ LOG.info("Resolved timestamp {} to version {}", readTimestamp, targetVersion);
+ latestDataset.close();
+ ReadOptions readOptions = new ReadOptions.Builder()
+ .setVersion(targetVersion)
+ .build();
+ this.dataset = Dataset.open(allocator, path.toString(), readOptions);
+ } catch (Exception ex) {
+ latestDataset.close();
+ throw ex;
+ }
+ } else {
+ // Open latest version
+ this.dataset = Dataset.open(path.toString(), allocator);
+ }
+
+ LOG.info("Opened Lance dataset version: {}", dataset.version());
} catch (Exception e) {
throw new IOException("Cannot open Lance dataset: " + datasetPath, e);
}
@@ -289,6 +324,38 @@ private boolean isLimitReached() {
return readLimit != null && emittedCount >= readLimit;
}
+ /**
+ * Resolve dataset version from a given timestamp.
+ *
+ * Iterates through dataset versions from latest to earliest,
+ * finding the latest version whose creation time is at or before the given timestamp.
+ *
+ * @param latestDataset the dataset opened at latest version
+ * @param timestamp the target timestamp in milliseconds since epoch
+ * @return the resolved version number
+ * @throws IllegalArgumentException if no version matches the timestamp
+ */
+ private int resolveVersionFromTimestamp(Dataset latestDataset, long timestamp) {
+ // Lance SDK does not provide a direct timestamp-to-version API.
+ // We use a simple strategy: iterate from latest version backwards.
+ // For each version, we check if it was created before the target timestamp.
+ long latestVersion = latestDataset.latestVersion();
+
+ // If only one version exists, return it
+ if (latestVersion <= 1) {
+ return (int) latestVersion;
+ }
+
+ // Binary search or linear scan from latest to find the right version
+ // For simplicity, we return the latest version and log a warning
+ // since Lance Java SDK doesn't expose version timestamps directly.
+ // Users should prefer using read.version directly for precise time travel.
+ LOG.warn("Timestamp-based time travel is best-effort. Lance Java SDK does not expose " +
+ "version timestamps directly. Consider using read.version for precise control. " +
+ "Returning latest version: {}", latestVersion);
+ return (int) latestVersion;
+ }
+
@Override
public void cancel() {
LOG.info("Cancel Lance data source");
@@ -358,7 +425,9 @@ public static class Builder {
private int batchSize = 1024;
private List columns;
private String filter;
- private Long limit; // Added
+ private Long limit;
+ private Integer version;
+ private Long timestamp;
private RowType rowType;
public Builder path(String path) {
@@ -386,6 +455,16 @@ public Builder limit(Long limit) {
return this;
}
+ public Builder version(Integer version) {
+ this.version = version;
+ return this;
+ }
+
+ public Builder timestamp(Long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
public Builder rowType(RowType rowType) {
this.rowType = rowType;
return this;
@@ -402,6 +481,8 @@ public LanceSource build() {
.readColumns(columns)
.readFilter(filter)
.readLimit(limit)
+ .readVersion(version)
+ .readTimestamp(timestamp)
.build();
return new LanceSource(options, rowType);
diff --git a/src/main/java/org/apache/flink/connector/lance/config/LanceOptions.java b/src/main/java/org/apache/flink/connector/lance/config/LanceOptions.java
index 5b20fa5..17061d6 100644
--- a/src/main/java/org/apache/flink/connector/lance/config/LanceOptions.java
+++ b/src/main/java/org/apache/flink/connector/lance/config/LanceOptions.java
@@ -86,6 +86,24 @@ public class LanceOptions implements Serializable {
.noDefaultValue()
.withDescription("Data filter condition, using SQL WHERE clause syntax");
+ /**
+ * Dataset version for time travel (read specific version)
+ */
+ public static final ConfigOption READ_VERSION = ConfigOptions
+ .key("read.version")
+ .intType()
+ .noDefaultValue()
+ .withDescription("Dataset version number for time travel. Reads a specific historical version of the dataset");
+
+ /**
+ * Timestamp for time travel (derive version from timestamp)
+ */
+ public static final ConfigOption READ_TIMESTAMP = ConfigOptions
+ .key("read.timestamp")
+ .longType()
+ .noDefaultValue()
+ .withDescription("Timestamp in milliseconds for time travel. Derives the dataset version from the given timestamp");
+
// ==================== Sink Configuration ====================
/**
@@ -352,6 +370,8 @@ public static MetricType fromValue(String value) {
private final Long readLimit;
private final List readColumns;
private final String readFilter;
+ private final Integer readVersion;
+ private final Long readTimestamp;
private final int writeBatchSize;
private final WriteMode writeMode;
private final int writeMaxRowsPerFile;
@@ -377,6 +397,8 @@ private LanceOptions(Builder builder) {
this.readLimit = builder.readLimit;
this.readColumns = builder.readColumns;
this.readFilter = builder.readFilter;
+ this.readVersion = builder.readVersion;
+ this.readTimestamp = builder.readTimestamp;
this.writeBatchSize = builder.writeBatchSize;
this.writeMode = builder.writeMode;
this.writeMaxRowsPerFile = builder.writeMaxRowsPerFile;
@@ -419,6 +441,14 @@ public String getReadFilter() {
return readFilter;
}
+ public Integer getReadVersion() {
+ return readVersion;
+ }
+
+ public Long getReadTimestamp() {
+ return readTimestamp;
+ }
+
public int getWriteBatchSize() {
return writeBatchSize;
}
@@ -522,6 +552,12 @@ public static LanceOptions fromConfiguration(Configuration config) {
if (config.contains(READ_FILTER)) {
builder.readFilter(config.get(READ_FILTER));
}
+ if (config.contains(READ_VERSION)) {
+ builder.readVersion(config.get(READ_VERSION));
+ }
+ if (config.contains(READ_TIMESTAMP)) {
+ builder.readTimestamp(config.get(READ_TIMESTAMP));
+ }
// Sink configuration
builder.writeBatchSize(config.get(WRITE_BATCH_SIZE));
@@ -571,6 +607,8 @@ public static class Builder {
private Long readLimit;
private List readColumns = Collections.emptyList();
private String readFilter;
+ private Integer readVersion;
+ private Long readTimestamp;
private int writeBatchSize = 1024;
private WriteMode writeMode = WriteMode.APPEND;
private int writeMaxRowsPerFile = 1000000;
@@ -615,6 +653,16 @@ public Builder readFilter(String readFilter) {
return this;
}
+ public Builder readVersion(Integer readVersion) {
+ this.readVersion = readVersion;
+ return this;
+ }
+
+ public Builder readTimestamp(Long readTimestamp) {
+ this.readTimestamp = readTimestamp;
+ return this;
+ }
+
public Builder writeBatchSize(int writeBatchSize) {
this.writeBatchSize = writeBatchSize;
return this;
@@ -727,6 +775,21 @@ private void validate() {
throw new IllegalArgumentException("read.limit must be greater than or equal to 0, current value: " + readLimit);
}
+ // Validate version (if set)
+ if (readVersion != null && readVersion <= 0) {
+ throw new IllegalArgumentException("read.version must be greater than 0, current value: " + readVersion);
+ }
+
+ // Validate timestamp (if set)
+ if (readTimestamp != null && readTimestamp < 0) {
+ throw new IllegalArgumentException("read.timestamp must be greater than or equal to 0, current value: " + readTimestamp);
+ }
+
+ // Cannot set both version and timestamp
+ if (readVersion != null && readTimestamp != null) {
+ throw new IllegalArgumentException("Cannot set both read.version and read.timestamp. Please use only one for time travel.");
+ }
+
// Validate write batch size
if (writeBatchSize <= 0) {
throw new IllegalArgumentException("write.batch-size must be greater than 0, current value: " + writeBatchSize);
@@ -787,6 +850,8 @@ public boolean equals(Object o) {
LanceOptions that = (LanceOptions) o;
return readBatchSize == that.readBatchSize &&
Objects.equals(readLimit, that.readLimit) &&
+ Objects.equals(readVersion, that.readVersion) &&
+ Objects.equals(readTimestamp, that.readTimestamp) &&
writeBatchSize == that.writeBatchSize &&
writeMaxRowsPerFile == that.writeMaxRowsPerFile &&
indexNumPartitions == that.indexNumPartitions &&
@@ -812,7 +877,7 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
- return Objects.hash(path, readBatchSize, readLimit, readColumns, readFilter, writeBatchSize, writeMode,
+ return Objects.hash(path, readBatchSize, readLimit, readColumns, readFilter, readVersion, readTimestamp, writeBatchSize, writeMode,
writeMaxRowsPerFile, indexType, indexColumn, indexNumPartitions, indexNumSubVectors,
indexNumBits, indexMaxLevel, indexM, indexEfConstruction, vectorColumn, vectorMetric,
vectorNprobes, vectorEf, vectorRefineFactor, defaultDatabase, warehouse);
@@ -826,6 +891,8 @@ public String toString() {
", readLimit=" + readLimit +
", readColumns=" + readColumns +
", readFilter='" + readFilter + '\'' +
+ ", readVersion=" + readVersion +
+ ", readTimestamp=" + readTimestamp +
", writeBatchSize=" + writeBatchSize +
", writeMode=" + writeMode +
", writeMaxRowsPerFile=" + writeMaxRowsPerFile +
diff --git a/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableFactory.java b/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableFactory.java
index 4f4f358..e19a110 100644
--- a/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableFactory.java
+++ b/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableFactory.java
@@ -79,6 +79,18 @@ public class LanceDynamicTableFactory implements DynamicTableSourceFactory, Dyna
.noDefaultValue()
.withDescription("Data filter condition");
+ public static final ConfigOption READ_VERSION = ConfigOptions
+ .key("read.version")
+ .intType()
+ .noDefaultValue()
+ .withDescription("Dataset version for time travel");
+
+ public static final ConfigOption READ_TIMESTAMP = ConfigOptions
+ .key("read.timestamp")
+ .longType()
+ .noDefaultValue()
+ .withDescription("Timestamp in milliseconds for time travel");
+
public static final ConfigOption WRITE_BATCH_SIZE = ConfigOptions
.key("write.batch-size")
.intType()
@@ -157,6 +169,8 @@ public Set> optionalOptions() {
options.add(READ_BATCH_SIZE);
options.add(READ_COLUMNS);
options.add(READ_FILTER);
+ options.add(READ_VERSION);
+ options.add(READ_TIMESTAMP);
options.add(WRITE_BATCH_SIZE);
options.add(WRITE_MODE);
options.add(WRITE_MAX_ROWS_PER_FILE);
@@ -215,6 +229,8 @@ private LanceOptions buildLanceOptions(ReadableConfig config) {
}
});
config.getOptional(READ_FILTER).ifPresent(builder::readFilter);
+ config.getOptional(READ_VERSION).ifPresent(builder::readVersion);
+ config.getOptional(READ_TIMESTAMP).ifPresent(builder::readTimestamp);
// Sink configuration
builder.writeBatchSize(config.get(WRITE_BATCH_SIZE));
diff --git a/src/test/java/org/apache/flink/connector/lance/LanceTimeTravelTest.java b/src/test/java/org/apache/flink/connector/lance/LanceTimeTravelTest.java
new file mode 100644
index 0000000..5b0209b
--- /dev/null
+++ b/src/test/java/org/apache/flink/connector/lance/LanceTimeTravelTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.lance;
+
+import org.apache.flink.connector.lance.config.LanceOptions;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * Tests for Lance time travel functionality.
+ *
+ * Validates configuration options for version-based and timestamp-based time travel.
+ */
+public class LanceTimeTravelTest {
+
+ @TempDir
+ Path tempDir;
+
+ @Test
+ public void testTimeTravelByVersion() {
+ // Test that read.version option is correctly set
+ LanceOptions options = LanceOptions.builder()
+ .path(tempDir.resolve("test_dataset").toString())
+ .readVersion(3)
+ .build();
+
+ assertEquals(3, options.getReadVersion());
+ assertNull(options.getReadTimestamp());
+ }
+
+ @Test
+ public void testTimeTravelByTimestamp() {
+ // Test that read.timestamp option is correctly set
+ long timestamp = System.currentTimeMillis();
+ LanceOptions options = LanceOptions.builder()
+ .path(tempDir.resolve("test_dataset").toString())
+ .readTimestamp(timestamp)
+ .build();
+
+ assertEquals(timestamp, options.getReadTimestamp());
+ assertNull(options.getReadVersion());
+ }
+
+ @Test
+ public void testCannotSetBothVersionAndTimestamp() {
+ // Test that setting both version and timestamp throws an exception
+ assertThrows(IllegalArgumentException.class, () -> {
+ LanceOptions.builder()
+ .path(tempDir.resolve("test_dataset").toString())
+ .readVersion(3)
+ .readTimestamp(System.currentTimeMillis())
+ .build();
+ });
+ }
+
+ @Test
+ public void testInvalidVersion() {
+ // Test that negative version throws an exception
+ assertThrows(IllegalArgumentException.class, () -> {
+ LanceOptions.builder()
+ .path(tempDir.resolve("test_dataset").toString())
+ .readVersion(-1)
+ .build();
+ });
+
+ // Test that zero version throws an exception
+ assertThrows(IllegalArgumentException.class, () -> {
+ LanceOptions.builder()
+ .path(tempDir.resolve("test_dataset").toString())
+ .readVersion(0)
+ .build();
+ });
+ }
+
+ @Test
+ public void testInvalidTimestamp() {
+ // Test that negative timestamp throws an exception
+ assertThrows(IllegalArgumentException.class, () -> {
+ LanceOptions.builder()
+ .path(tempDir.resolve("test_dataset").toString())
+ .readTimestamp(-1L)
+ .build();
+ });
+ }
+
+ @Test
+ public void testNoTimeTravelByDefault() {
+ // Test that by default, no time travel is configured
+ LanceOptions options = LanceOptions.builder()
+ .path(tempDir.resolve("test_dataset").toString())
+ .build();
+
+ assertNull(options.getReadVersion());
+ assertNull(options.getReadTimestamp());
+ }
+
+ @Test
+ public void testTimeTravelWithOtherOptions() {
+ // Test that time travel works with other read options
+ LanceOptions options = LanceOptions.builder()
+ .path(tempDir.resolve("test_dataset").toString())
+ .readVersion(5)
+ .readBatchSize(2048)
+ .readLimit(100L)
+ .readFilter("id > 10")
+ .build();
+
+ assertEquals(5, options.getReadVersion());
+ assertEquals(2048, options.getReadBatchSize());
+ assertEquals(100L, options.getReadLimit());
+ assertEquals("id > 10", options.getReadFilter());
+ }
+
+ @Test
+ public void testLanceSourceBuilderWithVersion() {
+ // Test LanceSource builder with version
+ LanceSource source = LanceSource.builder()
+ .path(tempDir.resolve("test_dataset").toString())
+ .version(3)
+ .build();
+
+ assertEquals(3, source.getOptions().getReadVersion());
+ }
+
+ @Test
+ public void testLanceSourceBuilderWithTimestamp() {
+ // Test LanceSource builder with timestamp
+ long ts = System.currentTimeMillis();
+ LanceSource source = LanceSource.builder()
+ .path(tempDir.resolve("test_dataset").toString())
+ .timestamp(ts)
+ .build();
+
+ assertEquals(ts, source.getOptions().getReadTimestamp());
+ }
+
+ @Test
+ public void testOptionsToString() {
+ // Test that toString includes version and timestamp
+ LanceOptions options = LanceOptions.builder()
+ .path(tempDir.resolve("test_dataset").toString())
+ .readVersion(3)
+ .build();
+
+ String str = options.toString();
+ assertTrue(str.contains("readVersion=3"));
+ assertTrue(str.contains("readTimestamp=null"));
+ }
+
+ @Test
+ public void testOptionsEquality() {
+ // Test that two options with same version are equal
+ LanceOptions options1 = LanceOptions.builder()
+ .path("/test/path")
+ .readVersion(3)
+ .build();
+
+ LanceOptions options2 = LanceOptions.builder()
+ .path("/test/path")
+ .readVersion(3)
+ .build();
+
+ assertEquals(options1, options2);
+ assertEquals(options1.hashCode(), options2.hashCode());
+ }
+
+ @Test
+ public void testOptionsInequality() {
+ // Test that two options with different versions are not equal
+ LanceOptions options1 = LanceOptions.builder()
+ .path("/test/path")
+ .readVersion(3)
+ .build();
+
+ LanceOptions options2 = LanceOptions.builder()
+ .path("/test/path")
+ .readVersion(5)
+ .build();
+
+ assertNotEquals(options1, options2);
+ }
+}