diff --git a/poc-sync-engine/src/SyncManager.ts b/poc-sync-engine/src/SyncManager.ts new file mode 100644 index 0000000..d4b0825 --- /dev/null +++ b/poc-sync-engine/src/SyncManager.ts @@ -0,0 +1,69 @@ +import { SyncQueue, SyncJob } from './SyncQueue'; + +export interface SyncHandler { + handle(job: SyncJob): Promise; +} + +export class SyncManager { + private queue: SyncQueue; + private handler: SyncHandler; + private isOnline: boolean = false; + private isProcessing: boolean = false; + + constructor(queue: SyncQueue, handler: SyncHandler) { + this.queue = queue; + this.handler = handler; + } + + public updateNetworkStatus(online: boolean) { + console.log(`[Network] Status changed to ${online ? 'ONLINE' : 'OFFLINE'}`); + this.isOnline = online; + if (this.isOnline) { + this.drainQueue(); + } + } + + public getQueueLength(): number { + return this.queue.getLength(); + } + + public async drainQueue(): Promise { + if (this.isProcessing || !this.isOnline || this.queue.getLength() === 0) { + return; + } + + this.isProcessing = true; + + while (this.queue.getLength() > 0 && this.isOnline) { + const job = this.queue.peek(); + if (!job) break; + + try { + console.log(`[SyncManager] Attempting to sync job ${job.id} (${job.method} ${job.url})`); + + await this.handler.handle(job); + + console.log(`[SyncManager] Successfully synced job ${job.id}`); + await this.queue.dequeue(); + } catch (error) { + console.error(`[SyncManager] Sync failed for job ${job.id}`); + + if (!this.isOnline) { + break; + } + + if (job.retryCount >= 5) { + console.error(`[SyncManager] Max retries reached for job ${job.id}. Dropping or moving to DLQ.`); + await this.queue.dequeue(); + } else { + await this.queue.requeue(job); + const backoffTime = Math.pow(2, job.retryCount) * 1000; + console.log(`[SyncManager] Backing off for ${backoffTime}ms before next attempt`); + await new Promise(resolve => setTimeout(resolve, backoffTime)); + } + } + } + + this.isProcessing = false; + } +} diff --git a/poc-sync-engine/src/SyncQueue.ts b/poc-sync-engine/src/SyncQueue.ts new file mode 100644 index 0000000..0c059b8 --- /dev/null +++ b/poc-sync-engine/src/SyncQueue.ts @@ -0,0 +1,87 @@ +export interface SyncJob { + id: string; + url: string; + method: 'POST' | 'PUT' | 'PATCH' | 'DELETE'; + payload: T; + timestamp: number; + retryCount: number; +} + +export interface IStorageProvider { + getItem(key: string): string | null | Promise; + setItem(key: string, value: string): void | Promise; +} + +export class SyncQueue { + private queue: SyncJob[] = []; + private storage: IStorageProvider; + private readonly storageKey: string; + private writePromise: Promise = Promise.resolve(); + + constructor(storage: IStorageProvider, storageKey: string = 'sync_queue') { + this.storage = storage; + this.storageKey = storageKey; + } + + public async initialize(): Promise { + await this.loadFromStorage(); + } + + public async enqueue(job: Omit, 'id' | 'timestamp' | 'retryCount'>): Promise { + const newJob: SyncJob = { + ...job, + id: Math.random().toString(36).substring(2, 9), + timestamp: Date.now(), + retryCount: 0, + }; + this.queue.push(newJob); + await this.persistToStorage(); + } + + public peek(): SyncJob | undefined { + return this.queue[0]; + } + + public async dequeue(): Promise | undefined> { + const job = this.queue.shift(); + await this.persistToStorage(); + return job; + } + + public async requeue(job: SyncJob): Promise { + job.retryCount += 1; + await this.persistToStorage(); + } + + public getLength(): number { + return this.queue.length; + } + + private async persistToStorage(): Promise { + this.writePromise = this.writePromise.then(async () => { + try { + const data = JSON.stringify(this.queue); + await this.storage.setItem(this.storageKey, data); + } catch (error) { + console.error('[SyncQueue] Failed to persist queue:', error); + } + }); + return this.writePromise; + } + + private async loadFromStorage(): Promise { + try { + const data = await this.storage.getItem(this.storageKey); + if (data) { + const parsed = JSON.parse(data); + if (Array.isArray(parsed)) { + this.queue = parsed; + return; + } + } + } catch (error) { + console.error('[SyncQueue] Failed to load queue or data was corrupted:', error); + } + this.queue = []; + } +} diff --git a/poc-sync-engine/tests/SyncManager.test.ts b/poc-sync-engine/tests/SyncManager.test.ts new file mode 100644 index 0000000..548ae4a --- /dev/null +++ b/poc-sync-engine/tests/SyncManager.test.ts @@ -0,0 +1,167 @@ +import { SyncQueue, IStorageProvider } from '../src/SyncQueue'; +import { SyncManager, SyncHandler } from '../src/SyncManager'; + +class SyncMockStorage implements IStorageProvider { + public store: { [key: string]: string } = {}; + + getItem(key: string): string | null { + return this.store[key] || null; + } + + setItem(key: string, value: string): void { + this.store[key] = value; + } +} + +class AsyncMockStorage implements IStorageProvider { + public store: { [key: string]: string } = {}; + + async getItem(key: string): Promise { + return this.store[key] || null; + } + + async setItem(key: string, value: string): Promise { + this.store[key] = value; + } +} + +describe('Offline-First Sync Engine', () => { + let queue: SyncQueue; + let manager: SyncManager; + let mockHandler: jest.Mocked; + let storage: SyncMockStorage; + + beforeEach(async () => { + storage = new SyncMockStorage(); + queue = new SyncQueue(storage); + await queue.initialize(); + mockHandler = { + handle: jest.fn().mockResolvedValue(undefined), + }; + manager = new SyncManager(queue, mockHandler); + jest.spyOn(console, 'log').mockImplementation(() => {}); + jest.spyOn(console, 'error').mockImplementation(() => {}); + }); + + afterEach(() => { + jest.restoreAllMocks(); + }); + + test('should enqueue mutations successfully', async () => { + await queue.enqueue({ + url: '/api/resource/LMS Assignment Submission', + method: 'POST', + payload: { answer: 'A', assignment_id: '123' }, + }); + + expect(queue.getLength()).toBe(1); + expect(queue.peek()?.url).toBe('/api/resource/LMS Assignment Submission'); + + const stored = JSON.parse(storage.store['sync_queue'] || '[]'); + expect(stored.length).toBe(1); + expect(stored[0].url).toBe('/api/resource/LMS Assignment Submission'); + }); + + test('should not process queue when offline', async () => { + await queue.enqueue({ url: '/api/resource/Course', method: 'POST', payload: {} }); + + manager.updateNetworkStatus(false); + await manager.drainQueue(); + + expect(queue.getLength()).toBe(1); + expect(mockHandler.handle).not.toHaveBeenCalled(); + }); + + test('should drain queue when online', async () => { + mockHandler.handle.mockResolvedValue(undefined); + + await queue.enqueue({ url: '/api/resource/A', method: 'POST', payload: {} }); + await queue.enqueue({ url: '/api/resource/B', method: 'POST', payload: {} }); + + manager.updateNetworkStatus(true); + + await new Promise(process.nextTick); + await new Promise(process.nextTick); + + expect(queue.getLength()).toBe(0); + expect(mockHandler.handle).toHaveBeenCalledTimes(2); + }); + + test('should support and preserve typed payloads', async () => { + interface AssignmentPayload { + answer: string; + assignment_id: string; + } + const typedQueue = new SyncQueue(storage); + await typedQueue.initialize(); + await typedQueue.enqueue({ + url: '/api/resource/LMS Assignment Submission', + method: 'POST', + payload: { answer: 'B', assignment_id: '456' }, + }); + + const job = typedQueue.peek(); + expect(job).toBeDefined(); + if (job) { + const payload: AssignmentPayload = job.payload; + expect(payload.answer).toBe('B'); + expect(payload.assignment_id).toBe('456'); + } + }); + + test('should retry on handler failure', async () => { + jest.useFakeTimers(); + mockHandler.handle + .mockRejectedValueOnce(new Error('Network error')) + .mockResolvedValue(undefined); + + await queue.enqueue({ url: '/api/resource/A', method: 'POST', payload: {} }); + + manager.updateNetworkStatus(true); + + await Promise.resolve(); // trigger initial attempt and failure + await jest.advanceTimersByTimeAsync(2000); // trigger retry attempt + + expect(queue.getLength()).toBe(0); + expect(mockHandler.handle).toHaveBeenCalledTimes(2); + jest.useRealTimers(); + }); + + test('should hydrate queue from storage correctly', async () => { + const freshStorage = new SyncMockStorage(); + freshStorage.setItem('sync_queue', JSON.stringify([ + { id: '1', url: '/api/resource/A', method: 'POST', payload: {}, timestamp: Date.now(), retryCount: 0 } + ])); + + const freshQueue = new SyncQueue(freshStorage); + await freshQueue.initialize(); + expect(freshQueue.getLength()).toBe(1); + expect(freshQueue.peek()?.id).toBe('1'); + }); + + test('should handle corrupted JSON data gracefully on initialize', async () => { + const freshStorage = new SyncMockStorage(); + freshStorage.setItem('sync_queue', '{invalid_json}'); + + const freshQueue = new SyncQueue(freshStorage); + await freshQueue.initialize(); + expect(freshQueue.getLength()).toBe(0); + }); + + test('should prevent write race conditions under concurrent mutations', async () => { + const asyncStorage = new AsyncMockStorage(); + const freshQueue = new SyncQueue(asyncStorage); + await freshQueue.initialize(); + + const p1 = freshQueue.enqueue({ url: '/api/A', method: 'POST', payload: {} }); + const p2 = freshQueue.enqueue({ url: '/api/B', method: 'POST', payload: {} }); + + await Promise.all([p1, p2]); + + expect(freshQueue.getLength()).toBe(2); + const stored = JSON.parse(await asyncStorage.getItem('sync_queue') || '[]'); + expect(stored.length).toBe(2); + expect(stored[0].url).toBe('/api/A'); + expect(stored[1].url).toBe('/api/B'); + }); +});