From 70eb791f85268ce5219a9ef2c8e9d3fbf8ddffab Mon Sep 17 00:00:00 2001 From: zhangdove Date: Wed, 14 Jan 2026 23:07:24 +0800 Subject: [PATCH] [core] Fix the issue where aggregate function columns without defining sequenceGroup cause the table to be unavailable --- .../paimon/schema/SchemaValidation.java | 41 ++++++++++++++++++- .../paimon/schema/SchemaValidationTest.java | 13 ++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 1378ecc58651..e69ba193b794 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -26,6 +26,7 @@ import org.apache.paimon.format.FileFormat; import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory; import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.Options; import org.apache.paimon.table.BucketMode; @@ -450,11 +451,15 @@ private static void validateFieldsPrefix(TableSchema schema, CoreOptions options private static void validateSequenceGroup(TableSchema schema, CoreOptions options) { Map> fields2Group = new HashMap<>(); + Set sequenceGroupFieldIndexs = new HashSet<>(); + List fieldNames = schema.fieldNames(); for (Map.Entry entry : options.toMap().entrySet()) { String k = entry.getKey(); String v = entry.getValue(); - List fieldNames = schema.fieldNames(); if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) { + Arrays.stream(v.split(FIELDS_SEPARATOR)) + .map(fieldName -> requireField(fieldName, fieldNames)) + .forEach(sequenceGroupFieldIndexs::add); String[] sequenceFieldNames = k.substring( FIELDS_PREFIX.length() + 1, @@ -492,8 +497,33 @@ private static void validateSequenceGroup(TableSchema schema, CoreOptions option Set group = fields2Group.computeIfAbsent(field, p -> new HashSet<>()); group.addAll(sequenceFieldsList); } + + // add self + Arrays.stream(sequenceFieldNames) + .mapToInt(fieldName -> requireField(fieldName, fieldNames)) + .forEach(sequenceGroupFieldIndexs::add); } } + + if (options.mergeEngine() == MergeEngine.PARTIAL_UPDATE) { + for (String fieldName : fieldNames) { + String aggFunc = options.fieldAggFunc(fieldName); + String aggFuncName = aggFunc == null ? options.fieldsDefaultFunc() : aggFunc; + if (schema.primaryKeys().contains(fieldName)) { + continue; + } + if (aggFuncName != null) { + // last_non_null_value doesn't require sequence group + checkArgument( + aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME) + || sequenceGroupFieldIndexs.contains( + fieldNames.indexOf(fieldName)), + "Must use sequence group for aggregation functions but not found for field %s.", + fieldName); + } + } + } + Set illegalGroup = fields2Group.values().stream() .flatMap(Collection::stream) @@ -689,6 +719,15 @@ private static void validateIncrementalClustering(TableSchema schema, CoreOption } } + private static int requireField(String fieldName, List fieldNames) { + int field = fieldNames.indexOf(fieldName); + if (field == -1) { + throw new IllegalArgumentException( + String.format("Field %s can not be found in table schema.", fieldName)); + } + return field; + } + public static void validateChainTable(TableSchema schema, CoreOptions options) { if (options.isChainTable()) { boolean isPrimaryTbl = schema.primaryKeys() != null && !schema.primaryKeys().isEmpty(); diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java index 57c659e5c752..1af8c8334649 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java @@ -151,4 +151,17 @@ public void testBlobTableSchema() { assertThatThrownBy(() -> validateBlobSchema(options, Collections.singletonList("f2"))) .hasMessage("The BLOB type column can not be part of partition keys."); } + + @Test + public void testPartialUpdateTableAggregateFunctionWithoutSequenceGroup() { + Map options = new HashMap<>(2); + options.put("merge-engine", "partial-update"); + options.put("fields.f3.aggregate-function", "max"); + assertThatThrownBy(() -> validateTableSchemaExec(options)) + .hasMessageContaining( + "Must use sequence group for aggregation functions but not found for field"); + + options.put("fields.f2.sequence-group", "f3"); + assertThatCode(() -> validateTableSchemaExec(options)).doesNotThrowAnyException(); + } }