diff --git a/src/auth/guards/jwt-auth.guard.ts b/src/auth/guards/jwt-auth.guard.ts index 2155290e..91447a67 100644 --- a/src/auth/guards/jwt-auth.guard.ts +++ b/src/auth/guards/jwt-auth.guard.ts @@ -1,5 +1,6 @@ import { Injectable } from '@nestjs/common'; import { AuthGuard } from '@nestjs/passport'; +import { AUTH_STRATEGY } from '../../common/constants/auth.constants'; @Injectable() -export class JwtAuthGuard extends AuthGuard('jwt') {} +export class JwtAuthGuard extends AuthGuard(AUTH_STRATEGY.JWT) {} diff --git a/src/backup/backup.module.ts b/src/backup/backup.module.ts index 602141f4..0d8ee817 100644 --- a/src/backup/backup.module.ts +++ b/src/backup/backup.module.ts @@ -3,6 +3,7 @@ import { TypeOrmModule } from '@nestjs/typeorm'; import { BullModule } from '@nestjs/bull'; import { ConfigModule } from '@nestjs/config'; import { ScheduleModule } from '@nestjs/schedule'; +import { QUEUE_NAMES } from '../common/constants/queue.constants'; // Entities import { BackupRecord } from './entities/backup-record.entity'; @@ -31,7 +32,7 @@ import { MonitoringModule } from '../monitoring/monitoring.module'; ScheduleModule.forRoot(), TypeOrmModule.forFeature([BackupRecord, RecoveryTest]), BullModule.registerQueue({ - name: 'backup-processing', + name: QUEUE_NAMES.BACKUP_PROCESSING, }), MediaModule, // For FileStorageService MonitoringModule, // For AlertingService and MetricsCollectionService diff --git a/src/backup/backup.service.ts b/src/backup/backup.service.ts index 554e141f..a7a034b4 100644 --- a/src/backup/backup.service.ts +++ b/src/backup/backup.service.ts @@ -3,6 +3,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { Repository, LessThan } from 'typeorm'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; +import { QUEUE_NAMES, JOB_NAMES } from './../../common/constants/queue.constants'; import { ConfigService } from '@nestjs/config'; import { Cron } from '@nestjs/schedule'; import { BackupRecord } from './entities/backup-record.entity'; @@ -29,7 +30,7 @@ export class BackupService { constructor( @InjectRepository(BackupRecord) private readonly backupRepository: Repository, - @InjectQueue('backup-processing') + @InjectQueue(QUEUE_NAMES.BACKUP_PROCESSING) private readonly backupQueue: Queue, private readonly configService: ConfigService, private readonly alertingService: AlertingService, @@ -103,7 +104,7 @@ export class BackupService { // Queue backup job await this.backupQueue.add( - 'create-backup', + JOB_NAMES.CREATE_BACKUP, { backupRecordId: backupRecord.id, backupType: BackupType.FULL, @@ -157,7 +158,7 @@ export class BackupService { for (const backup of expiredBackups) { await this.backupQueue.add( - 'delete-backup', + JOB_NAMES.DELETE_BACKUP, { backupRecordId: backup.id }, { attempts: 3, diff --git a/src/backup/processing/backup-queue.processor.ts b/src/backup/processing/backup-queue.processor.ts index e5cdb61f..94e4a7fd 100644 --- a/src/backup/processing/backup-queue.processor.ts +++ b/src/backup/processing/backup-queue.processor.ts @@ -1,6 +1,7 @@ import { Process, Processor } from '@nestjs/bull'; import { Logger } from '@nestjs/common'; import { Job } from 'bull'; +import { QUEUE_NAMES, JOB_NAMES } from '../../common/constants/queue.constants'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { ConfigService } from '@nestjs/config'; @@ -24,7 +25,7 @@ import { S3Client, CopyObjectCommand, DeleteObjectCommand } from '@aws-sdk/clien const execAsync = promisify(exec); const MAX_RETRIES = 3; -@Processor('backup-processing') +@Processor(QUEUE_NAMES.BACKUP_PROCESSING) export class BackupQueueProcessor { private readonly logger = new Logger(BackupQueueProcessor.name); private readonly kmsClient: KMSClient; @@ -57,7 +58,7 @@ export class BackupQueueProcessor { }); } - @Process('create-backup') + @Process(JOB_NAMES.CREATE_BACKUP) async handleCreateBackup(job: Job) { const { backupRecordId, databaseName } = job.data; this.logger.log(`Processing backup creation for: ${backupRecordId}`); @@ -136,7 +137,7 @@ export class BackupQueueProcessor { // Queue verification job await (job.queue as any).add( - 'verify-backup', + JOB_NAMES.VERIFY_BACKUP, { backupRecordId, storageKey: encryptedKey }, { attempts: 3, @@ -151,7 +152,7 @@ export class BackupQueueProcessor { } } - @Process('verify-backup') + @Process(JOB_NAMES.VERIFY_BACKUP) async handleVerifyBackup(job: Job) { const { backupRecordId } = job.data; this.logger.log(`Verifying backup integrity: ${backupRecordId}`); @@ -177,13 +178,13 @@ export class BackupQueueProcessor { } } - @Process('recovery-test') + @Process(JOB_NAMES.RECOVERY_TEST) async handleRecoveryTest(_job: Job) { this.logger.log(`Recovery test processing handled by RecoveryTestingService`); // Delegated to RecoveryTestingService.executeRecoveryTest() } - @Process('delete-backup') + @Process(JOB_NAMES.DELETE_BACKUP) async handleDeleteBackup(job: Job<{ backupRecordId: string }>) { const { backupRecordId } = job.data; this.logger.log(`Deleting expired backup: ${backupRecordId}`); diff --git a/src/backup/testing/recovery-testing.service.ts b/src/backup/testing/recovery-testing.service.ts index 33de4391..2abdde1f 100644 --- a/src/backup/testing/recovery-testing.service.ts +++ b/src/backup/testing/recovery-testing.service.ts @@ -3,6 +3,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; +import { QUEUE_NAMES, JOB_NAMES } from '../../common/constants/queue.constants'; import { ConfigService } from '@nestjs/config'; import { RecoveryTest } from '../entities/recovery-test.entity'; import { RecoveryTestStatus } from '../enums/recovery-test-status.enum'; @@ -27,7 +28,7 @@ export class RecoveryTestingService { constructor( @InjectRepository(RecoveryTest) private readonly recoveryTestRepository: Repository, - @InjectQueue('backup-processing') + @InjectQueue(QUEUE_NAMES.BACKUP_PROCESSING) private readonly backupQueue: Queue, private readonly backupService: BackupService, private readonly fileStorageService: FileStorageService, @@ -59,7 +60,7 @@ export class RecoveryTestingService { // Queue recovery test job await this.backupQueue.add( - 'recovery-test', + JOB_NAMES.RECOVERY_TEST, { recoveryTestId: recoveryTest.id, backupRecordId: backupId, diff --git a/src/caching/cache-management.controller.ts b/src/caching/cache-management.controller.ts index b51d13b8..5331f8c2 100644 --- a/src/caching/cache-management.controller.ts +++ b/src/caching/cache-management.controller.ts @@ -16,14 +16,13 @@ import { CacheInvalidationService } from './invalidation/invalidation.service'; import { CacheWarmingService } from './warming/cache-warming.service'; import { CacheStrategiesService } from './strategies/cache-strategies.service'; import { RolesGuard } from '../common/guards/roles.guard'; -import { Roles } from '../common/decorators/roles.decorator'; -// import { Role } from '../common/decorators/roles.decorator'; +import { Roles, Role } from '../common/decorators/roles.decorator'; @ApiTags('Cache Management') @ApiBearerAuth() @Controller('cache') @UseGuards(RolesGuard) -@Roles('admin') +@Roles(Role.ADMIN) export class CacheManagementController { constructor( private readonly cachingService: CachingService, diff --git a/src/caching/caching.constants.ts b/src/caching/caching.constants.ts index b2ac0da5..7b911828 100644 --- a/src/caching/caching.constants.ts +++ b/src/caching/caching.constants.ts @@ -20,6 +20,8 @@ export const CACHE_PREFIXES = { POPULAR: 'cache:popular', ENROLLMENT: 'cache:enrollment', FEATURED: 'cache:featured', + SYSTEM_CONFIG: 'cache:system:config', + USERS_LIST: 'cache:users:list', } as const; export const CACHE_EVENTS = { @@ -30,4 +32,6 @@ export const CACHE_EVENTS = { ENROLLMENT_CREATED: 'cache.enrollment.created', ENROLLMENT_UPDATED: 'cache.enrollment.updated', SEARCH_INDEX_UPDATED: 'cache.search.updated', + CACHE_INVALIDATED: 'cache.invalidated', + CACHE_PURGED: 'cache.purged', } as const; diff --git a/src/caching/warming/cache-warming.service.ts b/src/caching/warming/cache-warming.service.ts index b7303e10..fd1a781e 100644 --- a/src/caching/warming/cache-warming.service.ts +++ b/src/caching/warming/cache-warming.service.ts @@ -219,7 +219,7 @@ export class CacheWarmingService implements OnModuleInit, OnModuleDestroy { private async warmSystemConfig(): Promise { this.logger.debug('Warming system configuration...'); - const configKey = 'cache:system:config'; + const configKey = CACHE_PREFIXES.SYSTEM_CONFIG; const configData = { version: this.configService.get('npm_package_version') || '1.0.0', diff --git a/src/collaboration/constants/collaboration-events.constants.ts b/src/collaboration/constants/collaboration-events.constants.ts new file mode 100644 index 00000000..99586236 --- /dev/null +++ b/src/collaboration/constants/collaboration-events.constants.ts @@ -0,0 +1,25 @@ +export const COLLABORATION_EVENTS = { + // Inbound WebSocket messages + JOIN_SESSION: 'join-session', + COLLABORATIVE_OPERATION: 'collaborative-operation', + REQUEST_SYNC: 'request-sync', + RESOLVE_CONFLICT: 'resolve-conflict', + + // Outbound WebSocket messages + USER_JOINED: 'user-joined', + SESSION_STATE: 'session-state', + OPERATION_APPLIED: 'operation-applied', + FULL_SYNC: 'full-sync', + CONFLICT_RESOLVED: 'conflict-resolved', +} as const; + +export const NOTIFICATION_GATEWAY_EVENTS = { + SUBSCRIBE: 'subscribe', + NOTIFICATION: 'notification', + BROADCAST_NOTIFICATION: 'broadcast_notification', + SUBSCRIBE_NOTIFICATIONS: 'subscribe_notifications', +} as const; + +export const MESSAGING_GATEWAY_EVENTS = { + SEND_MESSAGE: 'send_message', +} as const; diff --git a/src/collaboration/gateway/collaboration.gateway.ts b/src/collaboration/gateway/collaboration.gateway.ts index ae02702e..0611f7a8 100644 --- a/src/collaboration/gateway/collaboration.gateway.ts +++ b/src/collaboration/gateway/collaboration.gateway.ts @@ -8,6 +8,7 @@ import { MessageBody, ConnectedSocket, } from '@nestjs/websockets'; +import { COLLABORATION_EVENTS } from '../constants/collaboration-events.constants'; import { Server, Socket } from 'socket.io'; import { Logger, UseGuards } from '@nestjs/common'; import { CollaborationService } from '../collaboration.service'; @@ -75,7 +76,7 @@ export class CollaborationGateway // Remove client from any active sessions } - @SubscribeMessage('join-session') + @SubscribeMessage(COLLABORATION_EVENTS.JOIN_SESSION) async handleJoinSession( @MessageBody() data: { sessionId: string; userId: string; resourceType: string }, @ConnectedSocket() client: Socket, @@ -122,10 +123,10 @@ export class CollaborationGateway } // Notify other users in the session - client.to(sessionId).emit('user-joined', { userId, sessionId }); + client.to(sessionId).emit(COLLABORATION_EVENTS.USER_JOINED, { userId, sessionId }); // Send current resource state to the joining user - client.emit('session-state', { + client.emit(COLLABORATION_EVENTS.SESSION_STATE, { sessionId, resourceType, resource, @@ -139,7 +140,7 @@ export class CollaborationGateway } } - @SubscribeMessage('collaborative-operation') + @SubscribeMessage(COLLABORATION_EVENTS.COLLABORATIVE_OPERATION) async handleCollaborativeOperation( @MessageBody() operation: CollaborativeOperation, @ConnectedSocket() client: Socket, @@ -175,7 +176,7 @@ export class CollaborationGateway }); // Broadcast the operation to all other clients in the session - client.to(sessionId).emit('operation-applied', { + client.to(sessionId).emit(COLLABORATION_EVENTS.OPERATION_APPLIED, { operation: opData, userId, timestamp: Date.now(), @@ -189,7 +190,7 @@ export class CollaborationGateway } } - @SubscribeMessage('request-sync') + @SubscribeMessage(COLLABORATION_EVENTS.REQUEST_SYNC) async handleSyncRequest( @MessageBody() data: { sessionId: string; userId: string }, @ConnectedSocket() client: Socket, @@ -207,7 +208,7 @@ export class CollaborationGateway const document = await this.sharedDocumentService.getDocument(sessionId); const whiteboard = await this.whiteboardService.getWhiteboard(sessionId); - client.emit('full-sync', { + client.emit(COLLABORATION_EVENTS.FULL_SYNC, { sessionId, document: document || null, whiteboard: whiteboard || null, @@ -218,7 +219,7 @@ export class CollaborationGateway } } - @SubscribeMessage('resolve-conflict') + @SubscribeMessage(COLLABORATION_EVENTS.RESOLVE_CONFLICT) async handleConflictResolution( @MessageBody() data: { sessionId: string; userId: string; resourceType: string; operations: any[] }, @@ -246,7 +247,7 @@ export class CollaborationGateway } // Broadcast resolved state to all clients - this.server.to(sessionId).emit('conflict-resolved', { + this.server.to(sessionId).emit(COLLABORATION_EVENTS.CONFLICT_RESOLVED, { sessionId, resourceType, resolvedState: result, diff --git a/src/common/constants/auth.constants.ts b/src/common/constants/auth.constants.ts new file mode 100644 index 00000000..5b3dd5df --- /dev/null +++ b/src/common/constants/auth.constants.ts @@ -0,0 +1,3 @@ +export const AUTH_STRATEGY = { + JWT: 'jwt', +} as const; diff --git a/src/common/constants/event.constants.ts b/src/common/constants/event.constants.ts new file mode 100644 index 00000000..91b65d2c --- /dev/null +++ b/src/common/constants/event.constants.ts @@ -0,0 +1,31 @@ +export const APP_EVENTS = { + // Data sync events + DATA_UPDATED: 'data.updated', + + // Cache events + CACHE_INVALIDATED: 'cache.invalidated', + CACHE_PURGED: 'cache.purged', + + // Data integrity events + DATA_CONSISTENCY_SCHEDULED: 'data.consistency.scheduled', + DATA_INTEGRITY_VIOLATION: 'data.integrity.violation', + + // User events + USER_SIGNUP: 'user.signup', + USER_ADD_TAG: 'user.addTag', + USER_REMOVE_TAG: 'user.removeTag', + + // Course events + COURSE_ENROLLED: 'course.enrolled', + COURSE_COMPLETED: 'course.completed', + + // Payment events + PAYMENT_COMPLETED: 'payment.completed', + + // Segment events + SEGMENT_ADD_USER: 'segment.addUser', + + // Notification events + NOTIFICATION_SEND: 'notification.send', + NOTIFICATION_TEMPLATE_SEND: 'notification.template.send', +} as const; diff --git a/src/common/constants/queue.constants.ts b/src/common/constants/queue.constants.ts new file mode 100644 index 00000000..cf131b2a --- /dev/null +++ b/src/common/constants/queue.constants.ts @@ -0,0 +1,42 @@ +export const QUEUE_NAMES = { + EMAIL: 'email', + EMAIL_MARKETING: 'email-marketing', + SYNC_TASKS: 'sync-tasks', + BACKUP_PROCESSING: 'backup-processing', + MESSAGE_QUEUE: 'message-queue', + MEDIA_PROCESSING: 'media-processing', + DEFAULT: 'default', + USER_DATA_EXPORT: 'user-data-export', + SUBSCRIPTIONS: 'subscriptions', + WEBHOOKS: 'webhooks', +} as const; + +export const JOB_NAMES = { + // Email queue + SEND_EMAIL: 'send-email', + + // Email marketing queue + SEND_CAMPAIGN: 'send-campaign', + PROCESS_CAMPAIGN: 'process-campaign', + RESUME_CAMPAIGN: 'resume-campaign', + SEND_AUTOMATION_EMAIL: 'send-automation-email', + CONTINUE_AUTOMATION: 'continue-automation', + CALL_WEBHOOK: 'call-webhook', + + // Sync tasks queue + CONSISTENCY_CHECK: 'consistency-check', + REPLICATE_DATA: 'replicate-data', + + // Backup processing queue + CREATE_BACKUP: 'create-backup', + VERIFY_BACKUP: 'verify-backup', + RECOVERY_TEST: 'recovery-test', + DELETE_BACKUP: 'delete-backup', + + // Media processing queue + TRANSCODE_VIDEO: 'transcode-video', + + // Payments queues + PROCESS_SUBSCRIPTION: 'process_subscription', + PROCESS_WEBHOOK: 'process-webhook', +} as const; diff --git a/src/email-marketing/automation/automation.service.ts b/src/email-marketing/automation/automation.service.ts index 06281fc5..b4b5f76a 100644 --- a/src/email-marketing/automation/automation.service.ts +++ b/src/email-marketing/automation/automation.service.ts @@ -4,6 +4,8 @@ import { Repository } from 'typeorm'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; import { EventEmitter2, OnEvent } from '@nestjs/event-emitter'; +import { QUEUE_NAMES, JOB_NAMES } from '../../common/constants/queue.constants'; +import { APP_EVENTS } from '../../common/constants/event.constants'; import { AutomationWorkflow } from '../entities/automation-workflow.entity'; import { AutomationTrigger } from '../entities/automation-trigger.entity'; @@ -24,7 +26,7 @@ export class AutomationService { private readonly triggerRepository: Repository, @InjectRepository(AutomationAction) private readonly actionRepository: Repository, - @InjectQueue('email-marketing') + @InjectQueue(QUEUE_NAMES.EMAIL_MARKETING) private readonly emailQueue: Queue, private readonly eventEmitter: EventEmitter2, ) {} @@ -206,7 +208,7 @@ export class AutomationService { /** * Handle user signup event */ - @OnEvent('user.signup') + @OnEvent(APP_EVENTS.USER_SIGNUP) async handleUserSignup(payload: { userId: string; email: string }) { await this.executeTriggeredWorkflows(TriggerType.USER_SIGNUP, payload); } @@ -214,7 +216,7 @@ export class AutomationService { /** * Handle course enrollment event */ - @OnEvent('course.enrolled') + @OnEvent(APP_EVENTS.COURSE_ENROLLED) async handleCourseEnrollment(payload: { userId: string; courseId: string }) { await this.executeTriggeredWorkflows(TriggerType.COURSE_ENROLLED, payload); } @@ -222,7 +224,7 @@ export class AutomationService { /** * Handle course completion event */ - @OnEvent('course.completed') + @OnEvent(APP_EVENTS.COURSE_COMPLETED) async handleCourseCompletion(payload: { userId: string; courseId: string }) { await this.executeTriggeredWorkflows(TriggerType.COURSE_COMPLETED, payload); } @@ -230,7 +232,7 @@ export class AutomationService { /** * Handle purchase event */ - @OnEvent('payment.completed') + @OnEvent(APP_EVENTS.PAYMENT_COMPLETED) async handlePurchase(payload: { userId: string; amount: number; productId: string }) { await this.executeTriggeredWorkflows(TriggerType.PURCHASE_MADE, payload); } @@ -316,7 +318,7 @@ export class AutomationService { ): Promise { switch (action.type) { case ActionType.SEND_EMAIL: - await this.emailQueue.add('send-automation-email', { + await this.emailQueue.add(JOB_NAMES.SEND_AUTOMATION_EMAIL, { actionId: action.id, templateId: action.config.templateId, userId: payload.userId, @@ -326,7 +328,7 @@ export class AutomationService { case ActionType.WAIT: await this.emailQueue.add( - 'continue-automation', + JOB_NAMES.CONTINUE_AUTOMATION, { workflowId: action.workflowId, nextActionOrder: action.order + 1, @@ -337,28 +339,28 @@ export class AutomationService { break; case ActionType.ADD_TAG: - this.eventEmitter.emit('user.addTag', { + this.eventEmitter.emit(APP_EVENTS.USER_ADD_TAG, { userId: payload.userId, tag: action.config.tag, }); break; case ActionType.REMOVE_TAG: - this.eventEmitter.emit('user.removeTag', { + this.eventEmitter.emit(APP_EVENTS.USER_REMOVE_TAG, { userId: payload.userId, tag: action.config.tag, }); break; case ActionType.ADD_TO_SEGMENT: - this.eventEmitter.emit('segment.addUser', { + this.eventEmitter.emit(APP_EVENTS.SEGMENT_ADD_USER, { userId: payload.userId, segmentId: action.config.segmentId, }); break; case ActionType.WEBHOOK: - await this.emailQueue.add('call-webhook', { + await this.emailQueue.add(JOB_NAMES.CALL_WEBHOOK, { url: action.config.webhookUrl, method: action.config.method || 'POST', payload: { ...payload, ...action.config.webhookPayload }, diff --git a/src/email-marketing/email-marketing.module.ts b/src/email-marketing/email-marketing.module.ts index 62b56b4e..8f7c11d6 100644 --- a/src/email-marketing/email-marketing.module.ts +++ b/src/email-marketing/email-marketing.module.ts @@ -2,6 +2,7 @@ import { Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; import { BullModule } from '@nestjs/bull'; import { ConfigModule } from '@nestjs/config'; +import { QUEUE_NAMES } from '../common/constants/queue.constants'; // Services import { EmailMarketingService } from './email-marketing.service'; @@ -56,7 +57,7 @@ import { EmailQueueProcessor } from './processors/email-queue.processor'; EmailSubscription, ]), BullModule.registerQueue({ - name: 'email-marketing', + name: QUEUE_NAMES.EMAIL_MARKETING, }), ], controllers: [ diff --git a/src/email-marketing/email-marketing.service.ts b/src/email-marketing/email-marketing.service.ts index 0d363a27..f5cc3e0e 100644 --- a/src/email-marketing/email-marketing.service.ts +++ b/src/email-marketing/email-marketing.service.ts @@ -3,6 +3,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; +import { QUEUE_NAMES, JOB_NAMES } from '../common/constants/queue.constants'; import { Campaign } from './entities/campaign.entity'; import { CampaignRecipient } from './entities/campaign-recipient.entity'; @@ -23,7 +24,7 @@ export class EmailMarketingService { private readonly campaignRepository: Repository, @InjectRepository(CampaignRecipient) private readonly recipientRepository: Repository, - @InjectQueue('email-marketing') + @InjectQueue(QUEUE_NAMES.EMAIL_MARKETING) private readonly emailQueue: Queue, private readonly segmentationService: SegmentationService, private readonly templateService: TemplateManagementService, @@ -150,7 +151,7 @@ export class EmailMarketingService { // Add to queue with delay const delay = scheduledDate.getTime() - Date.now(); await this.emailQueue.add( - 'send-campaign', + JOB_NAMES.SEND_CAMPAIGN, { campaignId: id }, { delay, jobId: `campaign-${id}` }, ); @@ -184,7 +185,7 @@ export class EmailMarketingService { await this.campaignRepository.save(campaign); // Queue emails for sending - await this.emailQueue.add('process-campaign', { + await this.emailQueue.add(JOB_NAMES.PROCESS_CAMPAIGN, { campaignId: id, recipients: recipients.map((r) => r.id), }); @@ -231,7 +232,7 @@ export class EmailMarketingService { // Otherwise, resume sending campaign.status = CampaignStatus.SENDING; - await this.emailQueue.add('resume-campaign', { campaignId: id }); + await this.emailQueue.add(JOB_NAMES.RESUME_CAMPAIGN, { campaignId: id }); return this.campaignRepository.save(campaign); } diff --git a/src/email-marketing/processors/email-queue.processor.ts b/src/email-marketing/processors/email-queue.processor.ts index 341bb2d4..d6108f9f 100644 --- a/src/email-marketing/processors/email-queue.processor.ts +++ b/src/email-marketing/processors/email-queue.processor.ts @@ -1,6 +1,7 @@ import { Process, Processor } from '@nestjs/bull'; import { Logger } from '@nestjs/common'; import { Job } from 'bull'; +import { QUEUE_NAMES, JOB_NAMES } from '../../common/constants/queue.constants'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; @@ -12,7 +13,7 @@ import { ABTestingService } from '../ab-testing/ab-testing.service'; import { CampaignStatus } from '../enums/campaign-status.enum'; import { RecipientStatus } from '../enums/recipient-status.enum'; -@Processor('email-marketing') +@Processor(QUEUE_NAMES.EMAIL_MARKETING) export class EmailQueueProcessor { private readonly logger = new Logger(EmailQueueProcessor.name); @@ -26,7 +27,7 @@ export class EmailQueueProcessor { private readonly abTestingService: ABTestingService, ) {} - @Process('send-campaign') + @Process(JOB_NAMES.SEND_CAMPAIGN) async handleScheduledCampaign(job: Job<{ campaignId: string }>) { this.logger.log(`Processing scheduled campaign: ${job.data.campaignId}`); @@ -51,7 +52,7 @@ export class EmailQueueProcessor { await this.processRecipients(campaign, users); } - @Process('process-campaign') + @Process(JOB_NAMES.PROCESS_CAMPAIGN) async handleCampaignProcessing(job: Job<{ campaignId: string; recipients: string[] }>) { this.logger.log(`Processing campaign: ${job.data.campaignId}`); @@ -74,7 +75,7 @@ export class EmailQueueProcessor { await this.processRecipients(campaign, users); } - @Process('send-automation-email') + @Process(JOB_NAMES.SEND_AUTOMATION_EMAIL) async handleAutomationEmail( job: Job<{ actionId: string; @@ -97,7 +98,7 @@ export class EmailQueueProcessor { }); } - @Process('resume-campaign') + @Process(JOB_NAMES.RESUME_CAMPAIGN) async handleResumeCampaign(job: Job<{ campaignId: string }>) { this.logger.log(`Resuming campaign: ${job.data.campaignId}`); diff --git a/src/gateways/messaging.gateway.ts b/src/gateways/messaging.gateway.ts index 39b84845..31f04e3f 100644 --- a/src/gateways/messaging.gateway.ts +++ b/src/gateways/messaging.gateway.ts @@ -12,6 +12,7 @@ import { WsJwtAuthGuard } from '../auth/guards/ws-jwt-auth.guard'; import { WsThrottlerGuard } from '../common/guards/ws-throttler.guard'; import { wsManager } from '../common/utils/websocket.utils'; import { JwtService } from '@nestjs/jwt'; +import { MESSAGING_GATEWAY_EVENTS } from '../collaboration/constants/collaboration-events.constants'; @WebSocketGateway({ namespace: '/messaging' }) @UseGuards(WsThrottlerGuard) @@ -48,7 +49,7 @@ export class MessagingGateway implements OnGatewayConnection, OnGatewayDisconnec } @UseGuards(WsJwtAuthGuard) - @SubscribeMessage('send_message') + @SubscribeMessage(MESSAGING_GATEWAY_EVENTS.SEND_MESSAGE) async handleMessage(@MessageBody() data: any, @ConnectedSocket() client: Socket) { const user = (client as any).user; return { userId: user.sub, message: data }; diff --git a/src/gateways/notifications.gateway.ts b/src/gateways/notifications.gateway.ts index ec5f494b..ffb912f3 100644 --- a/src/gateways/notifications.gateway.ts +++ b/src/gateways/notifications.gateway.ts @@ -11,6 +11,7 @@ import { WsJwtAuthGuard } from '../auth/guards/ws-jwt-auth.guard'; import { WsThrottlerGuard } from '../common/guards/ws-throttler.guard'; import { wsManager } from '../common/utils/websocket.utils'; import { JwtService } from '@nestjs/jwt'; +import { NOTIFICATION_GATEWAY_EVENTS } from '../collaboration/constants/collaboration-events.constants'; @WebSocketGateway({ namespace: '/notifications' }) @UseGuards(WsThrottlerGuard) @@ -48,7 +49,7 @@ export class NotificationsGateway implements OnGatewayConnection, OnGatewayDisco } @UseGuards(WsJwtAuthGuard) - @SubscribeMessage('subscribe_notifications') + @SubscribeMessage(NOTIFICATION_GATEWAY_EVENTS.SUBSCRIBE_NOTIFICATIONS) async handleSubscribe(@ConnectedSocket() client: Socket) { const user = (client as any).user; return { userId: user.sub, subscribed: true }; diff --git a/src/media/media.module.ts b/src/media/media.module.ts index c131f0d9..ba34cbe1 100644 --- a/src/media/media.module.ts +++ b/src/media/media.module.ts @@ -1,6 +1,7 @@ import { Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; import { BullModule } from '@nestjs/bull'; +import { QUEUE_NAMES } from '../common/constants/queue.constants'; import { MediaController } from './media.controller'; import { MediaService } from './media.service'; import { FileStorageService } from './storage/file-storage.service'; @@ -16,7 +17,7 @@ import { VideoProcessor } from './processing/video.processor'; @Module({ imports: [ TypeOrmModule.forFeature([ContentMetadata]), - BullModule.registerQueue({ name: 'media-processing' }), + BullModule.registerQueue({ name: QUEUE_NAMES.MEDIA_PROCESSING }), ], controllers: [MediaController], providers: [ diff --git a/src/media/processing/video-processing.service.ts b/src/media/processing/video-processing.service.ts index f7cfba80..ef3cf2eb 100644 --- a/src/media/processing/video-processing.service.ts +++ b/src/media/processing/video-processing.service.ts @@ -1,17 +1,18 @@ import { Injectable, Logger } from '@nestjs/common'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; +import { QUEUE_NAMES, JOB_NAMES } from '../../common/constants/queue.constants'; import { ContentMetadata } from '../../cdn/entities/content-metadata.entity'; @Injectable() export class VideoProcessingService { private readonly logger = new Logger(VideoProcessingService.name); - constructor(@InjectQueue('media-processing') private readonly queue: Queue) {} + constructor(@InjectQueue(QUEUE_NAMES.MEDIA_PROCESSING) private readonly queue: Queue) {} async enqueueTranscode(content: ContentMetadata) { await this.queue.add( - 'transcode-video', + JOB_NAMES.TRANSCODE_VIDEO, { contentId: content.contentId, url: content.cdnUrl, diff --git a/src/media/processing/video.processor.ts b/src/media/processing/video.processor.ts index 54079211..f3d570f6 100644 --- a/src/media/processing/video.processor.ts +++ b/src/media/processing/video.processor.ts @@ -1,4 +1,5 @@ import { Processor, Process, OnQueueFailed, OnQueueCompleted } from '@nestjs/bull'; +import { QUEUE_NAMES, JOB_NAMES } from '../../common/constants/queue.constants'; import { Job } from 'bull'; import { Logger } from '@nestjs/common'; import * as fs from 'fs'; @@ -11,7 +12,7 @@ import { Repository } from 'typeorm'; import { UploadedFile } from '../../common/types/file.types'; import { ContentMetadata } from '../../cdn/entities/content-metadata.entity'; -@Processor('media-processing') +@Processor(QUEUE_NAMES.MEDIA_PROCESSING) export class VideoProcessor { private readonly logger = new Logger(VideoProcessor.name); @@ -21,7 +22,7 @@ export class VideoProcessor { private readonly contentRepo: Repository, ) {} - @Process('transcode-video') + @Process(JOB_NAMES.TRANSCODE_VIDEO) async handleTranscode(job: Job) { const { contentId, url, fileName } = job.data as { contentId: string; diff --git a/src/messaging/messaging.module.ts b/src/messaging/messaging.module.ts index 3cff0671..88b407e1 100644 --- a/src/messaging/messaging.module.ts +++ b/src/messaging/messaging.module.ts @@ -1,5 +1,6 @@ import { Module } from '@nestjs/common'; import { BullModule } from '@nestjs/bull'; +import { QUEUE_NAMES } from '../common/constants/queue.constants'; import { EventEmitterModule } from '@nestjs/event-emitter'; import { MessagingService } from './messaging.service'; import { EventBusService } from './event-bus/event-bus.service'; @@ -18,7 +19,7 @@ import { createBullRedisClient } from '../common/utils/bull-redis.util'; createClient: createBullRedisClient, }), BullModule.registerQueue({ - name: 'message-queue', + name: QUEUE_NAMES.MESSAGE_QUEUE, }), EventEmitterModule.forRoot(), ], diff --git a/src/messaging/messaging.service.ts b/src/messaging/messaging.service.ts index da953cb8..291a50e0 100644 --- a/src/messaging/messaging.service.ts +++ b/src/messaging/messaging.service.ts @@ -1,6 +1,7 @@ import { Injectable, Logger } from '@nestjs/common'; import { InjectQueue } from '@nestjs/bull'; import { Queue, Job } from 'bull'; +import { QUEUE_NAMES } from '../common/constants/queue.constants'; import { TracingService } from './tracing/tracing.service'; @Injectable() @@ -8,7 +9,7 @@ export class MessagingService { private readonly logger = new Logger(MessagingService.name); constructor( - @InjectQueue('message-queue') + @InjectQueue(QUEUE_NAMES.MESSAGE_QUEUE) private readonly messageQueue: Queue, private readonly tracingService: TracingService, ) {} diff --git a/src/notifications/email/email.processor.ts b/src/notifications/email/email.processor.ts index 67c0e809..9d3e1715 100644 --- a/src/notifications/email/email.processor.ts +++ b/src/notifications/email/email.processor.ts @@ -2,14 +2,15 @@ import { Process, Processor } from '@nestjs/bull'; import { Logger } from '@nestjs/common'; import { Job } from 'bull'; import { EmailService, EmailOptions } from './email.service'; +import { QUEUE_NAMES, JOB_NAMES } from '../../common/constants/queue.constants'; -@Processor('email') +@Processor(QUEUE_NAMES.EMAIL) export class EmailProcessor { private readonly logger = new Logger(EmailProcessor.name); constructor(private readonly emailService: EmailService) {} - @Process('send-email') + @Process(JOB_NAMES.SEND_EMAIL) async handleSendEmail(job: Job) { this.logger.log(`Processing email job ${job.id} for ${job.data.to}`); diff --git a/src/notifications/email/email.service.ts b/src/notifications/email/email.service.ts index 717e5021..e34bae0f 100644 --- a/src/notifications/email/email.service.ts +++ b/src/notifications/email/email.service.ts @@ -6,6 +6,7 @@ import * as fs from 'fs'; import * as path from 'path'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; +import { QUEUE_NAMES, JOB_NAMES } from '../../common/constants/queue.constants'; export interface EmailOptions { to: string; @@ -22,7 +23,7 @@ export class EmailService { constructor( private readonly configService: ConfigService, - @InjectQueue('email') private readonly emailQueue: Queue, + @InjectQueue(QUEUE_NAMES.EMAIL) private readonly emailQueue: Queue, ) { this.initializeTransporter(); } @@ -43,7 +44,7 @@ export class EmailService { const appUrl = this.configService.get('APP_URL') || 'http://localhost:3000'; await this.emailQueue.add( - 'send-email', + JOB_NAMES.SEND_EMAIL, { to: email, subject: 'Verify Your Email - TeachLink', @@ -69,7 +70,7 @@ export class EmailService { const appUrl = this.configService.get('APP_URL') || 'http://localhost:3000'; await this.emailQueue.add( - 'send-email', + JOB_NAMES.SEND_EMAIL, { to: email, subject: 'Reset Your Password - TeachLink', diff --git a/src/notifications/notifications.gateway.ts b/src/notifications/notifications.gateway.ts index 5eba6f7a..b17e6a71 100644 --- a/src/notifications/notifications.gateway.ts +++ b/src/notifications/notifications.gateway.ts @@ -13,6 +13,7 @@ import { WsJwtAuthGuard } from '../auth/guards/ws-jwt-auth.guard'; import { Notification } from './entities/notification.entity'; import { wsManager } from '../common/utils/websocket.utils'; import { WsThrottlerGuard } from '../common/guards/ws-throttler.guard'; +import { NOTIFICATION_GATEWAY_EVENTS } from '../collaboration/constants/collaboration-events.constants'; @WebSocketGateway({ cors: { @@ -48,7 +49,7 @@ export class NotificationsGateway implements OnGatewayConnection, OnGatewayDisco } @UseGuards(WsJwtAuthGuard) - @SubscribeMessage('subscribe') + @SubscribeMessage(NOTIFICATION_GATEWAY_EVENTS.SUBSCRIBE) async handleSubscribe( @ConnectedSocket() client: Socket, @MessageBody() data: { userId: string }, @@ -77,7 +78,7 @@ export class NotificationsGateway implements OnGatewayConnection, OnGatewayDisco * Send notification to a specific user in real-time */ async sendToUser(userId: string, notification: Notification) { - this.server.to(`user:${userId}`).emit('notification', notification); + this.server.to(`user:${userId}`).emit(NOTIFICATION_GATEWAY_EVENTS.NOTIFICATION, notification); this.logger.debug(`Notification sent to user:${userId}`); } @@ -85,7 +86,7 @@ export class NotificationsGateway implements OnGatewayConnection, OnGatewayDisco * Broadcast notification to all users */ async broadcast(notification: Partial) { - this.server.emit('broadcast_notification', notification); + this.server.emit(NOTIFICATION_GATEWAY_EVENTS.BROADCAST_NOTIFICATION, notification); this.logger.debug('Broadcast notification sent'); } diff --git a/src/notifications/notifications.module.ts b/src/notifications/notifications.module.ts index 6ec8dd7b..cd19b321 100644 --- a/src/notifications/notifications.module.ts +++ b/src/notifications/notifications.module.ts @@ -1,6 +1,7 @@ import { Module, Global } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; import { BullModule } from '@nestjs/bull'; +import { QUEUE_NAMES } from '../common/constants/queue.constants'; import { ConfigModule } from '@nestjs/config'; import { JwtModule } from '@nestjs/jwt'; import { NotificationsService } from './notifications.service'; @@ -23,7 +24,7 @@ import { NotificationPreferences } from './entities/notification-preferences.ent signOptions: { expiresIn: '24h' }, }), BullModule.registerQueue({ - name: 'email', + name: QUEUE_NAMES.EMAIL, }), ], controllers: [NotificationsController], diff --git a/src/notifications/notifications.service.ts b/src/notifications/notifications.service.ts index 3fe5a128..a38c210a 100644 --- a/src/notifications/notifications.service.ts +++ b/src/notifications/notifications.service.ts @@ -2,6 +2,7 @@ import { Injectable, Logger, NotFoundException } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { OnEvent } from '@nestjs/event-emitter'; +import { APP_EVENTS } from '../common/constants/event.constants'; import { Notification, NotificationType, @@ -227,7 +228,7 @@ export class NotificationsService { /** * Event listener for system-wide notifications */ - @OnEvent('notification.send') + @OnEvent(APP_EVENTS.NOTIFICATION_SEND) async handleSendNotification(payload: CreateNotificationDto): Promise { await this.create(payload); } @@ -235,7 +236,7 @@ export class NotificationsService { /** * Event listener for specific templates */ - @OnEvent('notification.template.send') + @OnEvent(APP_EVENTS.NOTIFICATION_TEMPLATE_SEND) async handleSendTemplateNotification(payload: { userId: string; templateType: string; diff --git a/src/payments/payments.module.ts b/src/payments/payments.module.ts index ab70d8e7..d1da1112 100644 --- a/src/payments/payments.module.ts +++ b/src/payments/payments.module.ts @@ -1,6 +1,7 @@ import { Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; import { BullModule } from '@nestjs/bull'; +import { QUEUE_NAMES } from '../common/constants/queue.constants'; import { PaymentsService } from './payments.service'; import { PaymentsController } from './payments.controller'; import { WebhookController } from './webhooks/webhook.controller'; @@ -27,10 +28,10 @@ import { TransactionHelperService } from '../common/database/transaction-helper. TypeOrmModule.forFeature([Payment, Subscription, Invoice, Refund, User, WebhookRetry]), BullModule.registerQueue( { - name: 'subscriptions', + name: QUEUE_NAMES.SUBSCRIPTIONS, }, { - name: 'webhooks', + name: QUEUE_NAMES.WEBHOOKS, }, ), UsersModule, diff --git a/src/payments/subscriptions/subscription-job.processor.ts b/src/payments/subscriptions/subscription-job.processor.ts index 103590ea..96972a05 100644 --- a/src/payments/subscriptions/subscription-job.processor.ts +++ b/src/payments/subscriptions/subscription-job.processor.ts @@ -1,11 +1,12 @@ import { Processor, Process } from '@nestjs/bull'; import { Job } from 'bull'; import { Logger } from '@nestjs/common'; +import { QUEUE_NAMES, JOB_NAMES } from '../../common/constants/queue.constants'; -@Processor('subscriptions') +@Processor(QUEUE_NAMES.SUBSCRIPTIONS) export class SubscriptionJobProcessor { private readonly logger = new Logger(SubscriptionJobProcessor.name); - @Process('process_subscription') + @Process(JOB_NAMES.PROCESS_SUBSCRIPTION) async handleSubscription(job: Job) { // Process subscription job this.logger.log('Processing subscription job:', job.data); diff --git a/src/payments/webhooks/webhook-queue.service.ts b/src/payments/webhooks/webhook-queue.service.ts index 02a79871..2f4fc953 100644 --- a/src/payments/webhooks/webhook-queue.service.ts +++ b/src/payments/webhooks/webhook-queue.service.ts @@ -1,6 +1,7 @@ import { Injectable, Logger } from '@nestjs/common'; import { Queue } from 'bull'; import { InjectQueue } from '@nestjs/bull'; +import { QUEUE_NAMES, JOB_NAMES } from '../../common/constants/queue.constants'; import { Repository } from 'typeorm'; import { InjectRepository } from '@nestjs/typeorm'; import { WebhookRetry, WebhookStatus, WebhookProvider } from './entities/webhook-retry.entity'; @@ -19,7 +20,7 @@ export class WebhookQueueService { private readonly logger = new Logger(WebhookQueueService.name); constructor( - @InjectQueue('webhooks') + @InjectQueue(QUEUE_NAMES.WEBHOOKS) private readonly webhookQueue: Queue, @InjectRepository(WebhookRetry) private readonly webhookRetryRepository: Repository, @@ -65,7 +66,7 @@ export class WebhookQueueService { } // Queue the job for processing - const job = await this.webhookQueue.add('process-webhook', payload, { + const job = await this.webhookQueue.add(JOB_NAMES.PROCESS_WEBHOOK, payload, { attempts: 1, // Let our processor handle retries backoff: { type: 'exponential', diff --git a/src/payments/webhooks/webhook-retry.processor.ts b/src/payments/webhooks/webhook-retry.processor.ts index 68cb4531..ee33e1f7 100644 --- a/src/payments/webhooks/webhook-retry.processor.ts +++ b/src/payments/webhooks/webhook-retry.processor.ts @@ -1,6 +1,7 @@ import { Process, Processor } from '@nestjs/bull'; import { Job } from 'bull'; import { Injectable, Logger } from '@nestjs/common'; +import { QUEUE_NAMES, JOB_NAMES } from '../../common/constants/queue.constants'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { WebhookRetry, WebhookStatus, WebhookProvider } from './entities/webhook-retry.entity'; @@ -45,7 +46,7 @@ interface WebhookJobData { } @Injectable() -@Processor('webhooks') +@Processor(QUEUE_NAMES.WEBHOOKS) export class WebhookRetryProcessor { private readonly logger = new Logger(WebhookRetryProcessor.name); @@ -61,7 +62,7 @@ export class WebhookRetryProcessor { private readonly paymentsService: PaymentsService, ) {} - @Process('process-webhook') + @Process(JOB_NAMES.PROCESS_WEBHOOK) async processWebhook(job: Job) { const { webhookRetryId, provider, payload, signature, externalEventId, headers } = job.data; diff --git a/src/queues/monitoring/queue-monitoring.service.ts b/src/queues/monitoring/queue-monitoring.service.ts index 29bc51c0..a79e30b0 100644 --- a/src/queues/monitoring/queue-monitoring.service.ts +++ b/src/queues/monitoring/queue-monitoring.service.ts @@ -1,6 +1,7 @@ import { Injectable, Logger } from '@nestjs/common'; import { InjectQueue } from '@nestjs/bull'; import { Queue, Job } from 'bull'; +import { QUEUE_NAMES } from '../../common/constants/queue.constants'; import { Cron, CronExpression } from '@nestjs/schedule'; import { QueueMetrics } from '../interfaces/queue.interfaces'; @@ -14,7 +15,7 @@ export class QueueMonitoringService { private metricsHistory: Map = new Map(); private readonly MAX_HISTORY_SIZE = 100; - constructor(@InjectQueue('default') private readonly defaultQueue: Queue) {} + constructor(@InjectQueue(QUEUE_NAMES.DEFAULT) private readonly defaultQueue: Queue) {} /** * Get current queue metrics diff --git a/src/queues/processors/default-queue.processor.ts b/src/queues/processors/default-queue.processor.ts index efd4d1f9..328db8bc 100644 --- a/src/queues/processors/default-queue.processor.ts +++ b/src/queues/processors/default-queue.processor.ts @@ -1,4 +1,5 @@ import { Processor, Process, OnQueueActive, OnQueueCompleted, OnQueueFailed } from '@nestjs/bull'; +import { QUEUE_NAMES } from '../../common/constants/queue.constants'; import { Logger } from '@nestjs/common'; import { Job } from 'bull'; import { RetryLogicService } from '../retry/retry-logic.service'; @@ -8,7 +9,7 @@ import { sanitizeEmail, sanitizePii } from '../../common/utils/pii-sanitizer.uti * Default Queue Processor * Processes jobs from the default queue */ -@Processor('default') +@Processor(QUEUE_NAMES.DEFAULT) export class DefaultQueueProcessor { private readonly logger = new Logger(DefaultQueueProcessor.name); diff --git a/src/queues/queue.module.ts b/src/queues/queue.module.ts index c155fc89..ae4cf0cf 100644 --- a/src/queues/queue.module.ts +++ b/src/queues/queue.module.ts @@ -1,5 +1,6 @@ import { Module } from '@nestjs/common'; import { BullModule } from '@nestjs/bull'; +import { QUEUE_NAMES } from '../common/constants/queue.constants'; import { ScheduleModule } from '@nestjs/schedule'; import { QueueService } from './queue.service'; import { QueueController } from './queue.controller'; @@ -16,7 +17,7 @@ import { DefaultQueueProcessor } from './processors/default-queue.processor'; @Module({ imports: [ BullModule.registerQueue({ - name: 'default', + name: QUEUE_NAMES.DEFAULT, defaultJobOptions: { attempts: 3, backoff: { diff --git a/src/queues/queue.service.ts b/src/queues/queue.service.ts index 35f413a1..4b16f0e1 100644 --- a/src/queues/queue.service.ts +++ b/src/queues/queue.service.ts @@ -1,6 +1,7 @@ import { Injectable, Logger } from '@nestjs/common'; import { InjectQueue } from '@nestjs/bull'; import { Queue, Job } from 'bull'; +import { QUEUE_NAMES } from '../common/constants/queue.constants'; import { JobOptions, JobMetrics } from './interfaces/queue.interfaces'; import { JobPriority, JobStatus } from './enums/job-priority.enum'; @@ -12,7 +13,7 @@ import { JobPriority, JobStatus } from './enums/job-priority.enum'; export class QueueService { private readonly logger = new Logger(QueueService.name); - constructor(@InjectQueue('default') private readonly defaultQueue: Queue) {} + constructor(@InjectQueue(QUEUE_NAMES.DEFAULT) private readonly defaultQueue: Queue) {} /** * Add a job to the queue with priority and options diff --git a/src/queues/scheduler/job-scheduler.service.ts b/src/queues/scheduler/job-scheduler.service.ts index ae82605f..84bb185b 100644 --- a/src/queues/scheduler/job-scheduler.service.ts +++ b/src/queues/scheduler/job-scheduler.service.ts @@ -1,6 +1,7 @@ import { Injectable, Logger } from '@nestjs/common'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; +import { QUEUE_NAMES } from '../../common/constants/queue.constants'; import { Cron, CronExpression, SchedulerRegistry } from '@nestjs/schedule'; import { CronJob } from 'cron'; import { JobOptions } from '../interfaces/queue.interfaces'; @@ -14,7 +15,7 @@ export class JobSchedulerService { private readonly logger = new Logger(JobSchedulerService.name); constructor( - @InjectQueue('default') private readonly defaultQueue: Queue, + @InjectQueue(QUEUE_NAMES.DEFAULT) private readonly defaultQueue: Queue, private readonly schedulerRegistry: SchedulerRegistry, ) {} diff --git a/src/sync/cache/cache-invalidation.service.ts b/src/sync/cache/cache-invalidation.service.ts index 6035fda1..90310b0f 100644 --- a/src/sync/cache/cache-invalidation.service.ts +++ b/src/sync/cache/cache-invalidation.service.ts @@ -2,6 +2,7 @@ import { Injectable, Logger, Inject } from '@nestjs/common'; import { CACHE_MANAGER } from '@nestjs/cache-manager'; import { Cache } from 'cache-manager'; import { EventEmitter2 } from '@nestjs/event-emitter'; +import { APP_EVENTS } from '../../common/constants/event.constants'; @Injectable() export class CacheInvalidationService { @@ -18,7 +19,7 @@ export class CacheInvalidationService { async invalidateKey(key: string): Promise { this.logger.log(`Invalidating cache key: ${key}`); await this.cacheManager.del(key); - this.eventEmitter.emit('cache.invalidated', { key, type: 'single' }); + this.eventEmitter.emit(APP_EVENTS.CACHE_INVALIDATED, { key, type: 'single' }); } /** @@ -31,7 +32,7 @@ export class CacheInvalidationService { // In a production environment with Redis, we'd use 'SCAN' and 'DEL' // For now, we'll emit an event that other specialized listeners might handle - this.eventEmitter.emit('cache.invalidated', { pattern, type: 'pattern' }); + this.eventEmitter.emit(APP_EVENTS.CACHE_INVALIDATED, { pattern, type: 'pattern' }); // If the store supports a store-specific method, call it here. const store: any = (this.cacheManager as any).store; @@ -67,6 +68,6 @@ export class CacheInvalidationService { } else if (typeof (this.cacheManager as any).reset === 'function') { await (this.cacheManager as any).reset(); } - this.eventEmitter.emit('cache.purged', { timestamp: new Date() }); + this.eventEmitter.emit(APP_EVENTS.CACHE_PURGED, { timestamp: new Date() }); } } diff --git a/src/sync/consistency/data-consistency.service.ts b/src/sync/consistency/data-consistency.service.ts index 27bb00cd..b5c3eb5f 100644 --- a/src/sync/consistency/data-consistency.service.ts +++ b/src/sync/consistency/data-consistency.service.ts @@ -2,6 +2,8 @@ import { Injectable, Logger } from '@nestjs/common'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; +import { QUEUE_NAMES, JOB_NAMES } from '../../common/constants/queue.constants'; +import { APP_EVENTS } from '../../common/constants/event.constants'; export interface IntegrityCheckResult { consistent: boolean; @@ -15,7 +17,7 @@ export class DataConsistencyService { constructor( private eventEmitter: EventEmitter2, - @InjectQueue('sync-tasks') private syncQueue: Queue, + @InjectQueue(QUEUE_NAMES.SYNC_TASKS) private syncQueue: Queue, ) {} /** @@ -26,7 +28,7 @@ export class DataConsistencyService { // Add to queue for background processing await this.syncQueue.add( - 'consistency-check', + JOB_NAMES.CONSISTENCY_CHECK, { dataId, payload, @@ -42,7 +44,7 @@ export class DataConsistencyService { ); // Emit event for real-time subscribers - this.eventEmitter.emit('data.consistency.scheduled', { dataId, timestamp: new Date() }); + this.eventEmitter.emit(APP_EVENTS.DATA_CONSISTENCY_SCHEDULED, { dataId, timestamp: new Date() }); } /** @@ -69,7 +71,7 @@ export class DataConsistencyService { if (!consistent) { this.logger.warn(`Integrity check failed with issues: ${issues.join(', ')}`); - this.eventEmitter.emit('data.integrity.violation', { issues, timestamp: new Date() }); + this.eventEmitter.emit(APP_EVENTS.DATA_INTEGRITY_VIOLATION, { issues, timestamp: new Date() }); } return { diff --git a/src/sync/replication/replication.service.ts b/src/sync/replication/replication.service.ts index a7394aff..57fc8cd0 100644 --- a/src/sync/replication/replication.service.ts +++ b/src/sync/replication/replication.service.ts @@ -2,6 +2,7 @@ import { Injectable, Logger } from '@nestjs/common'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; +import { QUEUE_NAMES, JOB_NAMES } from '../../common/constants/queue.constants'; export interface ReplicationEvent { entityId: string; @@ -18,7 +19,7 @@ export class ReplicationService { constructor( private eventEmitter: EventEmitter2, - @InjectQueue('sync-tasks') private syncQueue: Queue, + @InjectQueue(QUEUE_NAMES.SYNC_TASKS) private syncQueue: Queue, ) {} /** @@ -41,7 +42,7 @@ export class ReplicationService { }; // Add to queue for asynchronous replication - await this.syncQueue.add('replicate-data', event, { + await this.syncQueue.add(JOB_NAMES.REPLICATE_DATA, event, { attempts: 5, backoff: { type: 'exponential', diff --git a/src/sync/sync.module.ts b/src/sync/sync.module.ts index 31fea08d..21f09848 100644 --- a/src/sync/sync.module.ts +++ b/src/sync/sync.module.ts @@ -1,5 +1,6 @@ import { Module } from '@nestjs/common'; import { BullModule } from '@nestjs/bull'; +import { QUEUE_NAMES } from '../common/constants/queue.constants'; import { SyncService } from './sync.service'; import { DataConsistencyService } from './consistency/data-consistency.service'; import { ConflictResolutionService } from './conflicts/conflict-resolution.service'; @@ -9,7 +10,7 @@ import { ReplicationService } from './replication/replication.service'; @Module({ imports: [ BullModule.registerQueue({ - name: 'sync-tasks', + name: QUEUE_NAMES.SYNC_TASKS, }), ], providers: [ diff --git a/src/sync/sync.service.ts b/src/sync/sync.service.ts index 7b09a2d1..ee84e9cd 100644 --- a/src/sync/sync.service.ts +++ b/src/sync/sync.service.ts @@ -1,5 +1,6 @@ import { Injectable, Logger } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; +import { APP_EVENTS } from '../common/constants/event.constants'; import { ConflictResolutionService, ConflictResolutionStrategy, @@ -46,7 +47,7 @@ export class SyncService { return resolvedData; } - @OnEvent('data.updated') + @OnEvent(APP_EVENTS.DATA_UPDATED) async handleDataUpdate(payload: { entity: string; id: string; data: any }) { this.logger.log(`Handling data update event for ${payload.entity}:${payload.id}`); diff --git a/src/users/users.module.ts b/src/users/users.module.ts index 0372d320..cc2bf8f4 100644 --- a/src/users/users.module.ts +++ b/src/users/users.module.ts @@ -1,6 +1,7 @@ import { Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; import { BullModule } from '@nestjs/bull'; +import { QUEUE_NAMES } from '../common/constants/queue.constants'; import { UsersService } from './users.service'; import { UsersController } from './users.controller'; import { User } from './entities/user.entity'; @@ -16,7 +17,7 @@ import { @Module({ imports: [ TypeOrmModule.forFeature([User, Enrollment, UserExportHistory]), - BullModule.registerQueue({ name: 'user-data-export' }), + BullModule.registerQueue({ name: QUEUE_NAMES.USER_DATA_EXPORT }), ], controllers: [UsersController], providers: [UsersService, ExportService, UserDataExportProcessor, RolesGuard, JwtAuthGuard], diff --git a/src/users/users.service.ts b/src/users/users.service.ts index d08258c0..3fcc813a 100644 --- a/src/users/users.service.ts +++ b/src/users/users.service.ts @@ -47,7 +47,7 @@ export class UsersService { } async findAll(filter?: GetUsersDto): Promise> { - const cacheKey = `cache:users:list:${JSON.stringify(filter || {})}`; + const cacheKey = `${CACHE_PREFIXES.USERS_LIST}:${JSON.stringify(filter || {})}`; return this.cachingService.getOrSet( cacheKey,