diff --git a/infrastructure/evault-core/src/config/database.ts b/infrastructure/evault-core/src/config/database.ts index 6e4c55baa..0d64d08c6 100644 --- a/infrastructure/evault-core/src/config/database.ts +++ b/infrastructure/evault-core/src/config/database.ts @@ -21,4 +21,12 @@ export const AppDataSource = new DataSource({ ca: process.env.DB_CA_CERT, } : false, + // Connection pool configuration to prevent exhaustion + extra: { + max: 10, + min: 2, + idleTimeoutMillis: 30000, + connectionTimeoutMillis: 5000, + statement_timeout: 10000, + }, }) \ No newline at end of file diff --git a/platforms/cerberus/src/controllers/WebhookController.ts b/platforms/cerberus/src/controllers/WebhookController.ts index 0657712a8..e2d9aff1e 100644 --- a/platforms/cerberus/src/controllers/WebhookController.ts +++ b/platforms/cerberus/src/controllers/WebhookController.ts @@ -113,21 +113,59 @@ export class WebhookController { Array.isArray(local.data.participants) ) { console.log("Processing participants:", local.data.participants); + + // Use Promise.allSettled with timeout to prevent webhook hang const participantPromises = local.data.participants.map( - async (ref: string) => { - if (ref && typeof ref === "string") { - const userId = ref.split("(")[1].split(")")[0]; - console.log("Extracted userId:", userId); - return await this.userService.getUserById(userId); + async (ref: string, index: number) => { + if (!ref || typeof ref !== "string") { + return null; + } + + try { + const userId = ref.split("(")[1]?.split(")")[0]; + if (!userId) { + console.warn(`⚠️ Could not extract userId from ref: ${ref}`); + return null; + } + + console.log(`Extracted userId [${index}]: ${userId}`); + + // Add 5-second timeout to prevent indefinite hang + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error(`Timeout loading user ${userId}`)), 5000) + ); + + const userPromise = this.userService.userRepository.findOne({ + where: { id: userId }, + // Skip heavy relations in webhook context - only need basic user data + }); + + const user = await Promise.race([userPromise, timeoutPromise]); + + if (user) { + console.log(`✅ Loaded user [${index}]: ${userId}`); + } else { + console.warn(`⚠️ User not found [${index}]: ${userId}`); + } + + return user; + } catch (error) { + console.error(`❌ Error loading participant [${index}]:`, error instanceof Error ? error.message : error); + return null; } - return null; } ); - participants = ( - await Promise.all(participantPromises) - ).filter((user): user is User => user !== null); - console.log("Found participants:", participants.length); + // Use allSettled to handle failures gracefully without blocking + const settledResults = await Promise.allSettled(participantPromises); + + participants = settledResults + .filter((result): result is PromiseFulfilledResult => + result.status === 'fulfilled' && result.value !== null + ) + .map(result => result.value as User); + + console.log(`Found ${participants.length} participants (${settledResults.filter(r => r.status === 'rejected').length} failed)`); } // Process admins - filter out nulls and extract IDs diff --git a/platforms/cerberus/src/database/data-source.ts b/platforms/cerberus/src/database/data-source.ts index 6e3b3c78c..ad94c5875 100644 --- a/platforms/cerberus/src/database/data-source.ts +++ b/platforms/cerberus/src/database/data-source.ts @@ -34,5 +34,18 @@ export const AppDataSource = new DataSource({ ca: process.env.DB_CA_CERT, } : false, + // Connection pool configuration to prevent exhaustion + extra: { + // Maximum number of connections in pool + max: 10, + // Minimum number of connections in pool + min: 2, + // Maximum time (ms) a connection can be idle before being released + idleTimeoutMillis: 30000, + // Maximum time (ms) to wait for a connection from pool + connectionTimeoutMillis: 5000, + // Query timeout (ms) - fail queries that take too long + statement_timeout: 10000, + }, }); diff --git a/platforms/cerberus/src/web3adapter/watchers/subscriber.ts b/platforms/cerberus/src/web3adapter/watchers/subscriber.ts index 144bc3980..c4b569712 100644 --- a/platforms/cerberus/src/web3adapter/watchers/subscriber.ts +++ b/platforms/cerberus/src/web3adapter/watchers/subscriber.ts @@ -109,23 +109,39 @@ export class PostgresSubscriber implements EntitySubscriberInterface { /** * Called after entity update. + * NOTE: We pass metadata to handleChangeWithReload so the entity reload happens + * AFTER the transaction commits (inside setTimeout), avoiding stale/partial reads. */ async afterUpdate(event: UpdateEvent) { - let entity = event.entity; - if (entity) { - entity = (await this.enrichEntity( - entity, - event.metadata.tableName, - event.metadata.target - )) as ObjectLiteral; + // Try different ways to get the entity ID + let entityId = event.entity?.id || event.databaseEntity?.id; + + if (!entityId && event.entity) { + // Look for common ID field names + entityId = event.entity.id || event.entity.Id || event.entity.ID || event.entity._id; } - this.handleChange( - // @ts-ignore - entity ?? event.entityId, - event.metadata.tableName.endsWith("s") - ? event.metadata.tableName - : event.metadata.tableName + "s" - ); + + if (!entityId) { + console.warn(`⚠️ afterUpdate: Could not determine entity ID for ${event.metadata.tableName}`); + return; + } + + const entityName = typeof event.metadata.target === 'function' + ? event.metadata.target.name + : event.metadata.target; + + const tableName = event.metadata.tableName.endsWith("s") + ? event.metadata.tableName + : event.metadata.tableName + "s"; + + // Pass reload metadata instead of entity - actual DB read happens in setTimeout + this.handleChangeWithReload({ + entityId, + tableName, + relations: this.getRelationsForEntity(entityName), + tableTarget: event.metadata.target, + rawTableName: event.metadata.tableName, + }); } /** @@ -152,6 +168,131 @@ export class PostgresSubscriber implements EntitySubscriberInterface { // This prevents the error when trying to access entity.id } + /** + * Handle update changes by reloading entity AFTER transaction commits. + * This avoids stale/partial reads that occur when we use event.entity which only contains changed fields. + */ + private async handleChangeWithReload(params: { + entityId: string; + tableName: string; + relations: string[]; + tableTarget: any; + rawTableName: string; + }): Promise { + const { entityId, tableName, relations, tableTarget, rawTableName } = params; + + console.log(`🔍 handleChangeWithReload called for: ${tableName}, entityId: ${entityId}`); + + // Check if this is a junction table - skip for now + if (tableName === "group_participants") { + return; + } + + // @ts-ignore + const junctionInfo = JUNCTION_TABLE_MAP[tableName]; + if (junctionInfo) { + // Junction tables handled separately + return; + } + + // Small delay to ensure transaction has committed before we read + // Groups and messages sync quickly (50ms), other entities use standard delay + const delayMs = (tableName.toLowerCase() === "groups" || tableName.toLowerCase() === "messages") ? 50 : 3_000; + + setTimeout(async () => { + try { + await this.executeReloadAndSend(params); + } catch (error) { + console.error(`❌ Error in handleChangeWithReload setTimeout for ${tableName}:`, error); + } + }, delayMs); + } + + /** + * Execute the entity reload and send webhook - called from within setTimeout + * when transaction has definitely committed. + */ + private async executeReloadAndSend(params: { + entityId: string; + tableName: string; + relations: string[]; + tableTarget: any; + rawTableName: string; + }): Promise { + const { entityId, tableName, relations, tableTarget, rawTableName } = params; + + // NOW reload entity - transaction has committed, data is fresh and complete + const repository = AppDataSource.getRepository(tableTarget); + let entity = await repository.findOne({ + where: { id: entityId }, + relations: relations.length > 0 ? relations : undefined + }); + + if (!entity) { + console.warn(`⚠️ executeReloadAndSend: Entity ${entityId} not found after reload`); + return; + } + + // Enrich entity with additional data + entity = (await this.enrichEntity( + entity, + rawTableName, + tableTarget + )) as ObjectLiteral; + + // Convert to plain data + const data = this.entityToPlain(entity); + + if (!data.id) { + return; + } + + // For Message entities, only process system messages + if (tableName === "messages") { + const isSystemMessage = data.text && data.text.includes('$$system-message$$'); + if (!isSystemMessage) { + return; + } + } + + let globalId = await this.adapter.mappingDb.getGlobalId(entityId); + globalId = globalId ?? ""; + + if (this.adapter.lockedIds.includes(globalId)) { + console.log("Entity already locked, skipping:", globalId, entityId); + return; + } + + if (this.adapter.lockedIds.includes(entityId)) { + console.log("Local entity locked (webhook created), skipping:", entityId); + return; + } + + console.log( + "sending packet for global Id", + globalId, + entityId, + "table:", + tableName + ); + + // Log the full data being sent for system messages + if (tableName === "messages") { + console.log("📤 [SUBSCRIBER] Sending message data:"); + console.log(" - Data keys:", Object.keys(data)); + console.log(" - Data.sender:", data.sender); + console.log(" - Data.group:", data.group ? `Group ID: ${data.group.id}` : "null"); + console.log(" - Data.text (first 100):", data.text?.substring(0, 100)); + console.log(" - Data.isSystemMessage:", data.isSystemMessage); + } + + const envelope = await this.adapter.handleChange({ + data, + tableName: tableName.toLowerCase(), + }); + console.log("📥 [SUBSCRIBER] Envelope response:", envelope); + } + /** * Handle entity changes and send to web3adapter */ diff --git a/platforms/dreamsync-api/src/database/data-source.ts b/platforms/dreamsync-api/src/database/data-source.ts index 00ebef940..be20862f4 100644 --- a/platforms/dreamsync-api/src/database/data-source.ts +++ b/platforms/dreamsync-api/src/database/data-source.ts @@ -30,6 +30,14 @@ export const dataSourceOptions: DataSourceOptions = { ca: process.env.DB_CA_CERT, } : false, + // Connection pool configuration to prevent exhaustion + extra: { + max: 10, + min: 2, + idleTimeoutMillis: 30000, + connectionTimeoutMillis: 5000, + statement_timeout: 10000, + }, }; export const AppDataSource = new DataSource(dataSourceOptions); diff --git a/platforms/eCurrency-api/src/database/data-source.ts b/platforms/eCurrency-api/src/database/data-source.ts index 49c695d4d..237eb936b 100644 --- a/platforms/eCurrency-api/src/database/data-source.ts +++ b/platforms/eCurrency-api/src/database/data-source.ts @@ -22,6 +22,14 @@ export const dataSourceOptions: DataSourceOptions = { migrations: [path.join(__dirname, "migrations", "*.ts")], logging: process.env.NODE_ENV === "development", subscribers: [PostgresSubscriber], + // Connection pool configuration to prevent exhaustion + extra: { + max: 10, + min: 2, + idleTimeoutMillis: 30000, + connectionTimeoutMillis: 5000, + statement_timeout: 10000, + }, }; export const AppDataSource = new DataSource(dataSourceOptions); diff --git a/platforms/eReputation-api/src/database/data-source.ts b/platforms/eReputation-api/src/database/data-source.ts index 89cf289f6..05500648f 100644 --- a/platforms/eReputation-api/src/database/data-source.ts +++ b/platforms/eReputation-api/src/database/data-source.ts @@ -26,6 +26,14 @@ export const dataSourceOptions: DataSourceOptions = { migrations: [path.join(__dirname, "migrations", "*.ts")], logging: process.env.NODE_ENV === "development", subscribers: [PostgresSubscriber], + // Connection pool configuration to prevent exhaustion + extra: { + max: 10, + min: 2, + idleTimeoutMillis: 30000, + connectionTimeoutMillis: 5000, + statement_timeout: 10000, + }, }; export const AppDataSource = new DataSource(dataSourceOptions); \ No newline at end of file diff --git a/platforms/emover-api/src/database/data-source.ts b/platforms/emover-api/src/database/data-source.ts index 097c4cb93..0be4bbd66 100644 --- a/platforms/emover-api/src/database/data-source.ts +++ b/platforms/emover-api/src/database/data-source.ts @@ -20,6 +20,14 @@ export const dataSourceOptions: DataSourceOptions = { ca: process.env.DB_CA_CERT, } : false, + // Connection pool configuration to prevent exhaustion + extra: { + max: 10, + min: 2, + idleTimeoutMillis: 30000, + connectionTimeoutMillis: 5000, + statement_timeout: 10000, + }, }; export const AppDataSource = new DataSource(dataSourceOptions); diff --git a/platforms/esigner-api/src/database/data-source.ts b/platforms/esigner-api/src/database/data-source.ts index 969c6f85c..c96dae24f 100644 --- a/platforms/esigner-api/src/database/data-source.ts +++ b/platforms/esigner-api/src/database/data-source.ts @@ -27,6 +27,14 @@ export const AppDataSource = new DataSource({ ca: process.env.DB_CA_CERT, } : false, + // Connection pool configuration to prevent exhaustion + extra: { + max: 10, + min: 2, + idleTimeoutMillis: 30000, + connectionTimeoutMillis: 5000, + statement_timeout: 10000, + }, }); diff --git a/platforms/evoting-api/src/database/data-source.ts b/platforms/evoting-api/src/database/data-source.ts index a7904499f..7c2f277af 100644 --- a/platforms/evoting-api/src/database/data-source.ts +++ b/platforms/evoting-api/src/database/data-source.ts @@ -29,6 +29,14 @@ export const dataSourceOptions: DataSourceOptions = { ca: process.env.DB_CA_CERT, } : false, + // Connection pool configuration to prevent exhaustion + extra: { + max: 10, + min: 2, + idleTimeoutMillis: 30000, + connectionTimeoutMillis: 5000, + statement_timeout: 10000, + }, }; export const AppDataSource = new DataSource(dataSourceOptions); diff --git a/platforms/file-manager-api/src/database/data-source.ts b/platforms/file-manager-api/src/database/data-source.ts index 7546efcb6..155d940a3 100644 --- a/platforms/file-manager-api/src/database/data-source.ts +++ b/platforms/file-manager-api/src/database/data-source.ts @@ -41,5 +41,13 @@ export const AppDataSource = new DataSource({ ca: process.env.DB_CA_CERT, } : false, + // Connection pool configuration to prevent exhaustion + extra: { + max: 10, + min: 2, + idleTimeoutMillis: 30000, + connectionTimeoutMillis: 5000, + statement_timeout: 10000, + }, }); diff --git a/platforms/group-charter-manager-api/src/database/data-source.ts b/platforms/group-charter-manager-api/src/database/data-source.ts index 856528ccf..dc57a7f77 100644 --- a/platforms/group-charter-manager-api/src/database/data-source.ts +++ b/platforms/group-charter-manager-api/src/database/data-source.ts @@ -24,4 +24,12 @@ export const AppDataSource = new DataSource({ ca: process.env.DB_CA_CERT, } : false, + // Connection pool configuration to prevent exhaustion + extra: { + max: 10, + min: 2, + idleTimeoutMillis: 30000, + connectionTimeoutMillis: 5000, + statement_timeout: 10000, + }, }); \ No newline at end of file diff --git a/platforms/pictique-api/src/database/data-source.ts b/platforms/pictique-api/src/database/data-source.ts index 4edc40f6a..fd63a0327 100644 --- a/platforms/pictique-api/src/database/data-source.ts +++ b/platforms/pictique-api/src/database/data-source.ts @@ -26,4 +26,12 @@ export const AppDataSource = new DataSource({ ca: process.env.DB_CA_CERT, } : false, + // Connection pool configuration to prevent exhaustion + extra: { + max: 10, + min: 2, + idleTimeoutMillis: 30000, + connectionTimeoutMillis: 5000, + statement_timeout: 10000, + }, }); diff --git a/platforms/registry/src/config/database.ts b/platforms/registry/src/config/database.ts index ae803561c..34d7366f8 100644 --- a/platforms/registry/src/config/database.ts +++ b/platforms/registry/src/config/database.ts @@ -23,4 +23,12 @@ export const AppDataSource = new DataSource({ ca: process.env.DB_CA_CERT, } : false, + // Connection pool configuration to prevent exhaustion + extra: { + max: 10, + min: 2, + idleTimeoutMillis: 30000, + connectionTimeoutMillis: 5000, + statement_timeout: 10000, + }, }) \ No newline at end of file