From 9eac2c74d32ddfe30bae0669c8932d4e26c7e013 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Tue, 21 Apr 2026 16:29:11 +0700 Subject: [PATCH] feat: push IN predicate down to Lance filter engine Render `field IN (v1, v2, ...)` from Flink's IN CallExpression using the existing literal-escaping helper, and expose getFilters() so tests can assert the exact Lance SQL string. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../lance/table/LanceDynamicTableSource.java | 36 ++++++++++++++++++- .../table/LanceReadOptimizationsTest.java | 4 +++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableSource.java b/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableSource.java index dfcf186..43103b4 100644 --- a/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableSource.java +++ b/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableSource.java @@ -50,6 +50,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -260,13 +261,38 @@ else if (funcDef == BuiltInFunctionDefinitions.IS_NULL) { else if (funcDef == BuiltInFunctionDefinitions.LIKE) { return buildComparisonFilter(args, "LIKE"); } - // IN (not supported yet, requires more complex handling) + // IN: args[0] is the field reference, args[1..n] are literal values + else if (funcDef == BuiltInFunctionDefinitions.IN) { + return buildInFilter(args); + } // BETWEEN (not supported yet) // Unsupported functions, return null return null; } + /** + * Build IN filter expression: {@code field IN (v1, v2, ...)}. + * Returns null if the field side is not a reference, the list is empty, + * or any value cannot be rendered as a literal — pushdown is all-or-nothing + * so Lance never sees a partial predicate. + */ + private String buildInFilter(List args) { + if (args.size() < 2 || !(args.get(0) instanceof FieldReferenceExpression)) { + return null; + } + String fieldName = ((FieldReferenceExpression) args.get(0)).getName(); + List values = new ArrayList<>(); + for (int i = 1; i < args.size(); i++) { + String value = extractLiteralValue(args.get(i)); + if (value == null) { + return null; + } + values.add(value); + } + return fieldName + " IN (" + String.join(", ", values) + ")"; + } + /** * Build comparison filter expression */ @@ -369,6 +395,14 @@ public LanceOptions getOptions() { return options; } + /** + * Lance-side filter strings accumulated by {@link #applyFilters(List)}, in acceptance order. + * Exposed so callers can inspect what was actually pushed down versus left in Flink. + */ + public List getFilters() { + return Collections.unmodifiableList(filters); + } + /** * Get physical data type */ diff --git a/src/test/java/org/apache/flink/connector/lance/table/LanceReadOptimizationsTest.java b/src/test/java/org/apache/flink/connector/lance/table/LanceReadOptimizationsTest.java index 9b3a5f0..9077d00 100644 --- a/src/test/java/org/apache/flink/connector/lance/table/LanceReadOptimizationsTest.java +++ b/src/test/java/org/apache/flink/connector/lance/table/LanceReadOptimizationsTest.java @@ -265,6 +265,10 @@ void testInPredicatePushDown() { SupportsFilterPushDown.Result result = source.applyFilters(Collections.singletonList(inExpr)); assertEquals(1, result.getAcceptedFilters().size(), "IN predicate should be accepted"); + assertEquals( + Collections.singletonList("status IN ('active', 'pending', 'completed')"), + source.getFilters(), + "IN predicate should be rendered as a Lance SQL IN clause with quoted string literals"); } @Test