Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions infrastructure/evault-core/src/config/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,12 @@ export const AppDataSource = new DataSource({
ca: process.env.DB_CA_CERT,
}
: false,
// Connection pool configuration to prevent exhaustion
extra: {
max: 10,
min: 2,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 5000,
statement_timeout: 10000,
},
})
58 changes: 48 additions & 10 deletions platforms/cerberus/src/controllers/WebhookController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,59 @@ export class WebhookController {
Array.isArray(local.data.participants)
) {
console.log("Processing participants:", local.data.participants);

// Use Promise.allSettled with timeout to prevent webhook hang
const participantPromises = local.data.participants.map(
async (ref: string) => {
if (ref && typeof ref === "string") {
const userId = ref.split("(")[1].split(")")[0];
console.log("Extracted userId:", userId);
return await this.userService.getUserById(userId);
async (ref: string, index: number) => {
if (!ref || typeof ref !== "string") {
return null;
}

try {
const userId = ref.split("(")[1]?.split(")")[0];
if (!userId) {
console.warn(`⚠️ Could not extract userId from ref: ${ref}`);
return null;
}

console.log(`Extracted userId [${index}]: ${userId}`);

// Add 5-second timeout to prevent indefinite hang
const timeoutPromise = new Promise<null>((_, reject) =>
setTimeout(() => reject(new Error(`Timeout loading user ${userId}`)), 5000)
);

const userPromise = this.userService.userRepository.findOne({
where: { id: userId },
// Skip heavy relations in webhook context - only need basic user data
});

const user = await Promise.race([userPromise, timeoutPromise]);

if (user) {
console.log(`✅ Loaded user [${index}]: ${userId}`);
} else {
console.warn(`⚠️ User not found [${index}]: ${userId}`);
}

return user;
} catch (error) {
console.error(`❌ Error loading participant [${index}]:`, error instanceof Error ? error.message : error);
return null;
}
return null;
}
);

participants = (
await Promise.all(participantPromises)
).filter((user): user is User => user !== null);
console.log("Found participants:", participants.length);
// Use allSettled to handle failures gracefully without blocking
const settledResults = await Promise.allSettled(participantPromises);

participants = settledResults
.filter((result): result is PromiseFulfilledResult<User | null> =>
result.status === 'fulfilled' && result.value !== null
)
.map(result => result.value as User);

console.log(`Found ${participants.length} participants (${settledResults.filter(r => r.status === 'rejected').length} failed)`);
}

// Process admins - filter out nulls and extract IDs
Expand Down
13 changes: 13 additions & 0 deletions platforms/cerberus/src/database/data-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,18 @@ export const AppDataSource = new DataSource({
ca: process.env.DB_CA_CERT,
}
: false,
// Connection pool configuration to prevent exhaustion
extra: {
// Maximum number of connections in pool
max: 10,
// Minimum number of connections in pool
min: 2,
// Maximum time (ms) a connection can be idle before being released
idleTimeoutMillis: 30000,
// Maximum time (ms) to wait for a connection from pool
connectionTimeoutMillis: 5000,
// Query timeout (ms) - fail queries that take too long
statement_timeout: 10000,
},
});

169 changes: 155 additions & 14 deletions platforms/cerberus/src/web3adapter/watchers/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,39 @@ export class PostgresSubscriber implements EntitySubscriberInterface {

/**
* Called after entity update.
* NOTE: We pass metadata to handleChangeWithReload so the entity reload happens
* AFTER the transaction commits (inside setTimeout), avoiding stale/partial reads.
*/
async afterUpdate(event: UpdateEvent<any>) {
let entity = event.entity;
if (entity) {
entity = (await this.enrichEntity(
entity,
event.metadata.tableName,
event.metadata.target
)) as ObjectLiteral;
// Try different ways to get the entity ID
let entityId = event.entity?.id || event.databaseEntity?.id;

if (!entityId && event.entity) {
// Look for common ID field names
entityId = event.entity.id || event.entity.Id || event.entity.ID || event.entity._id;
}
this.handleChange(
// @ts-ignore
entity ?? event.entityId,
event.metadata.tableName.endsWith("s")
? event.metadata.tableName
: event.metadata.tableName + "s"
);

if (!entityId) {
console.warn(`⚠️ afterUpdate: Could not determine entity ID for ${event.metadata.tableName}`);
return;
}

const entityName = typeof event.metadata.target === 'function'
? event.metadata.target.name
: event.metadata.target;

const tableName = event.metadata.tableName.endsWith("s")
? event.metadata.tableName
: event.metadata.tableName + "s";

// Pass reload metadata instead of entity - actual DB read happens in setTimeout
this.handleChangeWithReload({
entityId,
tableName,
relations: this.getRelationsForEntity(entityName),
tableTarget: event.metadata.target,
rawTableName: event.metadata.tableName,
});
}

/**
Expand All @@ -152,6 +168,131 @@ export class PostgresSubscriber implements EntitySubscriberInterface {
// This prevents the error when trying to access entity.id
}

/**
* Handle update changes by reloading entity AFTER transaction commits.
* This avoids stale/partial reads that occur when we use event.entity which only contains changed fields.
*/
private async handleChangeWithReload(params: {
entityId: string;
tableName: string;
relations: string[];
tableTarget: any;
rawTableName: string;
}): Promise<void> {
const { entityId, tableName, relations, tableTarget, rawTableName } = params;

console.log(`🔍 handleChangeWithReload called for: ${tableName}, entityId: ${entityId}`);

// Check if this is a junction table - skip for now
if (tableName === "group_participants") {
return;
}

// @ts-ignore
const junctionInfo = JUNCTION_TABLE_MAP[tableName];
if (junctionInfo) {
// Junction tables handled separately
return;
}

// Small delay to ensure transaction has committed before we read
// Groups and messages sync quickly (50ms), other entities use standard delay
const delayMs = (tableName.toLowerCase() === "groups" || tableName.toLowerCase() === "messages") ? 50 : 3_000;

setTimeout(async () => {
try {
await this.executeReloadAndSend(params);
} catch (error) {
console.error(`❌ Error in handleChangeWithReload setTimeout for ${tableName}:`, error);
}
}, delayMs);
}

/**
* Execute the entity reload and send webhook - called from within setTimeout
* when transaction has definitely committed.
*/
private async executeReloadAndSend(params: {
entityId: string;
tableName: string;
relations: string[];
tableTarget: any;
rawTableName: string;
}): Promise<void> {
const { entityId, tableName, relations, tableTarget, rawTableName } = params;

// NOW reload entity - transaction has committed, data is fresh and complete
const repository = AppDataSource.getRepository(tableTarget);
let entity = await repository.findOne({
where: { id: entityId },
relations: relations.length > 0 ? relations : undefined
});

if (!entity) {
console.warn(`⚠️ executeReloadAndSend: Entity ${entityId} not found after reload`);
return;
}

// Enrich entity with additional data
entity = (await this.enrichEntity(
entity,
rawTableName,
tableTarget
)) as ObjectLiteral;

// Convert to plain data
const data = this.entityToPlain(entity);

if (!data.id) {
return;
}

// For Message entities, only process system messages
if (tableName === "messages") {
const isSystemMessage = data.text && data.text.includes('$$system-message$$');
if (!isSystemMessage) {
return;
}
}

let globalId = await this.adapter.mappingDb.getGlobalId(entityId);
globalId = globalId ?? "";

if (this.adapter.lockedIds.includes(globalId)) {
console.log("Entity already locked, skipping:", globalId, entityId);
return;
}

if (this.adapter.lockedIds.includes(entityId)) {
console.log("Local entity locked (webhook created), skipping:", entityId);
return;
}

console.log(
"sending packet for global Id",
globalId,
entityId,
"table:",
tableName
);

// Log the full data being sent for system messages
if (tableName === "messages") {
console.log("📤 [SUBSCRIBER] Sending message data:");
console.log(" - Data keys:", Object.keys(data));
console.log(" - Data.sender:", data.sender);
console.log(" - Data.group:", data.group ? `Group ID: ${data.group.id}` : "null");
console.log(" - Data.text (first 100):", data.text?.substring(0, 100));
console.log(" - Data.isSystemMessage:", data.isSystemMessage);
}

const envelope = await this.adapter.handleChange({
data,
tableName: tableName.toLowerCase(),
});
console.log("📥 [SUBSCRIBER] Envelope response:", envelope);
}

/**
* Handle entity changes and send to web3adapter
*/
Expand Down
8 changes: 8 additions & 0 deletions platforms/dreamsync-api/src/database/data-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ export const dataSourceOptions: DataSourceOptions = {
ca: process.env.DB_CA_CERT,
}
: false,
// Connection pool configuration to prevent exhaustion
extra: {
max: 10,
min: 2,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 5000,
statement_timeout: 10000,
},
};

export const AppDataSource = new DataSource(dataSourceOptions);
8 changes: 8 additions & 0 deletions platforms/eCurrency-api/src/database/data-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ export const dataSourceOptions: DataSourceOptions = {
migrations: [path.join(__dirname, "migrations", "*.ts")],
logging: process.env.NODE_ENV === "development",
subscribers: [PostgresSubscriber],
// Connection pool configuration to prevent exhaustion
extra: {
max: 10,
min: 2,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 5000,
statement_timeout: 10000,
},
};

export const AppDataSource = new DataSource(dataSourceOptions);
Expand Down
8 changes: 8 additions & 0 deletions platforms/eReputation-api/src/database/data-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ export const dataSourceOptions: DataSourceOptions = {
migrations: [path.join(__dirname, "migrations", "*.ts")],
logging: process.env.NODE_ENV === "development",
subscribers: [PostgresSubscriber],
// Connection pool configuration to prevent exhaustion
extra: {
max: 10,
min: 2,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 5000,
statement_timeout: 10000,
},
};

export const AppDataSource = new DataSource(dataSourceOptions);
8 changes: 8 additions & 0 deletions platforms/emover-api/src/database/data-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ export const dataSourceOptions: DataSourceOptions = {
ca: process.env.DB_CA_CERT,
}
: false,
// Connection pool configuration to prevent exhaustion
extra: {
max: 10,
min: 2,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 5000,
statement_timeout: 10000,
},
};

export const AppDataSource = new DataSource(dataSourceOptions);
8 changes: 8 additions & 0 deletions platforms/esigner-api/src/database/data-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ export const AppDataSource = new DataSource({
ca: process.env.DB_CA_CERT,
}
: false,
// Connection pool configuration to prevent exhaustion
extra: {
max: 10,
min: 2,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 5000,
statement_timeout: 10000,
},
});


8 changes: 8 additions & 0 deletions platforms/evoting-api/src/database/data-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ export const dataSourceOptions: DataSourceOptions = {
ca: process.env.DB_CA_CERT,
}
: false,
// Connection pool configuration to prevent exhaustion
extra: {
max: 10,
min: 2,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 5000,
statement_timeout: 10000,
},
};

export const AppDataSource = new DataSource(dataSourceOptions);
8 changes: 8 additions & 0 deletions platforms/file-manager-api/src/database/data-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,13 @@ export const AppDataSource = new DataSource({
ca: process.env.DB_CA_CERT,
}
: false,
// Connection pool configuration to prevent exhaustion
extra: {
max: 10,
min: 2,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 5000,
statement_timeout: 10000,
},
});

Loading