From e701609858079a29f3ba3be0d5ade60cae4a5af6 Mon Sep 17 00:00:00 2001 From: ARIA Date: Wed, 20 May 2026 21:37:54 +0000 Subject: [PATCH] ARIA self-improvement: discard stale outgoing WhatsApp messages on reconnect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the WhatsApp socket drops and sendMessage is called while disconnected, the call now queues the message instead of throwing. On 'open', the queue is drained, but messages older than staleMsgThresholdMs (default 5 min, env STALE_MSG_THRESHOLD_MS) are discarded and logged with reason. Scheduled messages can opt out via { scheduled: true }. On 2026-05-20 ~12:15-12:16 UTC, after a brief offline period, sends queued from before the disconnect were flushed and arrived stale (e.g. "How is it going" to Gillis), causing social damage. 5-min threshold matches the think tick interval — anything older than one tick is by definition stale context. Intent-summary: queued WhatsApp sends from before a disconnect were flushed on reconnect without checking their age, delivering messages with stale context to contacts. Intent-tokens: whatsapp,stale,reconnect,queue,offline,flush,context Co-Authored-By: Claude Opus 4.7 --- backend/brain-config.ts | 2 + backend/integrations/whatsapp.ts | 74 +++++++++++++++++++++++++++++--- 2 files changed, 70 insertions(+), 6 deletions(-) diff --git a/backend/brain-config.ts b/backend/brain-config.ts index 94feaa4d..5d7ada57 100644 --- a/backend/brain-config.ts +++ b/backend/brain-config.ts @@ -33,6 +33,7 @@ export interface BrainConfig { maxThinkContextNodes: number; // base budget for think context node selection selfCritiqueEnabled: boolean; // enable pre-send quality gate for proactive messages selfCritiqueThreshold: number; // minimum score (1-10) to allow sending; default 6 + staleMsgThresholdMs: number; // max age (ms) for queued outgoing WhatsApp messages; older are discarded on reconnect /** Per-action model selection. Claude: "sonnet", "haiku", "opus". Grok: "grok", "grok-mini". */ models: { think: string; // brain think ticks @@ -203,6 +204,7 @@ function envDefaults(): BrainConfig { maxThinkContextNodes: 35, selfCritiqueEnabled: process.env.SELF_CRITIQUE_ENABLED !== "false", selfCritiqueThreshold: Number(process.env.SELF_CRITIQUE_THRESHOLD ?? 6), + staleMsgThresholdMs: Number(process.env.STALE_MSG_THRESHOLD_MS ?? 5 * 60 * 1000), models: { think: "sonnet", consolidate: "haiku", diff --git a/backend/integrations/whatsapp.ts b/backend/integrations/whatsapp.ts index 875beb82..e37b9b55 100644 --- a/backend/integrations/whatsapp.ts +++ b/backend/integrations/whatsapp.ts @@ -18,6 +18,7 @@ import { EventEmitter } from "events"; import { createLogger } from "../logger.js"; import { transcribeAudio } from "../utils/transcribe.js"; import { describeImage, isVisionRefusal } from "../utils/vision.js"; +import { getBrainConfig } from "../brain-config.js"; // Emits 'logout' when WhatsApp session is logged out. // Listeners can perform cleanup before the process exits. @@ -79,6 +80,48 @@ setInterval(() => { sweepDedupCache(sentMessages, MESSAGE_DEDUP_TTL, MAX_DEDUP_CACHE_SIZE); }, DEDUP_SWEEP_INTERVAL).unref(); +// Outgoing send queue — populated when sendMessage is called while disconnected. +// Drained on connection 'open' with a stale-age filter so that messages enqueued +// before a long offline period are not sent with obsolete context (see offlinemsgbug). +interface PendingSend { + jid: string; + text: string; + queuedAt: number; + scheduled: boolean; // bypass stale check when true + resolve: () => void; + reject: (err: Error) => void; +} +const pendingSendQueue: PendingSend[] = []; + +async function flushPendingSendQueue(): Promise { + if (pendingSendQueue.length === 0) return; + const threshold = getBrainConfig().staleMsgThresholdMs; + const now = Date.now(); + const drained = pendingSendQueue.splice(0, pendingSendQueue.length); + + const fresh: PendingSend[] = []; + for (const item of drained) { + const age = now - item.queuedAt; + if (!item.scheduled && age > threshold) { + log.warn( + `Discarding stale queued message to ${item.jid} (age=${Math.round(age / 1000)}s, threshold=${Math.round(threshold / 1000)}s): "${item.text.slice(0, 80)}"`, + ); + item.reject(new Error(`Stale outgoing message discarded after reconnect (age ${Math.round(age / 1000)}s > ${Math.round(threshold / 1000)}s)`)); + } else { + fresh.push(item); + } + } + + for (const item of fresh) { + try { + await sock.sendMessage(item.jid, { text: item.text }); + item.resolve(); + } catch (err) { + item.reject(err instanceof Error ? err : new Error(String(err))); + } + } +} + // Group name cache with TTL to prevent unbounded memory growth const GROUP_CACHE_TTL = 60 * 60 * 1000; // 1 hour const MAX_GROUP_CACHE_SIZE = 500; @@ -264,6 +307,11 @@ export async function startWhatsApp( isConnected = true; reconnectAttempt = 0; // Reset backoff on successful connection log.info("Connected!"); + // Drain any messages enqueued while disconnected, discarding ones older + // than staleMsgThresholdMs to avoid sending out-of-context messages. + flushPendingSendQueue().catch((err) => + log.error(`Pending send queue flush error: ${err}`), + ); } }); @@ -465,12 +513,11 @@ export async function syncContacts(): Promise { log.info("Contact sync triggered, waiting for events..."); } -export async function sendMessage(jid: string, text: string): Promise { - if (!isConnected) { - log.warn("Cannot send message — not connected"); - throw new Error("WhatsApp not connected"); - } - +export async function sendMessage( + jid: string, + text: string, + options?: { scheduled?: boolean }, +): Promise { // Outgoing dedup: prevent duplicate sends within 5 minutes (mirrors incoming dedup pattern) const hash = createHash("sha256").update(text).digest("hex").slice(0, 16); const dedupKey = `${jid}|${hash}`; @@ -480,6 +527,21 @@ export async function sendMessage(jid: string, text: string): Promise { } sentMessages.set(dedupKey, Date.now()); + if (!isConnected) { + // Queue for delivery on reconnect; flush will discard if too old. + log.warn(`sendMessage queued (not connected): ${jid} "${text.slice(0, 60)}"`); + return new Promise((resolve, reject) => { + pendingSendQueue.push({ + jid, + text, + queuedAt: Date.now(), + scheduled: options?.scheduled === true, + resolve, + reject, + }); + }); + } + await sock.sendMessage(jid, { text }); }