Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions api/src/ol/validators/validators.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,23 @@ export class ValidatorsService {
if (this.cacheEnabled) {
const cachedValidators = await this.getFromCache<Validator[]>(VALIDATORS_CACHE_KEY);
if (cachedValidators) {
this.logger.debug('Returning cached validators')
this.logger.debug(`Read this data from cache: ${JSON.stringify(cachedValidators).slice(0, 200)}`)
// this.logger.debug('Returning cached validators')
// this.logger.debug(`Read this data from cache: ${JSON.stringify(cachedValidators).slice(0, 200)}`)
return cachedValidators;
}
}

const validators = await this.queryValidators();
await this.setCache(VALIDATORS_CACHE_KEY, validators);
this.logger.debug('Stored validators in cache')
this.logger.debug(`This data written back: ${JSON.stringify(validators).slice(0, 200)}`)
// this.logger.debug('Stored validators in cache')
// this.logger.debug(`This data written back: ${JSON.stringify(validators).slice(0, 200)}`)

return validators;
}

public async getValidatorsHandlers(): Promise<Map<string, string>> {
if (this.cacheEnabled) {
this.logger.debug('Cache is enabled')
// this.logger.debug('Cache is enabled')
const cacheHandlersString = await this.getFromCache<string>(VALIDATORS_HANDLERS_CACHE_KEY);
// NOTE: cacheHandlersString is NOT a string (it is an Object)
let result:Map<string, string> = new Map([['bad', 'data']]);
Expand All @@ -90,19 +90,19 @@ export class ValidatorsService {
} else {
result = new Map();
}
this.logger.debug(`returning handles map with ${result.size} entries`)
// this.logger.debug(`returning handles map with ${result.size} entries`)
return result;
}

let handlers = new Map<string, string>();
try {
handlers = await this.loadValidatorHandles();
this.logger.debug(`Loaded validator handles: ${handlers}, ${JSON.stringify(handlers)}`)
// this.logger.debug(`Loaded validator handles: ${handlers}, ${JSON.stringify(handlers)}`)
} catch (error) {
this.logger.error('Error loading validators handlers', error);
} finally {
const obj = Object.fromEntries(handlers);
this.logger.debug(`Storing validator handles: ${obj}, ${JSON.stringify(obj)}`)
// this.logger.debug(`Storing validator handles: ${obj}, ${JSON.stringify(obj)}`)
await redisClient.set(VALIDATORS_HANDLERS_CACHE_KEY, JSON.stringify(obj));
this.logger.log('Validators handlers cache updated');
}
Expand Down
231 changes: 223 additions & 8 deletions api/src/redis/redis.service.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,223 @@
import { Redis } from 'ioredis';

export const redisClient = new Redis({
host: process.env.REDIS_HOST ?? '127.0.0.1',
port: process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT, 10) : 6379,
maxRetriesPerRequest: null,
enableOfflineQueue: false,
});
import { createClient, RedisClientOptions } from 'redis';
import { ConfigService } from '@nestjs/config';
import { Logger } from '@nestjs/common';
import { ConnectionOptions } from 'bullmq';

const logger = new Logger('RedisService');

// Function to create Redis connection options for BullMQ
export function createRedisConnectionOptions(configService: ConfigService): ConnectionOptions {
const host = configService.get('REDIS_HOST') || 'localhost';
const port = parseInt(configService.get('REDIS_PORT') || '6379', 10);

logger.log(`Creating Redis connection options for ${host}:${port}`);

return {
host,
port,
// BullMQ specific settings for reliability
enableReadyCheck: true,
retryStrategy: (times) => {
// conservative retry strategy
const delay = Math.min(Math.pow(2, times) * 500, 30000); // Start with shorter delays
if (times > 50) {
logger.warn(`Redis retry attempts exceeding 50, consider checking server status`);
}
logger.log(`Redis retry in ${delay}ms (attempt ${times})`);
return delay;
}
};
}

// A wrapper with safe methods for direct Redis operations
export class SafeRedisClient {
private client: any; // Using any to avoid type issues
private readonly logger = new Logger('SafeRedisClient');
private connected = false;
private connecting = false;
private readonly maxRetries = 3;
private readonly operationTimeout = 5000; // 5 seconds timeout for operations

constructor(private readonly configService: ConfigService) {
const host = this.configService.get('REDIS_HOST') || 'localhost';
const port = parseInt(this.configService.get('REDIS_PORT') || '6379', 10);
const password = this.configService.get('REDIS_PASSWORD'); // Get password from env if set

this.logger.log(`Initializing Redis client for ${host}:${port}`);

const options: RedisClientOptions = {
socket: {
host,
port,
connectTimeout: 10000, // 10 seconds connect timeout
reconnectStrategy: (retries) => {
if (retries > 10) {
// After 10 retries, slow down significantly
const delay = Math.min(5000 + (retries - 10) * 1000, 30000);
this.logger.log(`Redis reconnecting in ${delay}ms (attempt ${retries})`);
return delay;
}
const delay = Math.min(retries * 500, 5000);
this.logger.log(`Redis reconnecting in ${delay}ms (attempt ${retries})`);
return delay;
},
},
disableOfflineQueue: false, // Keep offline queue enabled
};

// Add password if configured
if (password) {
options.password = password;
this.logger.log('Using password authentication for Redis');
} else {
this.logger.warn('No Redis password set, using non-authenticated connection');
}

this.initClient(options);
}

private initClient(options: RedisClientOptions) {
try {
this.client = createClient(options);
this.setupEventHandlers();
this.connectWithRetry();
} catch (error) {
this.logger.error(`Redis client initialization error: ${error.message}`);
}
}

private setupEventHandlers() {
this.client.on('connect', () => {
this.logger.log('Redis client connected');
this.connected = true;
this.connecting = false;
});

this.client.on('ready', () => {
this.logger.log('Redis client ready');
});

this.client.on('error', (err: Error) => {
// Only log detailed errors for non-connection issues
if (!err.message.includes('ECONNREFUSED')) {
this.logger.error(`Redis client error: ${err.message}`);
}
this.connected = false;
});

this.client.on('reconnecting', () => {
if (!this.connecting) {
this.logger.warn('Redis client reconnecting');
this.connecting = true;
}
});

this.client.on('end', () => {
this.logger.warn('Redis client connection closed');
this.connected = false;
this.connecting = false;
});
}

private async connectWithRetry(attempt = 0) {
if (attempt >= 3) {
this.logger.warn('Max Redis connection attempts reached, will try again on next operation');
return;
}

try {
this.connecting = true;
await this.client.connect();
} catch (error) {
this.connecting = false;
this.connected = false;

// Avoid logging too many connection refused errors
if (!error.message.includes('ECONNREFUSED') || attempt === 0) {
this.logger.error(`Failed to connect to Redis: ${error.message}`);
}

// Schedule a retry with exponential backoff
const delay = Math.min(Math.pow(2, attempt) * 1000, 10000);
setTimeout(() => this.connectWithRetry(attempt + 1), delay);
}
}

// Safely execute a Redis operation with timeout and fallback
private async executeWithFallback<T>(operation: () => Promise<T>, defaultValue: T): Promise<T> {
if (!this.connected && !this.connecting) {
try {
await this.connectWithRetry();
} catch (error) {
// Already logged in connectWithRetry
return defaultValue;
}
}

try {
// Add a timeout to the operation
const result = await Promise.race([
operation(),
new Promise<T>((_, reject) => {
setTimeout(() => reject(new Error('Redis operation timeout')), this.operationTimeout);
})
]);
return result;
} catch (error) {
// Only log errors that aren't connection refused
if (!error.message.includes('ECONNREFUSED') && !error.message.includes('timeout')) {
this.logger.error(`Redis operation error: ${error.message}`);
}
return defaultValue;
}
}

async get(key: string): Promise<string | null> {
return this.executeWithFallback(async () => {
if (!this.client.isOpen) {
await this.connectWithRetry();
}
return await this.client.get(key);
}, null);
}

async set(key: string, value: string, options: any = {}): Promise<void> {
await this.executeWithFallback(async () => {
if (!this.client.isOpen) {
await this.connectWithRetry();
}
await this.client.set(key, value, options);
return true;
}, false);
}

async del(key: string): Promise<void> {
await this.executeWithFallback(async () => {
if (!this.client.isOpen) {
await this.connectWithRetry();
}
await this.client.del(key);
return true;
}, false);
}

get isConnected(): boolean {
return this.connected && (this.client?.isOpen || false);
}

async ping(): Promise<boolean> {
return this.executeWithFallback(async () => {
if (!this.client.isOpen) {
await this.connectWithRetry();
}
const response = await this.client.ping();
return response === 'PONG';
}, false);
}
}

// Create a Singleton instance
export const redisClient = new SafeRedisClient(new ConfigService());

// Export connection options for BullMQ - this should be used in all queue registrations
export const redisConnectionOptions = createRedisConnectionOptions(new ConfigService());
Loading
Loading