diff --git a/api/src/ol/validators/validators.service.ts b/api/src/ol/validators/validators.service.ts index b1b9fae1..6f6c72d1 100644 --- a/api/src/ol/validators/validators.service.ts +++ b/api/src/ol/validators/validators.service.ts @@ -64,23 +64,23 @@ export class ValidatorsService { if (this.cacheEnabled) { const cachedValidators = await this.getFromCache(VALIDATORS_CACHE_KEY); if (cachedValidators) { - this.logger.debug('Returning cached validators') - this.logger.debug(`Read this data from cache: ${JSON.stringify(cachedValidators).slice(0, 200)}`) + // this.logger.debug('Returning cached validators') + // this.logger.debug(`Read this data from cache: ${JSON.stringify(cachedValidators).slice(0, 200)}`) return cachedValidators; } } const validators = await this.queryValidators(); await this.setCache(VALIDATORS_CACHE_KEY, validators); - this.logger.debug('Stored validators in cache') - this.logger.debug(`This data written back: ${JSON.stringify(validators).slice(0, 200)}`) + // this.logger.debug('Stored validators in cache') + // this.logger.debug(`This data written back: ${JSON.stringify(validators).slice(0, 200)}`) return validators; } public async getValidatorsHandlers(): Promise> { if (this.cacheEnabled) { - this.logger.debug('Cache is enabled') + // this.logger.debug('Cache is enabled') const cacheHandlersString = await this.getFromCache(VALIDATORS_HANDLERS_CACHE_KEY); // NOTE: cacheHandlersString is NOT a string (it is an Object) let result:Map = new Map([['bad', 'data']]); @@ -90,19 +90,19 @@ export class ValidatorsService { } else { result = new Map(); } - this.logger.debug(`returning handles map with ${result.size} entries`) + // this.logger.debug(`returning handles map with ${result.size} entries`) return result; } let handlers = new Map(); try { handlers = await this.loadValidatorHandles(); - this.logger.debug(`Loaded validator handles: ${handlers}, ${JSON.stringify(handlers)}`) + // this.logger.debug(`Loaded validator handles: ${handlers}, ${JSON.stringify(handlers)}`) } catch (error) { this.logger.error('Error loading validators handlers', error); } finally { const obj = Object.fromEntries(handlers); - this.logger.debug(`Storing validator handles: ${obj}, ${JSON.stringify(obj)}`) + // this.logger.debug(`Storing validator handles: ${obj}, ${JSON.stringify(obj)}`) await redisClient.set(VALIDATORS_HANDLERS_CACHE_KEY, JSON.stringify(obj)); this.logger.log('Validators handlers cache updated'); } diff --git a/api/src/redis/redis.service.ts b/api/src/redis/redis.service.ts index 86626877..bf492619 100644 --- a/api/src/redis/redis.service.ts +++ b/api/src/redis/redis.service.ts @@ -1,8 +1,223 @@ -import { Redis } from 'ioredis'; - -export const redisClient = new Redis({ - host: process.env.REDIS_HOST ?? '127.0.0.1', - port: process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT, 10) : 6379, - maxRetriesPerRequest: null, - enableOfflineQueue: false, -}); +import { createClient, RedisClientOptions } from 'redis'; +import { ConfigService } from '@nestjs/config'; +import { Logger } from '@nestjs/common'; +import { ConnectionOptions } from 'bullmq'; + +const logger = new Logger('RedisService'); + +// Function to create Redis connection options for BullMQ +export function createRedisConnectionOptions(configService: ConfigService): ConnectionOptions { + const host = configService.get('REDIS_HOST') || 'localhost'; + const port = parseInt(configService.get('REDIS_PORT') || '6379', 10); + + logger.log(`Creating Redis connection options for ${host}:${port}`); + + return { + host, + port, + // BullMQ specific settings for reliability + enableReadyCheck: true, + retryStrategy: (times) => { + // conservative retry strategy + const delay = Math.min(Math.pow(2, times) * 500, 30000); // Start with shorter delays + if (times > 50) { + logger.warn(`Redis retry attempts exceeding 50, consider checking server status`); + } + logger.log(`Redis retry in ${delay}ms (attempt ${times})`); + return delay; + } + }; +} + +// A wrapper with safe methods for direct Redis operations +export class SafeRedisClient { + private client: any; // Using any to avoid type issues + private readonly logger = new Logger('SafeRedisClient'); + private connected = false; + private connecting = false; + private readonly maxRetries = 3; + private readonly operationTimeout = 5000; // 5 seconds timeout for operations + + constructor(private readonly configService: ConfigService) { + const host = this.configService.get('REDIS_HOST') || 'localhost'; + const port = parseInt(this.configService.get('REDIS_PORT') || '6379', 10); + const password = this.configService.get('REDIS_PASSWORD'); // Get password from env if set + + this.logger.log(`Initializing Redis client for ${host}:${port}`); + + const options: RedisClientOptions = { + socket: { + host, + port, + connectTimeout: 10000, // 10 seconds connect timeout + reconnectStrategy: (retries) => { + if (retries > 10) { + // After 10 retries, slow down significantly + const delay = Math.min(5000 + (retries - 10) * 1000, 30000); + this.logger.log(`Redis reconnecting in ${delay}ms (attempt ${retries})`); + return delay; + } + const delay = Math.min(retries * 500, 5000); + this.logger.log(`Redis reconnecting in ${delay}ms (attempt ${retries})`); + return delay; + }, + }, + disableOfflineQueue: false, // Keep offline queue enabled + }; + + // Add password if configured + if (password) { + options.password = password; + this.logger.log('Using password authentication for Redis'); + } else { + this.logger.warn('No Redis password set, using non-authenticated connection'); + } + + this.initClient(options); + } + + private initClient(options: RedisClientOptions) { + try { + this.client = createClient(options); + this.setupEventHandlers(); + this.connectWithRetry(); + } catch (error) { + this.logger.error(`Redis client initialization error: ${error.message}`); + } + } + + private setupEventHandlers() { + this.client.on('connect', () => { + this.logger.log('Redis client connected'); + this.connected = true; + this.connecting = false; + }); + + this.client.on('ready', () => { + this.logger.log('Redis client ready'); + }); + + this.client.on('error', (err: Error) => { + // Only log detailed errors for non-connection issues + if (!err.message.includes('ECONNREFUSED')) { + this.logger.error(`Redis client error: ${err.message}`); + } + this.connected = false; + }); + + this.client.on('reconnecting', () => { + if (!this.connecting) { + this.logger.warn('Redis client reconnecting'); + this.connecting = true; + } + }); + + this.client.on('end', () => { + this.logger.warn('Redis client connection closed'); + this.connected = false; + this.connecting = false; + }); + } + + private async connectWithRetry(attempt = 0) { + if (attempt >= 3) { + this.logger.warn('Max Redis connection attempts reached, will try again on next operation'); + return; + } + + try { + this.connecting = true; + await this.client.connect(); + } catch (error) { + this.connecting = false; + this.connected = false; + + // Avoid logging too many connection refused errors + if (!error.message.includes('ECONNREFUSED') || attempt === 0) { + this.logger.error(`Failed to connect to Redis: ${error.message}`); + } + + // Schedule a retry with exponential backoff + const delay = Math.min(Math.pow(2, attempt) * 1000, 10000); + setTimeout(() => this.connectWithRetry(attempt + 1), delay); + } + } + + // Safely execute a Redis operation with timeout and fallback + private async executeWithFallback(operation: () => Promise, defaultValue: T): Promise { + if (!this.connected && !this.connecting) { + try { + await this.connectWithRetry(); + } catch (error) { + // Already logged in connectWithRetry + return defaultValue; + } + } + + try { + // Add a timeout to the operation + const result = await Promise.race([ + operation(), + new Promise((_, reject) => { + setTimeout(() => reject(new Error('Redis operation timeout')), this.operationTimeout); + }) + ]); + return result; + } catch (error) { + // Only log errors that aren't connection refused + if (!error.message.includes('ECONNREFUSED') && !error.message.includes('timeout')) { + this.logger.error(`Redis operation error: ${error.message}`); + } + return defaultValue; + } + } + + async get(key: string): Promise { + return this.executeWithFallback(async () => { + if (!this.client.isOpen) { + await this.connectWithRetry(); + } + return await this.client.get(key); + }, null); + } + + async set(key: string, value: string, options: any = {}): Promise { + await this.executeWithFallback(async () => { + if (!this.client.isOpen) { + await this.connectWithRetry(); + } + await this.client.set(key, value, options); + return true; + }, false); + } + + async del(key: string): Promise { + await this.executeWithFallback(async () => { + if (!this.client.isOpen) { + await this.connectWithRetry(); + } + await this.client.del(key); + return true; + }, false); + } + + get isConnected(): boolean { + return this.connected && (this.client?.isOpen || false); + } + + async ping(): Promise { + return this.executeWithFallback(async () => { + if (!this.client.isOpen) { + await this.connectWithRetry(); + } + const response = await this.client.ping(); + return response === 'PONG'; + }, false); + } +} + +// Create a Singleton instance +export const redisClient = new SafeRedisClient(new ConfigService()); + +// Export connection options for BullMQ - this should be used in all queue registrations +export const redisConnectionOptions = createRedisConnectionOptions(new ConfigService()); diff --git a/api/src/stats/services/accounts.service.ts b/api/src/stats/services/accounts.service.ts new file mode 100644 index 00000000..c3e54aa6 --- /dev/null +++ b/api/src/stats/services/accounts.service.ts @@ -0,0 +1,204 @@ +import { Injectable } from '@nestjs/common'; +import { ClickhouseService } from '../../clickhouse/clickhouse.service.js'; +import { StatsUtils } from '../utils/stats.utils.js'; +import { TimestampValue, AccountsStats } from '../interfaces/stats.interface.js'; + +@Injectable() +export class AccountsService { + constructor( + private readonly clickhouseService: ClickhouseService, + private readonly statsUtils: StatsUtils, + ) {} + + async getAccountsOnChainOverTime(): Promise { + try { + const query = ` + SELECT + version, + address + FROM coin_balance + WHERE coin_module = 'libra_coin' + ORDER BY version ASC + `; + + const resultSet = await this.clickhouseService.client.query({ + query: query, + format: 'JSONEachRow', + }); + + const rows = await resultSet.json<{ + version: string; + address: string; + }>(); + + if (!rows.length) { + return []; + } + + // Extract versions and convert them to timestamps + const versions = rows.map((row) => parseInt(row.version, 10)); + const chunkSize = 1000; + const versionChunks = this.statsUtils.chunkArray(versions, chunkSize); + + const allTimestampMappings = ( + await Promise.all(versionChunks.map((chunk) => this.statsUtils.mapVersionsToTimestamps(chunk))) + ).flat(); + + const versionToTimestampMap = new Map( + allTimestampMappings.map(({ version, timestamp }) => [version, timestamp]), + ); + + const accountsOverTime: TimestampValue[] = []; + const seenAddresses = new Set(); + const dailyCounts = new Map(); + + rows.forEach((row) => { + const version = parseInt(row.version, 10); + const timestamp = versionToTimestampMap.get(version) ?? 0; + const dayTimestamp = Math.floor(timestamp / 86400) * 86400; + + if (!seenAddresses.has(row.address)) { + seenAddresses.add(row.address); + const currentCount = dailyCounts.get(dayTimestamp) || 0; + dailyCounts.set(dayTimestamp, currentCount + 1); + } + }); + + // Convert daily counts to the desired format + dailyCounts.forEach((count, timestamp) => { + accountsOverTime.push({ + timestamp: timestamp, + value: count, + }); + }); + + // Ensure the array is sorted by timestamp + accountsOverTime.sort((a, b) => a.timestamp - b.timestamp); + + // Accumulate counts over time + let runningTotal = 0; + accountsOverTime.forEach((entry) => { + runningTotal += entry.value; + entry.value = runningTotal; + }); + + return accountsOverTime; + } catch (error) { + console.error('Error in getAccountsOnChainOverTime:', error); + throw error; + } + } + + async getAccountsStats(): Promise { + const totalAccounts = await this.getTotalUniqueAccounts(); + const activeAddressesCount = await this.getActiveAddressesCount(); + return { totalAccounts, activeAddressesCount }; + } + + async getActiveAddressesCount(): Promise<{ + lastDay: number; + last30Days: number; + last90Days: number; + }> { + try { + const query = ` + SELECT + version, + address + FROM olfyi.coin_balance + WHERE coin_module = 'libra_coin' + ORDER BY version ASC + `; + + const resultSet = await this.clickhouseService.client.query({ + query: query, + format: 'JSONEachRow', + }); + + const rows = await resultSet.json<{ + version: string; + address: string; + }>(); + + if (!rows.length) { + return { + lastDay: 0, + last30Days: 0, + last90Days: 0, + }; + } + + // Extract versions and convert them to timestamps + const versions = rows.map((row) => parseInt(row.version, 10)); + const chunkSize = 1000; + const versionChunks = this.statsUtils.chunkArray(versions, chunkSize); + const allTimestampMappings = ( + await Promise.all(versionChunks.map((chunk) => this.statsUtils.mapVersionsToTimestamps(chunk))) + ).flat(); + + const versionToTimestampMap = new Map( + allTimestampMappings.map(({ version, timestamp }) => [version, timestamp]), + ); + + const now = Math.floor(Date.now() / 1000); + const oneDayAgo = now - 86400; + const thirtyDaysAgo = now - 30 * 86400; + const ninetyDaysAgo = now - 90 * 86400; + + const seenAddressesLastDay = new Set(); + const seenAddressesLast30Days = new Set(); + const seenAddressesLast90Days = new Set(); + + rows.forEach((row) => { + const version = parseInt(row.version, 10); + const timestamp = versionToTimestampMap.get(version) ?? 0; + + if (timestamp >= oneDayAgo) { + seenAddressesLastDay.add(row.address); + } + if (timestamp >= thirtyDaysAgo) { + seenAddressesLast30Days.add(row.address); + } + if (timestamp >= ninetyDaysAgo) { + seenAddressesLast90Days.add(row.address); + } + }); + + return { + lastDay: seenAddressesLastDay.size, + last30Days: seenAddressesLast30Days.size, + last90Days: seenAddressesLast90Days.size, + }; + } catch (error) { + console.error('Error in getActiveAddressesCount:', error); + throw error; + } + } + + async getTotalUniqueAccounts(): Promise { + try { + const query = ` + SELECT COUNT(DISTINCT address) AS unique_accounts + FROM olfyi.coin_balance + WHERE coin_module = 'libra_coin' + `; + + const resultSet = await this.clickhouseService.client.query({ + query: query, + format: 'JSONEachRow', + }); + + const rows = await resultSet.json<{ unique_accounts: number }[]>(); + + if (rows.length === 0) { + return 0; + } + const uniqueAccountsCount = Number(rows[0]['unique_accounts']); + + return uniqueAccountsCount; + } catch (error) { + console.error('Error in getTotalUniqueAccounts:', error); + throw error; + } + } +}