diff --git a/backend/brain-config.ts b/backend/brain-config.ts index 94feaa4d..5d7ada57 100644 --- a/backend/brain-config.ts +++ b/backend/brain-config.ts @@ -33,6 +33,7 @@ export interface BrainConfig { maxThinkContextNodes: number; // base budget for think context node selection selfCritiqueEnabled: boolean; // enable pre-send quality gate for proactive messages selfCritiqueThreshold: number; // minimum score (1-10) to allow sending; default 6 + staleMsgThresholdMs: number; // max age (ms) for queued outgoing WhatsApp messages; older are discarded on reconnect /** Per-action model selection. Claude: "sonnet", "haiku", "opus". Grok: "grok", "grok-mini". */ models: { think: string; // brain think ticks @@ -203,6 +204,7 @@ function envDefaults(): BrainConfig { maxThinkContextNodes: 35, selfCritiqueEnabled: process.env.SELF_CRITIQUE_ENABLED !== "false", selfCritiqueThreshold: Number(process.env.SELF_CRITIQUE_THRESHOLD ?? 6), + staleMsgThresholdMs: Number(process.env.STALE_MSG_THRESHOLD_MS ?? 5 * 60 * 1000), models: { think: "sonnet", consolidate: "haiku", diff --git a/backend/integrations/whatsapp.ts b/backend/integrations/whatsapp.ts index 875beb82..e37b9b55 100644 --- a/backend/integrations/whatsapp.ts +++ b/backend/integrations/whatsapp.ts @@ -18,6 +18,7 @@ import { EventEmitter } from "events"; import { createLogger } from "../logger.js"; import { transcribeAudio } from "../utils/transcribe.js"; import { describeImage, isVisionRefusal } from "../utils/vision.js"; +import { getBrainConfig } from "../brain-config.js"; // Emits 'logout' when WhatsApp session is logged out. // Listeners can perform cleanup before the process exits. @@ -79,6 +80,48 @@ setInterval(() => { sweepDedupCache(sentMessages, MESSAGE_DEDUP_TTL, MAX_DEDUP_CACHE_SIZE); }, DEDUP_SWEEP_INTERVAL).unref(); +// Outgoing send queue — populated when sendMessage is called while disconnected. +// Drained on connection 'open' with a stale-age filter so that messages enqueued +// before a long offline period are not sent with obsolete context (see offlinemsgbug). +interface PendingSend { + jid: string; + text: string; + queuedAt: number; + scheduled: boolean; // bypass stale check when true + resolve: () => void; + reject: (err: Error) => void; +} +const pendingSendQueue: PendingSend[] = []; + +async function flushPendingSendQueue(): Promise { + if (pendingSendQueue.length === 0) return; + const threshold = getBrainConfig().staleMsgThresholdMs; + const now = Date.now(); + const drained = pendingSendQueue.splice(0, pendingSendQueue.length); + + const fresh: PendingSend[] = []; + for (const item of drained) { + const age = now - item.queuedAt; + if (!item.scheduled && age > threshold) { + log.warn( + `Discarding stale queued message to ${item.jid} (age=${Math.round(age / 1000)}s, threshold=${Math.round(threshold / 1000)}s): "${item.text.slice(0, 80)}"`, + ); + item.reject(new Error(`Stale outgoing message discarded after reconnect (age ${Math.round(age / 1000)}s > ${Math.round(threshold / 1000)}s)`)); + } else { + fresh.push(item); + } + } + + for (const item of fresh) { + try { + await sock.sendMessage(item.jid, { text: item.text }); + item.resolve(); + } catch (err) { + item.reject(err instanceof Error ? err : new Error(String(err))); + } + } +} + // Group name cache with TTL to prevent unbounded memory growth const GROUP_CACHE_TTL = 60 * 60 * 1000; // 1 hour const MAX_GROUP_CACHE_SIZE = 500; @@ -264,6 +307,11 @@ export async function startWhatsApp( isConnected = true; reconnectAttempt = 0; // Reset backoff on successful connection log.info("Connected!"); + // Drain any messages enqueued while disconnected, discarding ones older + // than staleMsgThresholdMs to avoid sending out-of-context messages. + flushPendingSendQueue().catch((err) => + log.error(`Pending send queue flush error: ${err}`), + ); } }); @@ -465,12 +513,11 @@ export async function syncContacts(): Promise { log.info("Contact sync triggered, waiting for events..."); } -export async function sendMessage(jid: string, text: string): Promise { - if (!isConnected) { - log.warn("Cannot send message — not connected"); - throw new Error("WhatsApp not connected"); - } - +export async function sendMessage( + jid: string, + text: string, + options?: { scheduled?: boolean }, +): Promise { // Outgoing dedup: prevent duplicate sends within 5 minutes (mirrors incoming dedup pattern) const hash = createHash("sha256").update(text).digest("hex").slice(0, 16); const dedupKey = `${jid}|${hash}`; @@ -480,6 +527,21 @@ export async function sendMessage(jid: string, text: string): Promise { } sentMessages.set(dedupKey, Date.now()); + if (!isConnected) { + // Queue for delivery on reconnect; flush will discard if too old. + log.warn(`sendMessage queued (not connected): ${jid} "${text.slice(0, 60)}"`); + return new Promise((resolve, reject) => { + pendingSendQueue.push({ + jid, + text, + queuedAt: Date.now(), + scheduled: options?.scheduled === true, + resolve, + reject, + }); + }); + } + await sock.sendMessage(jid, { text }); }