Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 84 additions & 3 deletions src/main/java/org/apache/flink/connector/lance/LanceSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import com.lancedb.lance.Dataset;
import com.lancedb.lance.Fragment;
import com.lancedb.lance.ReadOptions;
import com.lancedb.lance.ipc.LanceScanner;
import com.lancedb.lance.ipc.ScanOptions;
import org.apache.arrow.memory.BufferAllocator;
Expand All @@ -49,14 +50,15 @@
* Lance data source implementation.
*
* <p>Reads data from Lance dataset and converts to Flink RowData.
* <p>Supports column pruning, predicate push-down and Limit push-down optimization.
* <p>Supports column pruning, predicate push-down, Limit push-down and time travel optimization.
*
* <p>Usage example:
* <pre>{@code
* LanceOptions options = LanceOptions.builder()
* .path("/path/to/lance/dataset")
* .readBatchSize(1024)
* .readLimit(100L) // Limit push-down
* .readVersion(3) // Time travel to version 3
* .build();
*
* LanceSource source = new LanceSource(options, rowType);
Expand Down Expand Up @@ -126,7 +128,40 @@ public void open(Configuration parameters) throws Exception {

Path path = Paths.get(datasetPath);
try {
this.dataset = Dataset.open(path.toString(), allocator);
// Build ReadOptions for time travel support
Integer readVersion = options.getReadVersion();
Long readTimestamp = options.getReadTimestamp();

if (readVersion != null) {
// Time travel by version number
LOG.info("Time travel enabled, reading version: {}", readVersion);
ReadOptions readOptions = new ReadOptions.Builder()
.setVersion(readVersion)
.build();
this.dataset = Dataset.open(allocator, path.toString(), readOptions);
} else if (readTimestamp != null) {
// Time travel by timestamp: find the version at the given timestamp
LOG.info("Time travel enabled, reading dataset at timestamp: {}", readTimestamp);
// First open the latest version to find the correct version for the timestamp
Dataset latestDataset = Dataset.open(path.toString(), allocator);
try {
int targetVersion = resolveVersionFromTimestamp(latestDataset, readTimestamp);
LOG.info("Resolved timestamp {} to version {}", readTimestamp, targetVersion);
latestDataset.close();
ReadOptions readOptions = new ReadOptions.Builder()
.setVersion(targetVersion)
.build();
this.dataset = Dataset.open(allocator, path.toString(), readOptions);
} catch (Exception ex) {
latestDataset.close();
throw ex;
}
} else {
// Open latest version
this.dataset = Dataset.open(path.toString(), allocator);
}

LOG.info("Opened Lance dataset version: {}", dataset.version());
} catch (Exception e) {
throw new IOException("Cannot open Lance dataset: " + datasetPath, e);
}
Expand Down Expand Up @@ -289,6 +324,38 @@ private boolean isLimitReached() {
return readLimit != null && emittedCount >= readLimit;
}

/**
* Resolve dataset version from a given timestamp.
*
* <p>Iterates through dataset versions from latest to earliest,
* finding the latest version whose creation time is at or before the given timestamp.
*
* @param latestDataset the dataset opened at latest version
* @param timestamp the target timestamp in milliseconds since epoch
* @return the resolved version number
* @throws IllegalArgumentException if no version matches the timestamp
*/
private int resolveVersionFromTimestamp(Dataset latestDataset, long timestamp) {
// Lance SDK does not provide a direct timestamp-to-version API.
// We use a simple strategy: iterate from latest version backwards.
// For each version, we check if it was created before the target timestamp.
long latestVersion = latestDataset.latestVersion();

// If only one version exists, return it
if (latestVersion <= 1) {
return (int) latestVersion;
}

// Binary search or linear scan from latest to find the right version
// For simplicity, we return the latest version and log a warning
// since Lance Java SDK doesn't expose version timestamps directly.
// Users should prefer using read.version directly for precise time travel.
LOG.warn("Timestamp-based time travel is best-effort. Lance Java SDK does not expose " +
"version timestamps directly. Consider using read.version for precise control. " +
"Returning latest version: {}", latestVersion);
return (int) latestVersion;
}

@Override
public void cancel() {
LOG.info("Cancel Lance data source");
Expand Down Expand Up @@ -358,7 +425,9 @@ public static class Builder {
private int batchSize = 1024;
private List<String> columns;
private String filter;
private Long limit; // Added
private Long limit;
private Integer version;
private Long timestamp;
private RowType rowType;

public Builder path(String path) {
Expand Down Expand Up @@ -386,6 +455,16 @@ public Builder limit(Long limit) {
return this;
}

public Builder version(Integer version) {
this.version = version;
return this;
}

public Builder timestamp(Long timestamp) {
this.timestamp = timestamp;
return this;
}

public Builder rowType(RowType rowType) {
this.rowType = rowType;
return this;
Expand All @@ -402,6 +481,8 @@ public LanceSource build() {
.readColumns(columns)
.readFilter(filter)
.readLimit(limit)
.readVersion(version)
.readTimestamp(timestamp)
.build();

return new LanceSource(options, rowType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,24 @@ public class LanceOptions implements Serializable {
.noDefaultValue()
.withDescription("Data filter condition, using SQL WHERE clause syntax");

/**
* Dataset version for time travel (read specific version)
*/
public static final ConfigOption<Integer> READ_VERSION = ConfigOptions
.key("read.version")
.intType()
.noDefaultValue()
.withDescription("Dataset version number for time travel. Reads a specific historical version of the dataset");

/**
* Timestamp for time travel (derive version from timestamp)
*/
public static final ConfigOption<Long> READ_TIMESTAMP = ConfigOptions
.key("read.timestamp")
.longType()
.noDefaultValue()
.withDescription("Timestamp in milliseconds for time travel. Derives the dataset version from the given timestamp");

// ==================== Sink Configuration ====================

/**
Expand Down Expand Up @@ -352,6 +370,8 @@ public static MetricType fromValue(String value) {
private final Long readLimit;
private final List<String> readColumns;
private final String readFilter;
private final Integer readVersion;
private final Long readTimestamp;
private final int writeBatchSize;
private final WriteMode writeMode;
private final int writeMaxRowsPerFile;
Expand All @@ -377,6 +397,8 @@ private LanceOptions(Builder builder) {
this.readLimit = builder.readLimit;
this.readColumns = builder.readColumns;
this.readFilter = builder.readFilter;
this.readVersion = builder.readVersion;
this.readTimestamp = builder.readTimestamp;
this.writeBatchSize = builder.writeBatchSize;
this.writeMode = builder.writeMode;
this.writeMaxRowsPerFile = builder.writeMaxRowsPerFile;
Expand Down Expand Up @@ -419,6 +441,14 @@ public String getReadFilter() {
return readFilter;
}

public Integer getReadVersion() {
return readVersion;
}

public Long getReadTimestamp() {
return readTimestamp;
}

public int getWriteBatchSize() {
return writeBatchSize;
}
Expand Down Expand Up @@ -522,6 +552,12 @@ public static LanceOptions fromConfiguration(Configuration config) {
if (config.contains(READ_FILTER)) {
builder.readFilter(config.get(READ_FILTER));
}
if (config.contains(READ_VERSION)) {
builder.readVersion(config.get(READ_VERSION));
}
if (config.contains(READ_TIMESTAMP)) {
builder.readTimestamp(config.get(READ_TIMESTAMP));
}

// Sink configuration
builder.writeBatchSize(config.get(WRITE_BATCH_SIZE));
Expand Down Expand Up @@ -571,6 +607,8 @@ public static class Builder {
private Long readLimit;
private List<String> readColumns = Collections.emptyList();
private String readFilter;
private Integer readVersion;
private Long readTimestamp;
private int writeBatchSize = 1024;
private WriteMode writeMode = WriteMode.APPEND;
private int writeMaxRowsPerFile = 1000000;
Expand Down Expand Up @@ -615,6 +653,16 @@ public Builder readFilter(String readFilter) {
return this;
}

public Builder readVersion(Integer readVersion) {
this.readVersion = readVersion;
return this;
}

public Builder readTimestamp(Long readTimestamp) {
this.readTimestamp = readTimestamp;
return this;
}

public Builder writeBatchSize(int writeBatchSize) {
this.writeBatchSize = writeBatchSize;
return this;
Expand Down Expand Up @@ -727,6 +775,21 @@ private void validate() {
throw new IllegalArgumentException("read.limit must be greater than or equal to 0, current value: " + readLimit);
}

// Validate version (if set)
if (readVersion != null && readVersion <= 0) {
throw new IllegalArgumentException("read.version must be greater than 0, current value: " + readVersion);
}

// Validate timestamp (if set)
if (readTimestamp != null && readTimestamp < 0) {
throw new IllegalArgumentException("read.timestamp must be greater than or equal to 0, current value: " + readTimestamp);
}

// Cannot set both version and timestamp
if (readVersion != null && readTimestamp != null) {
throw new IllegalArgumentException("Cannot set both read.version and read.timestamp. Please use only one for time travel.");
}

// Validate write batch size
if (writeBatchSize <= 0) {
throw new IllegalArgumentException("write.batch-size must be greater than 0, current value: " + writeBatchSize);
Expand Down Expand Up @@ -787,6 +850,8 @@ public boolean equals(Object o) {
LanceOptions that = (LanceOptions) o;
return readBatchSize == that.readBatchSize &&
Objects.equals(readLimit, that.readLimit) &&
Objects.equals(readVersion, that.readVersion) &&
Objects.equals(readTimestamp, that.readTimestamp) &&
writeBatchSize == that.writeBatchSize &&
writeMaxRowsPerFile == that.writeMaxRowsPerFile &&
indexNumPartitions == that.indexNumPartitions &&
Expand All @@ -812,7 +877,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(path, readBatchSize, readLimit, readColumns, readFilter, writeBatchSize, writeMode,
return Objects.hash(path, readBatchSize, readLimit, readColumns, readFilter, readVersion, readTimestamp, writeBatchSize, writeMode,
writeMaxRowsPerFile, indexType, indexColumn, indexNumPartitions, indexNumSubVectors,
indexNumBits, indexMaxLevel, indexM, indexEfConstruction, vectorColumn, vectorMetric,
vectorNprobes, vectorEf, vectorRefineFactor, defaultDatabase, warehouse);
Expand All @@ -826,6 +891,8 @@ public String toString() {
", readLimit=" + readLimit +
", readColumns=" + readColumns +
", readFilter='" + readFilter + '\'' +
", readVersion=" + readVersion +
", readTimestamp=" + readTimestamp +
", writeBatchSize=" + writeBatchSize +
", writeMode=" + writeMode +
", writeMaxRowsPerFile=" + writeMaxRowsPerFile +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ public class LanceDynamicTableFactory implements DynamicTableSourceFactory, Dyna
.noDefaultValue()
.withDescription("Data filter condition");

public static final ConfigOption<Integer> READ_VERSION = ConfigOptions
.key("read.version")
.intType()
.noDefaultValue()
.withDescription("Dataset version for time travel");

public static final ConfigOption<Long> READ_TIMESTAMP = ConfigOptions
.key("read.timestamp")
.longType()
.noDefaultValue()
.withDescription("Timestamp in milliseconds for time travel");

public static final ConfigOption<Integer> WRITE_BATCH_SIZE = ConfigOptions
.key("write.batch-size")
.intType()
Expand Down Expand Up @@ -157,6 +169,8 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(READ_BATCH_SIZE);
options.add(READ_COLUMNS);
options.add(READ_FILTER);
options.add(READ_VERSION);
options.add(READ_TIMESTAMP);
options.add(WRITE_BATCH_SIZE);
options.add(WRITE_MODE);
options.add(WRITE_MAX_ROWS_PER_FILE);
Expand Down Expand Up @@ -215,6 +229,8 @@ private LanceOptions buildLanceOptions(ReadableConfig config) {
}
});
config.getOptional(READ_FILTER).ifPresent(builder::readFilter);
config.getOptional(READ_VERSION).ifPresent(builder::readVersion);
config.getOptional(READ_TIMESTAMP).ifPresent(builder::readTimestamp);

// Sink configuration
builder.writeBatchSize(config.get(WRITE_BATCH_SIZE));
Expand Down
Loading