diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java
index f9b78a8bc676..18c7e5db798d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java
@@ -188,32 +188,6 @@ public String toString() {
+ '}';
}
- /**
- * Like {@link #from(int[])}, but throws {@link IllegalArgumentException} if the provided {@code
- * projection} array contains nested projections, which are not supported by {@link
- * ProjectedRow}.
- *
- *
The array represents the mapping of the fields of the original {@link DataType}, including
- * nested rows. For example, {@code [[0, 2, 1], ...]} specifies to include the 2nd field of the
- * 3rd field of the 1st field in the top-level row.
- *
- * @see Projection
- * @see ProjectedRow
- */
- public static ProjectedRow from(int[][] projection) throws IllegalArgumentException {
- return new ProjectedRow(
- Arrays.stream(projection)
- .mapToInt(
- arr -> {
- if (arr.length != 1) {
- throw new IllegalArgumentException(
- "ProjectedRowData doesn't support nested projections");
- }
- return arr[0];
- })
- .toArray());
- }
-
/**
* Create an empty {@link ProjectedRow} starting from a {@code projection} array.
*
@@ -234,17 +208,4 @@ public static ProjectedRow from(RowType readType, RowType tableType) {
.mapToInt(field -> tableType.getFieldIndexByFieldId(field.id()))
.toArray());
}
-
- /**
- * Create an empty {@link ProjectedRow} starting from a {@link Projection}.
- *
- *
Throws {@link IllegalStateException} if the provided {@code projection} array contains
- * nested projections, which are not supported by {@link ProjectedRow}.
- *
- * @see Projection
- * @see ProjectedRow
- */
- public static ProjectedRow from(Projection projection) {
- return new ProjectedRow(projection.toTopLevelIndexes());
- }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
index 5422f5469e0b..50fdec2ad9cf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
@@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.options.Options;
+import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
@@ -73,7 +74,7 @@ public static MergeFunctionFactory factory(Options options) {
private static class Factory implements MergeFunctionFactory {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final boolean ignoreDelete;
@@ -82,7 +83,7 @@ private Factory(boolean ignoreDelete) {
}
@Override
- public MergeFunction create(@Nullable int[][] projection) {
+ public MergeFunction create(@Nullable RowType readType) {
return new DeduplicateMergeFunction(ignoreDelete);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
index 4b9e4a6ff100..1815d1f5dfca 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
@@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.options.Options;
+import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
@@ -82,7 +83,7 @@ public static MergeFunctionFactory factory(Options options) {
private static class Factory implements MergeFunctionFactory {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final boolean ignoreDelete;
public Factory(boolean ignoreDelete) {
@@ -90,7 +91,7 @@ public Factory(boolean ignoreDelete) {
}
@Override
- public MergeFunction create(@Nullable int[][] projection) {
+ public MergeFunction create(@Nullable RowType readType) {
return new FirstRowMergeFunction(ignoreDelete);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index 1bd9aaa84339..cb494b86d07a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -143,7 +143,7 @@ public static MergeFunctionFactory wrap(
/** Factory to create {@link LookupMergeFunction}. */
public static class Factory implements MergeFunctionFactory {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final MergeFunctionFactory wrapped;
private final CoreOptions options;
@@ -168,14 +168,14 @@ public void withIOManager(@Nullable IOManager ioManager) {
}
@Override
- public MergeFunction create(@Nullable int[][] projection) {
+ public MergeFunction create(@Nullable RowType readType) {
return new LookupMergeFunction(
- wrapped.create(projection), options, keyType, valueType, ioManager);
+ wrapped.create(readType), options, keyType, valueType, ioManager);
}
@Override
- public AdjustedProjection adjustProjection(@Nullable int[][] projection) {
- return wrapped.adjustProjection(projection);
+ public RowType adjustReadType(RowType readType) {
+ return wrapped.adjustReadType(readType);
}
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunctionFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunctionFactory.java
index 8b0d3909ae1e..d05b5f73ea41 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunctionFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunctionFactory.java
@@ -18,6 +18,8 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.types.RowType;
+
import javax.annotation.Nullable;
import java.io.Serializable;
@@ -30,23 +32,10 @@ default MergeFunction create() {
return create(null);
}
- MergeFunction create(@Nullable int[][] projection);
-
- // todo: replace projection with rowType
- default AdjustedProjection adjustProjection(@Nullable int[][] projection) {
- return new AdjustedProjection(projection, null);
- }
-
- /** Result of adjusted projection. */
- class AdjustedProjection {
-
- @Nullable public final int[][] pushdownProjection;
-
- @Nullable public final int[][] outerProjection;
+ MergeFunction create(@Nullable RowType readType);
- public AdjustedProjection(int[][] pushdownProjection, int[][] outerProjection) {
- this.pushdownProjection = pushdownProjection;
- this.outerProjection = outerProjection;
- }
+ /** Adjust read type, if no need to adjust, return the original read type. */
+ default RowType adjustReadType(RowType readType) {
+ return readType;
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 3b6a89d6640c..2f429191c41a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -34,7 +34,6 @@
import org.apache.paimon.utils.ArrayUtils;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.UserDefinedSeqComparator;
import javax.annotation.Nullable;
@@ -51,8 +50,6 @@
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
@@ -379,13 +376,11 @@ public static MergeFunctionFactory factory(
private static class Factory implements MergeFunctionFactory {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final boolean ignoreDelete;
private final RowType rowType;
- private final List tableTypes;
-
private final Map> fieldSeqComparators;
private final Map> fieldAggregators;
@@ -397,7 +392,6 @@ private static class Factory implements MergeFunctionFactory {
private Factory(Options options, RowType rowType, List primaryKeys) {
this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
this.rowType = rowType;
- this.tableTypes = rowType.getFieldTypes();
this.removeRecordOnDelete = options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);
String removeRecordOnSequenceGroup =
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP);
@@ -498,26 +492,29 @@ private Factory(Options options, RowType rowType, List primaryKeys) {
}
@Override
- public MergeFunction create(@Nullable int[][] projection) {
- if (projection != null) {
- Map projectedSeqComparators = new HashMap<>();
- Map projectedAggregators = new HashMap<>();
- int[] projects = Projection.of(projection).toTopLevelIndexes();
+ public MergeFunction create(@Nullable RowType readType) {
+ RowType targetType = readType != null ? readType : rowType;
+ Map projectedSeqComparators = new HashMap<>();
+ Map projectedAggregators = new HashMap<>();
+
+ if (readType != null) {
+ // Build index mapping from table schema to read schema
+ List readFieldNames = readType.getFieldNames();
Map indexMap = new HashMap<>();
- List dataFields = rowType.getFields();
- List newDataTypes = new ArrayList<>();
-
- for (int i = 0; i < projects.length; i++) {
- indexMap.put(projects[i], i);
- newDataTypes.add(dataFields.get(projects[i]).type());
+ for (int i = 0; i < readType.getFieldCount(); i++) {
+ String fieldName = readFieldNames.get(i);
+ int oldIndex = rowType.getFieldIndex(fieldName);
+ if (oldIndex >= 0) {
+ indexMap.put(oldIndex, i);
+ }
}
- RowType newRowType = RowType.builder().fields(newDataTypes).build();
+ // Remap sequence comparators
fieldSeqComparators.forEach(
(field, comparatorSupplier) -> {
- FieldsComparator comparator = comparatorSupplier.get();
int newField = indexMap.getOrDefault(field, -1);
if (newField != -1) {
+ FieldsComparator comparator = comparatorSupplier.get();
int[] newSequenceFields =
Arrays.stream(comparator.compareFields())
.map(
@@ -532,94 +529,76 @@ public MergeFunction create(@Nullable int[][] projection) {
+ "for new field. new field "
+ "index is %s",
newField));
- } else {
- return newIndex;
}
+ return newIndex;
})
.toArray();
projectedSeqComparators.put(
newField,
UserDefinedSeqComparator.create(
- newRowType, newSequenceFields, true));
+ readType, newSequenceFields, true));
}
});
- for (int i = 0; i < projects.length; i++) {
- if (fieldAggregators.containsKey(projects[i])) {
- projectedAggregators.put(i, fieldAggregators.get(projects[i]).get());
+
+ // Remap field aggregators
+ for (int oldIndex : indexMap.keySet()) {
+ if (fieldAggregators.containsKey(oldIndex)) {
+ int newIndex = indexMap.get(oldIndex);
+ projectedAggregators.put(newIndex, fieldAggregators.get(oldIndex).get());
}
}
-
- List projectedTypes = Projection.of(projection).project(tableTypes);
- return new PartialUpdateMergeFunction(
- createFieldGetters(projectedTypes),
- ignoreDelete,
- projectedSeqComparators,
- projectedAggregators,
- !fieldSeqComparators.isEmpty(),
- removeRecordOnDelete,
- sequenceGroupPartialDelete,
- ArrayUtils.toPrimitiveBoolean(
- projectedTypes.stream()
- .map(DataType::isNullable)
- .toArray(Boolean[]::new)));
} else {
- Map fieldSeqComparators = new HashMap<>();
+ // Use original mappings
this.fieldSeqComparators.forEach(
- (f, supplier) -> fieldSeqComparators.put(f, supplier.get()));
- Map fieldAggregators = new HashMap<>();
+ (f, supplier) -> projectedSeqComparators.put(f, supplier.get()));
this.fieldAggregators.forEach(
- (f, supplier) -> fieldAggregators.put(f, supplier.get()));
- return new PartialUpdateMergeFunction(
- createFieldGetters(tableTypes),
- ignoreDelete,
- fieldSeqComparators,
- fieldAggregators,
- !fieldSeqComparators.isEmpty(),
- removeRecordOnDelete,
- sequenceGroupPartialDelete,
- ArrayUtils.toPrimitiveBoolean(
- rowType.getFieldTypes().stream()
- .map(DataType::isNullable)
- .toArray(Boolean[]::new)));
+ (f, supplier) -> projectedAggregators.put(f, supplier.get()));
}
+
+ List fieldTypes = targetType.getFieldTypes();
+ return new PartialUpdateMergeFunction(
+ createFieldGetters(fieldTypes),
+ ignoreDelete,
+ projectedSeqComparators,
+ projectedAggregators,
+ !fieldSeqComparators.isEmpty(),
+ removeRecordOnDelete,
+ sequenceGroupPartialDelete,
+ ArrayUtils.toPrimitiveBoolean(
+ fieldTypes.stream().map(DataType::isNullable).toArray(Boolean[]::new)));
}
@Override
- public AdjustedProjection adjustProjection(@Nullable int[][] projection) {
+ public RowType adjustReadType(RowType readType) {
if (fieldSeqComparators.isEmpty()) {
- return new AdjustedProjection(projection, null);
+ return readType;
}
- if (projection == null) {
- return new AdjustedProjection(null, null);
- }
- LinkedHashSet extraFields = new LinkedHashSet<>();
- int[] topProjects = Projection.of(projection).toTopLevelIndexes();
- Set indexSet = Arrays.stream(topProjects).boxed().collect(Collectors.toSet());
- for (int index : topProjects) {
+ LinkedHashSet extraFields = new LinkedHashSet<>();
+ List readFieldNames = readType.getFieldNames();
+ for (DataField readField : readType.getFields()) {
+ int index = rowType.getFieldIndex(readField.name());
Supplier comparatorSupplier = fieldSeqComparators.get(index);
if (comparatorSupplier == null) {
continue;
}
FieldsComparator comparator = comparatorSupplier.get();
- for (int field : comparator.compareFields()) {
- if (!indexSet.contains(field)) {
+ for (int fieldIndex : comparator.compareFields()) {
+ DataField field = rowType.getFields().get(fieldIndex);
+ if (!readFieldNames.contains(field.name())) {
extraFields.add(field);
}
}
}
- int[] allProjects =
- Stream.concat(Arrays.stream(topProjects).boxed(), extraFields.stream())
- .mapToInt(Integer::intValue)
- .toArray();
+ if (extraFields.isEmpty()) {
+ return readType;
+ }
- int[][] pushDown = Projection.of(allProjects).toNestedIndexes();
- int[][] outer =
- Projection.of(IntStream.range(0, topProjects.length).toArray())
- .toNestedIndexes();
- return new AdjustedProjection(pushDown, outer);
+ List allFields = new ArrayList<>(readType.getFields());
+ allFields.addAll(extraFields);
+ return new RowType(allFields);
}
private int requireField(String fieldName, List fieldNames) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
index 5f3fa3b4b31b..eea3c7e499b0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
@@ -31,8 +31,8 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ArrayUtils;
-import org.apache.paimon.utils.Projection;
import javax.annotation.Nullable;
@@ -133,44 +133,31 @@ public boolean requireCopy() {
}
public static MergeFunctionFactory factory(
- Options conf,
- List fieldNames,
- List fieldTypes,
- List primaryKeys) {
- return new Factory(conf, fieldNames, fieldTypes, primaryKeys);
+ Options conf, RowType rowType, List primaryKeys) {
+ return new Factory(conf, rowType, primaryKeys);
}
private static class Factory implements MergeFunctionFactory {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final CoreOptions options;
- private final List fieldNames;
- private final List fieldTypes;
+ private final RowType rowType;
private final List primaryKeys;
private final boolean removeRecordOnDelete;
- private Factory(
- Options conf,
- List fieldNames,
- List fieldTypes,
- List primaryKeys) {
+ private Factory(Options conf, RowType rowType, List primaryKeys) {
this.options = new CoreOptions(conf);
- this.fieldNames = fieldNames;
- this.fieldTypes = fieldTypes;
+ this.rowType = rowType;
this.primaryKeys = primaryKeys;
this.removeRecordOnDelete = options.aggregationRemoveRecordOnDelete();
}
@Override
- public MergeFunction create(@Nullable int[][] projection) {
- List fieldNames = this.fieldNames;
- List fieldTypes = this.fieldTypes;
- if (projection != null) {
- Projection project = Projection.of(projection);
- fieldNames = project.project(fieldNames);
- fieldTypes = project.project(fieldTypes);
- }
+ public MergeFunction create(@Nullable RowType readType) {
+ RowType targetType = readType != null ? readType : rowType;
+ List fieldNames = targetType.getFieldNames();
+ List fieldTypes = targetType.getFieldTypes();
FieldAggregator[] fieldAggregators = new FieldAggregator[fieldNames.size()];
List sequenceFields = options.sequenceField();
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index 2cd3a5322e9a..f51f5928899b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -24,6 +24,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.variant.VariantAccessInfo;
+import org.apache.paimon.data.variant.VariantAccessInfoUtils;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
@@ -39,7 +40,6 @@
import org.apache.paimon.mergetree.compact.IntervalPartition;
import org.apache.paimon.mergetree.compact.LookupMergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
-import org.apache.paimon.mergetree.compact.MergeFunctionFactory.AdjustedProjection;
import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
import org.apache.paimon.predicate.Predicate;
@@ -54,14 +54,12 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ProjectedRow;
-import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.UserDefinedSeqComparator;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
@@ -88,15 +86,11 @@ public class MergeFileSplitRead implements SplitRead {
private final boolean sequenceOrder;
@Nullable private RowType readKeyType;
-
+ @Nullable private RowType outerReadType;
+ @Nullable private VariantAccessInfo[] variantAccess;
@Nullable private List filtersForKeys;
-
@Nullable private List filtersForAll;
- @Nullable private int[][] pushdownProjection;
- @Nullable private int[][] outerProjection;
- @Nullable private VariantAccessInfo[] variantAccess;
-
private boolean forceKeepDelete = false;
public MergeFileSplitRead(
@@ -139,58 +133,32 @@ public MergeFileSplitRead withReadKeyType(RowType readKeyType) {
@Override
public MergeFileSplitRead withReadType(RowType readType) {
- // todo: replace projectedFields with readType
RowType tableRowType = tableSchema.logicalRowType();
- int[][] projectedFields =
- Arrays.stream(tableRowType.getFieldIndices(readType.getFieldNames()))
- .mapToObj(i -> new int[] {i})
- .toArray(int[][]::new);
- int[][] newProjectedFields = projectedFields;
- if (sequenceFields.size() > 0) {
- // make sure projection contains sequence fields
- List fieldNames = tableSchema.fieldNames();
- List projectedNames = Projection.of(projectedFields).project(fieldNames);
- int[] lackFields =
- sequenceFields.stream()
- .filter(f -> !projectedNames.contains(f))
- .mapToInt(fieldNames::indexOf)
- .toArray();
- if (lackFields.length > 0) {
- newProjectedFields =
- Arrays.copyOf(projectedFields, projectedFields.length + lackFields.length);
- for (int i = 0; i < lackFields.length; i++) {
- newProjectedFields[projectedFields.length + i] = new int[] {lackFields[i]};
+ RowType adjustedReadType = readType;
+
+ if (!sequenceFields.isEmpty()) {
+ // make sure actual readType contains sequence fields
+ List readFieldNames = readType.getFieldNames();
+ List extraFields = new ArrayList<>();
+ for (String seqField : sequenceFields) {
+ if (!readFieldNames.contains(seqField)) {
+ extraFields.add(tableRowType.getField(seqField));
}
}
- }
-
- AdjustedProjection projection = mfFactory.adjustProjection(newProjectedFields);
- this.pushdownProjection = projection.pushdownProjection;
- this.outerProjection = projection.outerProjection;
- if (pushdownProjection != null) {
- List tableFields = tableRowType.getFields();
- List readFields = readType.getFields();
- List finalReadFields = new ArrayList<>();
- for (int i : Arrays.stream(pushdownProjection).mapToInt(arr -> arr[0]).toArray()) {
- DataField requiredField = tableFields.get(i);
- finalReadFields.add(
- readFields.stream()
- .filter(x -> x.name().equals(requiredField.name()))
- .findFirst()
- .orElse(requiredField));
+ if (!extraFields.isEmpty()) {
+ List allFields = new ArrayList<>(readType.getFields());
+ allFields.addAll(extraFields);
+ adjustedReadType = new RowType(allFields);
}
- RowType pushdownRowType = new RowType(finalReadFields);
- readerFactoryBuilder.withReadValueType(pushdownRowType);
- mergeSorter.setProjectedValueType(pushdownRowType);
}
+ adjustedReadType = mfFactory.adjustReadType(adjustedReadType);
- if (newProjectedFields != projectedFields) {
- // Discard the completed sequence fields
- if (outerProjection == null) {
- outerProjection = Projection.range(0, projectedFields.length).toNestedIndexes();
- } else {
- outerProjection = Arrays.copyOf(outerProjection, projectedFields.length);
- }
+ readerFactoryBuilder.withReadValueType(adjustedReadType);
+ mergeSorter.setProjectedValueType(adjustedReadType);
+
+ // When finalReadType != readType, need to project the outer read type
+ if (adjustedReadType != readType) {
+ outerReadType = readType;
}
return this;
@@ -339,8 +307,12 @@ public RecordReader createMergeReader(
boolean keepDelete)
throws IOException {
List> sectionReaders = new ArrayList<>();
+ RowType mfReadType =
+ variantAccess != null
+ ? VariantAccessInfoUtils.buildReadRowType(actualReadType(), variantAccess)
+ : actualReadType();
MergeFunctionWrapper mergeFuncWrapper =
- new ReducerMergeFunctionWrapper(mfFactory.create(pushdownProjection));
+ new ReducerMergeFunctionWrapper(mfFactory.create(mfReadType));
for (List section : new IntervalPartition(files, keyComparator).partition()) {
sectionReaders.add(
() ->
@@ -386,6 +358,14 @@ public RecordReader createNoMergeReader(
return projectOuter(ConcatRecordReader.create(suppliers));
}
+ /**
+ * Returns the pushed read type if {@link #withReadType(RowType)} was called, else the default
+ * read type.
+ */
+ private RowType actualReadType() {
+ return readerFactoryBuilder.readValueType();
+ }
+
private RecordReader projectKey(RecordReader reader) {
if (readKeyType == null) {
return reader;
@@ -396,8 +376,8 @@ private RecordReader projectKey(RecordReader reader) {
}
private RecordReader projectOuter(RecordReader reader) {
- if (outerProjection != null) {
- ProjectedRow projectedRow = ProjectedRow.from(outerProjection);
+ if (outerReadType != null) {
+ ProjectedRow projectedRow = ProjectedRow.from(outerReadType, actualReadType());
reader = reader.transform(kv -> kv.replaceValue(projectedRow.replaceRow(kv.value())));
}
return reader;
@@ -405,7 +385,6 @@ private RecordReader projectOuter(RecordReader reader) {
@Nullable
public UserDefinedSeqComparator createUdsComparator() {
- return UserDefinedSeqComparator.create(
- readerFactoryBuilder.readValueType(), sequenceFields, sequenceOrder);
+ return UserDefinedSeqComparator.create(actualReadType(), sequenceFields, sequenceOrder);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 62631aaf8053..5e4cffcbf672 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -80,7 +80,7 @@ public KeyValueFileStore store() {
RowType keyType = new RowType(extractor.keyFields(tableSchema));
MergeFunctionFactory mfFactory =
- PrimaryKeyTableUtils.createMergeFunctionFactory(tableSchema, extractor);
+ PrimaryKeyTableUtils.createMergeFunctionFactory(tableSchema);
if (options.needLookup()) {
mfFactory = LookupMergeFunction.wrap(mfFactory, options, keyType, rowType);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index de320d78f96c..904ca51de708 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -56,7 +56,7 @@ public static List addKeyNamePrefix(List keyFields) {
}
public static MergeFunctionFactory createMergeFunctionFactory(
- TableSchema tableSchema, KeyValueFieldsExtractor extractor) {
+ TableSchema tableSchema) {
RowType rowType = tableSchema.logicalRowType();
Options conf = Options.fromMap(tableSchema.options());
CoreOptions options = new CoreOptions(conf);
@@ -68,11 +68,7 @@ public static MergeFunctionFactory createMergeFunctionFactory(
case PARTIAL_UPDATE:
return PartialUpdateMergeFunction.factory(conf, rowType, tableSchema.primaryKeys());
case AGGREGATE:
- return AggregateMergeFunction.factory(
- conf,
- tableSchema.fieldNames(),
- rowType.getFieldTypes(),
- tableSchema.primaryKeys());
+ return AggregateMergeFunction.factory(conf, rowType, tableSchema.primaryKeys());
case FIRST_ROW:
return FirstRowMergeFunction.factory(conf);
default:
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index 19af07dda624..ff8249c22153 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -34,6 +34,7 @@
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.sort.BinaryInMemorySortBuffer;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ReusingKeyValue;
@@ -45,7 +46,6 @@
import java.io.EOFException;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -225,8 +225,13 @@ protected MergeFunction createMergeFunction() {
options.set(CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE, removeRecordOnDelete);
return AggregateMergeFunction.factory(
options,
- Arrays.asList("f0", "f1"),
- Arrays.asList(DataTypes.INT().notNull(), DataTypes.BIGINT()),
+ RowType.builder()
+ .fields(
+ new DataType[] {
+ DataTypes.INT().notNull(), DataTypes.BIGINT()
+ },
+ new String[] {"f0", "f1"})
+ .build(),
Collections.singletonList("f0"))
.create();
}
@@ -263,8 +268,13 @@ protected MergeFunction createMergeFunction() {
MergeFunctionFactory aggMergeFunction =
AggregateMergeFunction.factory(
options,
- Arrays.asList("f0", "f1"),
- Arrays.asList(DataTypes.INT().notNull(), DataTypes.BIGINT()),
+ RowType.builder()
+ .fields(
+ new DataType[] {
+ DataTypes.INT().notNull(), DataTypes.BIGINT()
+ },
+ new String[] {"f0", "f1"})
+ .build(),
Collections.singletonList("f0"));
return LookupMergeFunction.wrap(aggMergeFunction, null, null, null).create();
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
index 5e88d2758ede..846734b5551d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
@@ -25,7 +25,6 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Projection;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -332,15 +331,15 @@ public void testAdjustProjectionSequenceFieldsProject() {
DataTypes.INT(),
DataTypes.INT());
// the sequence field 'f4' is projected too
- int[][] projection = new int[][] {{1}, {4}, {3}, {7}};
+ RowType readType = rowType.project("f1", "f4", "f3", "f7");
MergeFunctionFactory factory =
PartialUpdateMergeFunction.factory(options, rowType, ImmutableList.of("f0"));
- MergeFunctionFactory.AdjustedProjection adjustedProjection =
- factory.adjustProjection(projection);
+ RowType pushdownType = factory.adjustReadType(readType);
- validate(adjustedProjection, new int[] {1, 4, 3, 7, 5}, new int[] {0, 1, 2, 3});
+ assertThat(pushdownType).isNotNull();
+ assertThat(pushdownType.getFieldNames()).containsExactly("f1", "f4", "f3", "f7", "f5");
- MergeFunction func = factory.create(adjustedProjection.pushdownProjection);
+ MergeFunction func = factory.create(pushdownType);
func.reset();
// if sequence field is null, the related fields should not be updated
add(func, 1, 1, 1, 1, 1);
@@ -364,15 +363,16 @@ public void testMultiSequenceFieldsAdjustProjectionProject() {
DataTypes.INT(),
DataTypes.INT());
// the sequence field 'f4' is projected too
- int[][] projection = new int[][] {{1}, {4}, {3}, {7}};
+ RowType readType = rowType.project("f1", "f4", "f3", "f7");
MergeFunctionFactory factory =
PartialUpdateMergeFunction.factory(options, rowType, ImmutableList.of("f0"));
- MergeFunctionFactory.AdjustedProjection adjustedProjection =
- factory.adjustProjection(projection);
+ RowType pushdownType = factory.adjustReadType(readType);
- validate(adjustedProjection, new int[] {1, 4, 3, 7, 2, 5, 6}, new int[] {0, 1, 2, 3});
+ assertThat(pushdownType).isNotNull();
+ assertThat(pushdownType.getFieldNames())
+ .containsExactly("f1", "f4", "f3", "f7", "f2", "f5", "f6");
- MergeFunction func = factory.create(adjustedProjection.pushdownProjection);
+ MergeFunction func = factory.create(pushdownType);
func.reset();
// if sequence field is null, the related fields should not be updated
add(func, 1, 1, 1, 1, 1, 1, 1);
@@ -396,18 +396,16 @@ public void testAdjustProjectionAllFieldsProject() {
DataTypes.INT(),
DataTypes.INT());
// all fields are projected
- int[][] projection = new int[][] {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}};
+ RowType readType = rowType;
MergeFunctionFactory factory =
PartialUpdateMergeFunction.factory(options, rowType, ImmutableList.of("f0"));
- MergeFunctionFactory.AdjustedProjection adjustedProjection =
- factory.adjustProjection(projection);
+ RowType pushdownType = factory.adjustReadType(readType);
- validate(
- adjustedProjection,
- new int[] {0, 1, 2, 3, 4, 5, 6, 7},
- new int[] {0, 1, 2, 3, 4, 5, 6, 7});
+ assertThat(pushdownType).isNotNull();
+ assertThat(pushdownType.getFieldNames())
+ .containsExactly("f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7");
- MergeFunction func = factory.create(adjustedProjection.pushdownProjection);
+ MergeFunction func = factory.create(pushdownType);
func.reset();
// 'f6' has no sequence group, it should not be updated by null
add(func, 1, 1, 1, 1, 1, 1, 1, 1);
@@ -431,18 +429,16 @@ public void testMultiSequenceFieldsAdjustProjectionAllFieldsProject() {
DataTypes.INT(),
DataTypes.INT());
// all fields are projected
- int[][] projection = new int[][] {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}};
+ RowType readType = rowType;
MergeFunctionFactory factory =
PartialUpdateMergeFunction.factory(options, rowType, ImmutableList.of("f0"));
- MergeFunctionFactory.AdjustedProjection adjustedProjection =
- factory.adjustProjection(projection);
+ RowType pushdownType = factory.adjustReadType(readType);
- validate(
- adjustedProjection,
- new int[] {0, 1, 2, 3, 4, 5, 6, 7},
- new int[] {0, 1, 2, 3, 4, 5, 6, 7});
+ assertThat(pushdownType).isNotNull();
+ assertThat(pushdownType.getFieldNames())
+ .containsExactly("f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7");
- MergeFunction func = factory.create(adjustedProjection.pushdownProjection);
+ MergeFunction func = factory.create(pushdownType);
func.reset();
// 'f6' has no sequence group, it should not be updated by null
add(func, 1, 1, 1, 1, 1, 1, 1, 1);
@@ -465,16 +461,11 @@ public void testAdjustProjectionNonProject() {
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT());
- // set the projection = null
MergeFunctionFactory factory =
PartialUpdateMergeFunction.factory(options, rowType, ImmutableList.of("f0"));
- MergeFunctionFactory.AdjustedProjection adjustedProjection = factory.adjustProjection(null);
- validate(adjustedProjection, null, null);
-
- MergeFunction func = factory.create(adjustedProjection.pushdownProjection);
+ MergeFunction func = factory.create(rowType);
func.reset();
- // Setting projection with null is similar with projecting all fields
add(func, 1, 1, 1, 1, 1, 1, 1, 1);
add(func, 4, 2, 4, 2, 2, 0, null, 3);
validate(func, 4, 2, 4, 2, 2, 1, 1, 1);
@@ -493,15 +484,14 @@ public void testAdjustProjectionNoSequenceGroup() {
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT());
- int[][] projection = new int[][] {{0}, {1}, {3}, {4}, {7}};
+ RowType readType = rowType.project("f0", "f1", "f3", "f4", "f7");
MergeFunctionFactory factory =
PartialUpdateMergeFunction.factory(options, rowType, ImmutableList.of("f0"));
- MergeFunctionFactory.AdjustedProjection adjustedProjection =
- factory.adjustProjection(projection);
+ RowType pushdownType = factory.adjustReadType(readType);
- validate(adjustedProjection, new int[] {0, 1, 3, 4, 7}, null);
+ assertThat(pushdownType).isEqualTo(readType);
- MergeFunction func = factory.create(adjustedProjection.pushdownProjection);
+ MergeFunction func = factory.create(pushdownType);
func.reset();
// Without sequence group, all the fields should not be updated by null
add(func, 1, 1, 1, 1, 1);
@@ -526,12 +516,12 @@ public void testAdjustProjectionCreateDirectly() {
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT());
- int[][] projection = new int[][] {{1}, {7}};
+ RowType readType = rowType.project("f1", "f7");
assertThatThrownBy(
() ->
PartialUpdateMergeFunction.factory(
options, rowType, ImmutableList.of("f0"))
- .create(projection))
+ .create(readType))
.hasMessageContaining("Can not find new sequence field for new field.");
}
@@ -734,12 +724,12 @@ public void testPartialUpdateWithAggregationProjectPushDown() {
MergeFunctionFactory factory =
PartialUpdateMergeFunction.factory(options, rowType, ImmutableList.of("f0"));
- MergeFunctionFactory.AdjustedProjection adjustedProjection =
- factory.adjustProjection(new int[][] {{3}, {2}, {5}});
+ RowType pushdownType = factory.adjustReadType(rowType.project("f3", "f2", "f5"));
- validate(adjustedProjection, new int[] {3, 2, 5, 1}, new int[] {0, 1, 2});
+ assertThat(pushdownType).isNotNull();
+ assertThat(pushdownType.getFieldNames()).containsExactly("f3", "f2", "f5", "f1");
- MergeFunction func = factory.create(adjustedProjection.pushdownProjection);
+ MergeFunction func = factory.create(pushdownType);
func.reset();
// f0 pk
@@ -790,12 +780,12 @@ public void testMultiSequenceFieldsPartialUpdateWithAggregationProjectPushDown()
MergeFunctionFactory factory =
PartialUpdateMergeFunction.factory(options, rowType, ImmutableList.of("f0"));
- MergeFunctionFactory.AdjustedProjection adjustedProjection =
- factory.adjustProjection(new int[][] {{3}, {2}, {5}});
+ RowType pushdownType = factory.adjustReadType(rowType.project("f3", "f2", "f5"));
- validate(adjustedProjection, new int[] {3, 2, 5, 1, 8}, new int[] {0, 1, 2});
+ assertThat(pushdownType).isNotNull();
+ assertThat(pushdownType.getFieldNames()).containsExactly("f3", "f2", "f5", "f1", "f8");
- MergeFunction func = factory.create(adjustedProjection.pushdownProjection);
+ MergeFunction func = factory.create(pushdownType);
func.reset();
// f0 pk
@@ -894,22 +884,4 @@ private void add(MergeFunction function, RowKind rowKind, Integer... f
private void validate(MergeFunction function, Integer... f) {
assertThat(function.getResult().value()).isEqualTo(GenericRow.of(f));
}
-
- private void validate(
- MergeFunctionFactory.AdjustedProjection projection, int[] pushdown, int[] outer) {
- if (projection.pushdownProjection == null) {
- assertThat(pushdown).isNull();
- } else {
- assertThat(pushdown)
- .containsExactly(
- Projection.of(projection.pushdownProjection).toTopLevelIndexes());
- }
-
- if (projection.outerProjection == null) {
- assertThat(outer).isNull();
- } else {
- assertThat(outer)
- .containsExactly(Projection.of(projection.outerProjection).toTopLevelIndexes());
- }
- }
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
index 70cdbe310350..c1b2e9dcd872 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
@@ -23,12 +23,13 @@
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Test;
-import java.util.Arrays;
import java.util.Collections;
import static org.apache.paimon.CoreOptions.FIELDS_DEFAULT_AGG_FUNC;
@@ -45,13 +46,17 @@ void testDefaultAggFunc() {
MergeFunction aggregateFunction =
AggregateMergeFunction.factory(
options,
- Arrays.asList("k", "a", "b", "c", "d"),
- Arrays.asList(
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT()),
+ RowType.builder()
+ .fields(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT()
+ },
+ new String[] {"k", "a", "b", "c", "d"})
+ .build(),
Collections.singletonList("k"))
.create();
aggregateFunction.reset();
@@ -76,13 +81,17 @@ void tesListAggFunc() {
MergeFunction aggregateFunction =
AggregateMergeFunction.factory(
options,
- Arrays.asList("k", "a", "b", "c", "d"),
- Arrays.asList(
- DataTypes.INT(),
- DataTypes.STRING(),
- DataTypes.STRING(),
- DataTypes.INT(),
- DataTypes.STRING()),
+ RowType.builder()
+ .fields(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.STRING()
+ },
+ new String[] {"k", "a", "b", "c", "d"})
+ .build(),
Collections.singletonList("k"))
.create();
aggregateFunction.reset();
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
index 44a02699e426..db65eb8d1660 100644
--- a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
@@ -363,7 +363,7 @@ private static class Factory implements MergeFunctionFactory {
private static final long serialVersionUID = 1L;
@Override
- public MergeFunction create(@Nullable int[][] projection) {
+ public MergeFunction create(@Nullable RowType readType) {
return new TestValueCountMergeFunction();
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index bb48dbf36317..401ca436828d 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -32,7 +32,6 @@
import org.apache.paimon.mergetree.localmerge.LocalMerger;
import org.apache.paimon.mergetree.localmerge.SortBufferLocalMerger;
import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.PrimaryKeyTableUtils;
import org.apache.paimon.table.sink.RowKindGenerator;
@@ -61,8 +60,6 @@
import java.util.List;
-import static org.apache.paimon.table.PrimaryKeyTableUtils.addKeyNamePrefix;
-
/**
* {@link AbstractStreamOperator} which buffer input record and apply merge function when the buffer
* is full. Mainly to resolve data skew on primary keys.
@@ -106,24 +103,7 @@ public void open() throws Exception {
rowKindGenerator = RowKindGenerator.create(schema, options);
MergeFunction mergeFunction =
- PrimaryKeyTableUtils.createMergeFunctionFactory(
- schema,
- new KeyValueFieldsExtractor() {
- private static final long serialVersionUID = 1L;
-
- // At local merge operator, the key extractor should include
- // partition fields.
- @Override
- public List keyFields(TableSchema schema) {
- return addKeyNamePrefix(schema.primaryKeysFields());
- }
-
- @Override
- public List valueFields(TableSchema schema) {
- return schema.fields();
- }
- })
- .create();
+ PrimaryKeyTableUtils.createMergeFunctionFactory(schema).create();
boolean canHashMerger = true;
for (DataField field : valueType.getFields()) {
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
index 9fddb2f64eaf..5e4c074dbfa8 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
@@ -715,4 +715,158 @@ abstract class VariantTestBase extends PaimonSparkTestBase {
)
)
}
+
+ test("Paimon Variant: partial update with variant") {
+ withTable("t") {
+ sql("""
+ |CREATE table t (
+ | id INT,
+ | ts INT,
+ | dt INT,
+ | v VARIANT
+ |)
+ |TBLPROPERTIES (
+ | 'primary-key' = 'id',
+ | 'bucket' = '1',
+ | 'changelog-producer' = 'lookup',
+ | 'merge-engine' = 'partial-update',
+ | 'fields.dt.sequence-group' = 'ts',
+ | 'fields.ts.aggregate-function' = 'max',
+ | 'write-only' = 'true'
+ |)
+ |""".stripMargin)
+
+ sql("""
+ |INSERT INTO t VALUES
+ | (1, 1, 1, parse_json('{"c":{"a1":1,"a2":2}}'))
+ | """.stripMargin)
+
+ sql("""
+ |INSERT INTO t VALUES
+ | (1, 2, 2, parse_json('{"c":{"a1":3,"a2":4}}'))
+ | """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM t"),
+ sql("""SELECT 1, 2, 2, parse_json('{"c":{"a1":3,"a2":4}}')""")
+ )
+ checkAnswer(
+ sql("SELECT variant_get(v, '$.c', 'string') FROM t"),
+ Seq(
+ Row("{\"a1\":3,\"a2\":4}")
+ )
+ )
+ }
+ }
+
+ test("Paimon Variant: deduplicate with variant") {
+ withTable("t_dedup") {
+ sql("""
+ |CREATE table t_dedup (
+ | id INT,
+ | name STRING,
+ | v VARIANT
+ |) TBLPROPERTIES
+ |(
+ | 'primary-key' = 'id',
+ | 'bucket' = '1',
+ | 'merge-engine' = 'deduplicate',
+ | 'write-only' = 'true'
+ |)
+ |""".stripMargin)
+
+ sql("""
+ |INSERT INTO t_dedup VALUES
+ | (1, 'Alice', parse_json('{"age":30,"city":"NYC"}'))
+ | """.stripMargin)
+
+ sql("""
+ |INSERT INTO t_dedup VALUES
+ | (1, 'Bob', parse_json('{"age":25,"city":"LA"}'))
+ | """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM t_dedup"),
+ sql("""SELECT 1, 'Bob', parse_json('{"age":25,"city":"LA"}')""")
+ )
+ checkAnswer(
+ sql("SELECT variant_get(v, '$.age', 'int') FROM t_dedup"),
+ Seq(Row(25))
+ )
+ }
+ }
+
+ test("Paimon Variant: aggregate with variant") {
+ withTable("t_agg") {
+ sql("""
+ |CREATE table t_agg (
+ | id INT,
+ | cnt INT,
+ | v VARIANT
+ |) TBLPROPERTIES
+ |(
+ | 'primary-key' = 'id',
+ | 'bucket' = '1',
+ | 'merge-engine' = 'aggregation',
+ | 'fields.cnt.aggregate-function' = 'sum',
+ | 'write-only' = 'true'
+ |)
+ |""".stripMargin)
+
+ sql("""
+ |INSERT INTO t_agg VALUES
+ | (1, 10, parse_json('{"data":{"x":1,"y":2}}'))
+ | """.stripMargin)
+
+ sql("""
+ |INSERT INTO t_agg VALUES
+ | (1, 20, parse_json('{"data":{"x":3,"y":4}}'))
+ | """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM t_agg"),
+ sql("""SELECT 1, 30, parse_json('{"data":{"x":3,"y":4}}')""")
+ )
+ checkAnswer(
+ sql("SELECT variant_get(v, '$.data.x', 'int') FROM t_agg"),
+ Seq(Row(3))
+ )
+ }
+ }
+
+ test("Paimon Variant: first-row with variant") {
+ withTable("t_first") {
+ sql("""
+ |CREATE table t_first (
+ | id INT,
+ | seq INT,
+ | v VARIANT
+ |) TBLPROPERTIES
+ |(
+ | 'primary-key' = 'id',
+ | 'bucket' = '1',
+ | 'merge-engine' = 'first-row'
+ |)
+ |""".stripMargin)
+
+ sql("""
+ |INSERT INTO t_first VALUES
+ | (1, 100, parse_json('{"status":"active"}'))
+ | """.stripMargin)
+
+ sql("""
+ |INSERT INTO t_first VALUES
+ | (1, 200, parse_json('{"status":"inactive"}'))
+ | """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM t_first"),
+ sql("""SELECT 1, 100, parse_json('{"status":"active"}')""")
+ )
+ checkAnswer(
+ sql("SELECT variant_get(v, '$.status', 'string') FROM t_first"),
+ Seq(Row("active"))
+ )
+ }
+ }
}