Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import { IdempotencyModule } from './common/modules/idempotency.module';
import { IdempotencyInterceptor } from './common/interceptors/idempotency.interceptor';
import { DeepLinkModule } from './deep-link/deep-link.module';
import { InvoicesModule } from './payments/invoices/invoices.module';
import { PaymentMethodsModule } from './payments/payment-methods/payment-methods.module';
import { ReportingModule } from './payments/reporting/reporting.module';
import { NotificationsModule } from './notifications/notifications.module';
import { HealthModule } from './health/health.module';
import { AuditLogModule } from './audit-log/audit-log.module';

Expand Down Expand Up @@ -63,6 +65,8 @@ const featureFlags = loadFeatureFlags();
IdempotencyModule,
DeepLinkModule,
InvoicesModule,
PaymentMethodsModule,
NotificationsModule,
ReportingModule,
HealthModule,
AuditLogModule,
Expand Down
13 changes: 13 additions & 0 deletions src/notifications/notifications.module.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { ScheduleModule } from '@nestjs/schedule';
import { Notification } from './entities/notification.entity';
import { NotificationsQueueService } from './notifications.queue';
import { NotificationsService } from './notifications.service';

@Module({
imports: [TypeOrmModule.forFeature([Notification]), ScheduleModule],
providers: [NotificationsQueueService, NotificationsService],
exports: [NotificationsService],
})
export class NotificationsModule {}
import { Module, OnModuleInit } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { Notification } from './entities/notification.entity';
Expand Down
20 changes: 12 additions & 8 deletions src/notifications/notifications.queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export class NotificationsQueueService {
/**
* Publish notification to SNS topic
*/
async publishToTopic(notification: Notification, options?: { bypassBatch?: boolean }): Promise<void> {
async publishToTopic(notification: Notification): Promise<void> {
if (!this.snsTopicArn || !this.queueUrl) {
this.logger.warn(
Expand All @@ -46,19 +47,22 @@ export class NotificationsQueueService {
return;
}
try {
const payload = {
id: notification.id,
userId: notification.userId,
title: notification.title,
content: notification.content,
type: notification.type,
metadata: notification.metadata,
bypassBatch: options?.bypassBatch ?? false,
};
const command = new PublishCommand({
TopicArn: this.snsTopicArn,
Message: JSON.stringify({
id: notification.id,
userId: notification.userId,
title: notification.title,
content: notification.content,
type: notification.type,
metadata: notification.metadata,
}),
Message: JSON.stringify(payload),
MessageAttributes: {
type: { DataType: 'String', StringValue: notification.type },
priority: { DataType: 'String', StringValue: notification.priority },
batch: { DataType: 'String', StringValue: String(payload.bypassBatch ? 'false' : Boolean(notification.metadata?.batched)) },
},
});

Expand Down
78 changes: 78 additions & 0 deletions src/notifications/notifications.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,82 @@
import { Test, TestingModule } from '@nestjs/testing';
import { ConfigService } from '@nestjs/config';
import { getRepositoryToken } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { NotificationsService } from './notifications.service';
import { NotificationsQueueService } from './notifications.queue';
import { Notification, NotificationPriority, NotificationStatus, NotificationType } from './entities/notification.entity';

const mockRepository = {
findOne: jest.fn(),
create: jest.fn((dto) => dto),
save: jest.fn(),
find: jest.fn(),
update: jest.fn(),
};

const mockQueue = {
publishToTopic: jest.fn(),
};

const mockConfig = {
get: jest.fn((key: string, defaultValue?: string) => {
if (key === 'NOTIFICATION_BATCH_WINDOW_MS') {
return defaultValue ?? `${5 * 60 * 1000}`;
}
return defaultValue ?? null;
}),
};

describe('NotificationsService', () => {
let service: NotificationsService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
NotificationsService,
{ provide: ConfigService, useValue: mockConfig },
{ provide: getRepositoryToken(Notification), useValue: mockRepository },
{ provide: NotificationsQueueService, useValue: mockQueue },
],
}).compile();

service = module.get<NotificationsService>(NotificationsService);
});

afterEach(() => jest.clearAllMocks());

it('should deduplicate identical pending notifications within the batch window', async () => {
const existing = { id: 'n1', userId: 'user1', title: 'New course', content: 'New content', type: NotificationType.EMAIL, status: NotificationStatus.PENDING, createdAt: new Date() };
mockRepository.findOne.mockResolvedValue(existing);

const result = await service.send({
userId: 'user1',
title: 'New course',
content: 'New content',
type: NotificationType.EMAIL,
priority: NotificationPriority.MEDIUM,
});

expect(result).toBe(existing);
expect(mockRepository.save).not.toHaveBeenCalled();
});

it('should publish urgent notifications immediately', async () => {
mockRepository.findOne.mockResolvedValue(null);
const saved = { id: 'n2', userId: 'user1', title: 'Urgent', content: 'Please respond', type: NotificationType.SMS, priority: NotificationPriority.URGENT, status: NotificationStatus.SENT, deliveryAttempts: 0, createdAt: new Date() };
mockRepository.save.mockResolvedValue(saved);

const result = await service.send({
userId: 'user1',
title: 'Urgent',
content: 'Please respond',
type: NotificationType.SMS,
priority: NotificationPriority.URGENT,
});

expect(mockQueue.publishToTopic).toHaveBeenCalledWith(saved, { bypassBatch: true });
expect(mockRepository.update).toHaveBeenCalledWith(saved.id, expect.any(Object));
expect(result).toEqual(saved);
import { getRepositoryToken } from '@nestjs/typeorm';
import { Notification } from './entities/notification.entity';
import { NotificationsService } from './notifications.service';
Expand Down
154 changes: 154 additions & 0 deletions src/notifications/notifications.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,33 @@
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { InjectRepository } from '@nestjs/typeorm';
import { Cron, CronExpression } from '@nestjs/schedule';
import { In, Repository } from 'typeorm';
import {
Notification,
NotificationPriority,
NotificationStatus,
NotificationType,
} from './entities/notification.entity';
import { NotificationsQueueService } from './notifications.queue';

interface QueueNotificationPayload {
userId: string;
title: string;
content: string;
type: NotificationType;
priority?: NotificationPriority;
metadata?: Record<string, any>;
}

const DEFAULT_BATCH_WINDOW_MS = 5 * 60 * 1000;

const BATCH_CONFIG: Record<NotificationType, { intervalMs: number; batchLabel: string }> = {
[NotificationType.EMAIL]: { intervalMs: DEFAULT_BATCH_WINDOW_MS, batchLabel: 'Email Digest' },
[NotificationType.PUSH]: { intervalMs: 2 * 60 * 1000, batchLabel: 'Push Summary' },
[NotificationType.IN_APP]: { intervalMs: DEFAULT_BATCH_WINDOW_MS, batchLabel: 'In-App Summary' },
[NotificationType.SMS]: { intervalMs: DEFAULT_BATCH_WINDOW_MS, batchLabel: 'SMS Digest' },
};
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, In } from 'typeorm';
import {
Expand All @@ -15,6 +44,131 @@ import { SendTemplatedNotificationDto } from './dto/preferences.dto';
@Injectable()
export class NotificationsService {
private readonly logger = new Logger(NotificationsService.name);
private readonly batchWindowMs: number;

constructor(
private readonly configService: ConfigService,
@InjectRepository(Notification)
private readonly notificationRepository: Repository<Notification>,
private readonly notificationsQueue: NotificationsQueueService,
) {
const batchWindowSetting = this.configService.get<string | number>('NOTIFICATION_BATCH_WINDOW_MS', `${DEFAULT_BATCH_WINDOW_MS}`);
this.batchWindowMs = Number(batchWindowSetting) || DEFAULT_BATCH_WINDOW_MS;
}

async send(notification: QueueNotificationPayload): Promise<Notification> {
const priority = notification.priority ?? NotificationPriority.MEDIUM;
const isUrgent = priority === NotificationPriority.URGENT;

const duplicate = await this.findDuplicate(notification);
if (duplicate) {
this.logger.log(`Deduplicated notification for user ${notification.userId}`);
return duplicate;
}

const record = this.notificationRepository.create({
...notification,
priority,
status: isUrgent ? NotificationStatus.SENT : NotificationStatus.PENDING,
deliveryAttempts: 0,
});

const saved = await this.notificationRepository.save(record);

if (isUrgent) {
await this.publish(saved, true);
}

return saved;
}

@Cron(CronExpression.EVERY_5_MINUTES)
async flushBatches(): Promise<void> {
const pendingNotifications = await this.notificationRepository.find({
where: { status: NotificationStatus.PENDING },
order: { createdAt: 'ASC' },
});

if (pendingNotifications.length === 0) {
return;
}

const now = Date.now();
const groups = new Map<string, Notification[]>();

pendingNotifications.forEach((notification) => {
const key = `${notification.userId}:${notification.type}`;
const group = groups.get(key) ?? [];
group.push(notification);
groups.set(key, group);
});

for (const [key, notifications] of groups) {
const type = notifications[0].type;
const { intervalMs } = BATCH_CONFIG[type];
const oldest = notifications[0];

if (now - oldest.createdAt.getTime() < intervalMs) {
continue;
}

await this.publishBatch(notifications);
}
}

private async publishBatch(notifications: Notification[]): Promise<void> {
const first = notifications[0];
const batchTitle = `${BATCH_CONFIG[first.type].batchLabel}`;
const body = notifications
.map((notification, index) => `${index + 1}. ${notification.title}: ${notification.content}`)
.join('\n');

const batchNotification = this.notificationRepository.create({
userId: first.userId,
title: batchTitle,
content: body,
type: first.type,
priority: NotificationPriority.MEDIUM,
status: NotificationStatus.SENT,
metadata: { batched: true, count: notifications.length },
deliveryAttempts: 0,
});

await this.notificationsQueue.publishToTopic(batchNotification);
const ids = notifications.map((notification) => notification.id);
await this.notificationRepository.update({ id: In(ids) }, { status: NotificationStatus.SENT, lastAttemptAt: new Date() });
await this.notificationRepository.save(batchNotification);

this.logger.log(`Flushed ${notifications.length} notifications into a batch for user ${first.userId}`);
}

private async publish(notification: Notification, bypassBatch = false): Promise<void> {
await this.notificationsQueue.publishToTopic(notification, { bypassBatch });
await this.notificationRepository.update(notification.id, {
status: NotificationStatus.SENT,
lastAttemptAt: new Date(),
deliveryAttempts: notification.deliveryAttempts + 1,
});
}

private async findDuplicate(payload: QueueNotificationPayload): Promise<Notification | null> {
const existing = await this.notificationRepository.findOne({
where: {
userId: payload.userId,
title: payload.title,
content: payload.content,
type: payload.type,
status: NotificationStatus.PENDING,
},
order: { createdAt: 'DESC' },
});

if (!existing) {
return null;
}

const age = Date.now() - existing.createdAt.getTime();
return age <= this.batchWindowMs ? existing : null;

constructor(
@InjectRepository(Notification)
Expand Down
65 changes: 65 additions & 0 deletions src/payments/entities/payment-method.entity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import {
Entity,
PrimaryGeneratedColumn,
Column,
CreateDateColumn,
UpdateDateColumn,
DeleteDateColumn,
ManyToOne,
JoinColumn,
Index,
VersionColumn,
} from 'typeorm';
import { User } from '../../users/entities/user.entity';
import { PaymentMethod as PaymentMethodType } from './payment.entity';

@Entity('payment_methods')
@Index(['userId', 'isDefault'])
export class PaymentMethod {
@PrimaryGeneratedColumn('uuid')
id: string;

@VersionColumn()
version: number;

@Column({ type: 'enum', enum: PaymentMethodType })
method: PaymentMethodType;

@Column({ type: 'varchar', nullable: true })
provider?: string;

@Column({ type: 'varchar', length: 64, nullable: true })
displayName?: string;

@Column({ type: 'varchar', length: 4, nullable: true })
last4?: string;

@Column({ type: 'int', nullable: true })
expiryMonth?: number;

@Column({ type: 'int', nullable: true })
expiryYear?: number;

@Column({ type: 'boolean', default: false })
isDefault: boolean;

@Column({ type: 'jsonb', nullable: true })
metadata?: Record<string, any>;

@Column()
@Index()
userId: string;

@ManyToOne(() => User, { onDelete: 'CASCADE' })
@JoinColumn({ name: 'user_id' })
user: User;

@CreateDateColumn()
createdAt: Date;

@UpdateDateColumn()
updatedAt: Date;

@DeleteDateColumn()
deletedAt?: Date;
}
Loading
Loading