Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions migrations/1774715200000_audit-and-webhook-outbox.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Migration: Add audit_logs and webhook_outbox tables
*
* audit_logs — written atomically with stream operations so audit rows are
* always in sync with stream rows (transactional write path).
*
* webhook_outbox — transactional outbox pattern; a row is inserted here
* atomically with the stream write so the dispatcher can
* pick it up without risk of the stream being written but
* the webhook being lost (or vice-versa).
*/

import { ColumnDefinitions, MigrationBuilder } from 'node-pg-migrate';

export const shorthands: ColumnDefinitions | undefined = undefined;

export async function up(pgm: MigrationBuilder): Promise<void> {
// ── audit_logs ────────────────────────────────────────────────────────────
pgm.createTable('audit_logs', {
id: { type: 'bigserial', primaryKey: true },
seq: { type: 'bigint', notNull: true },
timestamp: { type: 'text', notNull: true },
action: { type: 'text', notNull: true },
resource_type: { type: 'text', notNull: true },
resource_id: { type: 'text', notNull: true },
correlation_id: { type: 'text' },
meta: { type: 'jsonb' }, // NULL when no metadata
});

pgm.createIndex('audit_logs', 'resource_id');
pgm.createIndex('audit_logs', 'action');
pgm.createIndex('audit_logs', 'timestamp');

// ── webhook_outbox ────────────────────────────────────────────────────────
pgm.createTable('webhook_outbox', {
id: { type: 'bigserial', primaryKey: true },
stream_id: { type: 'text', notNull: true },
event_type: { type: 'text', notNull: true },
payload: { type: 'jsonb', notNull: true }, // amounts are decimal strings
created_at: { type: 'timestamp with time zone', notNull: true, default: pgm.func('current_timestamp') },
processed: { type: 'boolean', notNull: true, default: false },
});

pgm.createIndex('webhook_outbox', 'stream_id');
pgm.createIndex('webhook_outbox', 'processed');
pgm.createIndex('webhook_outbox', 'created_at');
}

export async function down(pgm: MigrationBuilder): Promise<void> {
pgm.dropTable('webhook_outbox');
pgm.dropTable('audit_logs');
}
45 changes: 45 additions & 0 deletions src/db/migrations/001_create_streams_table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,51 @@ CREATE INDEX IF NOT EXISTS idx_streams_start_time ON streams (start_time);
CREATE INDEX IF NOT EXISTS idx_streams_end_time ON streams (end_time);
`;

/**
* Audit log table — stores immutable records of privileged state changes.
* Written atomically with stream operations via transactionalUpsertStream /
* transactionalUpdateStream so audit rows are always in sync with stream rows.
*/
export const upAuditLogs = `
CREATE TABLE IF NOT EXISTS audit_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
seq INTEGER NOT NULL,
timestamp TEXT NOT NULL,
action TEXT NOT NULL,
resource_type TEXT NOT NULL,
resource_id TEXT NOT NULL,
correlation_id TEXT,
meta TEXT -- JSON string or NULL
);

CREATE INDEX IF NOT EXISTS idx_audit_logs_resource_id ON audit_logs(resource_id);
CREATE INDEX IF NOT EXISTS idx_audit_logs_action ON audit_logs(action);
CREATE INDEX IF NOT EXISTS idx_audit_logs_timestamp ON audit_logs(timestamp);
`;

/**
* Webhook outbox table — transactional outbox pattern.
* A row is inserted here atomically with the stream write so the webhook
* dispatcher can pick it up without risk of the stream being written but
* the webhook being lost (or vice-versa).
*/
export const upWebhookOutbox = `
CREATE TABLE IF NOT EXISTS webhook_outbox (
id INTEGER PRIMARY KEY AUTOINCREMENT,
stream_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload TEXT NOT NULL, -- JSON string; amounts are decimal strings
created_at TEXT NOT NULL,
processed INTEGER NOT NULL DEFAULT 0 -- 0 = pending, 1 = dispatched
);

CREATE INDEX IF NOT EXISTS idx_webhook_outbox_stream_id ON webhook_outbox(stream_id);
CREATE INDEX IF NOT EXISTS idx_webhook_outbox_processed ON webhook_outbox(processed);
CREATE INDEX IF NOT EXISTS idx_webhook_outbox_created_at ON webhook_outbox(created_at);
`;

export const down = `
DROP TABLE IF EXISTS webhook_outbox;
DROP TABLE IF EXISTS audit_logs;
DROP TABLE IF EXISTS streams;
`;
260 changes: 260 additions & 0 deletions src/db/repositories/streamRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@
* Amount columns are stored and returned as TEXT. No numeric coercion
* is performed here — callers own that responsibility.
*
* Transactional operations
* ------------------------
* `transactionalUpsertStream` and `transactionalUpdateStream` wrap the stream
* write, an audit_logs row, and an optional webhook_outbox row inside a single
* SQLite transaction. If any step fails the entire transaction is rolled back,
* guaranteeing that the three tables are always in sync.
*
* Decimal-string amounts
* ----------------------
* All monetary fields (amount, streamed_amount, remaining_amount,
* rate_per_second) are stored and returned as TEXT. The repository never
* converts them to numbers, preserving full precision across the
* chain → DB → API boundary.
*
* @module db/repositories/streamRepository
*/

Expand Down Expand Up @@ -374,4 +388,250 @@ export const streamRepository = {
}
return counts;
},

// ── Transactional operations ──────────────────────────────────────────────

/**
* Atomically create-or-update a stream, write an audit_logs row, and
* optionally enqueue a webhook_outbox row — all in one SQLite transaction.
*
* On any failure the entire transaction is rolled back so the three tables
* remain in sync. Decimal-string amounts are preserved as-is.
*
* @param input Stream data from blockchain event
* @param auditAction Audit action to record (e.g. 'STREAM_CREATED')
* @param opts Correlation ID and optional webhook payload
*/
transactionalUpsertStream(
input: CreateStreamInput,
auditAction: AuditAction,
opts: TransactionOptions = {},
): TransactionalUpsertResult {
const db = getDatabase();

// Validate before opening the transaction to fail fast.
const validation = validateStreamInput(input);
if (!validation.valid) {
throw new Error(`Invalid stream input: ${validation.errors.join(", ")}`);
}

const txn = db.transaction((): TransactionalUpsertResult => {
const now = new Date().toISOString();

// ── 1. Stream upsert (idempotent) ──────────────────────────────────
const existing = db
.prepare(
`SELECT * FROM streams WHERE transaction_hash = ? AND event_index = ?`,
)
.get(input.transaction_hash, input.event_index) as StreamRecord | undefined;

if (existing) {
debug("Stream already exists (idempotent)", {
id: existing.id,
txHash: input.transaction_hash,
correlationId: opts.correlationId,
});
// Still write audit + webhook so callers get consistent results.
const auditEntry = buildAuditEntry(
auditAction,
"stream",
existing.id,
opts.correlationId,
buildStreamMeta(input),
);
writeAuditEntryToDb(db, auditEntry);
maybeWriteWebhookOutbox(db, existing.id, opts.webhookEvent);
return { created: false, updated: false, stream: existing, auditSeq: auditEntry.seq };
}

const existingById = db
.prepare("SELECT * FROM streams WHERE id = ?")
.get(input.id) as StreamRecord | undefined;

let stream: StreamRecord;
let created: boolean;
let updated: boolean;

if (existingById) {
// Out-of-order event: update existing record.
info("Updating existing stream with new event data (transactional)", {
id: input.id,
correlationId: opts.correlationId,
});

db.prepare(`
UPDATE streams SET
sender_address = ?, recipient_address = ?,
amount = ?, streamed_amount = ?, remaining_amount = ?,
rate_per_second = ?, start_time = ?, end_time = ?,
status = ?, contract_id = ?,
transaction_hash = ?, event_index = ?, updated_at = ?
WHERE id = ?
`).run(
input.sender_address, input.recipient_address,
input.amount, input.streamed_amount, input.remaining_amount,
input.rate_per_second, input.start_time, input.end_time,
"active", input.contract_id,
input.transaction_hash, input.event_index, now,
input.id,
);

stream = db.prepare("SELECT * FROM streams WHERE id = ?").get(input.id) as StreamRecord;
created = false;
updated = true;
} else {
// New stream.
info("Creating new stream from event (transactional)", {
id: input.id,
correlationId: opts.correlationId,
});

db.prepare(`
INSERT INTO streams (
id, sender_address, recipient_address, amount, streamed_amount,
remaining_amount, rate_per_second, start_time, end_time, status,
contract_id, transaction_hash, event_index, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`).run(
input.id,
input.sender_address, input.recipient_address,
input.amount, input.streamed_amount, input.remaining_amount,
input.rate_per_second, input.start_time, input.end_time,
"active", input.contract_id,
input.transaction_hash, input.event_index,
now, now,
);

stream = db.prepare("SELECT * FROM streams WHERE id = ?").get(input.id) as StreamRecord;
created = true;
updated = false;
}

// ── 2. Audit log row ───────────────────────────────────────────────
const auditEntry = buildAuditEntry(
auditAction,
"stream",
stream.id,
opts.correlationId,
buildStreamMeta(input),
);
writeAuditEntryToDb(db, auditEntry);

// ── 3. Webhook outbox row (optional) ──────────────────────────────
maybeWriteWebhookOutbox(db, stream.id, opts.webhookEvent);

return { created, updated, stream, auditSeq: auditEntry.seq };
});

try {
return txn();
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
logError("Transaction rolled back (upsertStream)", {
id: input.id,
error: message,
correlationId: opts.correlationId,
});
throw err;
}
},

/**
* Atomically update a stream's status/amounts, write an audit_logs row, and
* optionally enqueue a webhook_outbox row — all in one SQLite transaction.
*
* Validates the status-machine transition before opening the transaction.
* On any failure the entire transaction is rolled back.
*
* @param id Stream ID
* @param input Fields to update
* @param auditAction Audit action to record (e.g. 'STREAM_CANCELLED')
* @param opts Correlation ID and optional webhook payload
*/
transactionalUpdateStream(
id: string,
input: UpdateStreamInput,
auditAction: AuditAction,
opts: TransactionOptions = {},
): TransactionalUpdateResult {
const db = getDatabase();

const txn = db.transaction((): TransactionalUpdateResult => {
const now = new Date().toISOString();

// ── 1. Fetch current record (inside txn for consistent read) ───────
const current = db
.prepare("SELECT * FROM streams WHERE id = ?")
.get(id) as StreamRecord | undefined;

if (!current) {
throw new Error(`Stream not found: ${id}`);
}

// ── 2. Validate status transition ──────────────────────────────────
if (input.status && !isValidStatusTransition(current.status, input.status)) {
throw new Error(
`Invalid status transition: ${current.status} -> ${input.status}. ` +
`Valid transitions: ${STREAM_INVARIANTS.validTransitions[current.status].join(", ")}`,
);
}

// ── 3. Build and execute UPDATE ────────────────────────────────────
const updates: string[] = ["updated_at = ?"];
const values: (string | number)[] = [now];

if (input.status !== undefined) {
updates.push("status = ?");
values.push(input.status);
}
if (input.streamed_amount !== undefined) {
updates.push("streamed_amount = ?");
values.push(input.streamed_amount);
}
if (input.remaining_amount !== undefined) {
updates.push("remaining_amount = ?");
values.push(input.remaining_amount);
}
if (input.end_time !== undefined) {
updates.push("end_time = ?");
values.push(input.end_time);
}

values.push(id);
db.prepare(`UPDATE streams SET ${updates.join(", ")} WHERE id = ?`).run(...values);

const stream = db
.prepare("SELECT * FROM streams WHERE id = ?")
.get(id) as StreamRecord;

info("Stream updated (transactional)", { id, input, correlationId: opts.correlationId });

// ── 4. Audit log row ───────────────────────────────────────────────
const auditEntry = buildAuditEntry(
auditAction,
"stream",
id,
opts.correlationId,
{ previousStatus: current.status, ...input } as Record<string, unknown>,
);
writeAuditEntryToDb(db, auditEntry);

// ── 5. Webhook outbox row (optional) ──────────────────────────────
maybeWriteWebhookOutbox(db, id, opts.webhookEvent);

return { stream, auditSeq: auditEntry.seq };
});

try {
return txn();
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
logError("Transaction rolled back (updateStream)", {
id,
error: message,
correlationId: opts.correlationId,
});
throw err;
}
},
};
Loading
Loading