Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions poc-sync-engine/src/SyncManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { SyncQueue, SyncJob } from './SyncQueue';

export interface SyncHandler<T = unknown> {
handle(job: SyncJob<T>): Promise<void>;
}

export class SyncManager<T = unknown> {
private queue: SyncQueue<T>;
private handler: SyncHandler<T>;
private isOnline: boolean = false;
private isProcessing: boolean = false;

constructor(queue: SyncQueue<T>, handler: SyncHandler<T>) {
this.queue = queue;
this.handler = handler;
}

public updateNetworkStatus(online: boolean) {
console.log(`[Network] Status changed to ${online ? 'ONLINE' : 'OFFLINE'}`);
this.isOnline = online;
if (this.isOnline) {
this.drainQueue();
}
}

public getQueueLength(): number {
return this.queue.getLength();
}

public async drainQueue(): Promise<void> {
if (this.isProcessing || !this.isOnline || this.queue.getLength() === 0) {
return;
}

this.isProcessing = true;

while (this.queue.getLength() > 0 && this.isOnline) {
const job = this.queue.peek();
if (!job) break;

try {
console.log(`[SyncManager] Attempting to sync job ${job.id} (${job.method} ${job.url})`);

await this.handler.handle(job);

console.log(`[SyncManager] Successfully synced job ${job.id}`);
await this.queue.dequeue();
} catch (error) {
console.error(`[SyncManager] Sync failed for job ${job.id}`);

if (!this.isOnline) {
break;
}

if (job.retryCount >= 5) {
console.error(`[SyncManager] Max retries reached for job ${job.id}. Dropping or moving to DLQ.`);
await this.queue.dequeue();
} else {
await this.queue.requeue(job);
const backoffTime = Math.pow(2, job.retryCount) * 1000;
console.log(`[SyncManager] Backing off for ${backoffTime}ms before next attempt`);
await new Promise(resolve => setTimeout(resolve, backoffTime));
}
}
}

this.isProcessing = false;
}
}
87 changes: 87 additions & 0 deletions poc-sync-engine/src/SyncQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
export interface SyncJob<T = unknown> {
id: string;
url: string;
method: 'POST' | 'PUT' | 'PATCH' | 'DELETE';
payload: T;
timestamp: number;
retryCount: number;
}

export interface IStorageProvider {
getItem(key: string): string | null | Promise<string | null>;
setItem(key: string, value: string): void | Promise<void>;
}

export class SyncQueue<T = unknown> {
private queue: SyncJob<T>[] = [];
private storage: IStorageProvider;
private readonly storageKey: string;
private writePromise: Promise<void> = Promise.resolve();

constructor(storage: IStorageProvider, storageKey: string = 'sync_queue') {
this.storage = storage;
this.storageKey = storageKey;
}

public async initialize(): Promise<void> {
await this.loadFromStorage();
}

public async enqueue(job: Omit<SyncJob<T>, 'id' | 'timestamp' | 'retryCount'>): Promise<void> {
const newJob: SyncJob<T> = {
...job,
id: Math.random().toString(36).substring(2, 9),
timestamp: Date.now(),
retryCount: 0,
};
this.queue.push(newJob);
await this.persistToStorage();
}

public peek(): SyncJob<T> | undefined {
return this.queue[0];
}

public async dequeue(): Promise<SyncJob<T> | undefined> {
const job = this.queue.shift();
await this.persistToStorage();
return job;
}

public async requeue(job: SyncJob<T>): Promise<void> {
job.retryCount += 1;
await this.persistToStorage();
}

public getLength(): number {
return this.queue.length;
}

private async persistToStorage(): Promise<void> {
this.writePromise = this.writePromise.then(async () => {
try {
const data = JSON.stringify(this.queue);
await this.storage.setItem(this.storageKey, data);
} catch (error) {
console.error('[SyncQueue] Failed to persist queue:', error);
}
});
return this.writePromise;
}

private async loadFromStorage(): Promise<void> {
try {
const data = await this.storage.getItem(this.storageKey);
if (data) {
const parsed = JSON.parse(data);
if (Array.isArray(parsed)) {
this.queue = parsed;
return;
}
}
} catch (error) {
console.error('[SyncQueue] Failed to load queue or data was corrupted:', error);
}
this.queue = [];
}
}
167 changes: 167 additions & 0 deletions poc-sync-engine/tests/SyncManager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import { SyncQueue, IStorageProvider } from '../src/SyncQueue';
import { SyncManager, SyncHandler } from '../src/SyncManager';

class SyncMockStorage implements IStorageProvider {
public store: { [key: string]: string } = {};

getItem(key: string): string | null {
return this.store[key] || null;
}

setItem(key: string, value: string): void {
this.store[key] = value;
}
}

class AsyncMockStorage implements IStorageProvider {
public store: { [key: string]: string } = {};

async getItem(key: string): Promise<string | null> {
return this.store[key] || null;
}

async setItem(key: string, value: string): Promise<void> {
this.store[key] = value;
}
}

describe('Offline-First Sync Engine', () => {
let queue: SyncQueue;
let manager: SyncManager;
let mockHandler: jest.Mocked<SyncHandler>;
let storage: SyncMockStorage;

beforeEach(async () => {
storage = new SyncMockStorage();
queue = new SyncQueue(storage);
await queue.initialize();
mockHandler = {
handle: jest.fn().mockResolvedValue(undefined),
};
manager = new SyncManager(queue, mockHandler);
jest.spyOn(console, 'log').mockImplementation(() => {});
jest.spyOn(console, 'error').mockImplementation(() => {});
});

afterEach(() => {
jest.restoreAllMocks();
});

test('should enqueue mutations successfully', async () => {
await queue.enqueue({
url: '/api/resource/LMS Assignment Submission',
method: 'POST',
payload: { answer: 'A', assignment_id: '123' },
});

expect(queue.getLength()).toBe(1);
expect(queue.peek()?.url).toBe('/api/resource/LMS Assignment Submission');

const stored = JSON.parse(storage.store['sync_queue'] || '[]');
expect(stored.length).toBe(1);
expect(stored[0].url).toBe('/api/resource/LMS Assignment Submission');
});

test('should not process queue when offline', async () => {
await queue.enqueue({ url: '/api/resource/Course', method: 'POST', payload: {} });

manager.updateNetworkStatus(false);
await manager.drainQueue();

expect(queue.getLength()).toBe(1);
expect(mockHandler.handle).not.toHaveBeenCalled();
});

test('should drain queue when online', async () => {
mockHandler.handle.mockResolvedValue(undefined);

await queue.enqueue({ url: '/api/resource/A', method: 'POST', payload: {} });
await queue.enqueue({ url: '/api/resource/B', method: 'POST', payload: {} });

manager.updateNetworkStatus(true);

await new Promise(process.nextTick);
await new Promise(process.nextTick);

expect(queue.getLength()).toBe(0);
expect(mockHandler.handle).toHaveBeenCalledTimes(2);
});

test('should support and preserve typed payloads', async () => {
interface AssignmentPayload {
answer: string;
assignment_id: string;
}
const typedQueue = new SyncQueue<AssignmentPayload>(storage);
await typedQueue.initialize();
await typedQueue.enqueue({
url: '/api/resource/LMS Assignment Submission',
method: 'POST',
payload: { answer: 'B', assignment_id: '456' },
});

const job = typedQueue.peek();
expect(job).toBeDefined();
if (job) {
const payload: AssignmentPayload = job.payload;
expect(payload.answer).toBe('B');
expect(payload.assignment_id).toBe('456');
}
});

test('should retry on handler failure', async () => {
jest.useFakeTimers();
mockHandler.handle
.mockRejectedValueOnce(new Error('Network error'))
.mockResolvedValue(undefined);

await queue.enqueue({ url: '/api/resource/A', method: 'POST', payload: {} });

manager.updateNetworkStatus(true);

await Promise.resolve(); // trigger initial attempt and failure
await jest.advanceTimersByTimeAsync(2000); // trigger retry attempt

expect(queue.getLength()).toBe(0);
expect(mockHandler.handle).toHaveBeenCalledTimes(2);
jest.useRealTimers();
});

test('should hydrate queue from storage correctly', async () => {
const freshStorage = new SyncMockStorage();
freshStorage.setItem('sync_queue', JSON.stringify([
{ id: '1', url: '/api/resource/A', method: 'POST', payload: {}, timestamp: Date.now(), retryCount: 0 }
]));

const freshQueue = new SyncQueue(freshStorage);
await freshQueue.initialize();
expect(freshQueue.getLength()).toBe(1);
expect(freshQueue.peek()?.id).toBe('1');
});

test('should handle corrupted JSON data gracefully on initialize', async () => {
const freshStorage = new SyncMockStorage();
freshStorage.setItem('sync_queue', '{invalid_json}');

const freshQueue = new SyncQueue(freshStorage);
await freshQueue.initialize();
expect(freshQueue.getLength()).toBe(0);
});

test('should prevent write race conditions under concurrent mutations', async () => {
const asyncStorage = new AsyncMockStorage();
const freshQueue = new SyncQueue(asyncStorage);
await freshQueue.initialize();

const p1 = freshQueue.enqueue({ url: '/api/A', method: 'POST', payload: {} });
const p2 = freshQueue.enqueue({ url: '/api/B', method: 'POST', payload: {} });

await Promise.all([p1, p2]);

expect(freshQueue.getLength()).toBe(2);
const stored = JSON.parse(await asyncStorage.getItem('sync_queue') || '[]');
expect(stored.length).toBe(2);
expect(stored[0].url).toBe('/api/A');
expect(stored[1].url).toBe('/api/B');
});
});