From ff34a2a0e2b4262af77c31ddeae87924caa4d342 Mon Sep 17 00:00:00 2001 From: Sraleik Date: Wed, 29 Oct 2025 12:04:59 +0100 Subject: [PATCH 1/3] feat: add refreshProjection method and support for optional refresh flag in event reducer registration, enhancing projection management --- src/sorci.interface.ts | 3 +- src/sorci.postgres.ts | 80 +++++++- src/sorci.projections.test.ts | 333 +++++++++++++++++++++++++++++++++- 3 files changed, 413 insertions(+), 3 deletions(-) diff --git a/src/sorci.interface.ts b/src/sorci.interface.ts index db60e37..d69fd32 100644 --- a/src/sorci.interface.ts +++ b/src/sorci.interface.ts @@ -227,13 +227,14 @@ export interface Sorci { name: string; eventType: string; reducer: EventReducer; + refreshProjection?: boolean; }): Promise; /** * Manually refresh a projection by reprocessing all events * @category Projections */ - // refreshProjection(name: string): Promise; + refreshProjection(name: string): Promise; /** * Update a projection's configuration diff --git a/src/sorci.postgres.ts b/src/sorci.postgres.ts index 53f12d2..53508c0 100644 --- a/src/sorci.postgres.ts +++ b/src/sorci.postgres.ts @@ -798,6 +798,9 @@ export class SorciPostgres implements Sorci { throw new Error(`Projection "${name}" does not exist`); } + const projection = this._projectionRegistry.get(name); + const eventTypes = Array.from(projection!.reducers.keys()); + const tableName = `${this.streamName}_projection_${name.replace(/-/g, "_")}`; await this.sql`DROP TABLE IF EXISTS ${this.sql(tableName)} CASCADE`; @@ -809,14 +812,19 @@ export class SorciPostgres implements Sorci { `; this._projectionRegistry.delete(name); + + for (const eventType of eventTypes) { + await this.createOrUpdateEventTypeTrigger(eventType); + } } async addEventReducingToProjection(payload: { name: string; eventType: string; reducer: EventReducer; + refreshProjection?: boolean; }) { - const { name, eventType, reducer } = payload; + const { name, eventType, reducer, refreshProjection } = payload; const projection = this._projectionRegistry.get(name); if (!projection) { @@ -826,6 +834,76 @@ export class SorciPostgres implements Sorci { projection.reducers.set(eventType, reducer); await this.createOrUpdateEventTypeTrigger(eventType); + + if (refreshProjection) { + await this.refreshProjection(name); + } + } + + async refreshProjection(name: string) { + const projection = this._projectionRegistry.get(name); + if (!projection) { + throw new Error(`Projection "${name}" does not exist`); + } + + const eventTypes = Array.from(projection.reducers.keys()); + if (eventTypes.length === 0) { + return; + } + + const tableName = `${this.streamName}_projection_${name.replace(/-/g, "_")}`; + + await this.sql.begin(async (sql) => { + await sql`TRUNCATE TABLE ${sql(tableName)}`; + + const events = await sql` + SELECT * FROM ${sql(this.streamName)} + WHERE type = ANY(${eventTypes}) + ORDER BY id + `; + + for (const event of events) { + const reducer = projection.reducers.get(event.type); + if (!reducer) { + continue; + } + + const mockSql = this.createMockSqlFunction(); + const result = reducer(mockSql, tableName) as any; + + if (!result || typeof result !== "object" || !result.__mockSQL) { + throw new Error( + `Reducer for event type "${event.type}" in projection "${name}" did not return a valid SQL query` + ); + } + + const reducerSQL = result.__mockSQL as string; + + const eventTypeEscaped = event.type.replace(/'/g, "''"); + const eventDataJson = JSON.stringify(event.data).replace(/'/g, "''"); + const eventIdentifierJson = JSON.stringify(event.identifier).replace( + /'/g, + "''" + ); + const eventIdEscaped = event.id.replace(/'/g, "''"); + + await sql.unsafe(` + DO $$ + DECLARE + NEW RECORD; + BEGIN + SELECT + '${eventTypeEscaped}'::text as type, + '${eventDataJson}'::jsonb as data, + '${eventIdentifierJson}'::jsonb as identifier, + '${eventIdEscaped}'::text as id + INTO NEW; + + ${reducerSQL}; + END $$; + `); + } + }); } private createMockSqlFunction(): any { diff --git a/src/sorci.projections.test.ts b/src/sorci.projections.test.ts index e231fcf..f62bee7 100644 --- a/src/sorci.projections.test.ts +++ b/src/sorci.projections.test.ts @@ -4,6 +4,8 @@ import { createId } from "./common/utils"; afterEach(async () => { const projections = [ "user-profile", + "user-profile-refresh-test", + "user-profile-rebuild-test", "task-tracking", "empty-projection", "sourcing-dashboard", @@ -347,6 +349,199 @@ describe("Projections", () => { }); describe("addEventReducingToProjection", () => { + test("registers a new reducer for an event type", async () => { + await sorciTestClient.declareProjection({ + name: "user-profile", + schema: { + userId: { type: "text", primaryKey: true }, + email: { type: "text" }, + displayName: { type: "text" } + } + }); + + await sorciTestClient.addEventReducingToProjection({ + name: "user-profile", + eventType: "user-created", + reducer: (sql, tableName) => sql` + INSERT INTO ${sql(tableName)} ("userId", "email", "displayName") + VALUES (NEW.data->>'userId', NEW.data->>'email', NEW.data->>'displayName') + ON CONFLICT ("userId") DO UPDATE SET + "email" = EXCLUDED."email", + "displayName" = EXCLUDED."displayName" + ` + }); + + const userId = createId(); + + await sorciTestClient.insertEvents([ + { + id: createId(), + type: "user-created", + data: { + userId, + email: "alice@example.com", + displayName: "Alice" + }, + identifier: { userId } + }, + { + id: createId(), + type: "user-renamed", + data: { + userId, + displayName: "Bob" + }, + identifier: { userId } + } + ]); + + const rows = await sorciTestClient.queryProjection("user-profile"); + expect(rows).toHaveLength(1); + expect(rows[0]).toEqual({ + userId, + email: "alice@example.com", + displayName: "Alice" + }); + + await sorciTestClient.addEventReducingToProjection({ + name: "user-profile", + eventType: "user-renamed", + reducer: (sql, tableName) => sql` + UPDATE ${sql(tableName)} + SET "displayName" = NEW.data->>'displayName' + WHERE "userId" = NEW.data->>'userId' + ` + }); + + const rowsBis = await sorciTestClient.queryProjection("user-profile"); + expect(rowsBis).toHaveLength(1); + expect(rowsBis[0]).toEqual({ + userId, + email: "alice@example.com", + displayName: "Alice" + }); + + await sorciTestClient.insertEvents([ + { + id: createId(), + type: "user-renamed", + data: { + userId, + displayName: "Charlie" + }, + identifier: { userId } + } + ]); + + const rowsTer = await sorciTestClient.queryProjection("user-profile"); + expect(rowsTer).toHaveLength(1); + expect(rowsTer[0]).toEqual({ + userId, + email: "alice@example.com", + displayName: "Charlie" + }); + }); + + test("registers a new reducer for an event type with projection refresh", async () => { + await sorciTestClient.declareProjection({ + name: "user-profile-refresh-test", + schema: { + userId: { type: "text", primaryKey: true }, + email: { type: "text" }, + displayName: { type: "text" } + } + }); + + await sorciTestClient.addEventReducingToProjection({ + name: "user-profile-refresh-test", + eventType: "user-created-refresh", + reducer: (sql, tableName) => sql` + INSERT INTO ${sql(tableName)} ("userId", "email", "displayName") + VALUES (NEW.data->>'userId', NEW.data->>'email', NEW.data->>'displayName') + ON CONFLICT ("userId") DO UPDATE SET + "email" = EXCLUDED."email", + "displayName" = EXCLUDED."displayName" + ` + }); + + const userId = createId(); + + await sorciTestClient.insertEvents([ + { + id: createId(), + type: "user-created-refresh", + data: { + userId, + email: "alice@example.com", + displayName: "Alice" + }, + identifier: { userId } + }, + { + id: createId(), + type: "user-renamed-refresh", + data: { + userId, + displayName: "Bob" + }, + identifier: { userId } + } + ]); + + const rows = await sorciTestClient.queryProjection( + "user-profile-refresh-test" + ); + expect(rows).toHaveLength(1); + expect(rows[0]).toEqual({ + userId, + email: "alice@example.com", + displayName: "Alice" + }); + + await sorciTestClient.addEventReducingToProjection({ + name: "user-profile-refresh-test", + eventType: "user-renamed-refresh", + reducer: (sql, tableName) => sql` + UPDATE ${sql(tableName)} + SET "displayName" = NEW.data->>'displayName' + WHERE "userId" = NEW.data->>'userId' + `, + refreshProjection: true + }); + + const rowsBis = await sorciTestClient.queryProjection( + "user-profile-refresh-test" + ); + expect(rowsBis).toHaveLength(1); + expect(rowsBis[0]).toEqual({ + userId, + email: "alice@example.com", + displayName: "Bob" + }); + + await sorciTestClient.insertEvents([ + { + id: createId(), + type: "user-renamed-refresh", + data: { + userId, + displayName: "Charlie" + }, + identifier: { userId } + } + ]); + + const rowsTer = await sorciTestClient.queryProjection( + "user-profile-refresh-test" + ); + expect(rowsTer).toHaveLength(1); + expect(rowsTer[0]).toEqual({ + userId, + email: "alice@example.com", + displayName: "Charlie" + }); + }); + test("registers a reducer for an event type", async () => { await sorciTestClient.declareProjection({ name: "user-profile", @@ -377,7 +572,7 @@ describe("Projections", () => { expect(projection.reducers.has("user-created")).toBe(true); }); - test("silly reduction", async () => { + test("mutiple projection on same event", async () => { await sorciTestClient.declareProjection({ name: "account", schema: { @@ -779,4 +974,140 @@ describe("Projections", () => { }); }); }); + + describe("refreshProjection", () => { + test("rebuilds projection from scratch with updated reducer logic", async () => { + await sorciTestClient.declareProjection({ + name: "user-profile-rebuild-test", + schema: { + userId: { type: "text", primaryKey: true }, + email: { type: "text" }, + displayName: { type: "text" }, + updateCount: { type: "integer", default: 0 } + } + }); + + await sorciTestClient.addEventReducingToProjection({ + name: "user-profile-rebuild-test", + eventType: "user-created-rebuild", + reducer: (sql, tableName) => sql` + INSERT INTO ${sql(tableName)} ("userId", "email", "displayName") + VALUES (NEW.data->>'userId', NEW.data->>'email', NEW.data->>'displayName') + ON CONFLICT ("userId") DO UPDATE SET + "email" = EXCLUDED."email", + "displayName" = EXCLUDED."displayName" + ` + }); + + await sorciTestClient.addEventReducingToProjection({ + name: "user-profile-rebuild-test", + eventType: "user-renamed-rebuild", + reducer: (sql, tableName) => sql` + UPDATE ${sql(tableName)} + SET "displayName" = NEW.data->>'displayName' + WHERE "userId" = NEW.data->>'userId' + ` + }); + + const userId1 = createId(); + const userId2 = createId(); + + await sorciTestClient.insertEvents([ + { + id: createId(), + type: "user-created-rebuild", + data: { + userId: userId1, + email: "alice@example.com", + displayName: "Alice" + }, + identifier: { userId: userId1 } + }, + { + id: createId(), + type: "user-created-rebuild", + data: { + userId: userId2, + email: "bob@example.com", + displayName: "Bob" + }, + identifier: { userId: userId2 } + }, + { + id: createId(), + type: "user-renamed-rebuild", + data: { + userId: userId1, + displayName: "Alice Smith" + }, + identifier: { userId: userId1 } + }, + { + id: createId(), + type: "user-renamed-rebuild", + data: { + userId: userId2, + displayName: "Bob Jones" + }, + identifier: { userId: userId2 } + } + ]); + + const initialRows = await sorciTestClient.queryProjection( + "user-profile-rebuild-test" + ); + expect(initialRows).toHaveLength(2); + expect(initialRows).toEqual( + expect.arrayContaining([ + { + userId: userId1, + email: "alice@example.com", + displayName: "Alice Smith", + updateCount: 0 + }, + { + userId: userId2, + email: "bob@example.com", + displayName: "Bob Jones", + updateCount: 0 + } + ]) + ); + + await sorciTestClient.addEventReducingToProjection({ + name: "user-profile-rebuild-test", + eventType: "user-renamed-rebuild", + reducer: (sql, tableName) => sql` + UPDATE ${sql(tableName)} + SET + "displayName" = NEW.data->>'displayName', + "updateCount" = "updateCount" + 1 + WHERE "userId" = NEW.data->>'userId' + ` + }); + + await sorciTestClient.refreshProjection("user-profile-rebuild-test"); + + const refreshedRows = await sorciTestClient.queryProjection( + "user-profile-rebuild-test" + ); + expect(refreshedRows).toHaveLength(2); + expect(refreshedRows).toEqual( + expect.arrayContaining([ + { + userId: userId1, + email: "alice@example.com", + displayName: "Alice Smith", + updateCount: 1 + }, + { + userId: userId2, + email: "bob@example.com", + displayName: "Bob Jones", + updateCount: 1 + } + ]) + ); + }); + }); }); From 07b9160dfc3f0276318be5660529c04a95283005 Mon Sep 17 00:00:00 2001 From: Sraleik Date: Wed, 29 Oct 2025 14:34:54 +0100 Subject: [PATCH 2/3] feat: add test for processing events during projection refresh, ensuring correct balance updates after event insertion --- src/sorci.projections.test.ts | 66 +++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/src/sorci.projections.test.ts b/src/sorci.projections.test.ts index f62bee7..6a367f2 100644 --- a/src/sorci.projections.test.ts +++ b/src/sorci.projections.test.ts @@ -1109,5 +1109,71 @@ describe("Projections", () => { ]) ); }); + + test("processes events added during refresh after refresh completes", async () => { + await sorciTestClient.declareProjection({ + name: "account", + schema: { + accountId: { type: "text", primaryKey: true }, + balance: { type: "integer", default: 0 } + } + }); + + await sorciTestClient.addEventReducingToProjection({ + name: "account", + eventType: "account-created", + reducer: (sql, tableName) => sql` + INSERT INTO ${sql(tableName)} ("accountId", "balance") + VALUES (NEW.data->>'accountId', (NEW.data->>'balance')::integer) + ON CONFLICT ("accountId") DO NOTHING + ` + }); + + await sorciTestClient.addEventReducingToProjection({ + name: "account", + eventType: "account-deposited", + reducer: (sql, tableName) => sql` + UPDATE ${sql(tableName)} + SET "balance" = "balance" + (NEW.data->>'amount')::integer + WHERE "accountId" = NEW.data->>'accountId' + ` + }); + + const accountId = createId(); + + await sorciTestClient.insertEvents([ + { + id: createId(), + type: "account-created", + data: { + accountId, + balance: 100 + }, + identifier: { accountId } + } + ]); + + const refreshPromise = sorciTestClient.refreshProjection("account"); + await sorciTestClient.insertEvents([ + { + id: createId(), + type: "account-deposited", + data: { + accountId, + amount: 50 + }, + identifier: { accountId } + } + ]); + + await refreshPromise; + + const rows = await sorciTestClient.queryProjection("account"); + expect(rows).toHaveLength(1); + expect(rows[0]).toEqual({ + accountId, + balance: 150 + }); + }); }); }); From 7c4304db704464f4d181ac2241127bf7b474705c Mon Sep 17 00:00:00 2001 From: Sraleik Date: Wed, 29 Oct 2025 19:54:35 +0100 Subject: [PATCH 3/3] Document createId function in jsdoc --- src/common/utils.ts | 7 + src/sorci.interface.ts | 20 +-- src/sorci.postgres.ts | 31 +++- src/sorci.projections.test.ts | 269 ++++++++++++++++++++++++++++------ 4 files changed, 266 insertions(+), 61 deletions(-) diff --git a/src/common/utils.ts b/src/common/utils.ts index 12cb618..9165c9b 100644 --- a/src/common/utils.ts +++ b/src/common/utils.ts @@ -1,6 +1,13 @@ import { monotonicFactory } from "ulidx"; const ulid = monotonicFactory(); +/** + * Generates a monotonic ULID (Universally Unique Lexicographically Sortable Identifier). + * Returns the provided id if given, otherwise creates a new monotonic ULID that is guaranteed + * to be greater than the previous one, even if generated in the same millisecond. + * @param id - Optional existing identifier to use instead of generating a new one + * @returns A ULID string + */ export const createId = (id?: string) => id || ulid(); export const shortId = (id?: string) => id || ulid().toLowerCase().slice(0, 12); diff --git a/src/sorci.interface.ts b/src/sorci.interface.ts index d69fd32..53fe0fc 100644 --- a/src/sorci.interface.ts +++ b/src/sorci.interface.ts @@ -205,10 +205,10 @@ export interface Sorci { // Projections /** - * Declare a projection with schema and query + * Create a new projection with schema. Throws if projection already exists. * @category Projections */ - declareProjection(declaration: ProjectionDeclaration): Promise; + createProjection(declaration: ProjectionDeclaration): Promise; /** * Query a projection @@ -220,10 +220,10 @@ export interface Sorci { ): Promise; /** - * Add an event-specific reducer to a projection + * Set an event-specific reducer for a projection (adds new or updates existing) * @category Projections */ - addEventReducingToProjection(payload: { + setEventReducingToProjection(payload: { name: string; eventType: string; reducer: EventReducer; @@ -237,14 +237,14 @@ export interface Sorci { refreshProjection(name: string): Promise; /** - * Update a projection's configuration + * Update an existing projection's schema with custom SQL alterations. + * Throws if projection doesn't exist. * @category Projections */ - // updateProjection(payload: { - // name: string; - // query?: Query; - // resetState?: boolean; - // }): Promise; + updateProjection(payload: { + name: string; + alterationSQL: postgres.PendingQuery; + }): Promise; /** * Drop a projection completely diff --git a/src/sorci.postgres.ts b/src/sorci.postgres.ts index 53508c0..55e33f7 100644 --- a/src/sorci.postgres.ts +++ b/src/sorci.postgres.ts @@ -749,11 +749,11 @@ export class SorciPostgres implements Sorci { } } - async declareProjection(declaration: ProjectionDeclaration) { + async createProjection(declaration: ProjectionDeclaration) { const { name, schema } = declaration; if (this._projectionRegistry.has(name)) { - throw new Error(`Projection "${name}" is already declared`); + throw new Error(`Projection "${name}" already exists`); } await this.createProjectionTable(name, schema); @@ -818,7 +818,7 @@ export class SorciPostgres implements Sorci { } } - async addEventReducingToProjection(payload: { + async setEventReducingToProjection(payload: { name: string; eventType: string; reducer: EventReducer; @@ -906,6 +906,27 @@ export class SorciPostgres implements Sorci { }); } + async updateProjection(payload: { + name: string; + alterationSQL: postgres.PendingQuery; + }) { + const { name, alterationSQL } = payload; + + const projection = this._projectionRegistry.get(name); + if (!projection) { + throw new Error(`Projection "${name}" does not exist`); + } + + await alterationSQL; + + const metaTableName = `${this.streamName}_projections_meta`; + await this.sql` + UPDATE ${this.sql(metaTableName)} + SET updated_at = NOW() + WHERE name = ${name} + `; + } + private createMockSqlFunction(): any { const mockSql: any = (strings: TemplateStringsArray, ...values: any[]) => { let sql = ""; @@ -988,8 +1009,8 @@ export class SorciPostgres implements Sorci { const functionSQL = ` CREATE OR REPLACE FUNCTION ${functionName}() RETURNS TRIGGER AS $$ - BEGIN -${functionBody} + BEGIN + ${functionBody} RETURN NEW; END; $$ LANGUAGE plpgsql; diff --git a/src/sorci.projections.test.ts b/src/sorci.projections.test.ts index 6a367f2..d81c86c 100644 --- a/src/sorci.projections.test.ts +++ b/src/sorci.projections.test.ts @@ -18,9 +18,9 @@ afterEach(async () => { }); describe("Projections", () => { - describe("declareProjection", () => { + describe("createProjection", () => { test("creates table with correct schema, columns, primary keys and indexes", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user-profile", schema: { userId: { type: "text", primaryKey: true }, @@ -113,7 +113,7 @@ describe("Projections", () => { }); test("supports ulid type and default values", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "task-tracking", schema: { taskId: { type: "ulid", primaryKey: true }, @@ -159,7 +159,7 @@ describe("Projections", () => { }); test("Check default values are applied", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "task-tracking", schema: { taskId: { type: "ulid", primaryKey: true }, @@ -170,7 +170,7 @@ describe("Projections", () => { } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "task-tracking", eventType: "task-created", reducer: (sql, tableName) => sql` @@ -207,7 +207,7 @@ describe("Projections", () => { describe("dropProjection", () => { test("removes table, meta entry, and registry", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user-profile", schema: { userId: { type: "text", primaryKey: true }, @@ -254,9 +254,186 @@ describe("Projections", () => { }); }); + describe("updateProjection", () => { + test("adds column to existing projection", async () => { + await sorciTestClient.createProjection({ + name: "user-profile", + schema: { + userId: { type: "text", primaryKey: true }, + email: { type: "text" } + } + }); + + const sorciPostgres = sorciTestClient as SorciPostgres; + const sql = (sorciPostgres as any).sql; + const streamName = (sorciPostgres as any).streamName; + const tableName = `${streamName}_projection_user_profile`; + + await sorciTestClient.updateProjection({ + name: "user-profile", + alterationSQL: sql` + ALTER TABLE ${sql(tableName)} + ADD COLUMN display_name text + ` + }); + + const columns = await sql` + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_name = ${tableName} + ORDER BY ordinal_position + `; + + expect(columns.map((col: any) => col.column_name)).toEqual([ + "userId", + "email", + "display_name" + ]); + + const metaTableName = `${streamName}_projections_meta`; + const metaRows = await sql` + SELECT updated_at FROM ${sql(metaTableName)} + WHERE name = 'user-profile' + `; + + expect(metaRows).toHaveLength(1); + expect(metaRows[0].updated_at).toBeDefined(); + }); + + test("adds index to existing projection", async () => { + await sorciTestClient.createProjection({ + name: "user-profile", + schema: { + userId: { type: "text", primaryKey: true }, + email: { type: "text" } + } + }); + + const sorciPostgres = sorciTestClient as SorciPostgres; + const sql = (sorciPostgres as any).sql; + const streamName = (sorciPostgres as any).streamName; + const tableName = `${streamName}_projection_user_profile`; + + await sorciTestClient.updateProjection({ + name: "user-profile", + alterationSQL: sql` + CREATE INDEX idx_user_profile_email + ON ${sql(tableName)} USING btree (email) + ` + }); + + const indexes = await sql` + SELECT + i.relname as index_name, + a.attname as column_name, + am.amname as index_type + FROM pg_class t + JOIN pg_index ix ON t.oid = ix.indrelid + JOIN pg_class i ON i.oid = ix.indexrelid + JOIN pg_am am ON i.relam = am.oid + JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) + WHERE t.relname = ${tableName} + AND NOT ix.indisprimary + `; + + const emailIndex = indexes.find( + (idx: any) => idx.column_name === "email" + ); + expect(emailIndex).toBeDefined(); + expect(emailIndex.index_type).toBe("btree"); + }); + + test("throws error if projection doesn't exist", async () => { + const sorciPostgres = sorciTestClient as SorciPostgres; + const sql = (sorciPostgres as any).sql; + + await expect( + sorciTestClient.updateProjection({ + name: "non-existent", + alterationSQL: sql`ALTER TABLE fake_table ADD COLUMN test text` + }) + ).rejects.toThrow('Projection "non-existent" does not exist'); + }); + + test("works with setEventReducingToProjection and refreshProjection", async () => { + await sorciTestClient.createProjection({ + name: "user-profile", + schema: { + userId: { type: "text", primaryKey: true }, + email: { type: "text" } + } + }); + + await sorciTestClient.setEventReducingToProjection({ + name: "user-profile", + eventType: "user-created", + reducer: (sql, tableName) => sql` + INSERT INTO ${sql(tableName)} ("userId", "email") + VALUES (NEW.data->>'userId', NEW.data->>'email') + ON CONFLICT ("userId") DO UPDATE SET + "email" = EXCLUDED."email" + ` + }); + + const userId = createId(); + await sorciTestClient.insertEvents([ + { + id: createId(), + type: "user-created", + data: { + userId, + email: "alice@example.com" + }, + identifier: { userId } + } + ]); + + const rowsBefore = await sorciTestClient.queryProjection("user-profile"); + expect(rowsBefore).toHaveLength(1); + expect(rowsBefore[0]).toEqual({ + userId, + email: "alice@example.com" + }); + + const sorciPostgres = sorciTestClient as SorciPostgres; + const sql = (sorciPostgres as any).sql; + const streamName = (sorciPostgres as any).streamName; + const tableName = `${streamName}_projection_user_profile`; + + await sorciTestClient.updateProjection({ + name: "user-profile", + alterationSQL: sql` + ALTER TABLE ${sql(tableName)} + ADD COLUMN display_name text DEFAULT 'Unknown' + ` + }); + + await sorciTestClient.setEventReducingToProjection({ + name: "user-profile", + eventType: "user-created", + reducer: (sql, tableName) => sql` + INSERT INTO ${sql(tableName)} ("userId", "email", "display_name") + VALUES (NEW.data->>'userId', NEW.data->>'email', NEW.data->>'displayName') + ON CONFLICT ("userId") DO UPDATE SET + "email" = EXCLUDED."email", + "display_name" = EXCLUDED."display_name" + `, + refreshProjection: true + }); + + const rowsAfter = await sorciTestClient.queryProjection("user-profile"); + expect(rowsAfter).toHaveLength(1); + expect(rowsAfter[0]).toEqual({ + userId, + email: "alice@example.com", + display_name: null + }); + }); + }); + describe("queryProjection", () => { test("retrieves data from projection table", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user-profile", schema: { userId: { type: "text", primaryKey: true }, @@ -298,7 +475,7 @@ describe("Projections", () => { }); test("queries projection with where clause", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user-profile", schema: { userId: { type: "text", primaryKey: true }, @@ -336,7 +513,7 @@ describe("Projections", () => { }); test("returns empty array for projection with no data", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "empty-projection", schema: { id: { type: "text", primaryKey: true } @@ -348,9 +525,9 @@ describe("Projections", () => { }); }); - describe("addEventReducingToProjection", () => { + describe("setEventReducingToProjection", () => { test("registers a new reducer for an event type", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user-profile", schema: { userId: { type: "text", primaryKey: true }, @@ -359,7 +536,7 @@ describe("Projections", () => { } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user-profile", eventType: "user-created", reducer: (sql, tableName) => sql` @@ -403,7 +580,7 @@ describe("Projections", () => { displayName: "Alice" }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user-profile", eventType: "user-renamed", reducer: (sql, tableName) => sql` @@ -443,7 +620,7 @@ describe("Projections", () => { }); test("registers a new reducer for an event type with projection refresh", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user-profile-refresh-test", schema: { userId: { type: "text", primaryKey: true }, @@ -452,7 +629,7 @@ describe("Projections", () => { } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user-profile-refresh-test", eventType: "user-created-refresh", reducer: (sql, tableName) => sql` @@ -498,7 +675,7 @@ describe("Projections", () => { displayName: "Alice" }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user-profile-refresh-test", eventType: "user-renamed-refresh", reducer: (sql, tableName) => sql` @@ -543,7 +720,7 @@ describe("Projections", () => { }); test("registers a reducer for an event type", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user-profile", schema: { userId: { type: "text", primaryKey: true }, @@ -552,7 +729,7 @@ describe("Projections", () => { } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user-profile", eventType: "user-created", reducer: (sql, tableName) => sql` @@ -573,7 +750,7 @@ describe("Projections", () => { }); test("mutiple projection on same event", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "account", schema: { accountId: { type: "ulid", primaryKey: true }, @@ -582,7 +759,7 @@ describe("Projections", () => { } }); - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user", schema: { userId: { type: "ulid", primaryKey: true }, @@ -591,7 +768,7 @@ describe("Projections", () => { } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "account", eventType: "user-created", reducer: (sql, tableName) => sql` @@ -601,7 +778,7 @@ describe("Projections", () => { ` }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user", eventType: "user-created", reducer: (sql, tableName) => sql` @@ -611,7 +788,7 @@ describe("Projections", () => { ` }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "account", eventType: "user-renamed", reducer: (sql, tableName) => sql` @@ -621,7 +798,7 @@ describe("Projections", () => { ` }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "account", eventType: "user-deleted", reducer: (sql, tableName) => sql` @@ -671,7 +848,7 @@ describe("Projections", () => { test("throws error when projection does not exist", async () => { await expect( - sorciTestClient.addEventReducingToProjection({ + sorciTestClient.setEventReducingToProjection({ name: "non-existent", eventType: "some-event", reducer: (sql, tableName) => sql` @@ -682,7 +859,7 @@ describe("Projections", () => { }); test("allows multiple reducers for different event types on same projection", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user-profile", schema: { userId: { type: "text", primaryKey: true }, @@ -690,7 +867,7 @@ describe("Projections", () => { } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user-profile", eventType: "user-created", reducer: (sql, tableName) => sql` @@ -699,7 +876,7 @@ describe("Projections", () => { ` }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user-profile", eventType: "user-updated", reducer: (sql, tableName) => sql` @@ -721,7 +898,7 @@ describe("Projections", () => { describe("End-to-End: Automatic projection updates", () => { test("projection is automatically updated when event is inserted", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user-profile", schema: { userId: { type: "text", primaryKey: true }, @@ -730,7 +907,7 @@ describe("Projections", () => { } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user-profile", eventType: "user-created", reducer: (sql, tableName) => sql` @@ -765,7 +942,7 @@ describe("Projections", () => { }); test("projection is updated by multiple events", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user-profile", schema: { userId: { type: "text", primaryKey: true }, @@ -774,7 +951,7 @@ describe("Projections", () => { } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user-profile", eventType: "user-created", reducer: (sql, tableName) => sql` @@ -843,7 +1020,7 @@ describe("Projections", () => { }); test("multiple event on same projection are processed in order", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "sourcing-dashboard", schema: { sourcingId: { type: "text", primaryKey: true }, @@ -852,7 +1029,7 @@ describe("Projections", () => { } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "sourcing-dashboard", eventType: "sourcing-created", reducer: (sql, tableName) => sql` @@ -863,7 +1040,7 @@ describe("Projections", () => { ` }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "sourcing-dashboard", eventType: "sourcing-deleted", reducer: (sql, tableName) => sql` @@ -908,7 +1085,7 @@ describe("Projections", () => { }); test("same event type is processed properly for multiple projections", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user", schema: { userId: { type: "text", primaryKey: true }, @@ -917,14 +1094,14 @@ describe("Projections", () => { } }); - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "account", schema: { userId: { type: "text", primaryKey: true } } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user", eventType: "user-created", reducer: (sql, tableName) => sql` @@ -936,7 +1113,7 @@ describe("Projections", () => { ` }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "account", eventType: "user-created", reducer: (sql, tableName) => sql` @@ -977,7 +1154,7 @@ describe("Projections", () => { describe("refreshProjection", () => { test("rebuilds projection from scratch with updated reducer logic", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "user-profile-rebuild-test", schema: { userId: { type: "text", primaryKey: true }, @@ -987,7 +1164,7 @@ describe("Projections", () => { } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user-profile-rebuild-test", eventType: "user-created-rebuild", reducer: (sql, tableName) => sql` @@ -999,7 +1176,7 @@ describe("Projections", () => { ` }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user-profile-rebuild-test", eventType: "user-renamed-rebuild", reducer: (sql, tableName) => sql` @@ -1074,7 +1251,7 @@ describe("Projections", () => { ]) ); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "user-profile-rebuild-test", eventType: "user-renamed-rebuild", reducer: (sql, tableName) => sql` @@ -1111,7 +1288,7 @@ describe("Projections", () => { }); test("processes events added during refresh after refresh completes", async () => { - await sorciTestClient.declareProjection({ + await sorciTestClient.createProjection({ name: "account", schema: { accountId: { type: "text", primaryKey: true }, @@ -1119,7 +1296,7 @@ describe("Projections", () => { } }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "account", eventType: "account-created", reducer: (sql, tableName) => sql` @@ -1129,7 +1306,7 @@ describe("Projections", () => { ` }); - await sorciTestClient.addEventReducingToProjection({ + await sorciTestClient.setEventReducingToProjection({ name: "account", eventType: "account-deposited", reducer: (sql, tableName) => sql`