diff --git a/api/docs/docs.go b/api/docs/docs.go index 236f18a7..06aef1bb 100644 --- a/api/docs/docs.go +++ b/api/docs/docs.go @@ -1465,6 +1465,10 @@ const docTemplate = `{ "type": "string", "x-order:1": true }, + "evm_ret_cleanup_height": { + "type": "integer", + "x-order:6": true + }, "height": { "type": "integer", "x-order:3": true @@ -1477,6 +1481,18 @@ const docTemplate = `{ "type": "integer", "x-order:5": true }, + "tx_account_cleanup_deleted": { + "type": "integer", + "x-order:8": true + }, + "tx_account_cleanup_inserted": { + "type": "integer", + "x-order:9": true + }, + "tx_account_cleanup_sequence": { + "type": "integer", + "x-order:7": true + }, "version": { "type": "string", "x-order:0": true diff --git a/api/docs/swagger.json b/api/docs/swagger.json index 18a59e04..f336936a 100644 --- a/api/docs/swagger.json +++ b/api/docs/swagger.json @@ -1454,6 +1454,10 @@ "type": "string", "x-order:1": true }, + "evm_ret_cleanup_height": { + "type": "integer", + "x-order:6": true + }, "height": { "type": "integer", "x-order:3": true @@ -1466,6 +1470,18 @@ "type": "integer", "x-order:5": true }, + "tx_account_cleanup_deleted": { + "type": "integer", + "x-order:8": true + }, + "tx_account_cleanup_inserted": { + "type": "integer", + "x-order:9": true + }, + "tx_account_cleanup_sequence": { + "type": "integer", + "x-order:7": true + }, "version": { "type": "string", "x-order:0": true diff --git a/api/docs/swagger.yaml b/api/docs/swagger.yaml index b7d31536..fc771f28 100644 --- a/api/docs/swagger.yaml +++ b/api/docs/swagger.yaml @@ -143,6 +143,9 @@ definitions: commit_hash: type: string x-order:1: true + evm_ret_cleanup_height: + type: integer + x-order:6: true height: type: integer x-order:3: true @@ -152,6 +155,15 @@ definitions: rich_list_height: type: integer x-order:5: true + tx_account_cleanup_deleted: + type: integer + x-order:8: true + tx_account_cleanup_inserted: + type: integer + x-order:9: true + tx_account_cleanup_sequence: + type: integer + x-order:7: true version: type: string x-order:0: true diff --git a/config/config.go b/config/config.go index 570f838d..f25f2517 100644 --- a/config/config.go +++ b/config/config.go @@ -129,26 +129,27 @@ func SetBuildInfo(v, commit string) { } type Config struct { - listenPort string - recvBufSize uint - indexerListenPort string - dbConfig *dbconfig.Config - chainConfig *ChainConfig - logLevel string - logFormat string - coolingDuration time.Duration // for indexer only - queryTimeout time.Duration // for indexer only - maxConcurrentRequests int // for indexer only - cacheSize int - cacheTTL time.Duration // for api only - pollingInterval time.Duration // for api only - internalTxConfig *InternalTxConfig - richListConfig *RichListConfig - evmRetCleanupConfig *EvmRetCleanupConfig - metricsConfig *MetricsConfig - cacheConfig *CacheConfig - sentryConfig *SentryConfig - corsConfig *CORSConfig + listenPort string + recvBufSize uint + indexerListenPort string + dbConfig *dbconfig.Config + chainConfig *ChainConfig + logLevel string + logFormat string + coolingDuration time.Duration // for indexer only + queryTimeout time.Duration // for indexer only + maxConcurrentRequests int // for indexer only + cacheSize int + cacheTTL time.Duration // for api only + pollingInterval time.Duration // for api only + internalTxConfig *InternalTxConfig + richListConfig *RichListConfig + evmRetCleanupConfig *EvmRetCleanupConfig + txAccountCleanupConfig *TxAccountCleanupConfig + metricsConfig *MetricsConfig + cacheConfig *CacheConfig + sentryConfig *SentryConfig + corsConfig *CORSConfig // Start height configuration startHeight int64 // explicit start height when set @@ -177,6 +178,7 @@ func setDefaults() { viper.SetDefault("INTERNAL_TX_BATCH_SIZE", DefaultInternalTxBatchSize) viper.SetDefault("INTERNAL_TX_QUEUE_SIZE", DefaultInternalTxQueueSize) viper.SetDefault("RICH_LIST", true) + viper.SetDefault("TX_ACCOUNT_CLEANUP", true) viper.SetDefault("METRICS_ENABLED", false) viper.SetDefault("METRICS_PATH", DefaultMetricsPath) viper.SetDefault("METRICS_PORT", DefaultMetricsPort) @@ -297,6 +299,9 @@ func loadConfig() (*Config, error) { evmRetCleanupConfig: &EvmRetCleanupConfig{ Enabled: viper.GetBool("EVM_RET_CLEANUP"), }, + txAccountCleanupConfig: &TxAccountCleanupConfig{ + Enabled: viper.GetBool("TX_ACCOUNT_CLEANUP"), + }, metricsConfig: &MetricsConfig{ Enabled: viper.GetBool("METRICS_ENABLED"), Path: viper.GetString("METRICS_PATH"), @@ -471,6 +476,10 @@ func (c *Config) SetEvmRetCleanupConfig(evmRetCleanCgf *EvmRetCleanupConfig) { c.evmRetCleanupConfig = evmRetCleanCgf } +func (c Config) TxAccountCleanupEnabled() bool { + return c.txAccountCleanupConfig != nil && c.txAccountCleanupConfig.Enabled +} + func (c Config) GetSentryConfig() *SentryConfig { if c.sentryConfig == nil || c.sentryConfig.DSN == "" { return nil diff --git a/config/txaccountcleanup.go b/config/txaccountcleanup.go new file mode 100644 index 00000000..fbd03ae0 --- /dev/null +++ b/config/txaccountcleanup.go @@ -0,0 +1,5 @@ +package config + +type TxAccountCleanupConfig struct { + Enabled bool +} diff --git a/indexer/collector/tx/address.go b/indexer/collector/tx/address.go index f636de97..772c368c 100644 --- a/indexer/collector/tx/address.go +++ b/indexer/collector/tx/address.go @@ -39,8 +39,8 @@ func findAllMoveHexAddress(attr string) []string { return regexMoveHex.FindAllString(attr, -1) } -func grepAddressesFromTx(events []abci.Event, tx *gorm.DB) (grepped []string, err error) { - storeAddrMap := make(map[string]interface{}) // set of fa store addrs +func GrepAddressesFromTx(events []abci.Event, tx *gorm.DB) (grepped []string, err error) { + storeAddrMap := make(map[string]any) // set of fa store addrs for _, event := range events { for idx, attr := range event.Attributes { diff --git a/indexer/collector/tx/collect.go b/indexer/collector/tx/collect.go index d24641cd..291d5319 100644 --- a/indexer/collector/tx/collect.go +++ b/indexer/collector/tx/collect.go @@ -128,7 +128,7 @@ func (sub *TxSubmodule) collect(block indexertypes.ScrapedBlock, tx *gorm.DB) er } // grep addresses from events - addrs, err := grepAddressesFromTx(res.Events, tx) + addrs, err := GrepAddressesFromTx(res.Events, tx) if err != nil { return err } diff --git a/indexer/extension/manager.go b/indexer/extension/manager.go index caf07495..d7807805 100644 --- a/indexer/extension/manager.go +++ b/indexer/extension/manager.go @@ -11,6 +11,7 @@ import ( evmret "github.com/initia-labs/rollytics/indexer/extension/evmret" internaltx "github.com/initia-labs/rollytics/indexer/extension/internaltx" richlist "github.com/initia-labs/rollytics/indexer/extension/richlist" + txaccountcleanup "github.com/initia-labs/rollytics/indexer/extension/txaccountcleanup" "github.com/initia-labs/rollytics/indexer/extension/types" "github.com/initia-labs/rollytics/orm" ) @@ -36,6 +37,10 @@ func New(cfg *config.Config, logger *slog.Logger, db *orm.Database) *ExtensionMa if retCleanup := evmret.New(cfg, logger, db); retCleanup != nil { extensions = append(extensions, retCleanup) } + // TX Account Cleanup + if taCleanup := txaccountcleanup.New(cfg, logger, db); taCleanup != nil { + extensions = append(extensions, taCleanup) + } return &ExtensionManager{ cfg: cfg, logger: logger, diff --git a/indexer/extension/txaccountcleanup/cleanup.go b/indexer/extension/txaccountcleanup/cleanup.go new file mode 100644 index 00000000..f8214b5d --- /dev/null +++ b/indexer/extension/txaccountcleanup/cleanup.go @@ -0,0 +1,217 @@ +package txaccountcleanup + +import ( + "context" + "encoding/hex" + "encoding/json" + "fmt" + "log/slog" + + abci "github.com/cometbft/cometbft/abci/types" + "gorm.io/gorm" + + tx "github.com/initia-labs/rollytics/indexer/collector/tx" + "github.com/initia-labs/rollytics/orm" + "github.com/initia-labs/rollytics/types" + "github.com/initia-labs/rollytics/util/cache" +) + +type txDataWithEvents struct { + Events []abci.Event `json:"events"` +} + +func ProcessBatch(ctx context.Context, db *gorm.DB, logger *slog.Logger, startSeq, endSeq int64) (lastProcessedSeq, totalDeleted, totalInserted int64, err error) { + lastProcessedSeq = startSeq - 1 + + // Query txs in sequence range + var txs []types.CollectedTx + if err := db.WithContext(ctx). + Where("sequence >= ? AND sequence <= ?", startSeq, endSeq). + Order("sequence ASC"). + Find(&txs).Error; err != nil { + return lastProcessedSeq, 0, 0, fmt.Errorf("failed to query transactions: %w", err) + } + + if len(txs) == 0 { + return endSeq, 0, 0, nil + } + + // Query actual tx_accounts in sequence range + var actualEntries []types.CollectedTxAccount + if err := db.WithContext(ctx). + Where("sequence >= ? AND sequence <= ?", startSeq, endSeq). + Find(&actualEntries).Error; err != nil { + return lastProcessedSeq, 0, 0, fmt.Errorf("failed to query tx_accounts: %w", err) + } + + // Group actual entries by sequence + actualBySeq := make(map[int64][]types.CollectedTxAccount) + for _, entry := range actualEntries { + actualBySeq[entry.Sequence] = append(actualBySeq[entry.Sequence], entry) + } + + for _, collectedTx := range txs { + select { + case <-ctx.Done(): + return lastProcessedSeq, totalDeleted, totalInserted, ctx.Err() + default: + } + + deleted, inserted, err := reconcileTx(ctx, db, logger, collectedTx, actualBySeq[collectedTx.Sequence]) + if err != nil { + hashStr := hex.EncodeToString(collectedTx.Hash) + return lastProcessedSeq, totalDeleted, totalInserted, fmt.Errorf("failed to reconcile tx %s at sequence %d: %w", hashStr, collectedTx.Sequence, err) + } + + totalDeleted += deleted + totalInserted += inserted + lastProcessedSeq = collectedTx.Sequence + } + + return endSeq, totalDeleted, totalInserted, nil +} + +func reconcileTx(ctx context.Context, db *gorm.DB, logger *slog.Logger, collectedTx types.CollectedTx, actual []types.CollectedTxAccount) (deleted, inserted int64, err error) { + ctxDB := db.WithContext(ctx) + + // Parse events from stored tx data + var txData txDataWithEvents + if err := json.Unmarshal(collectedTx.Data, &txData); err != nil { + return 0, 0, fmt.Errorf("failed to unmarshal tx data: %w", err) + } + + // Re-derive addresses from events + addrs, err := tx.GrepAddressesFromTx(txData.Events, ctxDB) + if err != nil { + return 0, 0, fmt.Errorf("failed to grep addresses: %w", err) + } + + // Convert to account IDs (don't create new accounts) + accountIdMap, err := cache.GetOrCreateAccountIds(ctxDB, addrs, false) + if err != nil { + return 0, 0, fmt.Errorf("failed to get account IDs: %w", err) + } + + // Build expected set (filter out id=0) + expectedSet := make(map[int64]struct{}) + for _, id := range accountIdMap { + if id != 0 { + expectedSet[id] = struct{}{} + } + } + + // Add signer (always expected) + if collectedTx.SignerId != 0 { + expectedSet[collectedTx.SignerId] = struct{}{} + } + + // Build actual set + actualSet := make(map[int64]types.CollectedTxAccount) + for _, entry := range actual { + actualSet[entry.AccountId] = entry + } + + // Compute extras (in actual but not expected) → DELETE + var extraIds []int64 + for accountId := range actualSet { + if _, ok := expectedSet[accountId]; !ok { + extraIds = append(extraIds, accountId) + } + } + + // Compute missing (in expected but not actual) → INSERT + var missingEntries []types.CollectedTxAccount + for accountId := range expectedSet { + if _, ok := actualSet[accountId]; !ok { + missingEntries = append(missingEntries, types.CollectedTxAccount{ + AccountId: accountId, + Sequence: collectedTx.Sequence, + Signer: accountId == collectedTx.SignerId, + }) + } + } + + // Compute stale signer flags (in both sets but wrong Signer value) → UPDATE + var staleSigner []int64 + for accountId, entry := range actualSet { + if _, ok := expectedSet[accountId]; !ok { + continue + } + expectedSigner := accountId == collectedTx.SignerId + if entry.Signer != expectedSigner { + staleSigner = append(staleSigner, accountId) + } + } + + if len(extraIds) == 0 && len(missingEntries) == 0 && len(staleSigner) == 0 { + return 0, 0, nil + } + + hashStr := hex.EncodeToString(collectedTx.Hash) + + err = db.WithContext(ctx).Transaction(func(txDB *gorm.DB) error { + // Delete extras + if len(extraIds) > 0 { + result := txDB. + Where("account_id IN ? AND sequence = ?", extraIds, collectedTx.Sequence). + Delete(&types.CollectedTxAccount{}) + if result.Error != nil { + return fmt.Errorf("failed to delete extra entries: %w", result.Error) + } + deleted = result.RowsAffected + + logger.Info("deleted extra tx_accounts entries", + slog.String("tx_hash", hashStr), + slog.Int64("sequence", collectedTx.Sequence), + slog.Int64("count", deleted)) + } + + // Insert missing + if len(missingEntries) > 0 { + result := txDB.Clauses(orm.DoNothingWhenConflict).Create(&missingEntries) + if result.Error != nil { + return fmt.Errorf("failed to insert missing entries: %w", result.Error) + } + inserted = result.RowsAffected + + logger.Info("inserted missing tx_accounts entries", + slog.String("tx_hash", hashStr), + slog.Int64("sequence", collectedTx.Sequence), + slog.Int64("count", inserted)) + } + + // Fix stale signer flags + if len(staleSigner) > 0 { + // Set signer=false for all stale entries + if err := txDB. + Model(&types.CollectedTxAccount{}). + Where("account_id IN ? AND sequence = ?", staleSigner, collectedTx.Sequence). + Update("signer", false).Error; err != nil { + return fmt.Errorf("failed to clear stale signer flags: %w", err) + } + // Set signer=true for the actual signer if it was in the stale list + if collectedTx.SignerId != 0 { + for _, id := range staleSigner { + if id == collectedTx.SignerId { + if err := txDB. + Model(&types.CollectedTxAccount{}). + Where("account_id = ? AND sequence = ?", collectedTx.SignerId, collectedTx.Sequence). + Update("signer", true).Error; err != nil { + return fmt.Errorf("failed to set signer flag: %w", err) + } + break + } + } + } + + logger.Info("fixed stale signer flags", + slog.String("tx_hash", hashStr), + slog.Int64("sequence", collectedTx.Sequence), + slog.Int("count", len(staleSigner))) + } + + return nil + }) + + return deleted, inserted, err +} diff --git a/indexer/extension/txaccountcleanup/extension.go b/indexer/extension/txaccountcleanup/extension.go new file mode 100644 index 00000000..e56d348f --- /dev/null +++ b/indexer/extension/txaccountcleanup/extension.go @@ -0,0 +1,151 @@ +package txaccountcleanup + +import ( + "context" + "errors" + "fmt" + "log/slog" + + "gorm.io/gorm" + + "github.com/initia-labs/rollytics/config" + exttypes "github.com/initia-labs/rollytics/indexer/extension/types" + "github.com/initia-labs/rollytics/orm" + "github.com/initia-labs/rollytics/types" +) + +const ( + ExtensionName = "tx-account-cleanup" + BatchSize = 1000 +) + +var _ exttypes.Extension = (*TxAccountCleanupExtension)(nil) + +type TxAccountCleanupExtension struct { + cfg *config.Config + logger *slog.Logger + db *orm.Database +} + +func New(cfg *config.Config, logger *slog.Logger, db *orm.Database) *TxAccountCleanupExtension { + if !cfg.TxAccountCleanupEnabled() { + return nil + } + + return &TxAccountCleanupExtension{ + cfg: cfg, + logger: logger.With("extension", ExtensionName), + db: db, + } +} + +func (e *TxAccountCleanupExtension) Name() string { + return ExtensionName +} + +func (e *TxAccountCleanupExtension) Initialize(ctx context.Context) (*types.CollectedTxAccountCleanupStatus, error) { + var status types.CollectedTxAccountCleanupStatus + err := e.db.WithContext(ctx).First(&status).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + // Initialize with -1 to indicate no cleanup has been done yet; + // Run() will set it to max(sequence) on first iteration. + status = types.CollectedTxAccountCleanupStatus{ + LastCleanedSequence: -1, + DeletedRecords: 0, + InsertedRecords: 0, + } + if err := e.db.WithContext(ctx).Create(&status).Error; err != nil { + return nil, fmt.Errorf("failed to create initial status: %w", err) + } + e.logger.Info("initialized cleanup status") + return &status, nil + } + return nil, fmt.Errorf("failed to retrieve cleanup status: %w", err) + } + + return &status, nil +} + +func (e *TxAccountCleanupExtension) Run(ctx context.Context) error { + status, err := e.Initialize(ctx) + if err != nil { + return fmt.Errorf("failed to initialize: %w", err) + } + + // Determine starting point: scan from max sequence down to 0 + currentSeq := status.LastCleanedSequence + if currentSeq < 0 { + // First run: start from the current max sequence + var maxSeq int64 + if err := e.db.WithContext(ctx). + Model(&types.CollectedTx{}). + Select("COALESCE(MAX(sequence), 0)"). + Scan(&maxSeq).Error; err != nil { + return fmt.Errorf("failed to get max sequence: %w", err) + } + currentSeq = maxSeq + status.LastCleanedSequence = currentSeq + if err := e.updateStatus(ctx, status); err != nil { + return fmt.Errorf("failed to update status: %w", err) + } + } + + e.logger.Info("starting tx account cleanup (reverse)", + slog.Int64("start_sequence", currentSeq)) + + for { + select { + case <-ctx.Done(): + e.logger.Info("cleanup stopped", + slog.Int64("last_cleaned_sequence", status.LastCleanedSequence), + slog.Int64("deleted_records", status.DeletedRecords), + slog.Int64("inserted_records", status.InsertedRecords)) + return ctx.Err() + default: + } + + if currentSeq <= 0 { + e.logger.Info("cleanup complete", + slog.Int64("deleted_records", status.DeletedRecords), + slog.Int64("inserted_records", status.InsertedRecords)) + return nil + } + + startSeq := max(currentSeq-BatchSize+1, 1) + + _, deleted, inserted, batchErr := ProcessBatch(ctx, e.db.DB, e.logger, startSeq, currentSeq) + if batchErr != nil { + return fmt.Errorf("failed to process batch [%d-%d]: %w", startSeq, currentSeq, batchErr) + } + + status.LastCleanedSequence = startSeq - 1 + status.DeletedRecords += deleted + status.InsertedRecords += inserted + + if err := e.updateStatus(ctx, status); err != nil { + return fmt.Errorf("failed to update status: %w", err) + } + + if deleted > 0 || inserted > 0 { + e.logger.Info("tx account cleanup processed batch", + slog.Int64("start_sequence", startSeq), + slog.Int64("end_sequence", currentSeq), + slog.Int64("batch_deleted", deleted), + slog.Int64("batch_inserted", inserted)) + } + + currentSeq = startSeq - 1 + } +} + +func (e *TxAccountCleanupExtension) updateStatus(ctx context.Context, status *types.CollectedTxAccountCleanupStatus) error { + return e.db.WithContext(ctx). + Model(&types.CollectedTxAccountCleanupStatus{}). + Where("1 = 1"). + Updates(map[string]any{ + "last_cleaned_sequence": status.LastCleanedSequence, + "deleted_records": status.DeletedRecords, + "inserted_records": status.InsertedRecords, + }).Error +} diff --git a/orm/migrations/20260409000000_add_tx_account_cleanup_status.sql b/orm/migrations/20260409000000_add_tx_account_cleanup_status.sql new file mode 100644 index 00000000..38305c85 --- /dev/null +++ b/orm/migrations/20260409000000_add_tx_account_cleanup_status.sql @@ -0,0 +1,6 @@ +-- Create "tx_account_cleanup_status" table +CREATE TABLE "public"."tx_account_cleanup_status" ( + "last_cleaned_sequence" bigint NULL, + "deleted_records" bigint NULL, + "inserted_records" bigint NULL +); diff --git a/orm/migrations/atlas.sum b/orm/migrations/atlas.sum index 71fcf941..f990bc4e 100644 --- a/orm/migrations/atlas.sum +++ b/orm/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:9m7zdGKVJkdc2VOvgpWviZE0aliHxKJ1keT+XydJNvo= +h1:kdj8FhrIlTWlahVSPIhdZiwfWXkeyE4uCptZLnuYiOc= 20250806084521_migration.sql h1:Qdn42AgebdtLQoc+aUfautynU10/oHxL8wjXusSqQaE= 20250822034114_migration.sql h1:ybJSC6AlidSpXS+oup6aYHchZFaOEkJU9C8lOnF0S68= 20250902111542_add_partial_indices.sql h1:Qc5PA4bCNP5tjhZrHFhscgc/Ap/Ee/mnmoPixefeRtw= @@ -10,3 +10,4 @@ h1:9m7zdGKVJkdc2VOvgpWviZE0aliHxKJ1keT+XydJNvo= 20251112100000_add_tx_height_sequence_composite_index.sql h1:bgbWL2lGapZrH6cgc5z+ALSCh3QjeDvX1sdI+7kXHG4= 20251119052849_migration.sql h1:hHv9owtwZfsRqC8Ad7ZL3FNP9aGPSbRPZbHgrpdtyu8= 20260408163700_add_tx_accounts_sequence_index.sql h1:yzHQY8tFAm2+eFqoMg/eRnkDtVt33LN+hwPdbaF0y8E= +20260409000000_add_tx_account_cleanup_status.sql h1:OUN7L2AycU9G6g54K8hGUkII4vvf57QltgRci88itOo= diff --git a/types/table.go b/types/table.go index f8ecc01c..36f6a1ce 100644 --- a/types/table.go +++ b/types/table.go @@ -182,6 +182,12 @@ type CollectedEvmRetCleanupStatus struct { CorrectedRecords int64 `gorm:"type:bigint;column:corrected_records"` } +type CollectedTxAccountCleanupStatus struct { + LastCleanedSequence int64 `gorm:"type:bigint;column:last_cleaned_sequence"` + DeletedRecords int64 `gorm:"type:bigint;column:deleted_records"` + InsertedRecords int64 `gorm:"type:bigint;column:inserted_records"` +} + func (CollectedUpgradeHistory) TableName() string { return "upgrade_history" } @@ -274,6 +280,10 @@ func (CollectedEvmRetCleanupStatus) TableName() string { return "evm_ret_cleanup_status" } +func (CollectedTxAccountCleanupStatus) TableName() string { + return "tx_account_cleanup_status" +} + // CursorRecord interface implementations // Sequence-based tables diff --git a/util/cache/cache.go b/util/cache/cache.go index 91bb16ea..7cc121fe 100644 --- a/util/cache/cache.go +++ b/util/cache/cache.go @@ -141,8 +141,18 @@ func updateAccountCacheAndResult(uncached []string, accountIdMap map[string]int6 } func GetOrCreateAccountIds(db *gorm.DB, accounts []string, createNew bool) (idMap map[string]int64, err error) { + // Deduplicate input accounts + seen := make(map[string]struct{}, len(accounts)) + deduped := make([]string, 0, len(accounts)) + for _, account := range accounts { + if _, ok := seen[account]; !ok { + seen[account] = struct{}{} + deduped = append(deduped, account) + } + } + // Check cache and collect uncached accounts - idMap, uncached := checkAccountCache(accounts) + idMap, uncached := checkAccountCache(deduped) if len(uncached) == 0 { return idMap, nil diff --git a/util/common-handler/status/status.go b/util/common-handler/status/status.go index d80e2217..bc58d14b 100644 --- a/util/common-handler/status/status.go +++ b/util/common-handler/status/status.go @@ -13,9 +13,10 @@ import ( ) var ( - lastEvmInternalTxHeight atomic.Int64 - lastRichListHeight atomic.Int64 - lastEvmRetCleanupHeight atomic.Int64 + lastEvmInternalTxHeight atomic.Int64 + lastRichListHeight atomic.Int64 + lastEvmRetCleanupHeight atomic.Int64 + lastTxAccountCleanupStatus atomic.Pointer[types.CollectedTxAccountCleanupStatus] ) // status handles GET /status @@ -68,14 +69,26 @@ func (h *StatusHandler) GetStatus(c *fiber.Ctx) error { evmRetCleanupHeight = height } + var txAccountCleanupStatus types.CollectedTxAccountCleanupStatus + if h.isTxAccountCleanupEnabled() { + status, err := h.getTxAccountCleanupStatus(tx) + if err != nil { + return err + } + txAccountCleanupStatus = *status + } + return c.JSON(&StatusResponse{ - Version: config.Version, - CommitHash: config.CommitHash, - ChainId: h.GetChainId(), - Height: lastBlock.Height, - InternalTxHeight: internalTxHeight, - RichListHeight: richListHeight, - EvmRetCleanupHeight: evmRetCleanupHeight, + Version: config.Version, + CommitHash: config.CommitHash, + ChainId: h.GetChainId(), + Height: lastBlock.Height, + InternalTxHeight: internalTxHeight, + RichListHeight: richListHeight, + EvmRetCleanupHeight: evmRetCleanupHeight, + TxAccountCleanupSequence: txAccountCleanupStatus.LastCleanedSequence, + TxAccountCleanupDeleted: txAccountCleanupStatus.DeletedRecords, + TxAccountCleanupInserted: txAccountCleanupStatus.InsertedRecords, }) } @@ -147,6 +160,32 @@ func (h *StatusHandler) isEvmRetCleanupEnabled() bool { return (h.GetChainConfig().VmType == types.EVM) && h.GetConfig().EvmRetCleanupEnabled() } +func (h *StatusHandler) isTxAccountCleanupEnabled() bool { + return h.GetConfig().TxAccountCleanupEnabled() +} + +func (h *StatusHandler) getTxAccountCleanupStatus(tx *gorm.DB) (*types.CollectedTxAccountCleanupStatus, error) { + var cleanupStatus types.CollectedTxAccountCleanupStatus + err := tx.Model(&types.CollectedTxAccountCleanupStatus{}).First(&cleanupStatus).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return &types.CollectedTxAccountCleanupStatus{ + LastCleanedSequence: -1, + DeletedRecords: -1, + InsertedRecords: -1, + }, nil + } + return nil, fiber.NewError(fiber.StatusInternalServerError, err.Error()) + } + + cached := lastTxAccountCleanupStatus.Load() + if cached == nil || cleanupStatus.LastCleanedSequence < cached.LastCleanedSequence { + lastTxAccountCleanupStatus.Store(&cleanupStatus) + } + + return &cleanupStatus, nil +} + func (h *StatusHandler) getEvmRetCleanupStatus(tx *gorm.DB) (int64, error) { height := lastEvmRetCleanupHeight.Load() diff --git a/util/common-handler/status/types.go b/util/common-handler/status/types.go index c79db914..460eef0a 100644 --- a/util/common-handler/status/types.go +++ b/util/common-handler/status/types.go @@ -1,11 +1,14 @@ package status type StatusResponse struct { - Version string `json:"version" extensions:"x-order:0"` - CommitHash string `json:"commit_hash" extensions:"x-order:1"` - ChainId string `json:"chain_id" extensions:"x-order:2"` - Height int64 `json:"height" extensions:"x-order:3"` - InternalTxHeight int64 `json:"internal_tx_height,omitempty" extensions:"x-order:4"` - RichListHeight int64 `json:"rich_list_height,omitempty" extensions:"x-order:5"` - EvmRetCleanupHeight int64 `json:"evm_ret_cleanup_height,omitempty" extensions:"x-order:6"` + Version string `json:"version" extensions:"x-order:0"` + CommitHash string `json:"commit_hash" extensions:"x-order:1"` + ChainId string `json:"chain_id" extensions:"x-order:2"` + Height int64 `json:"height" extensions:"x-order:3"` + InternalTxHeight int64 `json:"internal_tx_height,omitempty" extensions:"x-order:4"` + RichListHeight int64 `json:"rich_list_height,omitempty" extensions:"x-order:5"` + EvmRetCleanupHeight int64 `json:"evm_ret_cleanup_height,omitempty" extensions:"x-order:6"` + TxAccountCleanupSequence int64 `json:"tx_account_cleanup_sequence,omitempty" extensions:"x-order:7"` + TxAccountCleanupDeleted int64 `json:"tx_account_cleanup_deleted,omitempty" extensions:"x-order:8"` + TxAccountCleanupInserted int64 `json:"tx_account_cleanup_inserted,omitempty" extensions:"x-order:9"` }