Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 156 additions & 37 deletions platforms/dreamsync-api/src/web3adapter/watchers/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,14 @@ export class PostgresSubscriber implements EntitySubscriberInterface {

/**
* Called after entity update.
* NOTE: We pass metadata to handleChangeWithReload so the entity reload happens
* AFTER the transaction commits (inside setTimeout), avoiding stale reads.
*/
async afterUpdate(event: UpdateEvent<any>) {
// For updates, we need to reload the full entity since event.entity only contains changed fields
let entity = event.entity;

// Try different ways to get the entity ID
let entityId = event.entity?.id || event.databaseEntity?.id;

if (!entityId && event.entity) {
// If we have the entity but no ID, try to extract it from the entity object
const entityKeys = Object.keys(event.entity);

// Look for common ID field names
entityId = event.entity.id || event.entity.Id || event.entity.ID || event.entity._id;
}
Expand All @@ -215,39 +211,27 @@ export class PostgresSubscriber implements EntitySubscriberInterface {
}
}

if (entityId) {
// Reload the full entity from the database
const repository = AppDataSource.getRepository(event.metadata.target);
const entityName = typeof event.metadata.target === 'function'
? event.metadata.target.name
: event.metadata.target;

const fullEntity = await repository.findOne({
where: { id: entityId },
relations: this.getRelationsForEntity(entityName)
});

if (fullEntity) {
entity = (await this.enrichEntity(
fullEntity,
event.metadata.tableName,
event.metadata.target
)) as ObjectLiteral;

// Special handling for Message entities to ensure complete data
if (event.metadata.tableName === "messages" && entity) {
entity = await this.enrichMessageEntity(entity);
}
}
if (!entityId) {
console.warn(`⚠️ afterUpdate: Could not determine entity ID for ${event.metadata.tableName}`);
return;
}

this.handleChange(
// @ts-ignore
entity ?? event.entityId,
event.metadata.tableName.endsWith("s")
? event.metadata.tableName
: event.metadata.tableName + "s"
);
const entityName = typeof event.metadata.target === 'function'
? event.metadata.target.name
: event.metadata.target;

const tableName = event.metadata.tableName.endsWith("s")
? event.metadata.tableName
: event.metadata.tableName + "s";

// Pass reload metadata instead of entity - actual DB read happens in setTimeout
this.handleChangeWithReload({
entityId,
tableName,
relations: this.getRelationsForEntity(entityName),
tableTarget: event.metadata.target,
rawTableName: event.metadata.tableName,
});
}

/**
Expand All @@ -270,6 +254,139 @@ export class PostgresSubscriber implements EntitySubscriberInterface {
);
}

/**
* Handle update changes by reloading entity AFTER transaction commits.
* This avoids stale reads that occur when findOne runs inside the same transaction.
*/
private async handleChangeWithReload(params: {
entityId: string;
tableName: string;
relations: string[];
tableTarget: any;
rawTableName: string;
}): Promise<void> {
const { entityId, tableName, relations, tableTarget, rawTableName } = params;

console.log(`🔍 handleChangeWithReload called for: ${tableName}, entityId: ${entityId}`);

// Check if this operation should be processed
if (!shouldProcessWebhook(entityId, tableName)) {
console.log(`⏭️ Skipping webhook for ${tableName}:${entityId} - not from ConsentService (protected entity)`);
return;
}

// Handle junction table changes
// @ts-ignore
const junctionInfo = JUNCTION_TABLE_MAP[tableName];
if (junctionInfo) {
// Junction tables need to load the parent entity, not the junction record
// This is handled separately in handleJunctionTableChange
return;
}

// Add debouncing for group entities
if (tableName === "groups") {
const debounceKey = `group-reload:${entityId}`;

if (this.junctionTableDebounceMap.has(debounceKey)) {
clearTimeout(this.junctionTableDebounceMap.get(debounceKey)!);
}

const timeoutId = setTimeout(async () => {
try {
await this.executeReloadAndSend(params);
this.junctionTableDebounceMap.delete(debounceKey);
} catch (error) {
console.error("Error in group reload timeout:", error);
this.junctionTableDebounceMap.delete(debounceKey);
}
}, 3_000);

this.junctionTableDebounceMap.set(debounceKey, timeoutId);
return;
}

// For other entities (including wishlists), use a small delay to ensure transaction commit
// Wishlists sync quickly (50ms), other tables use standard delay
const delayMs = tableName.toLowerCase() === "wishlists" ? 50 : 3_000;

setTimeout(async () => {
try {
await this.executeReloadAndSend(params);
} catch (error) {
console.error(`❌ Error in handleChangeWithReload setTimeout for ${tableName}:`, error);
}
}, delayMs);
}

/**
* Execute the entity reload and send webhook - called from within setTimeout
* when transaction has definitely committed.
*/
private async executeReloadAndSend(params: {
entityId: string;
tableName: string;
relations: string[];
tableTarget: any;
rawTableName: string;
}): Promise<void> {
const { entityId, tableName, relations, tableTarget, rawTableName } = params;

// NOW reload entity - transaction has committed, data is fresh
const repository = AppDataSource.getRepository(tableTarget);
let entity = await repository.findOne({
where: { id: entityId },
relations: relations.length > 0 ? relations : undefined
});

if (!entity) {
console.warn(`⚠️ executeReloadAndSend: Entity ${entityId} not found after reload`);
return;
}

// Enrich entity with additional relations
entity = (await this.enrichEntity(
entity,
rawTableName,
tableTarget
)) as ObjectLiteral;

// Special handling for Message entities
if (rawTableName === "messages" && entity) {
entity = await this.enrichMessageEntity(entity);
}

// For Message entities, only process system messages
const data = this.entityToPlain(entity);
if (tableName === "messages") {
const isSystemMessage = data.text && data.text.includes('$$system-message$$');
if (!isSystemMessage) {
return;
}
}

if (!data.id) {
return;
}

let globalId = await this.adapter.mappingDb.getGlobalId(entityId);
globalId = globalId ?? "";

if (this.adapter.lockedIds.includes(globalId)) {
return;
}

if (this.adapter.lockedIds.includes(entityId)) {
return;
}

console.log(`📤 Sending webhook for ${tableName}:${entityId}`);
await this.adapter.handleChange({
data,
tableName: tableName.toLowerCase(),
});
}

/**
* Handle entity changes and send to web3adapter
*/
Expand Down Expand Up @@ -490,6 +607,8 @@ export class PostgresSubscriber implements EntitySubscriberInterface {
return ["participants", "admins", "members"];
case "Message":
return ["group", "sender"];
case "Wishlist":
return ["user"];
default:
return [];
}
Expand Down
9 changes: 7 additions & 2 deletions platforms/esigner-api/src/controllers/FileController.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Request, Response } from "express";
import { FileService } from "../services/FileService";
import { FileService, ReservedFileNameError } from "../services/FileService";
import multer from "multer";

const upload = multer({
Expand Down Expand Up @@ -49,6 +49,9 @@ export class FileController {
createdAt: file.createdAt,
});
} catch (error) {
if (error instanceof ReservedFileNameError) {
return res.status(400).json({ error: error.message });
}
console.error("Error uploading file:", error);
res.status(500).json({ error: "Failed to upload file" });
}
Expand All @@ -61,7 +64,9 @@ export class FileController {
return res.status(401).json({ error: "Authentication required" });
}

const documents = await this.fileService.getDocumentsWithStatus(req.user.id);
const list = req.query.list as string | undefined;
const listMode = list === "all" ? "all" : "containers";
const documents = await this.fileService.getDocumentsWithStatus(req.user.id, listMode);
res.json(documents);
} catch (error) {
console.error("Error getting documents:", error);
Expand Down
26 changes: 17 additions & 9 deletions platforms/esigner-api/src/controllers/WebhookController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,21 +276,29 @@ export class WebhookController {
}

if (localId) {
// Update existing file
const file = await this.fileService.getFileById(localId);
// Update existing file – apply name/displayName so renames in File Manager sync to eSigner
const file = await this.fileRepository.findOne({
where: { id: localId },
});
if (!file) {
console.error("File not found for localId:", localId);
return res.status(500).send();
}

file.name = local.data.name as string;
file.displayName = local.data.displayName as string | null;
file.description = local.data.description as string | null;
file.mimeType = local.data.mimeType as string;
file.size = local.data.size as number;
file.md5Hash = local.data.md5Hash as string;
if (local.data.name !== undefined)
file.name = local.data.name as string;
if (local.data.displayName !== undefined)
file.displayName = local.data.displayName as string | null;
if (local.data.description !== undefined)
file.description = local.data.description as string | null;
if (local.data.mimeType !== undefined)
file.mimeType = local.data.mimeType as string;
if (local.data.size !== undefined)
file.size = local.data.size as number;
if (local.data.md5Hash !== undefined)
file.md5Hash = local.data.md5Hash as string;
file.ownerId = owner.id;

// Decode base64 data if provided
if (local.data.data && typeof local.data.data === "string") {
file.data = Buffer.from(local.data.data, "base64");
Expand Down
Loading