Skip to content

[Bug] Tables become unavailable everywhere due to aggregate function columns not defining sequenceGroup #7045

@zhangdove

Description

@zhangdove

Search before asking

  • I searched in the issues and found nothing similar.

Paimon version

1.3.1

Compute Engine

Flink-1.20.0

Minimal reproduce step

  1. Create table
CREATE TABLE `paimon`.`default`.`partialUpdate01` (
  f1 STRING NOT NULL ,
  f2 BIGINT NOT NULL ,
  f3 BIGINT ,
  f4 BIGINT ,
  f5 BIGINT ,
  f6 BIGINT ,
  log_date STRING NOT NULL ,
  log_hour STRING NOT NULL ,
  PRIMARY KEY (log_date,log_hour,f1,f2) NOT ENFORCED
)
PARTITIONED BY (log_date, log_hour)
WITH(
  'bucket' = '4',
  'changelog-producer' = 'lookup',
  'file.format' = 'orc',
  'merge-engine' = 'partial-update'
)
  1. Alter table
ALTER TABLE `paimon`.`default`.`partialUpdate01`
   SET
  ('fields.f1,f2.sequence-group'='f3,f4,f5',
  'fields.f3.aggregate-function'='last_value',
  'fields.f4.aggregate-function'='last_value',
  'fields.f5.aggregate-function'='max',
  'fields.f6.aggregate-function'='max')
  1. Insert into table
INSERT into `paimon`.`default`.`partialUpdate01` select f1,f2,f3,f4,f5,f6, '20260101' as log_date, '01' as log_hour from source

What doesn't meet your expectations?

Because the user was not sufficiently familiar with the functionality, they accidentally added an option ('fields.f6.aggregate-function'='max'), and then when performing a Flink write operation on this table, the following exception stack trace occurred:

Exception in thread "main" java.lang.IllegalArgumentException: Must use sequence group for aggregation functions but not found for field f6.
	at org.apache.paimon.utils.Preconditions.checkArgument(Preconditions.java:149)
	at org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.getAggFuncName(PartialUpdateMergeFunction.java:697)
	at org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction$Factory.createFieldAggregators(PartialUpdateMergeFunction.java:655)
	at org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction$Factory.<init>(PartialUpdateMergeFunction.java:452)
	at org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction$Factory.<init>(PartialUpdateMergeFunction.java:380)
	at org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.factory(PartialUpdateMergeFunction.java:377)
	at org.apache.paimon.table.PrimaryKeyTableUtils.createMergeFunctionFactory(PrimaryKeyTableUtils.java:69)
	at org.apache.paimon.table.PrimaryKeyFileStoreTable.store(PrimaryKeyFileStoreTable.java:83)
	at org.apache.paimon.table.PrimaryKeyFileStoreTable.store(PrimaryKeyFileStoreTable.java:53)
	at org.apache.paimon.table.AbstractFileStoreTable.setSnapshotCache(AbstractFileStoreTable.java:133)
	at org.apache.paimon.table.PrimaryKeyFileStoreTable.setSnapshotCache(PrimaryKeyFileStoreTable.java:53)
	at org.apache.paimon.catalog.CachingCatalog.putTableCache(CachingCatalog.java:250)
	at org.apache.paimon.catalog.CachingCatalog.getTable(CachingCatalog.java:243)
	at org.apache.paimon.flink.FlinkCatalog.getTable(FlinkCatalog.java:315)
	at org.apache.paimon.flink.FlinkCatalog.getTable(FlinkCatalog.java:300)
	at org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:740)
	at org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:634)
	at org.apache.flink.table.catalog.CatalogManager.getTableOrError(CatalogManager.java:691)
	at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertSqlInsert(SqlNodeToOperationConversion.java:770)
	at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:350)
	at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:261)
	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
	at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:74)
	at org.apache.flink.table.api.bridge.java.internal.StreamStatementSetImpl.addInsertSql(StreamStatementSetImpl.java:45)
	at org.apache.flink.table.api.bridge.java.internal.StreamStatementSetImpl.addInsertSql(StreamStatementSetImpl.java:30)

Anything else?

I tried using the Flink SQL client to execute the following command(s) to remove the incorrectly added option, but got the same error stack trace.

ALTER TABLE `paimon`.`default`.`partialUpdate01`
RESET
('fields.f6.aggregate-function')

Due to incorrectly adding a configuration and modifying the table schema without parameter validation, the table can now neither be read nor deleted. This appears to be an unexpected behavior.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions