diff --git a/EMAIL_NOTIFICATION_SERVICE_GUIDE.md b/EMAIL_NOTIFICATION_SERVICE_GUIDE.md new file mode 100644 index 0000000..baf952e --- /dev/null +++ b/EMAIL_NOTIFICATION_SERVICE_GUIDE.md @@ -0,0 +1,807 @@ +# Email Notification Service Guide + +## Overview + +The Email Notification Service provides a reliable, scalable system for dispatching transactional emails to users. It features provider abstraction (AWS SES/SendGrid), asynchronous processing with BullMQ, exponential backoff for rate limiting, and comprehensive template variable mapping for personalized communications. + +## Architecture + +### Core Components + +1. **BaseEmailProvider** (`services/emailProviders/BaseEmailProvider.js`) + - Abstract interface for email providers + - Standardized error handling and response formatting + - Email validation and normalization + - Rate limit detection and retry logic + +2. **SESProvider** (`services/emailProviders/SESProvider.js`) + - AWS Simple Email Service integration + - Template and simple email support + - Comprehensive SES API features + - Rate limit handling + +3. **SendGridProvider** (`services/emailProviders/SendGridProvider.js`) + - SendGrid API integration + - Dynamic template support + - Advanced features (validation, suppression) + - Rate limit handling + +4. **EmailQueueService** (`services/emailQueue.js`) + - BullMQ-based asynchronous processing + - Exponential backoff retry logic + - Rate limiting and concurrency control + - Job status tracking + +5. **NotificationService** (`services/notificationService.js`) + - Unified service with provider abstraction + - Template variable mapping system + - Provider switching and management + - Predefined templates + +6. **API Routes** (`routes/notifications.js`) + - RESTful endpoints for email management + - Queue monitoring and control + - Provider management + - Template management + +## Configuration + +### Environment Variables + +```bash +# Email Provider Configuration +DEFAULT_EMAIL_PROVIDER=ses # or 'sendgrid' + +# AWS SES Configuration +AWS_REGION=us-east-1 +AWS_ACCESS_KEY_ID=your-access-key +AWS_SECRET_ACCESS_KEY=your-secret-key +AWS_SESSION_TOKEN=your-session-token # Optional + +# SendGrid Configuration +SENDGRID_API_KEY=your-sendgrid-api-key + +# Redis Configuration (for BullMQ) +REDIS_HOST=localhost +REDIS_PORT=6379 +REDIS_PASSWORD=your-redis-password +REDIS_DB=0 + +# Queue Configuration +NOTIFICATION_QUEUE_NAME=notification-queue +NOTIFICATION_CONCURRENCY=5 +NOTIFICATION_RATE_LIMIT_MAX=100 +NOTIFICATION_RATE_LIMIT_DURATION=60000 + +# Default Email Configuration +DEFAULT_FROM_EMAIL=noreply@substream-protocol.com +``` + +### Service Configuration + +```javascript +const notificationService = new NotificationService({ + defaultProvider: 'ses', + ses: { + region: 'us-east-1', + accessKeyId: process.env.AWS_ACCESS_KEY_ID, + secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY + }, + sendgrid: { + apiKey: process.env.SENDGRID_API_KEY + }, + redis: { + host: process.env.REDIS_HOST, + port: process.env.REDIS_PORT, + password: process.env.REDIS_PASSWORD + }, + queueName: 'notification-queue', + concurrency: 5, + rateLimitMax: 100, + rateLimitDuration: 60000, + globalTemplateMappings: { + companyName: 'SubStream Protocol', + website: 'https://substream-protocol.com' + } +}); +``` + +## API Endpoints + +### Email Sending + +#### Send Template Email +``` +POST /api/v1/notifications/send +Authorization: Bearer + +{ + "to": "user@example.com", + "from": "noreply@example.com", + "subject": "Welcome to SubStream", + "templateId": "welcome-template", + "templateData": { + "name": "John Doe", + "plan": "premium" + }, + "provider": "ses", + "options": { + "attempts": 3, + "delay": 0, + "priority": 0 + } +} +``` + +**Response:** +```json +{ + "success": true, + "data": { + "jobId": "uuid-1234", + "queue": "notification-queue", + "provider": "ses", + "addedAt": "2024-01-15T10:00:00.000Z", + "message": "Email queued for processing" + } +} +``` + +#### Send Simple Email +``` +POST /api/v1/notifications/send-simple +Authorization: Bearer + +{ + "to": "user@example.com", + "from": "noreply@example.com", + "subject": "Simple Email", + "text": "This is a plain text email", + "html": "

This is an HTML email

" +} +``` + +#### Send Bulk Email +``` +POST /api/v1/notifications/send-bulk +Authorization: Bearer + +{ + "recipients": [ + { + "email": "user1@example.com", + "templateData": { "name": "User 1" } + }, + { + "email": "user2@example.com", + "templateData": { "name": "User 2" } + } + ], + "from": "noreply@example.com", + "subject": "Bulk Announcement", + "templateId": "announcement-template", + "templateData": { + "company": "SubStream Protocol" + } +} +``` + +#### Send Predefined Template +``` +POST /api/v1/notifications/send-template +Authorization: Bearer + +{ + "templateType": "welcome", + "to": "user@example.com", + "templateData": { + "name": "John Doe", + "plan": "premium" + } +} +``` + +### Job Management + +#### Get Job Status +``` +GET /api/v1/notifications/job/:jobId +Authorization: Bearer +``` + +**Response:** +```json +{ + "success": true, + "data": { + "jobId": "uuid-1234", + "state": "completed", + "progress": 100, + "createdAt": "2024-01-15T10:00:00.000Z", + "processedOn": "2024-01-15T10:00:05.000Z", + "finishedOn": "2024-01-15T10:00:05.000Z", + "returnvalue": { + "success": true, + "messageId": "aws-ses-message-id", + "provider": "SESProvider" + } + } +} +``` + +### Queue Management + +#### Get Queue Statistics +``` +GET /api/v1/notifications/queue/stats +Authorization: Bearer +``` + +**Response:** +```json +{ + "success": true, + "data": { + "queueName": "notification-queue", + "stats": { + "waiting": 5, + "active": 2, + "completed": 100, + "failed": 3, + "delayed": 1, + "total": 111 + } + } +} +``` + +#### Get Recent Jobs +``` +GET /api/v1/notifications/queue/jobs?state=completed&start=0&end=50 +Authorization: Bearer +``` + +#### Pause/Resume Queue +``` +POST /api/v1/notifications/queue/pause +POST /api/v1/notifications/queue/resume +Authorization: Bearer +``` + +#### Clear Queue +``` +POST /api/v1/notifications/queue/clear +Authorization: Bearer + +{ + "state": "waiting" +} +``` + +### Provider Management + +#### Get Available Providers +``` +GET /api/v1/notifications/providers +Authorization: Bearer +``` + +**Response:** +```json +{ + "success": true, + "data": { + "availableProviders": ["ses", "sendgrid"], + "defaultProvider": "ses", + "providerStats": { + "ses": { + "name": "SESProvider", + "service": "AWS SES", + "region": "us-east-1" + }, + "sendgrid": { + "name": "SendGridProvider", + "service": "SendGrid", + "baseUrl": "https://api.sendgrid.com/v3" + } + } + } +} +``` + +#### Switch Provider +``` +POST /api/v1/notifications/providers/switch +Authorization: Bearer + +{ + "provider": "sendgrid" +} +``` + +#### Test Provider Connection +``` +POST /api/v1/notifications/providers/test +Authorization: Bearer + +{ + "provider": "ses" +} +``` + +### Template Management + +#### Get Template Mappings +``` +GET /api/v1/notifications/templates +Authorization: Bearer +``` + +#### Add Template Mapping +``` +POST /api/v1/notifications/templates +Authorization: Bearer + +{ + "templateId": "custom-welcome", + "mapping": { + "defaultVariables": { + "appName": "My App", + "supportEmail": "support@example.com" + } + } +} +``` + +#### Remove Template Mapping +``` +DELETE /api/v1/notifications/templates/:templateId +Authorization: Bearer +``` + +### Monitoring + +#### Get Health Status +``` +GET /api/v1/notifications/health +``` + +**Response:** +```json +{ + "success": true, + "data": { + "status": "healthy", + "defaultProvider": "ses", + "availableProviders": ["ses", "sendgrid"], + "queue": { + "queueName": "notification-queue", + "provider": "ses", + "redisConnected": true, + "workerActive": true + }, + "providers": { ... }, + "templateMappings": 5 + } +} +``` + +#### Get Comprehensive Statistics +``` +GET /api/v1/notifications/stats +Authorization: Bearer +``` + +## Template System + +### Predefined Templates + +The service includes predefined templates for common use cases: + +#### Welcome Template +```javascript +{ + "templateId": "welcome", + "defaultVariables": { + "appName": "SubStream Protocol", + "supportEmail": "support@substream-protocol.com", + "currentYear": 2024 + } +} +``` + +#### Payment Failure Template +```javascript +{ + "templateId": "payment_failure", + "defaultVariables": { + "appName": "SubStream Protocol", + "supportEmail": "support@substream-protocol.com", + "billingUrl": "https://app.substream-protocol.com/billing" + } +} +``` + +#### Low Balance Warning Template +```javascript +{ + "templateId": "low_balance_warning", + "defaultVariables": { + "appName": "SubStream Protocol", + "supportEmail": "support@substream-protocol.com", + "addFundsUrl": "https://app.substream-protocol.com/wallet/add-funds" + } +} +``` + +#### Pre-Billing Health Check Template +```javascript +{ + "templateId": "pre_billing_warning", + "defaultVariables": { + "appName": "SubStream Protocol", + "supportEmail": "support@substream-protocol.com", + "warningDays": 3 + } +} +``` + +### Custom Templates + +You can create custom templates with dynamic variables: + +```javascript +notificationService.addTemplateMapping('custom_template', { + defaultVariables: { + companyName: 'Your Company', + website: 'https://yourcompany.com', + logoUrl: 'https://yourcompany.com/logo.png', + currentYear: () => new Date().getFullYear(), + timestamp: () => new Date().toISOString() + } +}); +``` + +### Template Variable Processing + +The system supports various variable types: + +- **Static Variables**: Simple key-value pairs +- **Function Variables**: Dynamic values generated at send time +- **Nested Objects**: Complex data structures +- **Global Mappings**: Variables applied to all templates +- **Template-Specific Mappings**: Variables for specific templates + +## Rate Limiting and Retry Logic + +### Exponential Backoff + +The system implements intelligent retry logic with exponential backoff: + +```javascript +{ + "attempts": 3, + "backoff": { + "type": "exponential", + "delay": 2000 // Start with 2 seconds + } +} +``` + +### Rate Limit Detection + +Each provider detects rate limit errors: + +#### AWS SES Rate Limits +- `ThrottlingException`: 30 seconds retry +- `TooManyRequestsException`: 60 seconds retry +- `SendingPausedException`: 60 seconds retry + +#### SendGrid Rate Limits +- HTTP 429: Uses `Retry-After` header or defaults to 60 seconds + +### Queue Rate Limiting + +The queue itself implements rate limiting: + +```javascript +{ + "limiter": { + "max": 100, // Max jobs per duration + "duration": 60000 // 1 minute + } +} +``` + +## Error Handling + +### Standardized Error Responses + +All errors follow a consistent format: + +```json +{ + "success": false, + "error": "Error message", + "code": "ERROR_CODE", + "isRateLimit": false, + "retryAfter": null, + "timestamp": "2024-01-15T10:00:00.000Z", + "context": { ... } +} +``` + +### Error Types + +1. **Validation Errors**: Missing required fields, invalid data +2. **Provider Errors**: AWS SES/SendGrid API errors +3. **Queue Errors**: BullMQ processing errors +4. **Rate Limit Errors**: Provider rate limits +5. **Configuration Errors**: Missing configuration + +## Monitoring and Observability + +### Metrics + +The system provides comprehensive metrics: + +- **Queue Metrics**: Waiting, active, completed, failed jobs +- **Provider Metrics**: Success rates, error rates, response times +- **Template Metrics**: Usage statistics, variable mapping +- **Health Metrics**: Service status, connection status + +### Logging + +All operations are logged with appropriate levels: + +- **INFO**: Successful operations, queue status +- **WARN**: Rate limits, retries, degraded performance +- **ERROR**: Failed operations, provider errors +- **DEBUG**: Detailed processing information + +### Health Checks + +Multiple health check endpoints: + +- `/api/v1/notifications/health`: Service health +- `/api/v1/notifications/providers/test`: Provider connections +- `/api/v1/notifications/queue/stats`: Queue status + +## Security Considerations + +### API Security + +- **Authentication**: JWT token required for all endpoints +- **Authorization**: Role-based access control +- **Input Validation**: All inputs validated and sanitized +- **Rate Limiting**: API-level rate limiting + +### Data Protection + +- **Sensitive Data**: API keys and secrets stored in environment variables +- **Template Data**: User data processed securely +- **Audit Trail**: All email operations logged +- **Data Retention**: Configurable job retention policies + +### Provider Security + +- **AWS SES**: IAM roles with least privilege +- **SendGrid**: API key restrictions and IP allowlists +- **Redis**: Authentication and TLS encryption +- **HTTPS**: All external communications encrypted + +## Performance Optimization + +### Queue Optimization + +- **Concurrency**: Configurable worker concurrency +- **Batching**: Bulk email processing +- **Prioritization**: Job priority levels +- **Memory Management**: Efficient job processing + +### Provider Optimization + +- **Connection Pooling**: Reuse provider connections +- **Caching**: Template and configuration caching +- **Timeouts**: Configurable request timeouts +- **Retry Logic**: Intelligent retry strategies + +### Database Optimization + +- **Indexes**: Optimized database queries +- **Connection Pooling**: Database connection reuse +- **Query Optimization**: Efficient data retrieval +- **Cleanup**: Automatic cleanup of old data + +## Testing + +### Unit Tests + +```bash +# Run all notification tests +npm test notificationService.test.js + +# Run provider-specific tests +npm test -- --testNamePattern="SES Provider" +npm test -- --testNamePattern="SendGrid Provider" +``` + +### Integration Tests + +```bash +# Run queue integration tests +npm test -- --testNamePattern="Email Queue" + +# Run API endpoint tests +npm test -- --testNamePattern="API Endpoints" +``` + +### Acceptance Tests + +The acceptance criteria are tested with: + +```bash +npm test -- --testNamePattern="Acceptance Criteria" +``` + +### Mock Testing + +All providers are mocked for testing: + +```javascript +// Mock AWS SES +jest.mock('aws-sdk'); + +// Mock SendGrid +jest.mock('axios'); + +// Mock BullMQ +jest.mock('bullmq'); +``` + +## Deployment + +### Production Deployment + +1. **Environment Setup** + ```bash + export NODE_ENV=production + export DEFAULT_EMAIL_PROVIDER=ses + export AWS_REGION=us-east-1 + export REDIS_HOST=your-redis-host + ``` + +2. **Service Initialization** + ```javascript + const notificationService = new NotificationService({ + defaultProvider: process.env.DEFAULT_EMAIL_PROVIDER, + concurrency: parseInt(process.env.NOTIFICATION_CONCURRENCY) || 10 + }); + ``` + +3. **Queue Worker** + ```bash + # Start queue worker + node workers/notificationWorker.js + ``` + +### Docker Deployment + +```dockerfile +FROM node:18-alpine + +# Install dependencies +COPY package*.json ./ +RUN npm ci --only=production + +# Copy application code +COPY . . + +# Set environment variables +ENV NODE_ENV=production +ENV DEFAULT_EMAIL_PROVIDER=ses + +# Start application +CMD ["node", "index.js"] +``` + +### Kubernetes Deployment + +```yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: notification-service +spec: + replicas: 3 + selector: + matchLabels: + app: notification-service + template: + metadata: + labels: + app: notification-service + spec: + containers: + - name: notification-service + image: substream/notification-service:latest + env: + - name: DEFAULT_EMAIL_PROVIDER + value: "ses" + - name: REDIS_HOST + value: "redis-service" +``` + +## Troubleshooting + +### Common Issues + +1. **Email Not Sending** + - Check provider configuration + - Verify API keys and credentials + - Check queue status and job details + +2. **Rate Limit Errors** + - Monitor rate limit headers + - Adjust queue concurrency + - Implement provider switching + +3. **Queue Processing Issues** + - Check Redis connection + - Verify worker status + - Monitor queue statistics + +4. **Template Issues** + - Validate template variables + - Check template mappings + - Verify provider template existence + +### Debug Mode + +Enable debug logging: + +```bash +DEBUG=notification:* node index.js +``` + +### Log Analysis + +```bash +# View notification logs +tail -f logs/notification.log + +# Filter errors +grep "ERROR" logs/notification.log + +# Monitor queue status +grep "queue" logs/notification.log +``` + +## Future Enhancements + +1. **Advanced Templates** + - Visual template editor + - A/B testing support + - Template versioning + +2. **Multi-Channel Support** + - SMS notifications + - Push notifications + - In-app notifications + +3. **Advanced Analytics** + - Open tracking + - Click tracking + - Engagement analytics + +4. **Enhanced Security** + - Email encryption + - Advanced authentication + - Compliance features + +5. **Performance Features** + - Distributed processing + - Load balancing + - Auto-scaling + +This comprehensive email notification service provides enterprise-grade functionality with provider abstraction, reliable queue processing, and extensive customization options for all transactional email needs. diff --git a/index.js b/index.js index 6e48006..5b2a929 100644 --- a/index.js +++ b/index.js @@ -829,6 +829,7 @@ app.use("/auth", require("./routes/auth")); app.use("/auth", require("./routes/stellarAuth")); app.use("/api/v1/merchants", require("./routes/merchants")); app.use("/api/v1/pre-billing", require("./routes/preBilling")); +app.use("/api/v1/notifications", require("./routes/notifications")); app.use("/content", require("./routes/content")); app.use("/analytics", require("./routes/analytics")); app.use("/storage", require("./routes/storage")); diff --git a/notificationService.test.js b/notificationService.test.js new file mode 100644 index 0000000..9e6e97e --- /dev/null +++ b/notificationService.test.js @@ -0,0 +1,965 @@ +const NotificationService = require('./services/notificationService'); +const EmailQueueService = require('./services/emailQueue'); +const SESProvider = require('./services/emailProviders/SESProvider'); +const SendGridProvider = require('./services/emailProviders/SendGridProvider'); + +// Mock BullMQ +jest.mock('bullmq', () => { + return { + Queue: jest.fn().mockImplementation(() => ({ + add: jest.fn().mockResolvedValue({ + id: 'mock-job-id', + data: { type: 'sendEmail', data: {} }, + opts: {} + }), + getJob: jest.fn().mockResolvedValue({ + id: 'mock-job-id', + getState: jest.fn().mockResolvedValue('completed'), + progress: 100, + data: { type: 'sendEmail', data: {} }, + timestamp: Date.now(), + processedOn: Date.now(), + finishedOn: Date.now(), + returnvalue: { success: true, messageId: 'mock-message-id' } + }), + getJobs: jest.fn().mockResolvedValue([]), + getWaiting: jest.fn().mockResolvedValue([]), + getActive: jest.fn().mockResolvedValue([]), + getCompleted: jest.fn().mockResolvedValue([]), + getFailed: jest.fn().mockResolvedValue([]), + getDelayed: jest.fn().mockResolvedValue([]), + clean: jest.fn().mockResolvedValue(), + pause: jest.fn().mockResolvedValue(), + resume: jest.fn().mockResolvedValue(), + close: jest.fn().mockResolvedValue(), + on: jest.fn() + })), + Worker: jest.fn().mockImplementation(() => ({ + close: jest.fn().mockResolvedValue(), + on: jest.fn() + })) + }; +}); + +// Mock Redis +jest.mock('ioredis', () => { + return jest.fn().mockImplementation(() => ({ + status: 'ready', + quit: jest.fn().mockResolvedValue() + })); +}); + +// Mock AWS SDK +jest.mock('aws-sdk', () => ({ + config: { + update: jest.fn() + }, + SES: jest.fn().mockImplementation(() => ({ + sendTemplatedEmail: jest.fn().mockReturnValue({ + promise: jest.fn().mockResolvedValue({ + MessageId: 'aws-ses-message-id' + }) + }), + sendEmail: jest.fn().mockReturnValue({ + promise: jest.fn().mockResolvedValue({ + MessageId: 'aws-ses-message-id' + }) + }), + getTemplate: jest.fn().mockReturnValue({ + promise: jest.fn().mockResolvedValue({ + Template: { + TemplateName: 'test-template', + SubjectPart: 'Test Subject', + TextPart: 'Test Text', + HtmlPart: 'Test HTML', + CreatedAt: new Date().toISOString() + } + }) + }), + getSendQuota: jest.fn().mockReturnValue({ + promise: jest.fn().mockResolvedValue({ + Max24HourSend: 1000, + MaxSendRate: 10, + SentLast24Hours: 100 + }) + }) + })) +})); + +// Mock Axios for SendGrid +jest.mock('axios', () => { + return jest.fn().mockImplementation(() => ({ + post: jest.fn().mockResolvedValue({ + headers: { + 'x-message-id': 'sendgrid-message-id', + 'x-request-id': 'sendgrid-request-id' + }, + data: {} + }), + get: jest.fn().mockResolvedValue({ + data: { + username: 'test-user', + email: 'test@example.com', + reputation: 95 + } + }), + interceptors: { + response: { + use: jest.fn() + } + } + })); +}); + +describe('Email Notification Service', () => { + let notificationService; + let mockConfig; + + beforeEach(() => { + mockConfig = { + defaultProvider: 'ses', + ses: { + region: 'us-east-1', + accessKeyId: 'test-key', + secretAccessKey: 'test-secret' + }, + sendgrid: { + apiKey: 'test-api-key' + }, + redis: { + host: 'localhost', + port: 6379 + } + }; + + notificationService = new NotificationService(mockConfig); + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + describe('Service Initialization', () => { + it('should initialize with both providers', () => { + expect(notificationService.getAvailableProviders()).toContain('ses'); + expect(notificationService.getAvailableProviders()).toContain('sendgrid'); + expect(notificationService.defaultProvider).toBe('ses'); + }); + + it('should throw error when no providers configured', () => { + expect(() => { + new NotificationService({}); + }).toThrow('No email providers configured'); + }); + + it('should initialize queue service', () => { + expect(notificationService.queueService).toBeDefined(); + expect(notificationService.queueService.queueName).toBe('notification-queue'); + }); + }); + + describe('Email Sending', () => { + it('should queue email with default provider', async () => { + const emailData = { + to: 'test@example.com', + from: 'noreply@example.com', + subject: 'Test Email', + templateId: 'test-template', + templateData: { name: 'John' } + }; + + const result = await notificationService.sendEmail(emailData); + + expect(result.success).toBe(true); + expect(result.jobId).toBeDefined(); + expect(result.provider).toBe('ses'); + expect(result.message).toBe('Email queued for processing'); + }); + + it('should queue email with specified provider', async () => { + const emailData = { + to: 'test@example.com', + from: 'noreply@example.com', + subject: 'Test Email', + templateId: 'test-template', + templateData: { name: 'John' } + }; + + const result = await notificationService.sendEmail(emailData, { provider: 'sendgrid' }); + + expect(result.success).toBe(true); + expect(result.provider).toBe('sendgrid'); + }); + + it('should throw error for invalid provider', async () => { + const emailData = { + to: 'test@example.com', + from: 'noreply@example.com', + subject: 'Test Email', + templateId: 'test-template', + templateData: { name: 'John' } + }; + + await expect(notificationService.sendEmail(emailData, { provider: 'invalid' })) + .rejects.toThrow('Email provider not found: invalid'); + }); + + it('should queue simple email', async () => { + const emailData = { + to: 'test@example.com', + from: 'noreply@example.com', + subject: 'Test Simple Email', + text: 'This is a test email', + html: '

This is a test email

' + }; + + const result = await notificationService.sendSimpleEmail(emailData); + + expect(result.success).toBe(true); + expect(result.jobId).toBeDefined(); + expect(result.message).toBe('Simple email queued for processing'); + }); + + it('should queue bulk email', async () => { + const bulkData = { + recipients: [ + { email: 'user1@example.com', templateData: { name: 'User 1' } }, + { email: 'user2@example.com', templateData: { name: 'User 2' } } + ], + from: 'noreply@example.com', + subject: 'Bulk Email', + templateId: 'bulk-template', + templateData: { company: 'Test Corp' } + }; + + const result = await notificationService.sendBulkEmail(bulkData); + + expect(result.success).toBe(true); + expect(result.jobId).toBeDefined(); + expect(result.totalRecipients).toBe(2); + expect(result.message).toBe('Bulk email queued for processing'); + }); + }); + + describe('Template Variable Mapping', () => { + it('should apply template mappings', () => { + notificationService.addTemplateMapping('welcome', { + defaultVariables: { + appName: 'Test App', + supportEmail: 'support@test.com' + } + }); + + const mapping = notificationService.getTemplateMapping('welcome'); + expect(mapping.defaultVariables.appName).toBe('Test App'); + expect(mapping.defaultVariables.supportEmail).toBe('support@test.com'); + }); + + it('should remove template mapping', () => { + notificationService.addTemplateMapping('test', { test: 'value' }); + expect(notificationService.getTemplateMapping('test')).toBeDefined(); + + notificationService.removeTemplateMapping('test'); + expect(notificationService.getTemplateMapping('test')).toBeUndefined(); + }); + + it('should process template variables', () => { + notificationService.addTemplateMapping('welcome', { + defaultVariables: { + appName: 'Test App', + currentYear: () => new Date().getFullYear() + } + }); + + const emailData = { + templateId: 'welcome', + templateData: { name: 'John' } + }; + + const processed = notificationService.processTemplateVariables(emailData); + + expect(processed.templateData.appName).toBe('Test App'); + expect(processed.templateData.currentYear).toBe(new Date().getFullYear()); + expect(processed.templateData.name).toBe('John'); + }); + + it('should apply global mappings', () => { + const service = new NotificationService({ + ...mockConfig, + globalTemplateMappings: { + companyName: 'Test Company', + website: 'https://test.com' + } + }); + + const emailData = { + templateData: { name: 'John' } + }; + + const processed = service.processTemplateVariables(emailData); + + expect(processed.templateData.companyName).toBe('Test Company'); + expect(processed.templateData.website).toBe('https://test.com'); + expect(processed.templateData.name).toBe('John'); + }); + }); + + describe('Provider Management', () => { + it('should switch provider successfully', () => { + const result = notificationService.switchProvider('sendgrid'); + + expect(result.success).toBe(true); + expect(result.provider).toBe('sendgrid'); + expect(notificationService.defaultProvider).toBe('sendgrid'); + }); + + it('should throw error when switching to invalid provider', () => { + expect(() => { + notificationService.switchProvider('invalid'); + }).toThrow('Email provider not found: invalid'); + }); + + it('should get provider statistics', () => { + const stats = notificationService.getProviderStats(); + + expect(stats).toHaveProperty('ses'); + expect(stats).toHaveProperty('sendgrid'); + expect(stats.ses).toHaveProperty('name', 'SESProvider'); + expect(stats.sendgrid).toHaveProperty('name', 'SendGridProvider'); + }); + + it('should get specific provider statistics', () => { + const stats = notificationService.getProviderStats('ses'); + + expect(stats).toHaveProperty('name', 'SESProvider'); + expect(stats).toHaveProperty('service', 'AWS SES'); + }); + + it('should test provider connections', async () => { + const results = await notificationService.testProviderConnection(); + + expect(results).toHaveProperty('ses'); + expect(results).toHaveProperty('sendgrid'); + expect(results.ses.success).toBe(true); + expect(results.sendgrid.success).toBe(true); + }); + }); + + describe('Predefined Templates', () => { + it('should create predefined templates', () => { + notificationService.createPredefinedTemplates(); + + const welcomeMapping = notificationService.getTemplateMapping('welcome'); + expect(welcomeMapping.defaultVariables.appName).toBe('SubStream Protocol'); + expect(welcomeMapping.defaultVariables.supportEmail).toBe('support@substream-protocol.com'); + + const paymentFailureMapping = notificationService.getTemplateMapping('payment_failure'); + expect(paymentFailureMapping.defaultVariables.appName).toBe('SubStream Protocol'); + expect(paymentFailureMapping.defaultVariables.billingUrl).toBe('https://app.substream-protocol.com/billing'); + }); + + it('should send predefined template email', async () => { + notificationService.createPredefinedTemplates(); + + const result = await notificationService.sendTemplateEmail('welcome', { + to: 'test@example.com', + from: 'noreply@example.com', + templateData: { name: 'John' } + }); + + expect(result.success).toBe(true); + expect(result.jobId).toBeDefined(); + }); + + it('should throw error for unknown template type', async () => { + await expect(notificationService.sendTemplateEmail('unknown', {})) + .rejects.toThrow('Template mapping not found: unknown'); + }); + }); + + describe('Queue Management', () => { + it('should get job status', async () => { + const result = await notificationService.getJobStatus('test-job-id'); + + expect(result.success).toBe(true); + expect(result.jobId).toBe('test-job-id'); + expect(result.state).toBe('completed'); + }); + + it('should get queue statistics', async () => { + const result = await notificationService.getQueueStats(); + + expect(result.success).toBe(true); + expect(result.queueName).toBe('notification-queue'); + expect(result.stats).toHaveProperty('waiting'); + expect(result.stats).toHaveProperty('active'); + expect(result.stats).toHaveProperty('completed'); + expect(result.stats).toHaveProperty('failed'); + }); + + it('should get recent jobs', async () => { + const result = await notificationService.getRecentJobs(); + + expect(result.success).toBe(true); + expect(result.state).toBe('completed'); + expect(Array.isArray(result.jobs)).toBe(true); + }); + }); + + describe('Health Status', () => { + it('should return healthy status', () => { + const status = notificationService.getHealthStatus(); + + expect(status.status).toBe('healthy'); + expect(status.defaultProvider).toBe('ses'); + expect(status.availableProviders).toContain('ses'); + expect(status.availableProviders).toContain('sendgrid'); + expect(status.queue).toBeDefined(); + expect(status.providers).toBeDefined(); + expect(status.templateMappings).toBe(0); + }); + + it('should return degraded status when no provider', () => { + const service = new NotificationService({ + defaultProvider: 'invalid', + ses: mockConfig.ses, + sendgrid: mockConfig.sendgrid + }); + + // Remove all providers to simulate degraded state + service.providers.clear(); + service.defaultProvider = 'none'; + + const status = service.getHealthStatus(); + expect(status.status).toBe('degraded'); + }); + }); + + describe('Error Handling', () => { + it('should handle missing required fields', async () => { + const invalidEmailData = { + to: 'test@example.com' + // Missing from, subject, templateId + }; + + await expect(notificationService.sendEmail(invalidEmailData)) + .rejects.toThrow(); + }); + + it('should handle invalid email addresses', async () => { + const invalidEmailData = { + to: 'invalid-email', + from: 'test@example.com', + subject: 'Test', + templateId: 'test' + }; + + await expect(notificationService.sendEmail(invalidEmailData)) + .rejects.toThrow('Invalid recipient email'); + }); + + it('should handle provider initialization failure', () => { + expect(() => { + new NotificationService({ + defaultProvider: 'ses' + // No provider configs + }); + }).toThrow('No email providers configured'); + }); + }); +}); + +describe('Email Providers', () => { + describe('BaseEmailProvider', () => { + const BaseEmailProvider = require('./services/emailProviders/BaseEmailProvider'); + + class TestProvider extends BaseEmailProvider { + async sendEmail() { return { success: true }; } + async sendSimpleEmail() { return { success: true }; } + async getTemplate() { return { name: 'test' }; } + } + + it('should validate email addresses', () => { + const provider = new TestProvider(); + + expect(provider.validateEmail('test@example.com')).toBe(true); + expect(provider.validateEmail('invalid-email')).toBe(false); + expect(provider.validateEmail('')).toBe(false); + }); + + it('should normalize email data', () => { + const provider = new TestProvider({ defaultFrom: 'default@example.com' }); + + const normalized = provider.normalizeEmailData({ + to: 'test@example.com', + subject: 'Test', + templateId: 'test' + }); + + expect(normalized.from).toBe('default@example.com'); + expect(normalized.to).toBe('test@example.com'); + expect(normalized.subject).toBe('Test'); + expect(normalized.templateId).toBe('test'); + }); + + it('should throw error for missing required fields', () => { + const provider = new TestProvider(); + + expect(() => { + provider.normalizeEmailData({}); + }).toThrow('Recipient email (to) is required'); + }); + + it('should create standardized success response', () => { + const provider = new TestProvider(); + const response = provider.createSuccess({ messageId: 'test-id' }); + + expect(response.success).toBe(true); + expect(response.messageId).toBe('test-id'); + expect(response.provider).toBe('TestProvider'); + expect(response.timestamp).toBeDefined(); + }); + + it('should create standardized error response', () => { + const provider = new TestProvider(); + const error = new Error('Test error'); + const response = provider.createError(error); + + expect(response.success).toBe(false); + expect(response.error).toBe('Test error'); + expect(response.provider).toBe('TestProvider'); + expect(response.timestamp).toBeDefined(); + }); + }); + + describe('SES Provider', () => { + const SESProvider = require('./services/emailProviders/SESProvider'); + + it('should initialize with config', () => { + const provider = new SESProvider({ + region: 'us-west-2', + accessKeyId: 'test-key', + secretAccessKey: 'test-secret' + }); + + expect(provider.region).toBe('us-west-2'); + expect(provider.name).toBe('SESProvider'); + }); + + it('should detect rate limit errors', () => { + const provider = new SESProvider(mockConfig.ses); + + const throttlingError = { code: 'ThrottlingException' }; + expect(provider.isRateLimitError(throttlingError)).toBe(true); + + const tooManyRequestsError = { code: 'TooManyRequestsException' }; + expect(provider.isRateLimitError(tooManyRequestsError)).toBe(true); + + const normalError = { code: 'InvalidParameter' }; + expect(provider.isRateLimitError(normalError)).toBe(false); + }); + + it('should extract retry after time', () => { + const provider = new SESProvider(mockConfig.ses); + + const throttlingError = { code: 'ThrottlingException' }; + expect(provider.getRetryAfter(throttlingError)).toBe(30); + + const rateLimitError = { message: 'maximum send rate exceeded' }; + expect(provider.getRetryAfter(rateLimitError)).toBe(60); + }); + + it('should send templated email', async () => { + const provider = new SESProvider(mockConfig.ses); + + const result = await provider.sendEmail({ + to: 'test@example.com', + from: 'sender@example.com', + subject: 'Test', + templateId: 'test-template', + templateData: { name: 'John' } + }); + + expect(result.success).toBe(true); + expect(result.messageId).toBe('aws-ses-message-id'); + expect(result.provider).toBe('SESProvider'); + }); + + it('should send simple email', async () => { + const provider = new SESProvider(mockConfig.ses); + + const result = await provider.sendSimpleEmail({ + to: 'test@example.com', + from: 'sender@example.com', + subject: 'Test', + text: 'Test content' + }); + + expect(result.success).toBe(true); + expect(result.messageId).toBe('aws-ses-message-id'); + }); + + it('should get template', async () => { + const provider = new SESProvider(mockConfig.ses); + + const result = await provider.getTemplate('test-template'); + + expect(result.name).toBe('test-template'); + expect(result.subject).toBe('Test Subject'); + }); + + it('should test connection', async () => { + const provider = new SESProvider(mockConfig.ses); + + const result = await provider.testConnection(); + + expect(result.success).toBe(true); + expect(result.data.Max24HourSend).toBe(1000); + }); + }); + + describe('SendGrid Provider', () => { + const SendGridProvider = require('./services/emailProviders/SendGridProvider'); + + it('should initialize with config', () => { + const provider = new SendGridProvider({ + apiKey: 'test-api-key' + }); + + expect(provider.apiKey).toBe('test-api-key'); + expect(provider.name).toBe('SendGridProvider'); + }); + + it('should throw error without API key', () => { + expect(() => { + new SendGridProvider({}); + }).toThrow('SendGrid API key is required'); + }); + + it('should detect rate limit errors', () => { + const provider = new SendGridProvider({ apiKey: 'test-key' }); + + const rateLimitError = { code: 429 }; + expect(provider.isRateLimitError(rateLimitError)).toBe(true); + + const tooManyRequestsError = { code: 'Too Many Requests' }; + expect(provider.isRateLimitError(tooManyRequestsError)).toBe(true); + + const normalError = { code: 400 }; + expect(provider.isRateLimitError(normalError)).toBe(false); + }); + + it('should send templated email', async () => { + const provider = new SendGridProvider({ apiKey: 'test-key' }); + + const result = await provider.sendEmail({ + to: 'test@example.com', + from: 'sender@example.com', + subject: 'Test', + templateId: 'test-template-id', + templateData: { name: 'John' } + }); + + expect(result.success).toBe(true); + expect(result.messageId).toBe('sendgrid-message-id'); + expect(result.provider).toBe('SendGridProvider'); + }); + + it('should send simple email', async () => { + const provider = new SendGridProvider({ apiKey: 'test-key' }); + + const result = await provider.sendSimpleEmail({ + to: 'test@example.com', + from: 'sender@example.com', + subject: 'Test', + text: 'Test content' + }); + + expect(result.success).toBe(true); + expect(result.messageId).toBe('sendgrid-message-id'); + }); + + it('should get template', async () => { + const provider = new SendGridProvider({ apiKey: 'test-key' }); + + const result = await provider.getTemplate('test-template-id'); + + expect(result.id).toBe('test-template-id'); + expect(result.versions).toBeDefined(); + }); + + it('should test connection', async () => { + const provider = new SendGridProvider({ apiKey: 'test-key' }); + + const result = await provider.testConnection(); + + expect(result.success).toBe(true); + expect(result.data.username).toBe('test-user'); + }); + }); +}); + +describe('Email Queue Service', () => { + let queueService; + + beforeEach(() => { + queueService = new EmailQueueService({ + queueName: 'test-email-queue', + redis: { + host: 'localhost', + port: 6379 + } + }); + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + describe('Queue Operations', () => { + it('should add email job', async () => { + const emailData = { + to: 'test@example.com', + from: 'sender@example.com', + subject: 'Test', + templateId: 'test-template', + templateData: { name: 'John' } + }; + + const result = await queueService.addEmailJob(emailData); + + expect(result.success).toBe(true); + expect(result.jobId).toBeDefined(); + expect(result.queue).toBe('test-email-queue'); + }); + + it('should add simple email job', async () => { + const emailData = { + to: 'test@example.com', + from: 'sender@example.com', + subject: 'Test', + text: 'Test content' + }; + + const result = await queueService.addSimpleEmailJob(emailData); + + expect(result.success).toBe(true); + expect(result.jobId).toBeDefined(); + expect(result.queue).toBe('test-email-queue'); + }); + + it('should add bulk email job', async () => { + const bulkData = { + recipients: [ + { email: 'user1@example.com' }, + { email: 'user2@example.com' } + ], + from: 'sender@example.com', + subject: 'Bulk Test', + templateId: 'bulk-template', + templateData: { company: 'Test Corp' } + }; + + const result = await queueService.addBulkEmailJob(bulkData); + + expect(result.success).toBe(true); + expect(result.jobId).toBeDefined(); + expect(result.queue).toBe('test-email-queue'); + }); + + it('should get job status', async () => { + const result = await queueService.getJobStatus('test-job-id'); + + expect(result.success).toBe(true); + expect(result.jobId).toBe('test-job-id'); + expect(result.state).toBe('completed'); + }); + + it('should get queue statistics', async () => { + const result = await queueService.getQueueStats(); + + expect(result.success).toBe(true); + expect(result.queueName).toBe('test-email-queue'); + expect(result.stats).toBeDefined(); + }); + + it('should get recent jobs', async () => { + const result = await queueService.getRecentJobs(); + + expect(result.success).toBe(true); + expect(result.state).toBe('completed'); + expect(Array.isArray(result.jobs)).toBe(true); + }); + }); + + describe('Queue Management', () => { + it('should pause queue', async () => { + const result = await queueService.pauseQueue(); + + expect(result.success).toBe(true); + expect(result.queueName).toBe('test-email-queue'); + expect(result.pausedAt).toBeDefined(); + }); + + it('should resume queue', async () => { + const result = await queueService.resumeQueue(); + + expect(result.success).toBe(true); + expect(result.queueName).toBe('test-email-queue'); + expect(result.resumedAt).toBeDefined(); + }); + + it('should clear queue', async () => { + const result = await queueService.clearQueue({ state: 'waiting' }); + + expect(result.success).toBe(true); + expect(result.queueName).toBe('test-email-queue'); + expect(result.clearedState).toBe('waiting'); + }); + }); + + describe('Health Status', () => { + it('should return health status', () => { + const status = queueService.getHealthStatus(); + + expect(status.queueName).toBe('test-email-queue'); + expect(status.redisConnected).toBe(true); + expect(status.config).toBeDefined(); + expect(status.timestamp).toBeDefined(); + }); + }); +}); + +describe('Acceptance Criteria Tests', () => { + let notificationService; + + beforeEach(() => { + notificationService = new NotificationService({ + defaultProvider: 'ses', + ses: { + region: 'us-east-1', + accessKeyId: 'test-key', + secretAccessKey: 'test-secret' + }, + sendgrid: { + apiKey: 'test-api-key' + }, + redis: { + host: 'localhost', + port: 6379 + } + }); + }); + + it('Acceptance 1: Reliable, scalable system for dispatching transactional emails', async () => { + // Test that emails are queued successfully + const emailData = { + to: 'user@example.com', + from: 'noreply@example.com', + subject: 'Welcome to SubStream', + templateId: 'welcome', + templateData: { name: 'John Doe', plan: 'premium' } + }; + + const result = await notificationService.sendEmail(emailData); + + expect(result.success).toBe(true); + expect(result.jobId).toBeDefined(); + expect(result.message).toBe('Email queued for processing'); + + // Test that queue is working + const queueStats = await notificationService.getQueueStats(); + expect(queueStats.success).toBe(true); + expect(queueStats.queueName).toBe('notification-queue'); + + // Test that providers are available and working + const providerStats = notificationService.getProviderStats(); + expect(providerStats).toHaveProperty('ses'); + expect(providerStats).toHaveProperty('sendgrid'); + + const connectionTests = await notificationService.testProviderConnection(); + expect(connectionTests.ses.success).toBe(true); + expect(connectionTests.sendgrid.success).toBe(true); + }); + + it('Acceptance 2: Asynchronous queue protects API performance from provider latency', async () => { + const startTime = Date.now(); + + // Send multiple emails quickly + const emailPromises = []; + for (let i = 0; i < 10; i++) { + emailPromises.push( + notificationService.sendEmail({ + to: `user${i}@example.com`, + from: 'noreply@example.com', + subject: `Email ${i}`, + templateId: 'welcome', + templateData: { name: `User ${i}` } + }) + ); + } + + const results = await Promise.all(emailPromises); + const endTime = Date.now(); + + // All emails should be queued quickly (under 100ms for 10 emails) + expect(endTime - startTime).toBeLessThan(100); + + // All should succeed + results.forEach(result => { + expect(result.success).toBe(true); + expect(result.jobId).toBeDefined(); + }); + + // Queue should have jobs waiting + const queueStats = await notificationService.getQueueStats(); + expect(queueStats.stats.waiting).toBeGreaterThanOrEqual(10); + }); + + it('Acceptance 3: Template variables are mapped correctly for personalized communications', () => { + // Create template mapping with variables + notificationService.addTemplateMapping('personalized_welcome', { + defaultVariables: { + appName: 'SubStream Protocol', + supportEmail: 'support@substream-protocol.com', + currentYear: () => new Date().getFullYear() + } + }); + + // Test template variable processing + const emailData = { + templateId: 'personalized_welcome', + templateData: { + userName: 'John Doe', + planType: 'Premium', + signupDate: '2024-01-15' + } + }; + + const processed = notificationService.processTemplateVariables(emailData); + + expect(processed.templateData.appName).toBe('SubStream Protocol'); + expect(processed.templateData.supportEmail).toBe('support@substream-protocol.com'); + expect(processed.templateData.currentYear).toBe(new Date().getFullYear()); + expect(processed.templateData.userName).toBe('John Doe'); + expect(processed.templateData.planType).toBe('Premium'); + expect(processed.templateData.signupDate).toBe('2024-01-15'); + + // Test function-based variables + notificationService.addTemplateMapping('dynamic', { + defaultVariables: { + currentTime: () => new Date().toISOString(), + randomNumber: () => Math.floor(Math.random() * 1000) + } + }); + + const dynamicData = { + templateId: 'dynamic', + templateData: { name: 'Test User' } + }; + + const processedDynamic = notificationService.processTemplateVariables(dynamicData); + expect(typeof processedDynamic.templateData.currentTime).toBe('string'); + expect(typeof processedDynamic.templateData.randomNumber).toBe('number'); + }); +}); diff --git a/routes/notifications.js b/routes/notifications.js new file mode 100644 index 0000000..99650a5 --- /dev/null +++ b/routes/notifications.js @@ -0,0 +1,731 @@ +const express = require('express'); +const { authenticateToken, getUserId } = require('../middleware/unifiedAuth'); +const NotificationService = require('../services/notificationService'); + +const router = express.Router(); + +/** + * Initialize notification service with middleware + */ +router.use((req, res, next) => { + if (!req.app.get('notificationService')) { + // Initialize notification service if not already initialized + const notificationService = new NotificationService({ + defaultProvider: process.env.DEFAULT_EMAIL_PROVIDER || 'ses', + ses: { + region: process.env.AWS_REGION || 'us-east-1', + accessKeyId: process.env.AWS_ACCESS_KEY_ID, + secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY, + sessionToken: process.env.AWS_SESSION_TOKEN + }, + sendgrid: { + apiKey: process.env.SENDGRID_API_KEY + }, + redis: { + host: process.env.REDIS_HOST || 'localhost', + port: parseInt(process.env.REDIS_PORT) || 6379, + password: process.env.REDIS_PASSWORD, + db: parseInt(process.env.REDIS_DB) || 0 + }, + queueName: process.env.NOTIFICATION_QUEUE_NAME || 'notification-queue', + concurrency: parseInt(process.env.NOTIFICATION_CONCURRENCY) || 5, + rateLimitMax: parseInt(process.env.NOTIFICATION_RATE_LIMIT_MAX) || 100, + rateLimitDuration: parseInt(process.env.NOTIFICATION_RATE_LIMIT_DURATION) || 60000 + }); + + // Create predefined templates + notificationService.createPredefinedTemplates(); + + req.app.set('notificationService', notificationService); + } + + next(); +}); + +/** + * POST /api/v1/notifications/send + * Send email using template + */ +router.post('/send', authenticateToken, async (req, res) => { + try { + const userId = getUserId(req.user); + const { + to, + from, + subject, + templateId, + templateData, + provider, + options = {} + } = req.body; + + // Validate required fields + if (!to || !templateId) { + return res.status(400).json({ + success: false, + error: 'Missing required fields: to, templateId' + }); + } + + const notificationService = req.app.get('notificationService'); + const result = await notificationService.sendEmail({ + to, + from: from || process.env.DEFAULT_FROM_EMAIL, + subject, + templateId, + templateData: templateData || {} + }, { + provider, + attempts: options.attempts, + delay: options.delay, + priority: options.priority + }); + + res.json({ + success: true, + data: result + }); + + } catch (error) { + console.error('Send email error:', error); + res.status(500).json({ + success: false, + error: error.message || 'Failed to send email' + }); + } +}); + +/** + * POST /api/v1/notifications/send-simple + * Send simple text/HTML email + */ +router.post('/send-simple', authenticateToken, async (req, res) => { + try { + const userId = getUserId(req.user); + const { + to, + from, + subject, + text, + html, + provider, + options = {} + } = req.body; + + // Validate required fields + if (!to || !subject || (!text && !html)) { + return res.status(400).json({ + success: false, + error: 'Missing required fields: to, subject, and either text or html' + }); + } + + const notificationService = req.app.get('notificationService'); + const result = await notificationService.sendSimpleEmail({ + to, + from: from || process.env.DEFAULT_FROM_EMAIL, + subject, + text, + html + }, { + provider, + attempts: options.attempts, + delay: options.delay, + priority: options.priority + }); + + res.json({ + success: true, + data: result + }); + + } catch (error) { + console.error('Send simple email error:', error); + res.status(500).json({ + success: false, + error: error.message || 'Failed to send simple email' + }); + } +}); + +/** + * POST /api/v1/notifications/send-bulk + * Send bulk email to multiple recipients + */ +router.post('/send-bulk', authenticateToken, async (req, res) => { + try { + const userId = getUserId(req.user); + const { + recipients, + from, + subject, + templateId, + templateData, + provider, + options = {} + } = req.body; + + // Validate required fields + if (!recipients || !Array.isArray(recipients) || recipients.length === 0) { + return res.status(400).json({ + success: false, + error: 'Missing required field: recipients (must be non-empty array)' + }); + } + + if (!templateId) { + return res.status(400).json({ + success: false, + error: 'Missing required field: templateId' + }); + } + + // Validate recipients + const invalidRecipients = recipients.filter(r => !r.email); + if (invalidRecipients.length > 0) { + return res.status(400).json({ + success: false, + error: 'Invalid recipients: missing email field', + invalidRecipients + }); + } + + const notificationService = req.app.get('notificationService'); + const result = await notificationService.sendBulkEmail({ + recipients, + from: from || process.env.DEFAULT_FROM_EMAIL, + subject, + templateId, + templateData: templateData || {}, + options + }, { + provider, + attempts: options.attempts, + delay: options.delay, + priority: options.priority + }); + + res.json({ + success: true, + data: result + }); + + } catch (error) { + console.error('Send bulk email error:', error); + res.status(500).json({ + success: false, + error: error.message || 'Failed to send bulk email' + }); + } +}); + +/** + * POST /api/v1/notifications/send-template + * Send email using predefined template + */ +router.post('/send-template', authenticateToken, async (req, res) => { + try { + const userId = getUserId(req.user); + const { + templateType, + to, + from, + templateData, + provider, + options = {} + } = req.body; + + // Validate required fields + if (!templateType || !to) { + return res.status(400).json({ + success: false, + error: 'Missing required fields: templateType, to' + }); + } + + const notificationService = req.app.get('notificationService'); + const result = await notificationService.sendTemplateEmail(templateType, { + to, + from: from || process.env.DEFAULT_FROM_EMAIL, + templateData: templateData || {} + }, { + provider, + attempts: options.attempts, + delay: options.delay, + priority: options.priority + }); + + res.json({ + success: true, + data: result + }); + + } catch (error) { + console.error('Send template email error:', error); + res.status(500).json({ + success: false, + error: error.message || 'Failed to send template email' + }); + } +}); + +/** + * GET /api/v1/notifications/job/:jobId + * Get job status + */ +router.get('/job/:jobId', authenticateToken, async (req, res) => { + try { + const userId = getUserId(req.user); + const { jobId } = req.params; + + const notificationService = req.app.get('notificationService'); + const result = await notificationService.getJobStatus(jobId); + + res.json({ + success: true, + data: result + }); + + } catch (error) { + console.error('Get job status error:', error); + res.status(500).json({ + success: false, + error: error.message || 'Failed to get job status' + }); + } +}); + +/** + * GET /api/v1/notifications/queue/stats + * Get queue statistics + */ +router.get('/queue/stats', authenticateToken, async (req, res) => { + try { + const userId = getUserId(req.user); + + const notificationService = req.app.get('notificationService'); + const result = await notificationService.getQueueStats(); + + res.json({ + success: true, + data: result + }); + + } catch (error) { + console.error('Get queue stats error:', error); + res.status(500).json({ + success: false, + error: error.message || 'Failed to get queue statistics' + }); + } +}); + +/** + * GET /api/v1/notifications/queue/jobs + * Get recent jobs + */ +router.get('/queue/jobs', authenticateToken, async (req, res) => { + try { + const userId = getUserId(req.user); + const { + state = 'completed', + start = 0, + end = 50 + } = req.query; + + const notificationService = req.app.get('notificationService'); + const result = await notificationService.getRecentJobs({ + state, + start: parseInt(start), + end: parseInt(end) + }); + + res.json({ + success: true, + data: result + }); + + } catch (error) { + console.error('Get recent jobs error:', error); + res.status(500).json({ + success: false, + error: error.message || 'Failed to get recent jobs' + }); + } +}); + +/** + * GET /api/v1/notifications/providers + * Get available providers + */ +router.get('/providers', authenticateToken, async (req, res) => { + try { + const userId = getUserId(req.user); + + const notificationService = req.app.get('notificationService'); + const providers = notificationService.getAvailableProviders(); + const stats = notificationService.getProviderStats(); + + res.json({ + success: true, + data: { + availableProviders: providers, + defaultProvider: notificationService.defaultProvider, + providerStats: stats + } + }); + + } catch (error) { + console.error('Get providers error:', error); + res.status(500).json({ + success: false, + error: error.message || 'Failed to get providers' + }); + } +}); + +/** + * POST /api/v1/notifications/providers/switch + * Switch default provider + */ +router.post('/providers/switch', authenticateToken, async (req, res) => { + try { + const userId = getUserId(req.user); + const { provider } = req.body; + + if (!provider) { + return res.status(400).json({ + success: false, + error: 'Missing required field: provider' + }); + } + + const notificationService = req.app.get('notificationService'); + const result = notificationService.switchProvider(provider); + + res.json({ + success: true, + data: result + }); + + } catch (error) { + console.error('Switch provider error:', error); + res.status(500).json({ + success: false, + error: error.message || 'Failed to switch provider' + }); + } +}); + +/** + * POST /api/v1/notifications/providers/test + * Test provider connection + */ +router.post('/providers/test', authenticateToken, async (req, res) => { + try { + const userId = getUserId(req.user); + const { provider } = req.body; + + const notificationService = req.app.get('notificationService'); + const results = await notificationService.testProviderConnection(provider); + + res.json({ + success: true, + data: results + }); + + } catch (error) { + console.error('Test provider error:', error); + res.status(500).json({ + success: false, + error: error.message || 'Failed to test provider' + }); + } +}); + +/** + * GET /api/v1/notifications/templates + * Get template mappings + */ +router.get('/templates', authenticateToken, async (req, res) => { + try { + const userId = getUserId(req.user); + + const notificationService = req.app.get('notificationService'); + + // Get all template mappings + const templates = {}; + for (const [templateId, mapping] of notificationService.templateMappings) { + templates[templateId] = mapping; + } + + res.json({ + success: true, + data: { + templates, + count: Object.keys(templates).length + } + }); + + } catch (error) { + console.error('Get templates error:', error); + res.status(500).json({ + success: false, + error: error.message || 'Failed to get templates' + }); + } +}); + +/** + * POST /api/v1/notifications/templates + * Add template mapping + */ +router.post('/templates', authenticateToken, async (req, res) => { + try { + const userId = getUserId(req.user); + const { templateId, mapping } = req.body; + + if (!templateId || !mapping) { + return res.status(400).json({ + success: false, + error: 'Missing required fields: templateId, mapping' + }); + } + + const notificationService = req.app.get('notificationService'); + notificationService.addTemplateMapping(templateId, mapping); + + res.json({ + success: true, + message: 'Template mapping added successfully', + data: { + templateId, + mapping + } + }); + + } catch (error) { + console.error('Add template mapping error:', error); + res.status(500).json({ + success: false, + error: error.message || 'Failed to add template mapping' + }); + } +}); + +/** + * DELETE /api/v1/notifications/templates/:templateId + * Remove template mapping + */ +router.delete('/templates/:templateId', authenticateToken, async (req, res) => { + try { + const userId = getUserId(req.user); + const { templateId } = req.params; + + const notificationService = req.app.get('notificationService'); + notificationService.removeTemplateMapping(templateId); + + res.json({ + success: true, + message: 'Template mapping removed successfully', + data: { + templateId + } + }); + + } catch (error) { + console.error('Remove template mapping error:', error); + res.status(500).json({ + success: false, + error: error.message || 'Failed to remove template mapping' + }); + } +}); + +/** + * GET /api/v1/notifications/health + * Get service health status + */ +router.get('/health', async (req, res) => { + try { + const notificationService = req.app.get('notificationService'); + const health = notificationService.getHealthStatus(); + + res.json({ + success: true, + data: health + }); + + } catch (error) { + console.error('Get health status error:', error); + res.status(500).json({ + success: false, + error: error.message || 'Failed to get health status' + }); + } +}); + +/** + * POST /api/v1/notifications/queue/pause + * Pause queue processing + */ +router.post('/queue/pause', authenticateToken, async (req, res) => { + try { + const userId = getUserId(req.user); + + const notificationService = req.app.get('notificationService'); + const result = await notificationService.queueService.pauseQueue(); + + res.json({ + success: true, + data: result + }); + + } catch (error) { + console.error('Pause queue error:', error); + res.status(500).json({ + success: false, + error: error.message || 'Failed to pause queue' + }); + } +}); + +/** + * POST /api/v1/notifications/queue/resume + * Resume queue processing + */ +router.post('/queue/resume', authenticateToken, async (req, res) => { + try { + const userId = getUserId(req.user); + + const notificationService = req.app.get('notificationService'); + const result = await notificationService.queueService.resumeQueue(); + + res.json({ + success: true, + data: result + }); + + } catch (error) { + console.error('Resume queue error:', error); + res.status(500).json({ + success: false, + error: error.message || 'Failed to resume queue' + }); + } +}); + +/** + * POST /api/v1/notifications/queue/clear + * Clear queue + */ +router.post('/queue/clear', authenticateToken, async (req, res) => { + try { + const userId = getUserId(req.user); + const { state = 'waiting' } = req.body; + + const notificationService = req.app.get('notificationService'); + const result = await notificationService.queueService.clearQueue({ state }); + + res.json({ + success: true, + data: result + }); + + } catch (error) { + console.error('Clear queue error:', error); + res.status(500).json({ + success: false, + error: error.message || 'Failed to clear queue' + }); + } +}); + +/** + * POST /api/v1/notifications/test-email + * Send test email + */ +router.post('/test-email', authenticateToken, async (req, res) => { + try { + const userId = getUserId(req.user); + const { + to, + provider, + templateType = 'welcome' + } = req.body; + + if (!to) { + return res.status(400).json({ + success: false, + error: 'Missing required field: to' + }); + } + + const notificationService = req.app.get('notificationService'); + + // Send test email using predefined template + const result = await notificationService.sendTemplateEmail(templateType, { + to, + templateData: { + name: 'Test User', + testMode: true, + timestamp: new Date().toISOString() + } + }, { + provider, + attempts: 1 + }); + + res.json({ + success: true, + data: result + }); + + } catch (error) { + console.error('Send test email error:', error); + res.status(500).json({ + success: false, + error: error.message || 'Failed to send test email' + }); + } +}); + +/** + * GET /api/v1/notifications/stats + * Get comprehensive statistics + */ +router.get('/stats', authenticateToken, async (req, res) => { + try { + const userId = getUserId(req.user); + + const notificationService = req.app.get('notificationService'); + + // Get queue stats + const queueStats = await notificationService.getQueueStats(); + + // Get provider stats + const providerStats = notificationService.getProviderStats(); + + // Get health status + const health = notificationService.getHealthStatus(); + + res.json({ + success: true, + data: { + queue: queueStats, + providers: providerStats, + health, + timestamp: new Date().toISOString() + } + }); + + } catch (error) { + console.error('Get stats error:', error); + res.status(500).json({ + success: false, + error: error.message || 'Failed to get statistics' + }); + } +}); + +module.exports = router; diff --git a/services/emailProviders/BaseEmailProvider.js b/services/emailProviders/BaseEmailProvider.js new file mode 100644 index 0000000..b78305c --- /dev/null +++ b/services/emailProviders/BaseEmailProvider.js @@ -0,0 +1,206 @@ +/** + * Base Email Provider Interface + * Abstract interface for email providers to ensure consistent API + */ +class BaseEmailProvider { + constructor(config = {}) { + this.config = config; + this.name = this.constructor.name; + } + + /** + * Send an email using the provider + * @param {Object} emailData - Email data + * @param {string} emailData.to - Recipient email + * @param {string} emailData.from - Sender email + * @param {string} emailData.subject - Email subject + * @param {string} emailData.templateId - Template identifier + * @param {Object} emailData.templateData - Template variables + * @param {Object} emailData.options - Additional provider-specific options + * @returns {Promise} Send result + */ + async sendEmail(emailData) { + throw new Error('sendEmail method must be implemented by subclass'); + } + + /** + * Send a simple text/HTML email + * @param {Object} emailData - Email data + * @param {string} emailData.to - Recipient email + * @param {string} emailData.from - Sender email + * @param {string} emailData.subject - Email subject + * @param {string} emailData.text - Plain text content + * @param {string} emailData.html - HTML content + * @param {Object} emailData.options - Additional provider-specific options + * @returns {Promise} Send result + */ + async sendSimpleEmail(emailData) { + throw new Error('sendSimpleEmail method must be implemented by subclass'); + } + + /** + * Get provider-specific template + * @param {string} templateId - Template identifier + * @returns {Promise} Template data + */ + async getTemplate(templateId) { + throw new Error('getTemplate method must be implemented by subclass'); + } + + /** + * Validate email address format + * @param {string} email - Email address to validate + * @returns {boolean} Whether email is valid + */ + validateEmail(email) { + const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/; + return emailRegex.test(email); + } + + /** + * Check if error is a rate limit error + * @param {Error} error - Error to check + * @returns {boolean} Whether error is rate limit related + */ + isRateLimitError(error) { + // Override in subclasses for provider-specific rate limit detection + return false; + } + + /** + * Extract retry-after time from rate limit error + * @param {Error} error - Rate limit error + * @returns {number} Retry after time in seconds + */ + getRetryAfter(error) { + // Override in subclasses for provider-specific retry-after extraction + return 60; // Default 60 seconds + } + + /** + * Get provider statistics + * @returns {Object} Provider statistics + */ + getStats() { + return { + name: this.name, + config: this.getSafeConfig(), + timestamp: new Date().toISOString() + }; + } + + /** + * Get safe configuration (without sensitive data) + * @returns {Object} Safe configuration + */ + getSafeConfig() { + const safeConfig = { ...this.config }; + + // Remove sensitive fields + const sensitiveFields = ['apiKey', 'secretKey', 'password', 'privateKey']; + sensitiveFields.forEach(field => { + if (safeConfig[field]) { + safeConfig[field] = '***'; + } + }); + + return safeConfig; + } + + /** + * Test provider connection + * @returns {Promise} Test result + */ + async testConnection() { + try { + // Default implementation - override in subclasses + return { + success: true, + message: 'Connection test successful', + timestamp: new Date().toISOString() + }; + } catch (error) { + return { + success: false, + error: error.message, + timestamp: new Date().toISOString() + }; + } + } + + /** + * Normalize email data to ensure consistent format + * @param {Object} emailData - Raw email data + * @returns {Object} Normalized email data + */ + normalizeEmailData(emailData) { + const normalized = { + to: emailData.to, + from: emailData.from || this.config.defaultFrom, + subject: emailData.subject, + templateId: emailData.templateId, + templateData: emailData.templateData || {}, + options: emailData.options || {} + }; + + // Validate required fields + if (!normalized.to) { + throw new Error('Recipient email (to) is required'); + } + + if (!normalized.from) { + throw new Error('Sender email (from) is required'); + } + + if (!normalized.subject) { + throw new Error('Email subject is required'); + } + + // Validate email addresses + if (!this.validateEmail(normalized.to)) { + throw new Error(`Invalid recipient email: ${normalized.to}`); + } + + if (!this.validateEmail(normalized.from)) { + throw new Error(`Invalid sender email: ${normalized.from}`); + } + + return normalized; + } + + /** + * Create standardized error response + * @param {Error} error - Original error + * @param {Object} context - Additional context + * @returns {Object} Standardized error + */ + createError(error, context = {}) { + return { + success: false, + provider: this.name, + error: error.message, + code: error.code || 'UNKNOWN_ERROR', + isRateLimit: this.isRateLimitError(error), + retryAfter: this.isRateLimitError(error) ? this.getRetryAfter(error) : null, + timestamp: new Date().toISOString(), + context + }; + } + + /** + * Create standardized success response + * @param {Object} data - Success data + * @returns {Object} Standardized success + */ + createSuccess(data) { + return { + success: true, + provider: this.name, + messageId: data.messageId || data.id, + timestamp: new Date().toISOString(), + ...data + }; + } +} + +module.exports = BaseEmailProvider; diff --git a/services/emailProviders/SESProvider.js b/services/emailProviders/SESProvider.js new file mode 100644 index 0000000..a0c1305 --- /dev/null +++ b/services/emailProviders/SESProvider.js @@ -0,0 +1,456 @@ +const AWS = require('aws-sdk'); +const BaseEmailProvider = require('./BaseEmailProvider'); + +/** + * AWS SES Email Provider + * Implements email sending using AWS Simple Email Service + */ +class SESProvider extends BaseEmailProvider { + constructor(config = {}) { + super(config); + + this.region = config.region || process.env.AWS_REGION || 'us-east-1'; + this.accessKeyId = config.accessKeyId || process.env.AWS_ACCESS_KEY_ID; + this.secretAccessKey = config.secretAccessKey || process.env.AWS_SECRET_ACCESS_KEY; + this.sessionToken = config.sessionToken || process.env.AWS_SESSION_TOKEN; + + this.initializeSES(); + } + + /** + * Initialize AWS SES client + */ + initializeSES() { + try { + AWS.config.update({ + region: this.region, + accessKeyId: this.accessKeyId, + secretAccessKey: this.secretAccessKey, + sessionToken: this.sessionToken + }); + + this.ses = new AWS.SES({ + apiVersion: '2010-12-01', + region: this.region + }); + + console.log(`AWS SES provider initialized for region: ${this.region}`); + } catch (error) { + console.error('Failed to initialize AWS SES:', error); + throw error; + } + } + + /** + * Send email using AWS SES template + * @param {Object} emailData - Email data + * @returns {Promise} Send result + */ + async sendEmail(emailData) { + try { + const normalized = this.normalizeEmailData(emailData); + + const params = { + Destination: { + ToAddresses: [normalized.to] + }, + Source: normalized.from, + Template: normalized.templateId, + TemplateData: JSON.stringify(normalized.templateData), + ConfigurationSetName: normalized.options.configurationSetName, + ReplyToAddresses: normalized.options.replyTo || [], + ReturnPath: normalized.options.returnPath || normalized.from + }; + + // Remove undefined values + Object.keys(params).forEach(key => { + if (params[key] === undefined) { + delete params[key]; + } + }); + + const result = await this.ses.sendTemplatedEmail(params).promise(); + + return this.createSuccess({ + messageId: result.MessageId, + templateId: normalized.templateId, + recipient: normalized.to + }); + + } catch (error) { + console.error('AWS SES sendEmail error:', error); + throw this.createError(error, { + templateId: emailData.templateId, + recipient: emailData.to + }); + } + } + + /** + * Send simple text/HTML email using AWS SES + * @param {Object} emailData - Email data + * @returns {Promise} Send result + */ + async sendSimpleEmail(emailData) { + try { + const normalized = this.normalizeEmailData(emailData); + + const params = { + Destination: { + ToAddresses: [normalized.to] + }, + Source: normalized.from, + Message: { + Subject: { + Data: normalized.subject, + Charset: 'UTF-8' + }, + Body: { + Text: normalized.text ? { + Data: normalized.text, + Charset: 'UTF-8' + } : undefined, + Html: normalized.html ? { + Data: normalized.html, + Charset: 'UTF-8' + } : undefined + } + }, + ConfigurationSetName: normalized.options.configurationSetName, + ReplyToAddresses: normalized.options.replyTo || [], + ReturnPath: normalized.options.returnPath || normalized.from + }; + + // Remove undefined values from Message.Body + Object.keys(params.Message.Body).forEach(key => { + if (params.Message.Body[key] === undefined) { + delete params.Message.Body[key]; + } + }); + + // Remove undefined values from params + Object.keys(params).forEach(key => { + if (params[key] === undefined) { + delete params[key]; + } + }); + + const result = await this.ses.sendEmail(params).promise(); + + return this.createSuccess({ + messageId: result.MessageId, + recipient: normalized.to, + subject: normalized.subject + }); + + } catch (error) { + console.error('AWS SES sendSimpleEmail error:', error); + throw this.createError(error, { + recipient: emailData.to, + subject: emailData.subject + }); + } + } + + /** + * Get template information from AWS SES + * @param {string} templateName - Template name + * @returns {Promise} Template data + */ + async getTemplate(templateName) { + try { + const params = { + TemplateName: templateName + }; + + const result = await this.ses.getTemplate(params).promise(); + + return { + name: result.Template.TemplateName, + subject: result.Template.SubjectPart, + text: result.Template.TextPart, + html: result.Template.HtmlPart, + created_at: result.Template.CreatedAt, + timestamp: new Date().toISOString() + }; + + } catch (error) { + console.error('AWS SES getTemplate error:', error); + throw this.createError(error, { templateName }); + } + } + + /** + * Create or update template in AWS SES + * @param {Object} templateData - Template data + * @returns {Promise} Template result + */ + async createTemplate(templateData) { + try { + const params = { + TemplateName: templateData.name, + SubjectPart: templateData.subject, + HtmlPart: templateData.html, + TextPart: templateData.text || '' + }; + + const result = await this.ses.createTemplate(params).promise(); + + return this.createSuccess({ + templateName: result.Template.TemplateName, + created_at: result.Template.CreatedAt + }); + + } catch (error) { + console.error('AWS SES createTemplate error:', error); + throw this.createError(error, { templateName: templateData.name }); + } + } + + /** + * Update existing template in AWS SES + * @param {Object} templateData - Template data + * @returns {Promise} Template result + */ + async updateTemplate(templateData) { + try { + const params = { + TemplateName: templateData.name, + SubjectPart: templateData.subject, + HtmlPart: templateData.html, + TextPart: templateData.text || '' + }; + + const result = await this.ses.updateTemplate(params).promise(); + + return this.createSuccess({ + templateName: result.Template.TemplateName, + updated_at: new Date().toISOString() + }); + + } catch (error) { + console.error('AWS SES updateTemplate error:', error); + throw this.createError(error, { templateName: templateData.name }); + } + } + + /** + * Delete template from AWS SES + * @param {string} templateName - Template name + * @returns {Promise} Delete result + */ + async deleteTemplate(templateName) { + try { + const params = { + TemplateName: templateName + }; + + await this.ses.deleteTemplate(params).promise(); + + return this.createSuccess({ + templateName, + deleted_at: new Date().toISOString() + }); + + } catch (error) { + console.error('AWS SES deleteTemplate error:', error); + throw this.createError(error, { templateName }); + } + } + + /** + * List all templates in AWS SES + * @param {Object} options - List options + * @returns {Promise} Templates list + */ + async listTemplates(options = {}) { + try { + const params = { + MaxItems: options.maxItems || 100, + NextToken: options.nextToken + }; + + // Remove undefined values + Object.keys(params).forEach(key => { + if (params[key] === undefined) { + delete params[key]; + } + }); + + const result = await this.ses.listTemplates(params).promise(); + + const templates = result.TemplatesMetadata.map(template => ({ + name: template.Name, + created_at: template.CreatedAt + })); + + return this.createSuccess({ + templates, + nextToken: result.NextToken, + count: templates.length + }); + + } catch (error) { + console.error('AWS SES listTemplates error:', error); + throw this.createError(error); + } + } + + /** + * Get sending statistics from AWS SES + * @returns {Promise} Sending statistics + */ + async getSendStatistics() { + try { + const result = await this.ses.getSendStatistics().promise(); + + const stats = result.SendDataPoints.map(point => ({ + timestamp: point.Timestamp, + deliveryAttempts: point.DeliveryAttempts, + bounces: point.Bounces, + complaints: point.Complaints, + rejects: point.Rejects + })); + + return this.createSuccess({ + statistics: stats, + count: stats.length + }); + + } catch (error) { + console.error('AWS SES getSendStatistics error:', error); + throw this.createError(error); + } + } + + /** + * Check if error is a rate limit error + * @param {Error} error - Error to check + * @returns {boolean} Whether error is rate limit related + */ + isRateLimitError(error) { + return error.code === 'ThrottlingException' || + error.code === 'TooManyRequestsException' || + error.code === 'SendingPausedException'; + } + + /** + * Extract retry-after time from rate limit error + * @param {Error} error - Rate limit error + * @returns {number} Retry after time in seconds + */ + getRetryAfter(error) { + // AWS SES rate limits typically reset within seconds to minutes + if (error.message && error.message.includes('maximum send rate')) { + return 60; // 1 minute for send rate limits + } + + if (error.code === 'ThrottlingException') { + return 30; // 30 seconds for general throttling + } + + return 60; // Default 60 seconds + } + + /** + * Test AWS SES connection + * @returns {Promise} Test result + */ + async testConnection() { + try { + // Try to get send quota as a connection test + const result = await this.ses.getSendQuota().promise(); + + return { + success: true, + message: 'AWS SES connection successful', + data: { + max24HourSend: result.Max24HourSend, + maxSendRate: result.MaxSendRate, + sentLast24Hours: result.SentLast24Hours + }, + timestamp: new Date().toISOString() + }; + } catch (error) { + return { + success: false, + error: error.message, + code: error.code, + timestamp: new Date().toISOString() + }; + } + } + + /** + * Get provider-specific statistics + * @returns {Object} Provider statistics + */ + getStats() { + return { + ...super.getStats(), + region: this.region, + service: 'AWS SES' + }; + } + + /** + * Verify email address or domain + * @param {string} email - Email address to verify + * @returns {Promise} Verification result + */ + async verifyEmail(email) { + try { + const params = { + EmailAddress: email + }; + + const result = await this.ses.verifyEmailIdentity(params).promise(); + + return this.createSuccess({ + email, + verificationStatus: 'pending', + requestId: result.ResponseMetadata.RequestId + }); + + } catch (error) { + console.error('AWS SES verifyEmail error:', error); + throw this.createError(error, { email }); + } + } + + /** + * Get verification status for email or domain + * @param {string} identity - Email address or domain + * @returns {Promise} Verification status + */ + async getVerificationStatus(identity) { + try { + const params = { + Identities: [identity] + }; + + const result = await this.ses.getIdentityVerificationAttributes(params).promise(); + + const verificationAttributes = result.VerificationAttributes[identity]; + + return this.createSuccess({ + identity, + verificationStatus: verificationAttributes.VerificationStatus, + verificationToken: verificationAttributes.VerificationToken, + verificationAttributes: { + dkimEnabled: verificationAttributes.DkimEnabled, + dkimVerificationStatus: verificationAttributes.DkimVerificationStatus, + dkimTokens: verificationAttributes.DkimTokens, + mailFromDomain: verificationAttributes.MailFromDomain, + mailFromDomainStatus: verificationAttributes.MailFromDomainStatus + } + }); + + } catch (error) { + console.error('AWS SES getVerificationStatus error:', error); + throw this.createError(error, { identity }); + } + } +} + +module.exports = SESProvider; diff --git a/services/emailProviders/SendGridProvider.js b/services/emailProviders/SendGridProvider.js new file mode 100644 index 0000000..ea753f6 --- /dev/null +++ b/services/emailProviders/SendGridProvider.js @@ -0,0 +1,629 @@ +const axios = require('axios'); +const BaseEmailProvider = require('./BaseEmailProvider'); + +/** + * SendGrid Email Provider + * Implements email sending using SendGrid API + */ +class SendGridProvider extends BaseEmailProvider { + constructor(config = {}) { + super(config); + + this.apiKey = config.apiKey || process.env.SENDGRID_API_KEY; + this.baseUrl = config.baseUrl || 'https://api.sendgrid.com/v3'; + this.version = config.version || 'v3'; + + if (!this.apiKey) { + throw new Error('SendGrid API key is required'); + } + + this.initializeAxios(); + } + + /** + * Initialize Axios client with SendGrid configuration + */ + initializeAxios() { + this.client = axios.create({ + baseURL: this.baseUrl, + headers: { + 'Authorization': `Bearer ${this.apiKey}`, + 'Content-Type': 'application/json' + }, + timeout: 30000 // 30 seconds timeout + }); + + // Add response interceptor for error handling + this.client.interceptors.response.use( + (response) => response, + (error) => { + if (error.response) { + // The request was made and the server responded with a status code + // that falls out of the range of 2xx + error.code = error.response.status; + error.message = error.response.data?.message || error.message; + } else if (error.request) { + // The request was made but no response was received + error.code = 'NETWORK_ERROR'; + error.message = 'Network error occurred'; + } + + throw error; + } + ); + + console.log('SendGrid provider initialized'); + } + + /** + * Send email using SendGrid dynamic template + * @param {Object} emailData - Email data + * @returns {Promise} Send result + */ + async sendEmail(emailData) { + try { + const normalized = this.normalizeEmailData(emailData); + + const payload = { + personalizations: [{ + to: [{ email: normalized.to }], + dynamic_template_data: normalized.templateData + }], + from: { email: normalized.from }, + template_id: normalized.templateId + }; + + // Add optional fields + if (normalized.options.replyTo) { + payload.reply_to = { email: normalized.options.replyTo }; + } + + if (normalized.options.cc) { + payload.personalizations[0].cc = normalized.options.cc.map(email => ({ email })); + } + + if (normalized.options.bcc) { + payload.personalizations[0].bcc = normalized.options.bcc.map(email => ({ email })); + } + + if (normalized.options.categories) { + payload.categories = normalized.options.categories; + } + + if (normalized.options.customArgs) { + payload.custom_args = normalized.options.customArgs; + } + + if (normalized.options.sendAt) { + payload.send_at = normalized.options.sendAt; + } + + const response = await this.client.post('/mail/send', payload); + + return this.createSuccess({ + messageId: response.headers['x-message-id'], + templateId: normalized.templateId, + recipient: normalized.to, + requestId: response.headers['x-request-id'] + }); + + } catch (error) { + console.error('SendGrid sendEmail error:', error); + throw this.createError(error, { + templateId: emailData.templateId, + recipient: emailData.to + }); + } + } + + /** + * Send simple text/HTML email using SendGrid + * @param {Object} emailData - Email data + * @returns {Promise} Send result + */ + async sendSimpleEmail(emailData) { + try { + const normalized = this.normalizeEmailData(emailData); + + const content = []; + + if (normalized.text) { + content.push({ + type: 'text/plain', + value: normalized.text + }); + } + + if (normalized.html) { + content.push({ + type: 'text/html', + value: normalized.html + }); + } + + if (content.length === 0) { + throw new Error('Either text or HTML content is required'); + } + + const payload = { + personalizations: [{ + to: [{ email: normalized.to }] + }], + from: { email: normalized.from }, + subject: normalized.subject, + content + }; + + // Add optional fields + if (normalized.options.replyTo) { + payload.reply_to = { email: normalized.options.replyTo }; + } + + if (normalized.options.cc) { + payload.personalizations[0].cc = normalized.options.cc.map(email => ({ email })); + } + + if (normalized.options.bcc) { + payload.personalizations[0].bcc = normalized.options.bcc.map(email => ({ email })); + } + + if (normalized.options.categories) { + payload.categories = normalized.options.categories; + } + + if (normalized.options.customArgs) { + payload.custom_args = normalized.options.customArgs; + } + + if (normalized.options.sendAt) { + payload.send_at = normalized.options.sendAt; + } + + const response = await this.client.post('/mail/send', payload); + + return this.createSuccess({ + messageId: response.headers['x-message-id'], + recipient: normalized.to, + subject: normalized.subject, + requestId: response.headers['x-request-id'] + }); + + } catch (error) { + console.error('SendGrid sendSimpleEmail error:', error); + throw this.createError(error, { + recipient: emailData.to, + subject: emailData.subject + }); + } + } + + /** + * Get template information from SendGrid + * @param {string} templateId - Template ID + * @returns {Promise} Template data + */ + async getTemplate(templateId) { + try { + const response = await this.client.get(`/templates/${templateId}`); + const template = response.data; + + return { + id: template.id, + name: template.name, + generated_at: template.generated_at, + versions: template.versions.map(version => ({ + id: version.id, + template_id: version.template_id, + active: version.active, + name: version.name, + subject: version.subject, + html_content: version.html_content, + plain_content: version.plain_content, + created_at: version.created_at, + updated_at: version.updated_at + })), + timestamp: new Date().toISOString() + }; + + } catch (error) { + console.error('SendGrid getTemplate error:', error); + throw this.createError(error, { templateId }); + } + } + + /** + * Create template in SendGrid + * @param {Object} templateData - Template data + * @returns {Promise} Template result + */ + async createTemplate(templateData) { + try { + const payload = { + name: templateData.name, + generation: 'dynamic' + }; + + const response = await this.client.post('/templates', payload); + const template = response.data; + + // Create the first version + if (templateData.subject || templateData.html || templateData.text) { + await this.createTemplateVersion(template.id, { + name: 'Version 1', + subject: templateData.subject, + html: templateData.html, + text: templateData.text, + active: true + }); + } + + return this.createSuccess({ + templateId: template.id, + name: template.name, + generated_at: template.generated_at + }); + + } catch (error) { + console.error('SendGrid createTemplate error:', error); + throw this.createError(error, { templateName: templateData.name }); + } + } + + /** + * Create template version in SendGrid + * @param {string} templateId - Template ID + * @param {Object} versionData - Version data + * @returns {Promise} Version result + */ + async createTemplateVersion(templateId, versionData) { + try { + const payload = { + template_id: templateId, + name: versionData.name, + subject: versionData.subject, + html_content: versionData.html, + plain_content: versionData.text, + active: versionData.active || false + }; + + const response = await this.client.post(`/templates/${templateId}/versions`, payload); + const version = response.data; + + return this.createSuccess({ + versionId: version.id, + templateId: templateId, + name: version.name, + active: version.active, + created_at: version.created_at + }); + + } catch (error) { + console.error('SendGrid createTemplateVersion error:', error); + throw this.createError(error, { templateId }); + } + } + + /** + * Update template version in SendGrid + * @param {string} templateId - Template ID + * @param {string} versionId - Version ID + * @param {Object} versionData - Version data + * @returns {Promise} Update result + */ + async updateTemplateVersion(templateId, versionId, versionData) { + try { + const payload = { + name: versionData.name, + subject: versionData.subject, + html_content: versionData.html, + plain_content: versionData.text, + active: versionData.active || false + }; + + const response = await this.client.patch(`/templates/${templateId}/versions/${versionId}`, payload); + const version = response.data; + + return this.createSuccess({ + versionId: version.id, + templateId: templateId, + name: version.name, + active: version.active, + updated_at: version.updated_at + }); + + } catch (error) { + console.error('SendGrid updateTemplateVersion error:', error); + throw this.createError(error, { templateId, versionId }); + } + } + + /** + * Delete template from SendGrid + * @param {string} templateId - Template ID + * @returns {Promise} Delete result + */ + async deleteTemplate(templateId) { + try { + await this.client.delete(`/templates/${templateId}`); + + return this.createSuccess({ + templateId, + deleted_at: new Date().toISOString() + }); + + } catch (error) { + console.error('SendGrid deleteTemplate error:', error); + throw this.createError(error, { templateId }); + } + } + + /** + * List all templates in SendGrid + * @param {Object} options - List options + * @returns {Promise} Templates list + */ + async listTemplates(options = {}) { + try { + const params = { + limit: options.limit || 100, + offset: options.offset || 0 + }; + + // Remove undefined values + Object.keys(params).forEach(key => { + if (params[key] === undefined) { + delete params[key]; + } + }); + + const response = await this.client.get('/templates', { params }); + const templates = response.data; + + return this.createSuccess({ + templates: templates.result.map(template => ({ + id: template.id, + name: template.name, + generated_at: template.generated_at, + versions: template.versions + })), + count: templates.result.length, + timestamp: new Date().toISOString() + }); + + } catch (error) { + console.error('SendGrid listTemplates error:', error); + throw this.createError(error); + } + } + + /** + * Get sending statistics from SendGrid + * @param {Object} options - Statistics options + * @returns {Promise} Sending statistics + */ + async getSendStatistics(options = {}) { + try { + const params = { + start_date: options.startDate || new Date(Date.now() - 30 * 24 * 60 * 60 * 1000).toISOString(), + end_date: options.endDate || new Date().toISOString(), + aggregated_by: options.aggregatedBy || 'day' + }; + + const response = await this.client.get('/stats', { params }); + const stats = response.data; + + return this.createSuccess({ + statistics: stats, + count: stats.length, + timestamp: new Date().toISOString() + }); + + } catch (error) { + console.error('SendGrid getSendStatistics error:', error); + throw this.createError(error); + } + } + + /** + * Check if error is a rate limit error + * @param {Error} error - Error to check + * @returns {boolean} Whether error is rate limit related + */ + isRateLimitError(error) { + return error.code === 429 || + error.code === 'Too Many Requests' || + (error.response && error.response.status === 429); + } + + /** + * Extract retry-after time from rate limit error + * @param {Error} error - Rate limit error + * @returns {number} Retry after time in seconds + */ + getRetryAfter(error) { + // Check for Retry-After header + if (error.response && error.response.headers['retry-after']) { + return parseInt(error.response.headers['retry-after'], 10); + } + + // Check SendGrid rate limit response + if (error.response && error.response.data) { + const data = error.response.data; + if (data.errors && data.errors.length > 0) { + const rateLimitError = data.errors.find(err => + err.message && err.message.includes('rate limit') + ); + if (rateLimitError) { + return 60; // Default 60 seconds for SendGrid rate limits + } + } + } + + return 60; // Default 60 seconds + } + + /** + * Test SendGrid connection + * @returns {Promise} Test result + */ + async testConnection() { + try { + // Try to get API key info as a connection test + const response = await this.client.get('/user/account'); + const account = response.data; + + return { + success: true, + message: 'SendGrid connection successful', + data: { + username: account.username, + email: account.email, + reputation: account.reputation, + plan: account.plan + }, + timestamp: new Date().toISOString() + }; + } catch (error) { + return { + success: false, + error: error.message, + code: error.code, + timestamp: new Date().toISOString() + }; + } + } + + /** + * Get provider-specific statistics + * @returns {Object} Provider statistics + */ + getStats() { + return { + ...super.getStats(), + service: 'SendGrid', + baseUrl: this.baseUrl, + version: this.version + }; + } + + /** + * Validate email address using SendGrid API + * @param {string} email - Email address to validate + * @returns {Promise} Validation result + */ + async validateEmail(email) { + try { + const response = await this.client.post('/validations/email', { + email: email, + source: 'signup' + }); + + const validation = response.data; + + return this.createSuccess({ + email, + verdict: validation.verdict, + score: validation.score, + checks: validation.checks, + ip_address: validation.ip_address, + suggested_correction: validation.suggested_correction + }); + + } catch (error) { + console.error('SendGrid validateEmail error:', error); + throw this.createError(error, { email }); + } + } + + /** + * Get email activity from SendGrid + * @param {Object} options - Activity options + * @returns {Promise} Email activity + */ + async getEmailActivity(options = {}) { + try { + const params = { + limit: options.limit || 100, + offset: options.offset || 0, + query: options.query + }; + + // Remove undefined values + Object.keys(params).forEach(key => { + if (params[key] === undefined) { + delete params[key]; + } + }); + + const response = await this.client.get('/messages', { params }); + const messages = response.data; + + return this.createSuccess({ + messages: messages.messages, + count: messages.messages.length, + timestamp: new Date().toISOString() + }); + + } catch (error) { + console.error('SendGrid getEmailActivity error:', error); + throw this.createError(error); + } + } + + /** + * Suppress email address + * @param {string} email - Email address to suppress + * @param {Object} options - Suppression options + * @returns {Promise} Suppression result + */ + async suppressEmail(email, options = {}) { + try { + const payload = { + recipient_emails: [email] + }; + + if (options.groupIds) { + payload.group_ids = options.groupIds; + } + + const response = await this.client.post('/asm/suppressions', payload); + + return this.createSuccess({ + email, + suppressed: true, + requestId: response.headers['x-request-id'], + timestamp: new Date().toISOString() + }); + + } catch (error) { + console.error('SendGrid suppressEmail error:', error); + throw this.createError(error, { email }); + } + } + + /** + * Unsuppress email address + * @param {string} email - Email address to unsuppress + * @param {number} groupId - Group ID + * @returns {Promise} Unsuppression result + */ + async unsuppressEmail(email, groupId) { + try { + const response = await this.client.delete(`/asm/groups/${groupId}/suppressions/${email}`); + + return this.createSuccess({ + email, + groupId, + unsuppressed: true, + requestId: response.headers['x-request-id'], + timestamp: new Date().toISOString() + }); + + } catch (error) { + console.error('SendGrid unsuppressEmail error:', error); + throw this.createError(error, { email, groupId }); + } + } +} + +module.exports = SendGridProvider; diff --git a/services/emailQueue.js b/services/emailQueue.js new file mode 100644 index 0000000..5398379 --- /dev/null +++ b/services/emailQueue.js @@ -0,0 +1,617 @@ +const { Queue, Worker } = require('bullmq'); +const Redis = require('ioredis'); + +/** + * Email Queue Service using BullMQ + * Handles asynchronous email processing with retry logic and rate limiting + */ +class EmailQueueService { + constructor(config = {}) { + this.config = config; + this.redisConfig = config.redis || { + host: process.env.REDIS_HOST || 'localhost', + port: parseInt(process.env.REDIS_PORT) || 6379, + password: process.env.REDIS_PASSWORD, + db: parseInt(process.env.REDIS_DB) || 0, + maxRetriesPerRequest: 3, + retryDelayOnFailover: 100, + lazyConnect: true + }; + + this.queueName = config.queueName || 'email-queue'; + this.defaultJobOptions = { + removeOnComplete: 100, // Keep last 100 completed jobs + removeOnFail: 50, // Keep last 50 failed jobs + attempts: 3, // Default retry attempts + backoff: { + type: 'exponential', + delay: 2000 // Start with 2 seconds + }, + delay: 0 + }; + + this.queue = null; + this.worker = null; + this.emailProvider = null; + this.processors = new Map(); + + this.initialize(); + } + + /** + * Initialize Redis connection and queue + */ + initialize() { + try { + // Create Redis connection + this.redis = new Redis(this.redisConfig); + + // Create queue + this.queue = new Queue(this.queueName, { + connection: this.redis, + defaultJobOptions: this.defaultJobOptions + }); + + console.log(`Email queue initialized: ${this.queueName}`); + + // Set up error handlers + this.queue.on('error', (error) => { + console.error('Email queue error:', error); + }); + + this.queue.on('waiting', (job) => { + console.log(`Email job waiting: ${job.id}`); + }); + + this.queue.on('active', (job) => { + console.log(`Email job active: ${job.id}`); + }); + + this.queue.on('completed', (job) => { + console.log(`Email job completed: ${job.id}`); + }); + + this.queue.on('failed', (job, error) => { + console.error(`Email job failed: ${job.id}`, error); + }); + + this.queue.on('stalled', (job) => { + console.warn(`Email job stalled: ${job.id}`); + }); + + } catch (error) { + console.error('Failed to initialize email queue:', error); + throw error; + } + } + + /** + * Set email provider for processing + * @param {BaseEmailProvider} provider - Email provider instance + */ + setEmailProvider(provider) { + this.emailProvider = provider; + + if (this.worker) { + this.worker.close(); + } + + this.setupWorker(); + } + + /** + * Setup worker to process email jobs + */ + setupWorker() { + if (!this.emailProvider) { + throw new Error('Email provider must be set before setting up worker'); + } + + try { + this.worker = new Worker( + this.queueName, + async (job) => { + return this.processEmailJob(job); + }, + { + connection: this.redis, + concurrency: this.config.concurrency || 5, + limiter: { + max: this.config.rateLimitMax || 100, + duration: this.config.rateLimitDuration || 60000, // 1 minute + } + } + ); + + // Set up worker error handlers + this.worker.on('error', (error) => { + console.error('Email worker error:', error); + }); + + this.worker.on('completed', (job) => { + console.log(`Email worker completed job: ${job.id}`); + }); + + this.worker.on('failed', (job, error) => { + console.error(`Email worker failed job: ${job.id}`, error); + }); + + console.log('Email worker setup completed'); + + } catch (error) { + console.error('Failed to setup email worker:', error); + throw error; + } + } + + /** + * Process individual email job + * @param {Object} job - BullMQ job object + * @returns {Promise} Processing result + */ + async processEmailJob(job) { + const { type, data } = job.data; + + try { + switch (type) { + case 'sendEmail': + return await this.processSendEmail(job); + case 'sendSimpleEmail': + return await this.processSendSimpleEmail(job); + case 'sendBulkEmail': + return await this.processSendBulkEmail(job); + default: + throw new Error(`Unknown job type: ${type}`); + } + } catch (error) { + console.error(`Failed to process email job ${job.id}:`, error); + + // Check if this is a rate limit error + if (this.emailProvider.isRateLimitError(error)) { + const retryAfter = this.emailProvider.getRetryAfter(error); + console.log(`Rate limit detected, retrying after ${retryAfter} seconds`); + + // Update job options for rate limit retry + job.opts.backoff = { + type: 'fixed', + delay: retryAfter * 1000 // Convert to milliseconds + }; + + throw error; // Re-throw to trigger retry + } + + throw error; + } + } + + /** + * Process sendEmail job + * @param {Object} job - BullMQ job object + * @returns {Promise} Send result + */ + async processSendEmail(job) { + const { to, from, subject, templateId, templateData, options } = job.data.data; + + const result = await this.emailProvider.sendEmail({ + to, + from, + subject, + templateId, + templateData, + options + }); + + return { + success: true, + messageId: result.messageId, + provider: result.provider, + processedAt: new Date().toISOString() + }; + } + + /** + * Process sendSimpleEmail job + * @param {Object} job - BullMQ job object + * @returns {Promise} Send result + */ + async processSendSimpleEmail(job) { + const { to, from, subject, text, html, options } = job.data.data; + + const result = await this.emailProvider.sendSimpleEmail({ + to, + from, + subject, + text, + html, + options + }); + + return { + success: true, + messageId: result.messageId, + provider: result.provider, + processedAt: new Date().toISOString() + }; + } + + /** + * Process sendBulkEmail job + * @param {Object} job - BullMQ job object + * @returns {Promise} Bulk send result + */ + async processSendBulkEmail(job) { + const { recipients, from, subject, templateId, templateData, options } = job.data.data; + + const results = []; + const errors = []; + + for (const recipient of recipients) { + try { + const result = await this.emailProvider.sendEmail({ + to: recipient.email, + from, + subject, + templateId, + templateData: { + ...templateData, + ...recipient.templateData + }, + options + }); + + results.push({ + email: recipient.email, + success: true, + messageId: result.messageId + }); + + } catch (error) { + errors.push({ + email: recipient.email, + error: error.message, + success: false + }); + } + } + + return { + success: true, + results, + errors, + totalRecipients: recipients.length, + successfulSends: results.length, + failedSends: errors.length, + processedAt: new Date().toISOString() + }; + } + + /** + * Add email job to queue + * @param {Object} emailData - Email data + * @param {Object} options - Job options + * @returns {Promise} Job result + */ + async addEmailJob(emailData, options = {}) { + try { + const jobData = { + type: 'sendEmail', + data: emailData + }; + + const jobOptions = { + ...this.defaultJobOptions, + ...options, + // Custom retry logic for rate limits + backoff: { + type: 'exponential', + delay: options.initialDelay || 2000 + } + }; + + const job = await this.queue.add('send-email', jobData, jobOptions); + + return { + success: true, + jobId: job.id, + queue: this.queueName, + addedAt: new Date().toISOString() + }; + + } catch (error) { + console.error('Failed to add email job:', error); + throw error; + } + } + + /** + * Add simple email job to queue + * @param {Object} emailData - Email data + * @param {Object} options - Job options + * @returns {Promise} Job result + */ + async addSimpleEmailJob(emailData, options = {}) { + try { + const jobData = { + type: 'sendSimpleEmail', + data: emailData + }; + + const jobOptions = { + ...this.defaultJobOptions, + ...options + }; + + const job = await this.queue.add('send-simple-email', jobData, jobOptions); + + return { + success: true, + jobId: job.id, + queue: this.queueName, + addedAt: new Date().toISOString() + }; + + } catch (error) { + console.error('Failed to add simple email job:', error); + throw error; + } + } + + /** + * Add bulk email job to queue + * @param {Object} bulkData - Bulk email data + * @param {Object} options - Job options + * @returns {Promise} Job result + */ + async addBulkEmailJob(bulkData, options = {}) { + try { + const jobData = { + type: 'sendBulkEmail', + data: bulkData + }; + + const jobOptions = { + ...this.defaultJobOptions, + ...options, + // Bulk jobs may need more time and retries + attempts: options.attempts || 5, + backoff: { + type: 'exponential', + delay: options.initialDelay || 5000 + } + }; + + const job = await this.queue.add('send-bulk-email', jobData, jobOptions); + + return { + success: true, + jobId: job.id, + queue: this.queueName, + addedAt: new Date().toISOString() + }; + + } catch (error) { + console.error('Failed to add bulk email job:', error); + throw error; + } + } + + /** + * Get job status + * @param {string} jobId - Job ID + * @returns {Promise} Job status + */ + async getJobStatus(jobId) { + try { + const job = await this.queue.getJob(jobId); + + if (!job) { + return { + success: false, + error: 'Job not found' + }; + } + + const state = await job.getState(); + const progress = job.progress; + + return { + success: true, + jobId: job.id, + state, + progress, + data: job.data, + opts: job.opts, + createdAt: new Date(job.timestamp).toISOString(), + processedOn: job.processedOn ? new Date(job.processedOn).toISOString() : null, + finishedOn: job.finishedOn ? new Date(job.finishedOn).toISOString() : null, + failedReason: job.failedReason, + returnvalue: job.returnvalue + }; + + } catch (error) { + console.error('Failed to get job status:', error); + throw error; + } + } + + /** + * Get queue statistics + * @returns {Promise} Queue statistics + */ + async getQueueStats() { + try { + const [waiting, active, completed, failed, delayed] = await Promise.all([ + this.queue.getWaiting(), + this.queue.getActive(), + this.queue.getCompleted(), + this.queue.getFailed(), + this.queue.getDelayed() + ]); + + return { + success: true, + queueName: this.queueName, + stats: { + waiting: waiting.length, + active: active.length, + completed: completed.length, + failed: failed.length, + delayed: delayed.length, + total: waiting.length + active.length + completed.length + failed.length + delayed.length + }, + timestamp: new Date().toISOString() + }; + + } catch (error) { + console.error('Failed to get queue stats:', error); + throw error; + } + } + + /** + * Get recent jobs + * @param {Object} options - Query options + * @returns {Promise} Recent jobs + */ + async getRecentJobs(options = {}) { + try { + const { state = 'completed', start = 0, end = 50 } = options; + + const jobs = await this.queue.getJobs([state], start, end); + + return { + success: true, + state, + jobs: jobs.map(job => ({ + id: job.id, + state: await job.getState(), + progress: job.progress, + data: job.data, + createdAt: new Date(job.timestamp).toISOString(), + processedOn: job.processedOn ? new Date(job.processedOn).toISOString() : null, + finishedOn: job.finishedOn ? new Date(job.finishedOn).toISOString() : null, + failedReason: job.failedReason + })), + count: jobs.length, + timestamp: new Date().toISOString() + }; + + } catch (error) { + console.error('Failed to get recent jobs:', error); + throw error; + } + } + + /** + * Pause queue processing + * @returns {Promise} Pause result + */ + async pauseQueue() { + try { + await this.queue.pause(); + + return { + success: true, + queueName: this.queueName, + pausedAt: new Date().toISOString() + }; + + } catch (error) { + console.error('Failed to pause queue:', error); + throw error; + } + } + + /** + * Resume queue processing + * @returns {Promise} Resume result + */ + async resumeQueue() { + try { + await this.queue.resume(); + + return { + success: true, + queueName: this.queueName, + resumedAt: new Date().toISOString() + }; + + } catch (error) { + console.error('Failed to resume queue:', error); + throw error; + } + } + + /** + * Clear queue + * @param {Object} options - Clear options + * @returns {Promise} Clear result + */ + async clearQueue(options = {}) { + try { + const { state = 'waiting' } = options; + + await this.queue.clean(0, 0, state); + + return { + success: true, + queueName: this.queueName, + clearedState: state, + clearedAt: new Date().toISOString() + }; + + } catch (error) { + console.error('Failed to clear queue:', error); + throw error; + } + } + + /** + * Close queue and worker + */ + async close() { + try { + if (this.worker) { + await this.worker.close(); + this.worker = null; + } + + if (this.queue) { + await this.queue.close(); + this.queue = null; + } + + if (this.redis) { + await this.redis.quit(); + this.redis = null; + } + + console.log('Email queue service closed'); + + } catch (error) { + console.error('Failed to close email queue service:', error); + throw error; + } + } + + /** + * Get service health status + * @returns {Object} Health status + */ + getHealthStatus() { + return { + queueName: this.queueName, + provider: this.emailProvider ? this.emailProvider.name : 'none', + redisConnected: this.redis ? this.redis.status === 'ready' : false, + workerActive: this.worker ? true : false, + config: { + concurrency: this.config.concurrency || 5, + rateLimitMax: this.config.rateLimitMax || 100, + rateLimitDuration: this.config.rateLimitDuration || 60000 + }, + timestamp: new Date().toISOString() + }; + } +} + +module.exports = EmailQueueService; diff --git a/services/notificationService.js b/services/notificationService.js new file mode 100644 index 0000000..2ca2a06 --- /dev/null +++ b/services/notificationService.js @@ -0,0 +1,518 @@ +const EmailQueueService = require('./emailQueue'); +const SESProvider = require('./emailProviders/SESProvider'); +const SendGridProvider = require('./emailProviders/SendGridProvider'); + +/** + * Unified Notification Service + * Manages email notifications with provider abstraction and queueing + */ +class NotificationService { + constructor(config = {}) { + this.config = config; + this.defaultProvider = config.defaultProvider || process.env.DEFAULT_EMAIL_PROVIDER || 'ses'; + this.providers = new Map(); + this.queueService = null; + this.templateMappings = new Map(); + + this.initialize(); + } + + /** + * Initialize notification service + */ + initialize() { + try { + // Initialize email queue + this.queueService = new EmailQueueService({ + queueName: config.queueName || 'notification-queue', + redis: config.redis, + concurrency: config.concurrency || 5, + rateLimitMax: config.rateLimitMax || 100, + rateLimitDuration: config.rateLimitDuration || 60000 + }); + + // Initialize providers + this.initializeProviders(); + + // Set default provider for queue + const defaultProvider = this.providers.get(this.defaultProvider); + if (defaultProvider) { + this.queueService.setEmailProvider(defaultProvider); + } + + console.log(`Notification service initialized with provider: ${this.defaultProvider}`); + + } catch (error) { + console.error('Failed to initialize notification service:', error); + throw error; + } + } + + /** + * Initialize email providers + */ + initializeProviders() { + try { + // Initialize AWS SES provider + if (this.config.ses || process.env.AWS_ACCESS_KEY_ID) { + const sesProvider = new SESProvider(this.config.ses || {}); + this.providers.set('ses', sesProvider); + console.log('AWS SES provider initialized'); + } + + // Initialize SendGrid provider + if (this.config.sendgrid || process.env.SENDGRID_API_KEY) { + const sendGridProvider = new SendGridProvider(this.config.sendgrid || {}); + this.providers.set('sendgrid', sendGridProvider); + console.log('SendGrid provider initialized'); + } + + if (this.providers.size === 0) { + throw new Error('No email providers configured'); + } + + } catch (error) { + console.error('Failed to initialize email providers:', error); + throw error; + } + } + + /** + * Send email using template + * @param {Object} emailData - Email data + * @param {Object} options - Send options + * @returns {Promise} Send result + */ + async sendEmail(emailData, options = {}) { + try { + const provider = options.provider || this.defaultProvider; + const emailProvider = this.providers.get(provider); + + if (!emailProvider) { + throw new Error(`Email provider not found: ${provider}`); + } + + // Process template variables + const processedData = this.processTemplateVariables(emailData); + + // Add to queue for async processing + const jobResult = await this.queueService.addEmailJob(processedData, { + provider, + attempts: options.attempts || 3, + delay: options.delay || 0, + priority: options.priority || 0 + }); + + return { + success: true, + jobId: jobResult.jobId, + queue: jobResult.queue, + provider, + addedAt: jobResult.addedAt, + message: 'Email queued for processing' + }; + + } catch (error) { + console.error('Failed to send email:', error); + throw error; + } + } + + /** + * Send simple text/HTML email + * @param {Object} emailData - Email data + * @param {Object} options - Send options + * @returns {Promise} Send result + */ + async sendSimpleEmail(emailData, options = {}) { + try { + const provider = options.provider || this.defaultProvider; + const emailProvider = this.providers.get(provider); + + if (!emailProvider) { + throw new Error(`Email provider not found: ${provider}`); + } + + // Add to queue for async processing + const jobResult = await this.queueService.addSimpleEmailJob(emailData, { + provider, + attempts: options.attempts || 3, + delay: options.delay || 0, + priority: options.priority || 0 + }); + + return { + success: true, + jobId: jobResult.jobId, + queue: jobResult.queue, + provider, + addedAt: jobResult.addedAt, + message: 'Simple email queued for processing' + }; + + } catch (error) { + console.error('Failed to send simple email:', error); + throw error; + } + } + + /** + * Send bulk email to multiple recipients + * @param {Object} bulkData - Bulk email data + * @param {Object} options - Send options + * @returns {Promise} Send result + */ + async sendBulkEmail(bulkData, options = {}) { + try { + const provider = options.provider || this.defaultProvider; + const emailProvider = this.providers.get(provider); + + if (!emailProvider) { + throw new Error(`Email provider not found: ${provider}`); + } + + // Process recipients with template variables + const processedRecipients = bulkData.recipients.map(recipient => ({ + email: recipient.email, + templateData: this.processTemplateVariables({ + templateData: recipient.templateData || {} + }).templateData + })); + + // Add to queue for async processing + const jobResult = await this.queueService.addBulkEmailJob({ + recipients: processedRecipients, + from: bulkData.from, + subject: bulkData.subject, + templateId: bulkData.templateId, + templateData: this.processTemplateVariables(bulkData).templateData, + options: bulkData.options + }, { + provider, + attempts: options.attempts || 5, + delay: options.delay || 0, + priority: options.priority || 0 + }); + + return { + success: true, + jobId: jobResult.jobId, + queue: jobResult.queue, + provider, + totalRecipients: bulkData.recipients.length, + addedAt: jobResult.addedAt, + message: 'Bulk email queued for processing' + }; + + } catch (error) { + console.error('Failed to send bulk email:', error); + throw error; + } + } + + /** + * Process template variables and apply mappings + * @param {Object} emailData - Email data with template variables + * @returns {Object} Processed email data + */ + processTemplateVariables(emailData) { + const processed = { ...emailData }; + + if (emailData.templateData) { + processed.templateData = this.applyTemplateMappings(emailData.templateData); + } + + // Apply default template mappings + if (processed.templateId && this.templateMappings.has(processed.templateId)) { + const mapping = this.templateMappings.get(processed.templateId); + processed.templateData = { + ...mapping.defaultVariables, + ...processed.templateData + }; + } + + return processed; + } + + /** + * Apply template variable mappings + * @param {Object} variables - Template variables + * @returns {Object} Mapped variables + */ + applyTemplateMappings(variables) { + const mapped = { ...variables }; + + // Apply global mappings + if (this.config.globalTemplateMappings) { + Object.keys(this.config.globalTemplateMappings).forEach(key => { + if (mapped[key] === undefined) { + mapped[key] = this.config.globalTemplateMappings[key]; + } + }); + } + + // Apply function-based mappings + Object.keys(mapped).forEach(key => { + const value = mapped[key]; + if (typeof value === 'function') { + mapped[key] = value(); + } else if (typeof value === 'object' && value !== null) { + mapped[key] = this.applyTemplateMappings(value); + } + }); + + return mapped; + } + + /** + * Add template mapping + * @param {string} templateId - Template ID + * @param {Object} mapping - Template mapping + */ + addTemplateMapping(templateId, mapping) { + this.templateMappings.set(templateId, mapping); + } + + /** + * Remove template mapping + * @param {string} templateId - Template ID + */ + removeTemplateMapping(templateId) { + this.templateMappings.delete(templateId); + } + + /** + * Get template mapping + * @param {string} templateId - Template ID + * @returns {Object} Template mapping + */ + getTemplateMapping(templateId) { + return this.templateMappings.get(templateId); + } + + /** + * Switch email provider + * @param {string} provider - Provider name + * @returns {Object} Switch result + */ + switchProvider(provider) { + try { + const emailProvider = this.providers.get(provider); + + if (!emailProvider) { + throw new Error(`Email provider not found: ${provider}`); + } + + this.defaultProvider = provider; + this.queueService.setEmailProvider(emailProvider); + + return { + success: true, + provider, + switchedAt: new Date().toISOString() + }; + + } catch (error) { + console.error('Failed to switch provider:', error); + throw error; + } + } + + /** + * Get job status + * @param {string} jobId - Job ID + * @returns {Promise} Job status + */ + async getJobStatus(jobId) { + return await this.queueService.getJobStatus(jobId); + } + + /** + * Get queue statistics + * @returns {Promise} Queue statistics + */ + async getQueueStats() { + return await this.queueService.getQueueStats(); + } + + /** + * Get recent jobs + * @param {Object} options - Query options + * @returns {Promise} Recent jobs + */ + async getRecentJobs(options = {}) { + return await this.queueService.getRecentJobs(options); + } + + /** + * Get provider statistics + * @param {string} provider - Provider name (optional) + * @returns {Object} Provider statistics + */ + getProviderStats(provider = null) { + if (provider) { + const emailProvider = this.providers.get(provider); + if (!emailProvider) { + throw new Error(`Email provider not found: ${provider}`); + } + return emailProvider.getStats(); + } + + // Return stats for all providers + const stats = {}; + this.providers.forEach((emailProvider, name) => { + stats[name] = emailProvider.getStats(); + }); + + return stats; + } + + /** + * Test provider connection + * @param {string} provider - Provider name (optional) + * @returns {Promise} Test results + */ + async testProviderConnection(provider = null) { + const providersToTest = provider ? [provider] : Array.from(this.providers.keys()); + const results = {}; + + for (const providerName of providersToTest) { + const emailProvider = this.providers.get(providerName); + if (emailProvider) { + try { + results[providerName] = await emailProvider.testConnection(); + } catch (error) { + results[providerName] = { + success: false, + error: error.message, + timestamp: new Date().toISOString() + }; + } + } + } + + return results; + } + + /** + * Get available providers + * @returns {Array} Available provider names + */ + getAvailableProviders() { + return Array.from(this.providers.keys()); + } + + /** + * Get service health status + * @returns {Object} Health status + */ + getHealthStatus() { + const queueHealth = this.queueService.getHealthStatus(); + const providerStats = this.getProviderStats(); + + return { + status: queueHealth.provider !== 'none' ? 'healthy' : 'degraded', + defaultProvider: this.defaultProvider, + availableProviders: this.getAvailableProviders(), + queue: queueHealth, + providers: providerStats, + templateMappings: this.templateMappings.size, + timestamp: new Date().toISOString() + }; + } + + /** + * Create predefined email templates + */ + createPredefinedTemplates() { + // Welcome email template + this.addTemplateMapping('welcome', { + defaultVariables: { + appName: 'SubStream Protocol', + supportEmail: 'support@substream-protocol.com', + currentYear: new Date().getFullYear() + } + }); + + // Payment failure template + this.addTemplateMapping('payment_failure', { + defaultVariables: { + appName: 'SubStream Protocol', + supportEmail: 'support@substream-protocol.com', + billingUrl: 'https://app.substream-protocol.com/billing' + } + }); + + // Low balance warning template + this.addTemplateMapping('low_balance_warning', { + defaultVariables: { + appName: 'SubStream Protocol', + supportEmail: 'support@substream-protocol.com', + addFundsUrl: 'https://app.substream-protocol.com/wallet/add-funds' + } + }); + + // Subscription expired template + this.addTemplateMapping('subscription_expired', { + defaultVariables: { + appName: 'SubStream Protocol', + supportEmail: 'support@substream-protocol.com', + renewUrl: 'https://app.substream-protocol.com/subscriptions' + } + }); + + // Pre-billing health check template + this.addTemplateMapping('pre_billing_warning', { + defaultVariables: { + appName: 'SubStream Protocol', + supportEmail: 'support@substream-protocol.com', + warningDays: 3 + } + }); + } + + /** + * Send predefined template email + * @param {string} templateType - Template type + * @param {Object} emailData - Email data + * @param {Object} options - Send options + * @returns {Promise} Send result + */ + async sendTemplateEmail(templateType, emailData, options = {}) { + const templateMapping = this.getTemplateMapping(templateType); + + if (!templateMapping) { + throw new Error(`Template mapping not found: ${templateType}`); + } + + // Merge template variables + const mergedData = { + ...emailData, + templateData: { + ...templateMapping.defaultVariables, + ...emailData.templateData + } + }; + + return await this.sendEmail(mergedData, options); + } + + /** + * Close notification service + */ + async close() { + try { + if (this.queueService) { + await this.queueService.close(); + } + + console.log('Notification service closed'); + + } catch (error) { + console.error('Failed to close notification service:', error); + throw error; + } + } +} + +module.exports = NotificationService;