From 9f2d09ca5463aa78cdcf1060f3e2a6a96edb7299 Mon Sep 17 00:00:00 2001 From: songwongtp <16089160+songwongtp@users.noreply.github.com> Date: Tue, 24 Mar 2026 16:08:59 +0700 Subject: [PATCH 1/9] feat: tx account cleanup extension --- config/config.go | 12 +- config/txaccountcleanup.go | 5 + indexer/collector/tx/address.go | 2 +- indexer/collector/tx/collect.go | 2 +- indexer/extension/manager.go | 5 + indexer/extension/txaccountcleanup/cleanup.go | 205 ++++++++++++++++++ .../extension/txaccountcleanup/extension.go | 142 ++++++++++++ ...09000000_add_tx_account_cleanup_status.sql | 6 + orm/migrations/atlas.sum | 3 +- types/table.go | 10 + 10 files changed, 387 insertions(+), 5 deletions(-) create mode 100644 config/txaccountcleanup.go create mode 100644 indexer/extension/txaccountcleanup/cleanup.go create mode 100644 indexer/extension/txaccountcleanup/extension.go create mode 100644 orm/migrations/20260409000000_add_tx_account_cleanup_status.sql diff --git a/config/config.go b/config/config.go index 570f838d..b3b089f3 100644 --- a/config/config.go +++ b/config/config.go @@ -144,8 +144,9 @@ type Config struct { pollingInterval time.Duration // for api only internalTxConfig *InternalTxConfig richListConfig *RichListConfig - evmRetCleanupConfig *EvmRetCleanupConfig - metricsConfig *MetricsConfig + evmRetCleanupConfig *EvmRetCleanupConfig + txAccountCleanupConfig *TxAccountCleanupConfig + metricsConfig *MetricsConfig cacheConfig *CacheConfig sentryConfig *SentryConfig corsConfig *CORSConfig @@ -297,6 +298,9 @@ func loadConfig() (*Config, error) { evmRetCleanupConfig: &EvmRetCleanupConfig{ Enabled: viper.GetBool("EVM_RET_CLEANUP"), }, + txAccountCleanupConfig: &TxAccountCleanupConfig{ + Enabled: viper.GetBool("TX_ACCOUNT_CLEANUP"), + }, metricsConfig: &MetricsConfig{ Enabled: viper.GetBool("METRICS_ENABLED"), Path: viper.GetString("METRICS_PATH"), @@ -471,6 +475,10 @@ func (c *Config) SetEvmRetCleanupConfig(evmRetCleanCgf *EvmRetCleanupConfig) { c.evmRetCleanupConfig = evmRetCleanCgf } +func (c Config) TxAccountCleanupEnabled() bool { + return c.txAccountCleanupConfig.Enabled +} + func (c Config) GetSentryConfig() *SentryConfig { if c.sentryConfig == nil || c.sentryConfig.DSN == "" { return nil diff --git a/config/txaccountcleanup.go b/config/txaccountcleanup.go new file mode 100644 index 00000000..fbd03ae0 --- /dev/null +++ b/config/txaccountcleanup.go @@ -0,0 +1,5 @@ +package config + +type TxAccountCleanupConfig struct { + Enabled bool +} diff --git a/indexer/collector/tx/address.go b/indexer/collector/tx/address.go index f636de97..9a696959 100644 --- a/indexer/collector/tx/address.go +++ b/indexer/collector/tx/address.go @@ -39,7 +39,7 @@ func findAllMoveHexAddress(attr string) []string { return regexMoveHex.FindAllString(attr, -1) } -func grepAddressesFromTx(events []abci.Event, tx *gorm.DB) (grepped []string, err error) { +func GrepAddressesFromTx(events []abci.Event, tx *gorm.DB) (grepped []string, err error) { storeAddrMap := make(map[string]interface{}) // set of fa store addrs for _, event := range events { diff --git a/indexer/collector/tx/collect.go b/indexer/collector/tx/collect.go index d24641cd..291d5319 100644 --- a/indexer/collector/tx/collect.go +++ b/indexer/collector/tx/collect.go @@ -128,7 +128,7 @@ func (sub *TxSubmodule) collect(block indexertypes.ScrapedBlock, tx *gorm.DB) er } // grep addresses from events - addrs, err := grepAddressesFromTx(res.Events, tx) + addrs, err := GrepAddressesFromTx(res.Events, tx) if err != nil { return err } diff --git a/indexer/extension/manager.go b/indexer/extension/manager.go index caf07495..d7807805 100644 --- a/indexer/extension/manager.go +++ b/indexer/extension/manager.go @@ -11,6 +11,7 @@ import ( evmret "github.com/initia-labs/rollytics/indexer/extension/evmret" internaltx "github.com/initia-labs/rollytics/indexer/extension/internaltx" richlist "github.com/initia-labs/rollytics/indexer/extension/richlist" + txaccountcleanup "github.com/initia-labs/rollytics/indexer/extension/txaccountcleanup" "github.com/initia-labs/rollytics/indexer/extension/types" "github.com/initia-labs/rollytics/orm" ) @@ -36,6 +37,10 @@ func New(cfg *config.Config, logger *slog.Logger, db *orm.Database) *ExtensionMa if retCleanup := evmret.New(cfg, logger, db); retCleanup != nil { extensions = append(extensions, retCleanup) } + // TX Account Cleanup + if taCleanup := txaccountcleanup.New(cfg, logger, db); taCleanup != nil { + extensions = append(extensions, taCleanup) + } return &ExtensionManager{ cfg: cfg, logger: logger, diff --git a/indexer/extension/txaccountcleanup/cleanup.go b/indexer/extension/txaccountcleanup/cleanup.go new file mode 100644 index 00000000..e00ad124 --- /dev/null +++ b/indexer/extension/txaccountcleanup/cleanup.go @@ -0,0 +1,205 @@ +package txaccountcleanup + +import ( + "context" + "encoding/hex" + "encoding/json" + "fmt" + "log/slog" + + abci "github.com/cometbft/cometbft/abci/types" + "gorm.io/gorm" + + "github.com/initia-labs/rollytics/config" + tx "github.com/initia-labs/rollytics/indexer/collector/tx" + "github.com/initia-labs/rollytics/indexer/extension/evmret" + "github.com/initia-labs/rollytics/orm" + "github.com/initia-labs/rollytics/types" + "github.com/initia-labs/rollytics/util/cache" +) + +type txDataWithEvents struct { + Events []abci.Event `json:"events"` +} + +func ProcessBatch(ctx context.Context, db *gorm.DB, cfg *config.Config, logger *slog.Logger, startSeq, endSeq int64) (totalDeleted, totalInserted int64, err error) { + // Query txs in sequence range + var txs []types.CollectedTx + if err := db.WithContext(ctx). + Where("sequence >= ? AND sequence <= ?", startSeq, endSeq). + Order("sequence ASC"). + Find(&txs).Error; err != nil { + return 0, 0, fmt.Errorf("failed to query transactions: %w", err) + } + + if len(txs) == 0 { + return 0, 0, nil + } + + // Query actual tx_accounts in sequence range + var actualEntries []types.CollectedTxAccount + if err := db.WithContext(ctx). + Where("sequence >= ? AND sequence <= ?", startSeq, endSeq). + Find(&actualEntries).Error; err != nil { + return 0, 0, fmt.Errorf("failed to query tx_accounts: %w", err) + } + + // Group actual entries by sequence + actualBySeq := make(map[int64][]types.CollectedTxAccount) + for _, entry := range actualEntries { + actualBySeq[entry.Sequence] = append(actualBySeq[entry.Sequence], entry) + } + + isEVM := cfg.GetVmType() == types.EVM + + for _, collectedTx := range txs { + select { + case <-ctx.Done(): + return totalDeleted, totalInserted, ctx.Err() + default: + } + + deleted, inserted, err := reconcileTx(ctx, db, logger, collectedTx, actualBySeq[collectedTx.Sequence], isEVM) + if err != nil { + hashStr := hex.EncodeToString(collectedTx.Hash) + logger.Warn("failed to reconcile tx", + slog.String("tx_hash", hashStr), + slog.Int64("sequence", collectedTx.Sequence), + slog.Any("error", err)) + continue + } + + totalDeleted += deleted + totalInserted += inserted + } + + return totalDeleted, totalInserted, nil +} + +func reconcileTx(ctx context.Context, db *gorm.DB, logger *slog.Logger, collectedTx types.CollectedTx, actual []types.CollectedTxAccount, isEVM bool) (deleted, inserted int64, err error) { + // Parse events from stored tx data + var txData txDataWithEvents + if err := json.Unmarshal(collectedTx.Data, &txData); err != nil { + return 0, 0, fmt.Errorf("failed to unmarshal tx data: %w", err) + } + + // Re-derive addresses from events + addrs, err := tx.GrepAddressesFromTx(txData.Events, db) + if err != nil { + return 0, 0, fmt.Errorf("failed to grep addresses: %w", err) + } + + // Deduplicate addresses + addrMap := make(map[string]struct{}) + for _, addr := range addrs { + addrMap[addr] = struct{}{} + } + var uniqueAddrs []string + for addr := range addrMap { + uniqueAddrs = append(uniqueAddrs, addr) + } + + // Convert to account IDs (don't create new accounts) + accountIdMap, err := cache.GetOrCreateAccountIds(db, uniqueAddrs, false) + if err != nil { + return 0, 0, fmt.Errorf("failed to get account IDs: %w", err) + } + + // Build expected set (filter out id=0) + expectedSet := make(map[int64]struct{}) + for _, id := range accountIdMap { + if id != 0 { + expectedSet[id] = struct{}{} + } + } + + // EVM ret-only safety: remove ret-only addresses from expected set before insert + if isEVM { + retOnlyAddrs, err := evmret.FindRetOnlyAddresses(collectedTx.Data) + if err != nil { + return 0, 0, fmt.Errorf("failed to find ret-only addresses: %w", err) + } + if len(retOnlyAddrs) > 0 { + retAccountIds, err := cache.GetOrCreateAccountIds(db, retOnlyAddrs, false) + if err != nil { + return 0, 0, fmt.Errorf("failed to get ret-only account IDs: %w", err) + } + for _, id := range retAccountIds { + delete(expectedSet, id) + } + } + } + + // Add signer AFTER ret-only filtering (signer is always expected) + if collectedTx.SignerId != 0 { + expectedSet[collectedTx.SignerId] = struct{}{} + } + + // Build actual set + actualSet := make(map[int64]types.CollectedTxAccount) + for _, entry := range actual { + actualSet[entry.AccountId] = entry + } + + // Compute extras (in actual but not expected) → DELETE + var extraIds []int64 + for accountId := range actualSet { + if _, ok := expectedSet[accountId]; !ok { + extraIds = append(extraIds, accountId) + } + } + + // Compute missing (in expected but not actual) → INSERT + var missingEntries []types.CollectedTxAccount + for accountId := range expectedSet { + if _, ok := actualSet[accountId]; !ok { + missingEntries = append(missingEntries, types.CollectedTxAccount{ + AccountId: accountId, + Sequence: collectedTx.Sequence, + Signer: accountId == collectedTx.SignerId, + }) + } + } + + if len(extraIds) == 0 && len(missingEntries) == 0 { + return 0, 0, nil + } + + hashStr := hex.EncodeToString(collectedTx.Hash) + + err = db.WithContext(ctx).Transaction(func(txDB *gorm.DB) error { + // Delete extras + if len(extraIds) > 0 { + result := txDB. + Where("account_id IN ? AND sequence = ?", extraIds, collectedTx.Sequence). + Delete(&types.CollectedTxAccount{}) + if result.Error != nil { + return fmt.Errorf("failed to delete extra entries: %w", result.Error) + } + deleted = result.RowsAffected + + logger.Info("deleted extra tx_accounts entries", + slog.String("tx_hash", hashStr), + slog.Int64("sequence", collectedTx.Sequence), + slog.Int64("count", deleted)) + } + + // Insert missing + if len(missingEntries) > 0 { + result := txDB.Clauses(orm.DoNothingWhenConflict).Create(&missingEntries) + if result.Error != nil { + return fmt.Errorf("failed to insert missing entries: %w", result.Error) + } + inserted = result.RowsAffected + + logger.Info("inserted missing tx_accounts entries", + slog.String("tx_hash", hashStr), + slog.Int64("sequence", collectedTx.Sequence), + slog.Int64("count", inserted)) + } + + return nil + }) + + return deleted, inserted, err +} diff --git a/indexer/extension/txaccountcleanup/extension.go b/indexer/extension/txaccountcleanup/extension.go new file mode 100644 index 00000000..e5292cde --- /dev/null +++ b/indexer/extension/txaccountcleanup/extension.go @@ -0,0 +1,142 @@ +package txaccountcleanup + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" + + "gorm.io/gorm" + + "github.com/initia-labs/rollytics/config" + exttypes "github.com/initia-labs/rollytics/indexer/extension/types" + "github.com/initia-labs/rollytics/orm" + "github.com/initia-labs/rollytics/types" +) + +const ( + ExtensionName = "tx-account-cleanup" + BatchSize = 1000 +) + +var _ exttypes.Extension = (*TxAccountCleanupExtension)(nil) + +type TxAccountCleanupExtension struct { + cfg *config.Config + logger *slog.Logger + db *orm.Database +} + +func New(cfg *config.Config, logger *slog.Logger, db *orm.Database) *TxAccountCleanupExtension { + if !cfg.TxAccountCleanupEnabled() { + return nil + } + + return &TxAccountCleanupExtension{ + cfg: cfg, + logger: logger.With("extension", ExtensionName), + db: db, + } +} + +func (e *TxAccountCleanupExtension) Name() string { + return ExtensionName +} + +func (e *TxAccountCleanupExtension) Initialize(ctx context.Context) (*types.CollectedTxAccountCleanupStatus, error) { + var status types.CollectedTxAccountCleanupStatus + err := e.db.WithContext(ctx).First(&status).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + status = types.CollectedTxAccountCleanupStatus{ + LastCleanedSequence: 0, + DeletedRecords: 0, + InsertedRecords: 0, + } + if err := e.db.WithContext(ctx).Create(&status).Error; err != nil { + return nil, fmt.Errorf("failed to create initial status: %w", err) + } + e.logger.Info("initialized cleanup status") + return &status, nil + } + return nil, fmt.Errorf("failed to retrieve cleanup status: %w", err) + } + + return &status, nil +} + +func (e *TxAccountCleanupExtension) Run(ctx context.Context) error { + status, err := e.Initialize(ctx) + if err != nil { + return fmt.Errorf("failed to initialize: %w", err) + } + + currentSeq := status.LastCleanedSequence + 1 + e.logger.Info("starting tx account cleanup", + slog.Int64("start_sequence", currentSeq)) + + for { + select { + case <-ctx.Done(): + e.logger.Info("cleanup stopped", + slog.Int64("last_cleaned_sequence", status.LastCleanedSequence), + slog.Int64("deleted_records", status.DeletedRecords), + slog.Int64("inserted_records", status.InsertedRecords)) + return ctx.Err() + default: + } + + var maxSeq int64 + if err := e.db.WithContext(ctx). + Model(&types.CollectedTx{}). + Select("COALESCE(MAX(sequence), 0)"). + Scan(&maxSeq).Error; err != nil { + return fmt.Errorf("failed to get max sequence: %w", err) + } + + if currentSeq > maxSeq { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(1 * time.Second): + } + continue + } + + endSeq := min(currentSeq+BatchSize-1, maxSeq) + + deleted, inserted, err := ProcessBatch(ctx, e.db.DB, e.cfg, e.logger, currentSeq, endSeq) + if err != nil { + return fmt.Errorf("failed to process batch [%d-%d]: %w", currentSeq, endSeq, err) + } + + status.LastCleanedSequence = endSeq + status.DeletedRecords += deleted + status.InsertedRecords += inserted + + if err := e.updateStatus(ctx, status); err != nil { + return fmt.Errorf("failed to update status: %w", err) + } + + if deleted > 0 || inserted > 0 { + e.logger.Info("tx account cleanup processed batch", + slog.Int64("end_sequence", endSeq), + slog.Int64("batch_deleted", deleted), + slog.Int64("batch_inserted", inserted)) + } + + currentSeq = endSeq + 1 + } +} + +func (e *TxAccountCleanupExtension) updateStatus(ctx context.Context, status *types.CollectedTxAccountCleanupStatus) error { + return e.db.WithContext(ctx). + Model(&types.CollectedTxAccountCleanupStatus{}). + Where("1 = 1"). + Updates(map[string]interface{}{ + "last_cleaned_sequence": status.LastCleanedSequence, + "deleted_records": status.DeletedRecords, + "inserted_records": status.InsertedRecords, + }).Error +} diff --git a/orm/migrations/20260409000000_add_tx_account_cleanup_status.sql b/orm/migrations/20260409000000_add_tx_account_cleanup_status.sql new file mode 100644 index 00000000..38305c85 --- /dev/null +++ b/orm/migrations/20260409000000_add_tx_account_cleanup_status.sql @@ -0,0 +1,6 @@ +-- Create "tx_account_cleanup_status" table +CREATE TABLE "public"."tx_account_cleanup_status" ( + "last_cleaned_sequence" bigint NULL, + "deleted_records" bigint NULL, + "inserted_records" bigint NULL +); diff --git a/orm/migrations/atlas.sum b/orm/migrations/atlas.sum index 71fcf941..f990bc4e 100644 --- a/orm/migrations/atlas.sum +++ b/orm/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:9m7zdGKVJkdc2VOvgpWviZE0aliHxKJ1keT+XydJNvo= +h1:kdj8FhrIlTWlahVSPIhdZiwfWXkeyE4uCptZLnuYiOc= 20250806084521_migration.sql h1:Qdn42AgebdtLQoc+aUfautynU10/oHxL8wjXusSqQaE= 20250822034114_migration.sql h1:ybJSC6AlidSpXS+oup6aYHchZFaOEkJU9C8lOnF0S68= 20250902111542_add_partial_indices.sql h1:Qc5PA4bCNP5tjhZrHFhscgc/Ap/Ee/mnmoPixefeRtw= @@ -10,3 +10,4 @@ h1:9m7zdGKVJkdc2VOvgpWviZE0aliHxKJ1keT+XydJNvo= 20251112100000_add_tx_height_sequence_composite_index.sql h1:bgbWL2lGapZrH6cgc5z+ALSCh3QjeDvX1sdI+7kXHG4= 20251119052849_migration.sql h1:hHv9owtwZfsRqC8Ad7ZL3FNP9aGPSbRPZbHgrpdtyu8= 20260408163700_add_tx_accounts_sequence_index.sql h1:yzHQY8tFAm2+eFqoMg/eRnkDtVt33LN+hwPdbaF0y8E= +20260409000000_add_tx_account_cleanup_status.sql h1:OUN7L2AycU9G6g54K8hGUkII4vvf57QltgRci88itOo= diff --git a/types/table.go b/types/table.go index f8ecc01c..36f6a1ce 100644 --- a/types/table.go +++ b/types/table.go @@ -182,6 +182,12 @@ type CollectedEvmRetCleanupStatus struct { CorrectedRecords int64 `gorm:"type:bigint;column:corrected_records"` } +type CollectedTxAccountCleanupStatus struct { + LastCleanedSequence int64 `gorm:"type:bigint;column:last_cleaned_sequence"` + DeletedRecords int64 `gorm:"type:bigint;column:deleted_records"` + InsertedRecords int64 `gorm:"type:bigint;column:inserted_records"` +} + func (CollectedUpgradeHistory) TableName() string { return "upgrade_history" } @@ -274,6 +280,10 @@ func (CollectedEvmRetCleanupStatus) TableName() string { return "evm_ret_cleanup_status" } +func (CollectedTxAccountCleanupStatus) TableName() string { + return "tx_account_cleanup_status" +} + // CursorRecord interface implementations // Sequence-based tables From 411b43e87f496fccb03e28fe5b47651bc6b53b19 Mon Sep 17 00:00:00 2001 From: songwongtp <16089160+songwongtp@users.noreply.github.com> Date: Tue, 24 Mar 2026 16:10:23 +0700 Subject: [PATCH 2/9] style: adjust indentation of Config struct fields. --- config/config.go | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/config/config.go b/config/config.go index b3b089f3..ceae9311 100644 --- a/config/config.go +++ b/config/config.go @@ -129,27 +129,27 @@ func SetBuildInfo(v, commit string) { } type Config struct { - listenPort string - recvBufSize uint - indexerListenPort string - dbConfig *dbconfig.Config - chainConfig *ChainConfig - logLevel string - logFormat string - coolingDuration time.Duration // for indexer only - queryTimeout time.Duration // for indexer only - maxConcurrentRequests int // for indexer only - cacheSize int - cacheTTL time.Duration // for api only - pollingInterval time.Duration // for api only - internalTxConfig *InternalTxConfig - richListConfig *RichListConfig - evmRetCleanupConfig *EvmRetCleanupConfig - txAccountCleanupConfig *TxAccountCleanupConfig - metricsConfig *MetricsConfig - cacheConfig *CacheConfig - sentryConfig *SentryConfig - corsConfig *CORSConfig + listenPort string + recvBufSize uint + indexerListenPort string + dbConfig *dbconfig.Config + chainConfig *ChainConfig + logLevel string + logFormat string + coolingDuration time.Duration // for indexer only + queryTimeout time.Duration // for indexer only + maxConcurrentRequests int // for indexer only + cacheSize int + cacheTTL time.Duration // for api only + pollingInterval time.Duration // for api only + internalTxConfig *InternalTxConfig + richListConfig *RichListConfig + evmRetCleanupConfig *EvmRetCleanupConfig + txAccountCleanupConfig *TxAccountCleanupConfig + metricsConfig *MetricsConfig + cacheConfig *CacheConfig + sentryConfig *SentryConfig + corsConfig *CORSConfig // Start height configuration startHeight int64 // explicit start height when set From 086fa27395beee78b7f0cff9e7db2aa2f88c702a Mon Sep 17 00:00:00 2001 From: songwongtp <16089160+songwongtp@users.noreply.github.com> Date: Tue, 24 Mar 2026 16:12:58 +0700 Subject: [PATCH 3/9] feat: Add default configuration for TX_ACCOUNT_CLEANUP. --- config/config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/config/config.go b/config/config.go index ceae9311..6b46ba10 100644 --- a/config/config.go +++ b/config/config.go @@ -178,6 +178,7 @@ func setDefaults() { viper.SetDefault("INTERNAL_TX_BATCH_SIZE", DefaultInternalTxBatchSize) viper.SetDefault("INTERNAL_TX_QUEUE_SIZE", DefaultInternalTxQueueSize) viper.SetDefault("RICH_LIST", true) + viper.SetDefault("TX_ACCOUNT_CLEANUP", true) viper.SetDefault("METRICS_ENABLED", false) viper.SetDefault("METRICS_PATH", DefaultMetricsPath) viper.SetDefault("METRICS_PORT", DefaultMetricsPort) From 1a84b320ebad5c228bd660038d742449ab117d90 Mon Sep 17 00:00:00 2001 From: songwongtp <16089160+songwongtp@users.noreply.github.com> Date: Tue, 24 Mar 2026 17:42:46 +0700 Subject: [PATCH 4/9] fix: `ProcessBatch` now returns the last processed sequence to enable checkpointing partial progress in cleanup. --- indexer/extension/txaccountcleanup/cleanup.go | 21 ++++++----- .../extension/txaccountcleanup/extension.go | 36 ++++++++++--------- 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/indexer/extension/txaccountcleanup/cleanup.go b/indexer/extension/txaccountcleanup/cleanup.go index e00ad124..f83f8d15 100644 --- a/indexer/extension/txaccountcleanup/cleanup.go +++ b/indexer/extension/txaccountcleanup/cleanup.go @@ -22,18 +22,20 @@ type txDataWithEvents struct { Events []abci.Event `json:"events"` } -func ProcessBatch(ctx context.Context, db *gorm.DB, cfg *config.Config, logger *slog.Logger, startSeq, endSeq int64) (totalDeleted, totalInserted int64, err error) { +func ProcessBatch(ctx context.Context, db *gorm.DB, cfg *config.Config, logger *slog.Logger, startSeq, endSeq int64) (lastProcessedSeq, totalDeleted, totalInserted int64, err error) { + lastProcessedSeq = startSeq - 1 + // Query txs in sequence range var txs []types.CollectedTx if err := db.WithContext(ctx). Where("sequence >= ? AND sequence <= ?", startSeq, endSeq). Order("sequence ASC"). Find(&txs).Error; err != nil { - return 0, 0, fmt.Errorf("failed to query transactions: %w", err) + return lastProcessedSeq, 0, 0, fmt.Errorf("failed to query transactions: %w", err) } if len(txs) == 0 { - return 0, 0, nil + return endSeq, 0, 0, nil } // Query actual tx_accounts in sequence range @@ -41,7 +43,7 @@ func ProcessBatch(ctx context.Context, db *gorm.DB, cfg *config.Config, logger * if err := db.WithContext(ctx). Where("sequence >= ? AND sequence <= ?", startSeq, endSeq). Find(&actualEntries).Error; err != nil { - return 0, 0, fmt.Errorf("failed to query tx_accounts: %w", err) + return lastProcessedSeq, 0, 0, fmt.Errorf("failed to query tx_accounts: %w", err) } // Group actual entries by sequence @@ -55,25 +57,22 @@ func ProcessBatch(ctx context.Context, db *gorm.DB, cfg *config.Config, logger * for _, collectedTx := range txs { select { case <-ctx.Done(): - return totalDeleted, totalInserted, ctx.Err() + return lastProcessedSeq, totalDeleted, totalInserted, ctx.Err() default: } deleted, inserted, err := reconcileTx(ctx, db, logger, collectedTx, actualBySeq[collectedTx.Sequence], isEVM) if err != nil { hashStr := hex.EncodeToString(collectedTx.Hash) - logger.Warn("failed to reconcile tx", - slog.String("tx_hash", hashStr), - slog.Int64("sequence", collectedTx.Sequence), - slog.Any("error", err)) - continue + return lastProcessedSeq, totalDeleted, totalInserted, fmt.Errorf("failed to reconcile tx %s at sequence %d: %w", hashStr, collectedTx.Sequence, err) } totalDeleted += deleted totalInserted += inserted + lastProcessedSeq = collectedTx.Sequence } - return totalDeleted, totalInserted, nil + return endSeq, totalDeleted, totalInserted, nil } func reconcileTx(ctx context.Context, db *gorm.DB, logger *slog.Logger, collectedTx types.CollectedTx, actual []types.CollectedTxAccount, isEVM bool) (deleted, inserted int64, err error) { diff --git a/indexer/extension/txaccountcleanup/extension.go b/indexer/extension/txaccountcleanup/extension.go index e5292cde..ef0b7fb4 100644 --- a/indexer/extension/txaccountcleanup/extension.go +++ b/indexer/extension/txaccountcleanup/extension.go @@ -106,27 +106,31 @@ func (e *TxAccountCleanupExtension) Run(ctx context.Context) error { endSeq := min(currentSeq+BatchSize-1, maxSeq) - deleted, inserted, err := ProcessBatch(ctx, e.db.DB, e.cfg, e.logger, currentSeq, endSeq) - if err != nil { - return fmt.Errorf("failed to process batch [%d-%d]: %w", currentSeq, endSeq, err) - } + lastProcessedSeq, deleted, inserted, batchErr := ProcessBatch(ctx, e.db.DB, e.cfg, e.logger, currentSeq, endSeq) - status.LastCleanedSequence = endSeq - status.DeletedRecords += deleted - status.InsertedRecords += inserted + // Checkpoint progress if any sequences were successfully processed + if lastProcessedSeq >= currentSeq { + status.LastCleanedSequence = lastProcessedSeq + status.DeletedRecords += deleted + status.InsertedRecords += inserted - if err := e.updateStatus(ctx, status); err != nil { - return fmt.Errorf("failed to update status: %w", err) - } + if err := e.updateStatus(ctx, status); err != nil { + return fmt.Errorf("failed to update status: %w", err) + } - if deleted > 0 || inserted > 0 { - e.logger.Info("tx account cleanup processed batch", - slog.Int64("end_sequence", endSeq), - slog.Int64("batch_deleted", deleted), - slog.Int64("batch_inserted", inserted)) + if deleted > 0 || inserted > 0 { + e.logger.Info("tx account cleanup processed batch", + slog.Int64("end_sequence", lastProcessedSeq), + slog.Int64("batch_deleted", deleted), + slog.Int64("batch_inserted", inserted)) + } + + currentSeq = lastProcessedSeq + 1 } - currentSeq = endSeq + 1 + if batchErr != nil { + return fmt.Errorf("failed to process batch [%d-%d]: %w", currentSeq, endSeq, batchErr) + } } } From 6dab705c57fb8fc208553b219adf8c08c07c6eec Mon Sep 17 00:00:00 2001 From: songwongtp <16089160+songwongtp@users.noreply.github.com> Date: Wed, 25 Mar 2026 13:59:43 +0700 Subject: [PATCH 5/9] refactor: improve tx account cleanup by processing sequences in reverse, fixing stale signer flags, and centralizing address deduplication. --- indexer/collector/tx/address.go | 2 +- indexer/extension/txaccountcleanup/cleanup.go | 78 ++++++++++--------- .../extension/txaccountcleanup/extension.go | 63 ++++++++------- util/cache/cache.go | 12 ++- 4 files changed, 88 insertions(+), 67 deletions(-) diff --git a/indexer/collector/tx/address.go b/indexer/collector/tx/address.go index 9a696959..772c368c 100644 --- a/indexer/collector/tx/address.go +++ b/indexer/collector/tx/address.go @@ -40,7 +40,7 @@ func findAllMoveHexAddress(attr string) []string { } func GrepAddressesFromTx(events []abci.Event, tx *gorm.DB) (grepped []string, err error) { - storeAddrMap := make(map[string]interface{}) // set of fa store addrs + storeAddrMap := make(map[string]any) // set of fa store addrs for _, event := range events { for idx, attr := range event.Attributes { diff --git a/indexer/extension/txaccountcleanup/cleanup.go b/indexer/extension/txaccountcleanup/cleanup.go index f83f8d15..5f2ff4e3 100644 --- a/indexer/extension/txaccountcleanup/cleanup.go +++ b/indexer/extension/txaccountcleanup/cleanup.go @@ -10,9 +10,7 @@ import ( abci "github.com/cometbft/cometbft/abci/types" "gorm.io/gorm" - "github.com/initia-labs/rollytics/config" tx "github.com/initia-labs/rollytics/indexer/collector/tx" - "github.com/initia-labs/rollytics/indexer/extension/evmret" "github.com/initia-labs/rollytics/orm" "github.com/initia-labs/rollytics/types" "github.com/initia-labs/rollytics/util/cache" @@ -22,7 +20,7 @@ type txDataWithEvents struct { Events []abci.Event `json:"events"` } -func ProcessBatch(ctx context.Context, db *gorm.DB, cfg *config.Config, logger *slog.Logger, startSeq, endSeq int64) (lastProcessedSeq, totalDeleted, totalInserted int64, err error) { +func ProcessBatch(ctx context.Context, db *gorm.DB, logger *slog.Logger, startSeq, endSeq int64) (lastProcessedSeq, totalDeleted, totalInserted int64, err error) { lastProcessedSeq = startSeq - 1 // Query txs in sequence range @@ -52,8 +50,6 @@ func ProcessBatch(ctx context.Context, db *gorm.DB, cfg *config.Config, logger * actualBySeq[entry.Sequence] = append(actualBySeq[entry.Sequence], entry) } - isEVM := cfg.GetVmType() == types.EVM - for _, collectedTx := range txs { select { case <-ctx.Done(): @@ -61,7 +57,7 @@ func ProcessBatch(ctx context.Context, db *gorm.DB, cfg *config.Config, logger * default: } - deleted, inserted, err := reconcileTx(ctx, db, logger, collectedTx, actualBySeq[collectedTx.Sequence], isEVM) + deleted, inserted, err := reconcileTx(ctx, db, logger, collectedTx, actualBySeq[collectedTx.Sequence]) if err != nil { hashStr := hex.EncodeToString(collectedTx.Hash) return lastProcessedSeq, totalDeleted, totalInserted, fmt.Errorf("failed to reconcile tx %s at sequence %d: %w", hashStr, collectedTx.Sequence, err) @@ -75,7 +71,9 @@ func ProcessBatch(ctx context.Context, db *gorm.DB, cfg *config.Config, logger * return endSeq, totalDeleted, totalInserted, nil } -func reconcileTx(ctx context.Context, db *gorm.DB, logger *slog.Logger, collectedTx types.CollectedTx, actual []types.CollectedTxAccount, isEVM bool) (deleted, inserted int64, err error) { +func reconcileTx(ctx context.Context, db *gorm.DB, logger *slog.Logger, collectedTx types.CollectedTx, actual []types.CollectedTxAccount) (deleted, inserted int64, err error) { + ctxDB := db.WithContext(ctx) + // Parse events from stored tx data var txData txDataWithEvents if err := json.Unmarshal(collectedTx.Data, &txData); err != nil { @@ -83,23 +81,13 @@ func reconcileTx(ctx context.Context, db *gorm.DB, logger *slog.Logger, collecte } // Re-derive addresses from events - addrs, err := tx.GrepAddressesFromTx(txData.Events, db) + addrs, err := tx.GrepAddressesFromTx(txData.Events, ctxDB) if err != nil { return 0, 0, fmt.Errorf("failed to grep addresses: %w", err) } - // Deduplicate addresses - addrMap := make(map[string]struct{}) - for _, addr := range addrs { - addrMap[addr] = struct{}{} - } - var uniqueAddrs []string - for addr := range addrMap { - uniqueAddrs = append(uniqueAddrs, addr) - } - // Convert to account IDs (don't create new accounts) - accountIdMap, err := cache.GetOrCreateAccountIds(db, uniqueAddrs, false) + accountIdMap, err := cache.GetOrCreateAccountIds(ctxDB, addrs, false) if err != nil { return 0, 0, fmt.Errorf("failed to get account IDs: %w", err) } @@ -112,24 +100,7 @@ func reconcileTx(ctx context.Context, db *gorm.DB, logger *slog.Logger, collecte } } - // EVM ret-only safety: remove ret-only addresses from expected set before insert - if isEVM { - retOnlyAddrs, err := evmret.FindRetOnlyAddresses(collectedTx.Data) - if err != nil { - return 0, 0, fmt.Errorf("failed to find ret-only addresses: %w", err) - } - if len(retOnlyAddrs) > 0 { - retAccountIds, err := cache.GetOrCreateAccountIds(db, retOnlyAddrs, false) - if err != nil { - return 0, 0, fmt.Errorf("failed to get ret-only account IDs: %w", err) - } - for _, id := range retAccountIds { - delete(expectedSet, id) - } - } - } - - // Add signer AFTER ret-only filtering (signer is always expected) + // Add signer (always expected) if collectedTx.SignerId != 0 { expectedSet[collectedTx.SignerId] = struct{}{} } @@ -160,7 +131,19 @@ func reconcileTx(ctx context.Context, db *gorm.DB, logger *slog.Logger, collecte } } - if len(extraIds) == 0 && len(missingEntries) == 0 { + // Compute stale signer flags (in both sets but wrong Signer value) → UPDATE + var staleSigner []int64 + for accountId, entry := range actualSet { + if _, ok := expectedSet[accountId]; !ok { + continue + } + expectedSigner := accountId == collectedTx.SignerId + if entry.Signer != expectedSigner { + staleSigner = append(staleSigner, accountId) + } + } + + if len(extraIds) == 0 && len(missingEntries) == 0 && len(staleSigner) == 0 { return 0, 0, nil } @@ -197,6 +180,25 @@ func reconcileTx(ctx context.Context, db *gorm.DB, logger *slog.Logger, collecte slog.Int64("count", inserted)) } + // Fix stale signer flags + if len(staleSigner) > 0 { + for _, accountId := range staleSigner { + expectedSigner := accountId == collectedTx.SignerId + result := txDB. + Model(&types.CollectedTxAccount{}). + Where("account_id = ? AND sequence = ?", accountId, collectedTx.Sequence). + Update("signer", expectedSigner) + if result.Error != nil { + return fmt.Errorf("failed to update signer flag: %w", result.Error) + } + } + + logger.Info("fixed stale signer flags", + slog.String("tx_hash", hashStr), + slog.Int64("sequence", collectedTx.Sequence), + slog.Int("count", len(staleSigner))) + } + return nil }) diff --git a/indexer/extension/txaccountcleanup/extension.go b/indexer/extension/txaccountcleanup/extension.go index ef0b7fb4..06a3f913 100644 --- a/indexer/extension/txaccountcleanup/extension.go +++ b/indexer/extension/txaccountcleanup/extension.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "log/slog" - "time" "gorm.io/gorm" @@ -49,8 +48,10 @@ func (e *TxAccountCleanupExtension) Initialize(ctx context.Context) (*types.Coll err := e.db.WithContext(ctx).First(&status).Error if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { + // Initialize with -1 to indicate no cleanup has been done yet; + // Run() will set it to max(sequence) on first iteration. status = types.CollectedTxAccountCleanupStatus{ - LastCleanedSequence: 0, + LastCleanedSequence: -1, DeletedRecords: 0, InsertedRecords: 0, } @@ -72,8 +73,25 @@ func (e *TxAccountCleanupExtension) Run(ctx context.Context) error { return fmt.Errorf("failed to initialize: %w", err) } - currentSeq := status.LastCleanedSequence + 1 - e.logger.Info("starting tx account cleanup", + // Determine starting point: scan from max sequence down to 0 + currentSeq := status.LastCleanedSequence + if currentSeq < 0 { + // First run: start from the current max sequence + var maxSeq int64 + if err := e.db.WithContext(ctx). + Model(&types.CollectedTx{}). + Select("COALESCE(MAX(sequence), 0)"). + Scan(&maxSeq).Error; err != nil { + return fmt.Errorf("failed to get max sequence: %w", err) + } + currentSeq = maxSeq + status.LastCleanedSequence = currentSeq + if err := e.updateStatus(ctx, status); err != nil { + return fmt.Errorf("failed to update status: %w", err) + } + } + + e.logger.Info("starting tx account cleanup (reverse)", slog.Int64("start_sequence", currentSeq)) for { @@ -87,30 +105,20 @@ func (e *TxAccountCleanupExtension) Run(ctx context.Context) error { default: } - var maxSeq int64 - if err := e.db.WithContext(ctx). - Model(&types.CollectedTx{}). - Select("COALESCE(MAX(sequence), 0)"). - Scan(&maxSeq).Error; err != nil { - return fmt.Errorf("failed to get max sequence: %w", err) - } - - if currentSeq > maxSeq { - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(1 * time.Second): - } - continue + if currentSeq <= 0 { + e.logger.Info("cleanup complete", + slog.Int64("deleted_records", status.DeletedRecords), + slog.Int64("inserted_records", status.InsertedRecords)) + return nil } - endSeq := min(currentSeq+BatchSize-1, maxSeq) + startSeq := max(currentSeq-BatchSize+1, 1) - lastProcessedSeq, deleted, inserted, batchErr := ProcessBatch(ctx, e.db.DB, e.cfg, e.logger, currentSeq, endSeq) + lastProcessedSeq, deleted, inserted, batchErr := ProcessBatch(ctx, e.db.DB, e.logger, startSeq, currentSeq) // Checkpoint progress if any sequences were successfully processed - if lastProcessedSeq >= currentSeq { - status.LastCleanedSequence = lastProcessedSeq + if lastProcessedSeq >= startSeq { + status.LastCleanedSequence = startSeq - 1 status.DeletedRecords += deleted status.InsertedRecords += inserted @@ -120,16 +128,17 @@ func (e *TxAccountCleanupExtension) Run(ctx context.Context) error { if deleted > 0 || inserted > 0 { e.logger.Info("tx account cleanup processed batch", - slog.Int64("end_sequence", lastProcessedSeq), + slog.Int64("start_sequence", startSeq), + slog.Int64("end_sequence", currentSeq), slog.Int64("batch_deleted", deleted), slog.Int64("batch_inserted", inserted)) } - currentSeq = lastProcessedSeq + 1 + currentSeq = startSeq - 1 } if batchErr != nil { - return fmt.Errorf("failed to process batch [%d-%d]: %w", currentSeq, endSeq, batchErr) + return fmt.Errorf("failed to process batch [%d-%d]: %w", startSeq, currentSeq, batchErr) } } } @@ -138,7 +147,7 @@ func (e *TxAccountCleanupExtension) updateStatus(ctx context.Context, status *ty return e.db.WithContext(ctx). Model(&types.CollectedTxAccountCleanupStatus{}). Where("1 = 1"). - Updates(map[string]interface{}{ + Updates(map[string]any{ "last_cleaned_sequence": status.LastCleanedSequence, "deleted_records": status.DeletedRecords, "inserted_records": status.InsertedRecords, diff --git a/util/cache/cache.go b/util/cache/cache.go index 91bb16ea..7cc121fe 100644 --- a/util/cache/cache.go +++ b/util/cache/cache.go @@ -141,8 +141,18 @@ func updateAccountCacheAndResult(uncached []string, accountIdMap map[string]int6 } func GetOrCreateAccountIds(db *gorm.DB, accounts []string, createNew bool) (idMap map[string]int64, err error) { + // Deduplicate input accounts + seen := make(map[string]struct{}, len(accounts)) + deduped := make([]string, 0, len(accounts)) + for _, account := range accounts { + if _, ok := seen[account]; !ok { + seen[account] = struct{}{} + deduped = append(deduped, account) + } + } + // Check cache and collect uncached accounts - idMap, uncached := checkAccountCache(accounts) + idMap, uncached := checkAccountCache(deduped) if len(uncached) == 0 { return idMap, nil From 879c443ffacd2a1fff045e890fd3ea75fe859a6c Mon Sep 17 00:00:00 2001 From: songwongtp <16089160+songwongtp@users.noreply.github.com> Date: Wed, 25 Mar 2026 14:16:06 +0700 Subject: [PATCH 6/9] Refactor: Unconditionally update tx account cleanup batch status and optimize stale signer flag corrections. --- indexer/extension/txaccountcleanup/cleanup.go | 27 +++++++++---- .../extension/txaccountcleanup/extension.go | 38 +++++++++---------- 2 files changed, 36 insertions(+), 29 deletions(-) diff --git a/indexer/extension/txaccountcleanup/cleanup.go b/indexer/extension/txaccountcleanup/cleanup.go index 5f2ff4e3..f8214b5d 100644 --- a/indexer/extension/txaccountcleanup/cleanup.go +++ b/indexer/extension/txaccountcleanup/cleanup.go @@ -182,14 +182,25 @@ func reconcileTx(ctx context.Context, db *gorm.DB, logger *slog.Logger, collecte // Fix stale signer flags if len(staleSigner) > 0 { - for _, accountId := range staleSigner { - expectedSigner := accountId == collectedTx.SignerId - result := txDB. - Model(&types.CollectedTxAccount{}). - Where("account_id = ? AND sequence = ?", accountId, collectedTx.Sequence). - Update("signer", expectedSigner) - if result.Error != nil { - return fmt.Errorf("failed to update signer flag: %w", result.Error) + // Set signer=false for all stale entries + if err := txDB. + Model(&types.CollectedTxAccount{}). + Where("account_id IN ? AND sequence = ?", staleSigner, collectedTx.Sequence). + Update("signer", false).Error; err != nil { + return fmt.Errorf("failed to clear stale signer flags: %w", err) + } + // Set signer=true for the actual signer if it was in the stale list + if collectedTx.SignerId != 0 { + for _, id := range staleSigner { + if id == collectedTx.SignerId { + if err := txDB. + Model(&types.CollectedTxAccount{}). + Where("account_id = ? AND sequence = ?", collectedTx.SignerId, collectedTx.Sequence). + Update("signer", true).Error; err != nil { + return fmt.Errorf("failed to set signer flag: %w", err) + } + break + } } } diff --git a/indexer/extension/txaccountcleanup/extension.go b/indexer/extension/txaccountcleanup/extension.go index 06a3f913..e56d348f 100644 --- a/indexer/extension/txaccountcleanup/extension.go +++ b/indexer/extension/txaccountcleanup/extension.go @@ -114,32 +114,28 @@ func (e *TxAccountCleanupExtension) Run(ctx context.Context) error { startSeq := max(currentSeq-BatchSize+1, 1) - lastProcessedSeq, deleted, inserted, batchErr := ProcessBatch(ctx, e.db.DB, e.logger, startSeq, currentSeq) - - // Checkpoint progress if any sequences were successfully processed - if lastProcessedSeq >= startSeq { - status.LastCleanedSequence = startSeq - 1 - status.DeletedRecords += deleted - status.InsertedRecords += inserted - - if err := e.updateStatus(ctx, status); err != nil { - return fmt.Errorf("failed to update status: %w", err) - } + _, deleted, inserted, batchErr := ProcessBatch(ctx, e.db.DB, e.logger, startSeq, currentSeq) + if batchErr != nil { + return fmt.Errorf("failed to process batch [%d-%d]: %w", startSeq, currentSeq, batchErr) + } - if deleted > 0 || inserted > 0 { - e.logger.Info("tx account cleanup processed batch", - slog.Int64("start_sequence", startSeq), - slog.Int64("end_sequence", currentSeq), - slog.Int64("batch_deleted", deleted), - slog.Int64("batch_inserted", inserted)) - } + status.LastCleanedSequence = startSeq - 1 + status.DeletedRecords += deleted + status.InsertedRecords += inserted - currentSeq = startSeq - 1 + if err := e.updateStatus(ctx, status); err != nil { + return fmt.Errorf("failed to update status: %w", err) } - if batchErr != nil { - return fmt.Errorf("failed to process batch [%d-%d]: %w", startSeq, currentSeq, batchErr) + if deleted > 0 || inserted > 0 { + e.logger.Info("tx account cleanup processed batch", + slog.Int64("start_sequence", startSeq), + slog.Int64("end_sequence", currentSeq), + slog.Int64("batch_deleted", deleted), + slog.Int64("batch_inserted", inserted)) } + + currentSeq = startSeq - 1 } } From 7f01999d95c9f87ba0a1c1b0269e453d2cf2a7ce Mon Sep 17 00:00:00 2001 From: songwongtp <16089160+songwongtp@users.noreply.github.com> Date: Wed, 25 Mar 2026 15:11:56 +0700 Subject: [PATCH 7/9] feat: Add Tx Account Cleanup status fields to the `/status` endpoint and implement logic to retrieve and expose them. --- util/common-handler/status/status.go | 59 +++++++++++++++++++++++----- util/common-handler/status/types.go | 17 ++++---- 2 files changed, 59 insertions(+), 17 deletions(-) diff --git a/util/common-handler/status/status.go b/util/common-handler/status/status.go index d80e2217..bc58d14b 100644 --- a/util/common-handler/status/status.go +++ b/util/common-handler/status/status.go @@ -13,9 +13,10 @@ import ( ) var ( - lastEvmInternalTxHeight atomic.Int64 - lastRichListHeight atomic.Int64 - lastEvmRetCleanupHeight atomic.Int64 + lastEvmInternalTxHeight atomic.Int64 + lastRichListHeight atomic.Int64 + lastEvmRetCleanupHeight atomic.Int64 + lastTxAccountCleanupStatus atomic.Pointer[types.CollectedTxAccountCleanupStatus] ) // status handles GET /status @@ -68,14 +69,26 @@ func (h *StatusHandler) GetStatus(c *fiber.Ctx) error { evmRetCleanupHeight = height } + var txAccountCleanupStatus types.CollectedTxAccountCleanupStatus + if h.isTxAccountCleanupEnabled() { + status, err := h.getTxAccountCleanupStatus(tx) + if err != nil { + return err + } + txAccountCleanupStatus = *status + } + return c.JSON(&StatusResponse{ - Version: config.Version, - CommitHash: config.CommitHash, - ChainId: h.GetChainId(), - Height: lastBlock.Height, - InternalTxHeight: internalTxHeight, - RichListHeight: richListHeight, - EvmRetCleanupHeight: evmRetCleanupHeight, + Version: config.Version, + CommitHash: config.CommitHash, + ChainId: h.GetChainId(), + Height: lastBlock.Height, + InternalTxHeight: internalTxHeight, + RichListHeight: richListHeight, + EvmRetCleanupHeight: evmRetCleanupHeight, + TxAccountCleanupSequence: txAccountCleanupStatus.LastCleanedSequence, + TxAccountCleanupDeleted: txAccountCleanupStatus.DeletedRecords, + TxAccountCleanupInserted: txAccountCleanupStatus.InsertedRecords, }) } @@ -147,6 +160,32 @@ func (h *StatusHandler) isEvmRetCleanupEnabled() bool { return (h.GetChainConfig().VmType == types.EVM) && h.GetConfig().EvmRetCleanupEnabled() } +func (h *StatusHandler) isTxAccountCleanupEnabled() bool { + return h.GetConfig().TxAccountCleanupEnabled() +} + +func (h *StatusHandler) getTxAccountCleanupStatus(tx *gorm.DB) (*types.CollectedTxAccountCleanupStatus, error) { + var cleanupStatus types.CollectedTxAccountCleanupStatus + err := tx.Model(&types.CollectedTxAccountCleanupStatus{}).First(&cleanupStatus).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return &types.CollectedTxAccountCleanupStatus{ + LastCleanedSequence: -1, + DeletedRecords: -1, + InsertedRecords: -1, + }, nil + } + return nil, fiber.NewError(fiber.StatusInternalServerError, err.Error()) + } + + cached := lastTxAccountCleanupStatus.Load() + if cached == nil || cleanupStatus.LastCleanedSequence < cached.LastCleanedSequence { + lastTxAccountCleanupStatus.Store(&cleanupStatus) + } + + return &cleanupStatus, nil +} + func (h *StatusHandler) getEvmRetCleanupStatus(tx *gorm.DB) (int64, error) { height := lastEvmRetCleanupHeight.Load() diff --git a/util/common-handler/status/types.go b/util/common-handler/status/types.go index c79db914..460eef0a 100644 --- a/util/common-handler/status/types.go +++ b/util/common-handler/status/types.go @@ -1,11 +1,14 @@ package status type StatusResponse struct { - Version string `json:"version" extensions:"x-order:0"` - CommitHash string `json:"commit_hash" extensions:"x-order:1"` - ChainId string `json:"chain_id" extensions:"x-order:2"` - Height int64 `json:"height" extensions:"x-order:3"` - InternalTxHeight int64 `json:"internal_tx_height,omitempty" extensions:"x-order:4"` - RichListHeight int64 `json:"rich_list_height,omitempty" extensions:"x-order:5"` - EvmRetCleanupHeight int64 `json:"evm_ret_cleanup_height,omitempty" extensions:"x-order:6"` + Version string `json:"version" extensions:"x-order:0"` + CommitHash string `json:"commit_hash" extensions:"x-order:1"` + ChainId string `json:"chain_id" extensions:"x-order:2"` + Height int64 `json:"height" extensions:"x-order:3"` + InternalTxHeight int64 `json:"internal_tx_height,omitempty" extensions:"x-order:4"` + RichListHeight int64 `json:"rich_list_height,omitempty" extensions:"x-order:5"` + EvmRetCleanupHeight int64 `json:"evm_ret_cleanup_height,omitempty" extensions:"x-order:6"` + TxAccountCleanupSequence int64 `json:"tx_account_cleanup_sequence,omitempty" extensions:"x-order:7"` + TxAccountCleanupDeleted int64 `json:"tx_account_cleanup_deleted,omitempty" extensions:"x-order:8"` + TxAccountCleanupInserted int64 `json:"tx_account_cleanup_inserted,omitempty" extensions:"x-order:9"` } From c7b8c3e2299958e3e1c672073db6f8b051ba5dc7 Mon Sep 17 00:00:00 2001 From: songwongtp <16089160+songwongtp@users.noreply.github.com> Date: Wed, 25 Mar 2026 15:26:50 +0700 Subject: [PATCH 8/9] feat: add EVM return and transaction account cleanup fields to API documentation. --- api/docs/docs.go | 16 ++++++++++++++++ api/docs/swagger.json | 16 ++++++++++++++++ api/docs/swagger.yaml | 12 ++++++++++++ 3 files changed, 44 insertions(+) diff --git a/api/docs/docs.go b/api/docs/docs.go index 236f18a7..06aef1bb 100644 --- a/api/docs/docs.go +++ b/api/docs/docs.go @@ -1465,6 +1465,10 @@ const docTemplate = `{ "type": "string", "x-order:1": true }, + "evm_ret_cleanup_height": { + "type": "integer", + "x-order:6": true + }, "height": { "type": "integer", "x-order:3": true @@ -1477,6 +1481,18 @@ const docTemplate = `{ "type": "integer", "x-order:5": true }, + "tx_account_cleanup_deleted": { + "type": "integer", + "x-order:8": true + }, + "tx_account_cleanup_inserted": { + "type": "integer", + "x-order:9": true + }, + "tx_account_cleanup_sequence": { + "type": "integer", + "x-order:7": true + }, "version": { "type": "string", "x-order:0": true diff --git a/api/docs/swagger.json b/api/docs/swagger.json index 18a59e04..f336936a 100644 --- a/api/docs/swagger.json +++ b/api/docs/swagger.json @@ -1454,6 +1454,10 @@ "type": "string", "x-order:1": true }, + "evm_ret_cleanup_height": { + "type": "integer", + "x-order:6": true + }, "height": { "type": "integer", "x-order:3": true @@ -1466,6 +1470,18 @@ "type": "integer", "x-order:5": true }, + "tx_account_cleanup_deleted": { + "type": "integer", + "x-order:8": true + }, + "tx_account_cleanup_inserted": { + "type": "integer", + "x-order:9": true + }, + "tx_account_cleanup_sequence": { + "type": "integer", + "x-order:7": true + }, "version": { "type": "string", "x-order:0": true diff --git a/api/docs/swagger.yaml b/api/docs/swagger.yaml index b7d31536..fc771f28 100644 --- a/api/docs/swagger.yaml +++ b/api/docs/swagger.yaml @@ -143,6 +143,9 @@ definitions: commit_hash: type: string x-order:1: true + evm_ret_cleanup_height: + type: integer + x-order:6: true height: type: integer x-order:3: true @@ -152,6 +155,15 @@ definitions: rich_list_height: type: integer x-order:5: true + tx_account_cleanup_deleted: + type: integer + x-order:8: true + tx_account_cleanup_inserted: + type: integer + x-order:9: true + tx_account_cleanup_sequence: + type: integer + x-order:7: true version: type: string x-order:0: true From e2b9d02ef08ab4d18c329e9cb9832dd8372edc30 Mon Sep 17 00:00:00 2001 From: songwongtp <16089160+songwongtp@users.noreply.github.com> Date: Wed, 25 Mar 2026 15:33:05 +0700 Subject: [PATCH 9/9] fix: Add nil check for `txAccountCleanupConfig` in `TxAccountCleanupEnabled` method. --- config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/config.go b/config/config.go index 6b46ba10..f25f2517 100644 --- a/config/config.go +++ b/config/config.go @@ -477,7 +477,7 @@ func (c *Config) SetEvmRetCleanupConfig(evmRetCleanCgf *EvmRetCleanupConfig) { } func (c Config) TxAccountCleanupEnabled() bool { - return c.txAccountCleanupConfig.Enabled + return c.txAccountCleanupConfig != nil && c.txAccountCleanupConfig.Enabled } func (c Config) GetSentryConfig() *SentryConfig {