diff --git a/src/common/utils.ts b/src/common/utils.ts index 12cb618..9165c9b 100644 --- a/src/common/utils.ts +++ b/src/common/utils.ts @@ -1,6 +1,13 @@ import { monotonicFactory } from "ulidx"; const ulid = monotonicFactory(); +/** + * Generates a monotonic ULID (Universally Unique Lexicographically Sortable Identifier). + * Returns the provided id if given, otherwise creates a new monotonic ULID that is guaranteed + * to be greater than the previous one, even if generated in the same millisecond. + * @param id - Optional existing identifier to use instead of generating a new one + * @returns A ULID string + */ export const createId = (id?: string) => id || ulid(); export const shortId = (id?: string) => id || ulid().toLowerCase().slice(0, 12); diff --git a/src/sorci.interface.ts b/src/sorci.interface.ts index db60e37..53fe0fc 100644 --- a/src/sorci.interface.ts +++ b/src/sorci.interface.ts @@ -205,10 +205,10 @@ export interface Sorci { // Projections /** - * Declare a projection with schema and query + * Create a new projection with schema. Throws if projection already exists. * @category Projections */ - declareProjection(declaration: ProjectionDeclaration): Promise; + createProjection(declaration: ProjectionDeclaration): Promise; /** * Query a projection @@ -220,30 +220,31 @@ export interface Sorci { ): Promise; /** - * Add an event-specific reducer to a projection + * Set an event-specific reducer for a projection (adds new or updates existing) * @category Projections */ - addEventReducingToProjection(payload: { + setEventReducingToProjection(payload: { name: string; eventType: string; reducer: EventReducer; + refreshProjection?: boolean; }): Promise; /** * Manually refresh a projection by reprocessing all events * @category Projections */ - // refreshProjection(name: string): Promise; + refreshProjection(name: string): Promise; /** - * Update a projection's configuration + * Update an existing projection's schema with custom SQL alterations. + * Throws if projection doesn't exist. * @category Projections */ - // updateProjection(payload: { - // name: string; - // query?: Query; - // resetState?: boolean; - // }): Promise; + updateProjection(payload: { + name: string; + alterationSQL: postgres.PendingQuery; + }): Promise; /** * Drop a projection completely diff --git a/src/sorci.postgres.ts b/src/sorci.postgres.ts index 53f12d2..55e33f7 100644 --- a/src/sorci.postgres.ts +++ b/src/sorci.postgres.ts @@ -749,11 +749,11 @@ export class SorciPostgres implements Sorci { } } - async declareProjection(declaration: ProjectionDeclaration) { + async createProjection(declaration: ProjectionDeclaration) { const { name, schema } = declaration; if (this._projectionRegistry.has(name)) { - throw new Error(`Projection "${name}" is already declared`); + throw new Error(`Projection "${name}" already exists`); } await this.createProjectionTable(name, schema); @@ -798,6 +798,9 @@ export class SorciPostgres implements Sorci { throw new Error(`Projection "${name}" does not exist`); } + const projection = this._projectionRegistry.get(name); + const eventTypes = Array.from(projection!.reducers.keys()); + const tableName = `${this.streamName}_projection_${name.replace(/-/g, "_")}`; await this.sql`DROP TABLE IF EXISTS ${this.sql(tableName)} CASCADE`; @@ -809,14 +812,19 @@ export class SorciPostgres implements Sorci { `; this._projectionRegistry.delete(name); + + for (const eventType of eventTypes) { + await this.createOrUpdateEventTypeTrigger(eventType); + } } - async addEventReducingToProjection(payload: { + async setEventReducingToProjection(payload: { name: string; eventType: string; reducer: EventReducer; + refreshProjection?: boolean; }) { - const { name, eventType, reducer } = payload; + const { name, eventType, reducer, refreshProjection } = payload; const projection = this._projectionRegistry.get(name); if (!projection) { @@ -826,6 +834,97 @@ export class SorciPostgres implements Sorci { projection.reducers.set(eventType, reducer); await this.createOrUpdateEventTypeTrigger(eventType); + + if (refreshProjection) { + await this.refreshProjection(name); + } + } + + async refreshProjection(name: string) { + const projection = this._projectionRegistry.get(name); + if (!projection) { + throw new Error(`Projection "${name}" does not exist`); + } + + const eventTypes = Array.from(projection.reducers.keys()); + if (eventTypes.length === 0) { + return; + } + + const tableName = `${this.streamName}_projection_${name.replace(/-/g, "_")}`; + + await this.sql.begin(async (sql) => { + await sql`TRUNCATE TABLE ${sql(tableName)}`; + + const events = await sql` + SELECT * FROM ${sql(this.streamName)} + WHERE type = ANY(${eventTypes}) + ORDER BY id + `; + + for (const event of events) { + const reducer = projection.reducers.get(event.type); + if (!reducer) { + continue; + } + + const mockSql = this.createMockSqlFunction(); + const result = reducer(mockSql, tableName) as any; + + if (!result || typeof result !== "object" || !result.__mockSQL) { + throw new Error( + `Reducer for event type "${event.type}" in projection "${name}" did not return a valid SQL query` + ); + } + + const reducerSQL = result.__mockSQL as string; + + const eventTypeEscaped = event.type.replace(/'/g, "''"); + const eventDataJson = JSON.stringify(event.data).replace(/'/g, "''"); + const eventIdentifierJson = JSON.stringify(event.identifier).replace( + /'/g, + "''" + ); + const eventIdEscaped = event.id.replace(/'/g, "''"); + + await sql.unsafe(` + DO $$ + DECLARE + NEW RECORD; + BEGIN + SELECT + '${eventTypeEscaped}'::text as type, + '${eventDataJson}'::jsonb as data, + '${eventIdentifierJson}'::jsonb as identifier, + '${eventIdEscaped}'::text as id + INTO NEW; + + ${reducerSQL}; + END $$; + `); + } + }); + } + + async updateProjection(payload: { + name: string; + alterationSQL: postgres.PendingQuery; + }) { + const { name, alterationSQL } = payload; + + const projection = this._projectionRegistry.get(name); + if (!projection) { + throw new Error(`Projection "${name}" does not exist`); + } + + await alterationSQL; + + const metaTableName = `${this.streamName}_projections_meta`; + await this.sql` + UPDATE ${this.sql(metaTableName)} + SET updated_at = NOW() + WHERE name = ${name} + `; } private createMockSqlFunction(): any { @@ -910,8 +1009,8 @@ export class SorciPostgres implements Sorci { const functionSQL = ` CREATE OR REPLACE FUNCTION ${functionName}() RETURNS TRIGGER AS $$ - BEGIN -${functionBody} + BEGIN + ${functionBody} RETURN NEW; END; $$ LANGUAGE plpgsql; diff --git a/src/sorci.projections.test.ts b/src/sorci.projections.test.ts index e231fcf..d81c86c 100644 --- a/src/sorci.projections.test.ts +++ b/src/sorci.projections.test.ts @@ -4,6 +4,8 @@ import { createId } from "./common/utils"; afterEach(async () => { const projections = [ "user-profile", + "user-profile-refresh-test", + "user-profile-rebuild-test", "task-tracking", "empty-projection", "sourcing-dashboard", @@ -16,9 +18,9 @@ afterEach(async () => { }); describe("Projections", () => { - describe("declareProjection", () => { + describe("createProjection", () => { test("creates table with correct schema, columns, primary keys and indexes", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user-profile", schema: { userId: { type: "text", primaryKey: true }, @@ -111,7 +113,7 @@ describe("Projections", () => { }); test("supports ulid type and default values", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "task-tracking", schema: { taskId: { type: "ulid", primaryKey: true }, @@ -157,7 +159,7 @@ describe("Projections", () => { }); test("Check default values are applied", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "task-tracking", schema: { taskId: { type: "ulid", primaryKey: true }, @@ -168,7 +170,7 @@ describe("Projections", () => { } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "task-tracking", eventType: "task-created", reducer: (sql, tableName) => sql` @@ -205,7 +207,7 @@ describe("Projections", () => { describe("dropProjection", () => { test("removes table, meta entry, and registry", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user-profile", schema: { userId: { type: "text", primaryKey: true }, @@ -252,9 +254,186 @@ describe("Projections", () => { }); }); + describe("updateProjection", () => { + test("adds column to existing projection", async () => { + await sorciTestClient.createProjection({ + name: "user-profile", + schema: { + userId: { type: "text", primaryKey: true }, + email: { type: "text" } + } + }); + + const sorciPostgres = sorciTestClient as SorciPostgres; + const sql = (sorciPostgres as any).sql; + const streamName = (sorciPostgres as any).streamName; + const tableName = `${streamName}_projection_user_profile`; + + await sorciTestClient.updateProjection({ + name: "user-profile", + alterationSQL: sql` + ALTER TABLE ${sql(tableName)} + ADD COLUMN display_name text + ` + }); + + const columns = await sql` + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_name = ${tableName} + ORDER BY ordinal_position + `; + + expect(columns.map((col: any) => col.column_name)).toEqual([ + "userId", + "email", + "display_name" + ]); + + const metaTableName = `${streamName}_projections_meta`; + const metaRows = await sql` + SELECT updated_at FROM ${sql(metaTableName)} + WHERE name = 'user-profile' + `; + + expect(metaRows).toHaveLength(1); + expect(metaRows[0].updated_at).toBeDefined(); + }); + + test("adds index to existing projection", async () => { + await sorciTestClient.createProjection({ + name: "user-profile", + schema: { + userId: { type: "text", primaryKey: true }, + email: { type: "text" } + } + }); + + const sorciPostgres = sorciTestClient as SorciPostgres; + const sql = (sorciPostgres as any).sql; + const streamName = (sorciPostgres as any).streamName; + const tableName = `${streamName}_projection_user_profile`; + + await sorciTestClient.updateProjection({ + name: "user-profile", + alterationSQL: sql` + CREATE INDEX idx_user_profile_email + ON ${sql(tableName)} USING btree (email) + ` + }); + + const indexes = await sql` + SELECT + i.relname as index_name, + a.attname as column_name, + am.amname as index_type + FROM pg_class t + JOIN pg_index ix ON t.oid = ix.indrelid + JOIN pg_class i ON i.oid = ix.indexrelid + JOIN pg_am am ON i.relam = am.oid + JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) + WHERE t.relname = ${tableName} + AND NOT ix.indisprimary + `; + + const emailIndex = indexes.find( + (idx: any) => idx.column_name === "email" + ); + expect(emailIndex).toBeDefined(); + expect(emailIndex.index_type).toBe("btree"); + }); + + test("throws error if projection doesn't exist", async () => { + const sorciPostgres = sorciTestClient as SorciPostgres; + const sql = (sorciPostgres as any).sql; + + await expect( + sorciTestClient.updateProjection({ + name: "non-existent", + alterationSQL: sql`ALTER TABLE fake_table ADD COLUMN test text` + }) + ).rejects.toThrow('Projection "non-existent" does not exist'); + }); + + test("works with setEventReducingToProjection and refreshProjection", async () => { + await sorciTestClient.createProjection({ + name: "user-profile", + schema: { + userId: { type: "text", primaryKey: true }, + email: { type: "text" } + } + }); + + await sorciTestClient.setEventReducingToProjection({ + name: "user-profile", + eventType: "user-created", + reducer: (sql, tableName) => sql` + INSERT INTO ${sql(tableName)} ("userId", "email") + VALUES (NEW.data->>'userId', NEW.data->>'email') + ON CONFLICT ("userId") DO UPDATE SET + "email" = EXCLUDED."email" + ` + }); + + const userId = createId(); + await sorciTestClient.insertEvents([ + { + id: createId(), + type: "user-created", + data: { + userId, + email: "alice@example.com" + }, + identifier: { userId } + } + ]); + + const rowsBefore = await sorciTestClient.queryProjection("user-profile"); + expect(rowsBefore).toHaveLength(1); + expect(rowsBefore[0]).toEqual({ + userId, + email: "alice@example.com" + }); + + const sorciPostgres = sorciTestClient as SorciPostgres; + const sql = (sorciPostgres as any).sql; + const streamName = (sorciPostgres as any).streamName; + const tableName = `${streamName}_projection_user_profile`; + + await sorciTestClient.updateProjection({ + name: "user-profile", + alterationSQL: sql` + ALTER TABLE ${sql(tableName)} + ADD COLUMN display_name text DEFAULT 'Unknown' + ` + }); + + await sorciTestClient.setEventReducingToProjection({ + name: "user-profile", + eventType: "user-created", + reducer: (sql, tableName) => sql` + INSERT INTO ${sql(tableName)} ("userId", "email", "display_name") + VALUES (NEW.data->>'userId', NEW.data->>'email', NEW.data->>'displayName') + ON CONFLICT ("userId") DO UPDATE SET + "email" = EXCLUDED."email", + "display_name" = EXCLUDED."display_name" + `, + refreshProjection: true + }); + + const rowsAfter = await sorciTestClient.queryProjection("user-profile"); + expect(rowsAfter).toHaveLength(1); + expect(rowsAfter[0]).toEqual({ + userId, + email: "alice@example.com", + display_name: null + }); + }); + }); + describe("queryProjection", () => { test("retrieves data from projection table", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user-profile", schema: { userId: { type: "text", primaryKey: true }, @@ -296,7 +475,7 @@ describe("Projections", () => { }); test("queries projection with where clause", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user-profile", schema: { userId: { type: "text", primaryKey: true }, @@ -334,7 +513,7 @@ describe("Projections", () => { }); test("returns empty array for projection with no data", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "empty-projection", schema: { id: { type: "text", primaryKey: true } @@ -346,9 +525,202 @@ describe("Projections", () => { }); }); - describe("addEventReducingToProjection", () => { + describe("setEventReducingToProjection", () => { + test("registers a new reducer for an event type", async () => { + await sorciTestClient.createProjection({ + name: "user-profile", + schema: { + userId: { type: "text", primaryKey: true }, + email: { type: "text" }, + displayName: { type: "text" } + } + }); + + await sorciTestClient.setEventReducingToProjection({ + name: "user-profile", + eventType: "user-created", + reducer: (sql, tableName) => sql` + INSERT INTO ${sql(tableName)} ("userId", "email", "displayName") + VALUES (NEW.data->>'userId', NEW.data->>'email', NEW.data->>'displayName') + ON CONFLICT ("userId") DO UPDATE SET + "email" = EXCLUDED."email", + "displayName" = EXCLUDED."displayName" + ` + }); + + const userId = createId(); + + await sorciTestClient.insertEvents([ + { + id: createId(), + type: "user-created", + data: { + userId, + email: "alice@example.com", + displayName: "Alice" + }, + identifier: { userId } + }, + { + id: createId(), + type: "user-renamed", + data: { + userId, + displayName: "Bob" + }, + identifier: { userId } + } + ]); + + const rows = await sorciTestClient.queryProjection("user-profile"); + expect(rows).toHaveLength(1); + expect(rows[0]).toEqual({ + userId, + email: "alice@example.com", + displayName: "Alice" + }); + + await sorciTestClient.setEventReducingToProjection({ + name: "user-profile", + eventType: "user-renamed", + reducer: (sql, tableName) => sql` + UPDATE ${sql(tableName)} + SET "displayName" = NEW.data->>'displayName' + WHERE "userId" = NEW.data->>'userId' + ` + }); + + const rowsBis = await sorciTestClient.queryProjection("user-profile"); + expect(rowsBis).toHaveLength(1); + expect(rowsBis[0]).toEqual({ + userId, + email: "alice@example.com", + displayName: "Alice" + }); + + await sorciTestClient.insertEvents([ + { + id: createId(), + type: "user-renamed", + data: { + userId, + displayName: "Charlie" + }, + identifier: { userId } + } + ]); + + const rowsTer = await sorciTestClient.queryProjection("user-profile"); + expect(rowsTer).toHaveLength(1); + expect(rowsTer[0]).toEqual({ + userId, + email: "alice@example.com", + displayName: "Charlie" + }); + }); + + test("registers a new reducer for an event type with projection refresh", async () => { + await sorciTestClient.createProjection({ + name: "user-profile-refresh-test", + schema: { + userId: { type: "text", primaryKey: true }, + email: { type: "text" }, + displayName: { type: "text" } + } + }); + + await sorciTestClient.setEventReducingToProjection({ + name: "user-profile-refresh-test", + eventType: "user-created-refresh", + reducer: (sql, tableName) => sql` + INSERT INTO ${sql(tableName)} ("userId", "email", "displayName") + VALUES (NEW.data->>'userId', NEW.data->>'email', NEW.data->>'displayName') + ON CONFLICT ("userId") DO UPDATE SET + "email" = EXCLUDED."email", + "displayName" = EXCLUDED."displayName" + ` + }); + + const userId = createId(); + + await sorciTestClient.insertEvents([ + { + id: createId(), + type: "user-created-refresh", + data: { + userId, + email: "alice@example.com", + displayName: "Alice" + }, + identifier: { userId } + }, + { + id: createId(), + type: "user-renamed-refresh", + data: { + userId, + displayName: "Bob" + }, + identifier: { userId } + } + ]); + + const rows = await sorciTestClient.queryProjection( + "user-profile-refresh-test" + ); + expect(rows).toHaveLength(1); + expect(rows[0]).toEqual({ + userId, + email: "alice@example.com", + displayName: "Alice" + }); + + await sorciTestClient.setEventReducingToProjection({ + name: "user-profile-refresh-test", + eventType: "user-renamed-refresh", + reducer: (sql, tableName) => sql` + UPDATE ${sql(tableName)} + SET "displayName" = NEW.data->>'displayName' + WHERE "userId" = NEW.data->>'userId' + `, + refreshProjection: true + }); + + const rowsBis = await sorciTestClient.queryProjection( + "user-profile-refresh-test" + ); + expect(rowsBis).toHaveLength(1); + expect(rowsBis[0]).toEqual({ + userId, + email: "alice@example.com", + displayName: "Bob" + }); + + await sorciTestClient.insertEvents([ + { + id: createId(), + type: "user-renamed-refresh", + data: { + userId, + displayName: "Charlie" + }, + identifier: { userId } + } + ]); + + const rowsTer = await sorciTestClient.queryProjection( + "user-profile-refresh-test" + ); + expect(rowsTer).toHaveLength(1); + expect(rowsTer[0]).toEqual({ + userId, + email: "alice@example.com", + displayName: "Charlie" + }); + }); + test("registers a reducer for an event type", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user-profile", schema: { userId: { type: "text", primaryKey: true }, @@ -357,7 +729,7 @@ describe("Projections", () => { } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user-profile", eventType: "user-created", reducer: (sql, tableName) => sql` @@ -377,8 +749,8 @@ describe("Projections", () => { expect(projection.reducers.has("user-created")).toBe(true); }); - test("silly reduction", async () => { - await sorciTestClient.declareProjection({ + test("mutiple projection on same event", async () => { + await sorciTestClient.createProjection({ name: "account", schema: { accountId: { type: "ulid", primaryKey: true }, @@ -387,7 +759,7 @@ describe("Projections", () => { } }); - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user", schema: { userId: { type: "ulid", primaryKey: true }, @@ -396,7 +768,7 @@ describe("Projections", () => { } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "account", eventType: "user-created", reducer: (sql, tableName) => sql` @@ -406,7 +778,7 @@ describe("Projections", () => { ` }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user", eventType: "user-created", reducer: (sql, tableName) => sql` @@ -416,7 +788,7 @@ describe("Projections", () => { ` }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "account", eventType: "user-renamed", reducer: (sql, tableName) => sql` @@ -426,7 +798,7 @@ describe("Projections", () => { ` }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "account", eventType: "user-deleted", reducer: (sql, tableName) => sql` @@ -476,7 +848,7 @@ describe("Projections", () => { test("throws error when projection does not exist", async () => { await expect( - sorciTestClient.addEventReducingToProjection({ + sorciTestClient.setEventReducingToProjection({ name: "non-existent", eventType: "some-event", reducer: (sql, tableName) => sql` @@ -487,7 +859,7 @@ describe("Projections", () => { }); test("allows multiple reducers for different event types on same projection", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user-profile", schema: { userId: { type: "text", primaryKey: true }, @@ -495,7 +867,7 @@ describe("Projections", () => { } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user-profile", eventType: "user-created", reducer: (sql, tableName) => sql` @@ -504,7 +876,7 @@ describe("Projections", () => { ` }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user-profile", eventType: "user-updated", reducer: (sql, tableName) => sql` @@ -526,7 +898,7 @@ describe("Projections", () => { describe("End-to-End: Automatic projection updates", () => { test("projection is automatically updated when event is inserted", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user-profile", schema: { userId: { type: "text", primaryKey: true }, @@ -535,7 +907,7 @@ describe("Projections", () => { } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user-profile", eventType: "user-created", reducer: (sql, tableName) => sql` @@ -570,7 +942,7 @@ describe("Projections", () => { }); test("projection is updated by multiple events", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user-profile", schema: { userId: { type: "text", primaryKey: true }, @@ -579,7 +951,7 @@ describe("Projections", () => { } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user-profile", eventType: "user-created", reducer: (sql, tableName) => sql` @@ -648,7 +1020,7 @@ describe("Projections", () => { }); test("multiple event on same projection are processed in order", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "sourcing-dashboard", schema: { sourcingId: { type: "text", primaryKey: true }, @@ -657,7 +1029,7 @@ describe("Projections", () => { } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "sourcing-dashboard", eventType: "sourcing-created", reducer: (sql, tableName) => sql` @@ -668,7 +1040,7 @@ describe("Projections", () => { ` }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "sourcing-dashboard", eventType: "sourcing-deleted", reducer: (sql, tableName) => sql` @@ -713,7 +1085,7 @@ describe("Projections", () => { }); test("same event type is processed properly for multiple projections", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user", schema: { userId: { type: "text", primaryKey: true }, @@ -722,14 +1094,14 @@ describe("Projections", () => { } }); - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "account", schema: { userId: { type: "text", primaryKey: true } } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user", eventType: "user-created", reducer: (sql, tableName) => sql` @@ -741,7 +1113,7 @@ describe("Projections", () => { ` }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "account", eventType: "user-created", reducer: (sql, tableName) => sql` @@ -779,4 +1151,206 @@ describe("Projections", () => { }); }); }); + + describe("refreshProjection", () => { + test("rebuilds projection from scratch with updated reducer logic", async () => { + await sorciTestClient.createProjection({ + name: "user-profile-rebuild-test", + schema: { + userId: { type: "text", primaryKey: true }, + email: { type: "text" }, + displayName: { type: "text" }, + updateCount: { type: "integer", default: 0 } + } + }); + + await sorciTestClient.setEventReducingToProjection({ + name: "user-profile-rebuild-test", + eventType: "user-created-rebuild", + reducer: (sql, tableName) => sql` + INSERT INTO ${sql(tableName)} ("userId", "email", "displayName") + VALUES (NEW.data->>'userId', NEW.data->>'email', NEW.data->>'displayName') + ON CONFLICT ("userId") DO UPDATE SET + "email" = EXCLUDED."email", + "displayName" = EXCLUDED."displayName" + ` + }); + + await sorciTestClient.setEventReducingToProjection({ + name: "user-profile-rebuild-test", + eventType: "user-renamed-rebuild", + reducer: (sql, tableName) => sql` + UPDATE ${sql(tableName)} + SET "displayName" = NEW.data->>'displayName' + WHERE "userId" = NEW.data->>'userId' + ` + }); + + const userId1 = createId(); + const userId2 = createId(); + + await sorciTestClient.insertEvents([ + { + id: createId(), + type: "user-created-rebuild", + data: { + userId: userId1, + email: "alice@example.com", + displayName: "Alice" + }, + identifier: { userId: userId1 } + }, + { + id: createId(), + type: "user-created-rebuild", + data: { + userId: userId2, + email: "bob@example.com", + displayName: "Bob" + }, + identifier: { userId: userId2 } + }, + { + id: createId(), + type: "user-renamed-rebuild", + data: { + userId: userId1, + displayName: "Alice Smith" + }, + identifier: { userId: userId1 } + }, + { + id: createId(), + type: "user-renamed-rebuild", + data: { + userId: userId2, + displayName: "Bob Jones" + }, + identifier: { userId: userId2 } + } + ]); + + const initialRows = await sorciTestClient.queryProjection( + "user-profile-rebuild-test" + ); + expect(initialRows).toHaveLength(2); + expect(initialRows).toEqual( + expect.arrayContaining([ + { + userId: userId1, + email: "alice@example.com", + displayName: "Alice Smith", + updateCount: 0 + }, + { + userId: userId2, + email: "bob@example.com", + displayName: "Bob Jones", + updateCount: 0 + } + ]) + ); + + await sorciTestClient.setEventReducingToProjection({ + name: "user-profile-rebuild-test", + eventType: "user-renamed-rebuild", + reducer: (sql, tableName) => sql` + UPDATE ${sql(tableName)} + SET + "displayName" = NEW.data->>'displayName', + "updateCount" = "updateCount" + 1 + WHERE "userId" = NEW.data->>'userId' + ` + }); + + await sorciTestClient.refreshProjection("user-profile-rebuild-test"); + + const refreshedRows = await sorciTestClient.queryProjection( + "user-profile-rebuild-test" + ); + expect(refreshedRows).toHaveLength(2); + expect(refreshedRows).toEqual( + expect.arrayContaining([ + { + userId: userId1, + email: "alice@example.com", + displayName: "Alice Smith", + updateCount: 1 + }, + { + userId: userId2, + email: "bob@example.com", + displayName: "Bob Jones", + updateCount: 1 + } + ]) + ); + }); + + test("processes events added during refresh after refresh completes", async () => { + await sorciTestClient.createProjection({ + name: "account", + schema: { + accountId: { type: "text", primaryKey: true }, + balance: { type: "integer", default: 0 } + } + }); + + await sorciTestClient.setEventReducingToProjection({ + name: "account", + eventType: "account-created", + reducer: (sql, tableName) => sql` + INSERT INTO ${sql(tableName)} ("accountId", "balance") + VALUES (NEW.data->>'accountId', (NEW.data->>'balance')::integer) + ON CONFLICT ("accountId") DO NOTHING + ` + }); + + await sorciTestClient.setEventReducingToProjection({ + name: "account", + eventType: "account-deposited", + reducer: (sql, tableName) => sql` + UPDATE ${sql(tableName)} + SET "balance" = "balance" + (NEW.data->>'amount')::integer + WHERE "accountId" = NEW.data->>'accountId' + ` + }); + + const accountId = createId(); + + await sorciTestClient.insertEvents([ + { + id: createId(), + type: "account-created", + data: { + accountId, + balance: 100 + }, + identifier: { accountId } + } + ]); + + const refreshPromise = sorciTestClient.refreshProjection("account"); + await sorciTestClient.insertEvents([ + { + id: createId(), + type: "account-deposited", + data: { + accountId, + amount: 50 + }, + identifier: { accountId } + } + ]); + + await refreshPromise; + + const rows = await sorciTestClient.queryProjection("account"); + expect(rows).toHaveLength(1); + expect(rows[0]).toEqual({ + accountId, + balance: 150 + }); + }); + }); });