Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/common/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import { monotonicFactory } from "ulidx";
const ulid = monotonicFactory();

/**
* Generates a monotonic ULID (Universally Unique Lexicographically Sortable Identifier).
* Returns the provided id if given, otherwise creates a new monotonic ULID that is guaranteed
* to be greater than the previous one, even if generated in the same millisecond.
* @param id - Optional existing identifier to use instead of generating a new one
* @returns A ULID string
*/
export const createId = (id?: string) => id || ulid();
export const shortId = (id?: string) => id || ulid().toLowerCase().slice(0, 12);

Expand Down
23 changes: 12 additions & 11 deletions src/sorci.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,10 @@ export interface Sorci {
// Projections

/**
* Declare a projection with schema and query
* Create a new projection with schema. Throws if projection already exists.
* @category Projections
*/
declareProjection(declaration: ProjectionDeclaration): Promise<void>;
createProjection(declaration: ProjectionDeclaration): Promise<void>;

/**
* Query a projection
Expand All @@ -220,30 +220,31 @@ export interface Sorci {
): Promise<any[]>;

/**
* Add an event-specific reducer to a projection
* Set an event-specific reducer for a projection (adds new or updates existing)
* @category Projections
*/
addEventReducingToProjection(payload: {
setEventReducingToProjection(payload: {
name: string;
eventType: string;
reducer: EventReducer;
refreshProjection?: boolean;
}): Promise<void>;

/**
* Manually refresh a projection by reprocessing all events
* @category Projections
*/
// refreshProjection(name: string): Promise<void>;
refreshProjection(name: string): Promise<void>;

/**
* Update a projection's configuration
* Update an existing projection's schema with custom SQL alterations.
* Throws if projection doesn't exist.
* @category Projections
*/
// updateProjection(payload: {
// name: string;
// query?: Query;
// resetState?: boolean;
// }): Promise<void>;
updateProjection(payload: {
name: string;
alterationSQL: postgres.PendingQuery<postgres.Row[]>;
}): Promise<void>;

/**
* Drop a projection completely
Expand Down
111 changes: 105 additions & 6 deletions src/sorci.postgres.ts
Original file line number Diff line number Diff line change
Expand Up @@ -749,11 +749,11 @@ export class SorciPostgres implements Sorci {
}
}

async declareProjection(declaration: ProjectionDeclaration) {
async createProjection(declaration: ProjectionDeclaration) {
const { name, schema } = declaration;

if (this._projectionRegistry.has(name)) {
throw new Error(`Projection "${name}" is already declared`);
throw new Error(`Projection "${name}" already exists`);
}

await this.createProjectionTable(name, schema);
Expand Down Expand Up @@ -798,6 +798,9 @@ export class SorciPostgres implements Sorci {
throw new Error(`Projection "${name}" does not exist`);
}

const projection = this._projectionRegistry.get(name);
const eventTypes = Array.from(projection!.reducers.keys());

const tableName = `${this.streamName}_projection_${name.replace(/-/g, "_")}`;

await this.sql`DROP TABLE IF EXISTS ${this.sql(tableName)} CASCADE`;
Expand All @@ -809,14 +812,19 @@ export class SorciPostgres implements Sorci {
`;

this._projectionRegistry.delete(name);

for (const eventType of eventTypes) {
await this.createOrUpdateEventTypeTrigger(eventType);
}
}

async addEventReducingToProjection(payload: {
async setEventReducingToProjection(payload: {
name: string;
eventType: string;
reducer: EventReducer;
refreshProjection?: boolean;
}) {
const { name, eventType, reducer } = payload;
const { name, eventType, reducer, refreshProjection } = payload;

const projection = this._projectionRegistry.get(name);
if (!projection) {
Expand All @@ -826,6 +834,97 @@ export class SorciPostgres implements Sorci {
projection.reducers.set(eventType, reducer);

await this.createOrUpdateEventTypeTrigger(eventType);

if (refreshProjection) {
await this.refreshProjection(name);
}
}

async refreshProjection(name: string) {
const projection = this._projectionRegistry.get(name);
if (!projection) {
throw new Error(`Projection "${name}" does not exist`);
}

const eventTypes = Array.from(projection.reducers.keys());
if (eventTypes.length === 0) {
return;
}

const tableName = `${this.streamName}_projection_${name.replace(/-/g, "_")}`;

await this.sql.begin(async (sql) => {
await sql`TRUNCATE TABLE ${sql(tableName)}`;

const events = await sql`
SELECT * FROM ${sql(this.streamName)}
WHERE type = ANY(${eventTypes})
ORDER BY id
`;

for (const event of events) {
const reducer = projection.reducers.get(event.type);
if (!reducer) {
continue;
}

const mockSql = this.createMockSqlFunction();
const result = reducer(mockSql, tableName) as any;

if (!result || typeof result !== "object" || !result.__mockSQL) {
throw new Error(
`Reducer for event type "${event.type}" in projection "${name}" did not return a valid SQL query`
);
}

const reducerSQL = result.__mockSQL as string;

const eventTypeEscaped = event.type.replace(/'/g, "''");
const eventDataJson = JSON.stringify(event.data).replace(/'/g, "''");
const eventIdentifierJson = JSON.stringify(event.identifier).replace(
/'/g,
"''"
);
const eventIdEscaped = event.id.replace(/'/g, "''");

await sql.unsafe(`
DO $$
DECLARE
NEW RECORD;
BEGIN
SELECT
'${eventTypeEscaped}'::text as type,
'${eventDataJson}'::jsonb as data,
'${eventIdentifierJson}'::jsonb as identifier,
'${eventIdEscaped}'::text as id
INTO NEW;

${reducerSQL};
END $$;
`);
}
});
}

async updateProjection(payload: {
name: string;
alterationSQL: postgres.PendingQuery<postgres.Row[]>;
}) {
const { name, alterationSQL } = payload;

const projection = this._projectionRegistry.get(name);
if (!projection) {
throw new Error(`Projection "${name}" does not exist`);
}

await alterationSQL;

const metaTableName = `${this.streamName}_projections_meta`;
await this.sql`
UPDATE ${this.sql(metaTableName)}
SET updated_at = NOW()
WHERE name = ${name}
`;
}

private createMockSqlFunction(): any {
Expand Down Expand Up @@ -910,8 +1009,8 @@ export class SorciPostgres implements Sorci {
const functionSQL = `
CREATE OR REPLACE FUNCTION ${functionName}()
RETURNS TRIGGER AS $$
BEGIN
${functionBody}
BEGIN
${functionBody}
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
Expand Down
Loading