Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/brain-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export interface BrainConfig {
maxThinkContextNodes: number; // base budget for think context node selection
selfCritiqueEnabled: boolean; // enable pre-send quality gate for proactive messages
selfCritiqueThreshold: number; // minimum score (1-10) to allow sending; default 6
staleMsgThresholdMs: number; // max age (ms) for queued outgoing WhatsApp messages; older are discarded on reconnect
/** Per-action model selection. Claude: "sonnet", "haiku", "opus". Grok: "grok", "grok-mini". */
models: {
think: string; // brain think ticks
Expand Down Expand Up @@ -203,6 +204,7 @@ function envDefaults(): BrainConfig {
maxThinkContextNodes: 35,
selfCritiqueEnabled: process.env.SELF_CRITIQUE_ENABLED !== "false",
selfCritiqueThreshold: Number(process.env.SELF_CRITIQUE_THRESHOLD ?? 6),
staleMsgThresholdMs: Number(process.env.STALE_MSG_THRESHOLD_MS ?? 5 * 60 * 1000),
models: {
think: "sonnet",
consolidate: "haiku",
Expand Down
74 changes: 68 additions & 6 deletions backend/integrations/whatsapp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { EventEmitter } from "events";
import { createLogger } from "../logger.js";
import { transcribeAudio } from "../utils/transcribe.js";
import { describeImage, isVisionRefusal } from "../utils/vision.js";
import { getBrainConfig } from "../brain-config.js";

// Emits 'logout' when WhatsApp session is logged out.
// Listeners can perform cleanup before the process exits.
Expand Down Expand Up @@ -79,6 +80,48 @@ setInterval(() => {
sweepDedupCache(sentMessages, MESSAGE_DEDUP_TTL, MAX_DEDUP_CACHE_SIZE);
}, DEDUP_SWEEP_INTERVAL).unref();

// Outgoing send queue — populated when sendMessage is called while disconnected.
// Drained on connection 'open' with a stale-age filter so that messages enqueued
// before a long offline period are not sent with obsolete context (see offlinemsgbug).
interface PendingSend {
jid: string;
text: string;
queuedAt: number;
scheduled: boolean; // bypass stale check when true
resolve: () => void;
reject: (err: Error) => void;
}
const pendingSendQueue: PendingSend[] = [];

async function flushPendingSendQueue(): Promise<void> {
if (pendingSendQueue.length === 0) return;
const threshold = getBrainConfig().staleMsgThresholdMs;
const now = Date.now();
const drained = pendingSendQueue.splice(0, pendingSendQueue.length);

const fresh: PendingSend[] = [];
for (const item of drained) {
const age = now - item.queuedAt;
if (!item.scheduled && age > threshold) {
log.warn(
`Discarding stale queued message to ${item.jid} (age=${Math.round(age / 1000)}s, threshold=${Math.round(threshold / 1000)}s): "${item.text.slice(0, 80)}"`,
);
item.reject(new Error(`Stale outgoing message discarded after reconnect (age ${Math.round(age / 1000)}s > ${Math.round(threshold / 1000)}s)`));
} else {
fresh.push(item);
}
}

for (const item of fresh) {
try {
await sock.sendMessage(item.jid, { text: item.text });
item.resolve();
} catch (err) {
item.reject(err instanceof Error ? err : new Error(String(err)));
}
}
}

// Group name cache with TTL to prevent unbounded memory growth
const GROUP_CACHE_TTL = 60 * 60 * 1000; // 1 hour
const MAX_GROUP_CACHE_SIZE = 500;
Expand Down Expand Up @@ -264,6 +307,11 @@ export async function startWhatsApp(
isConnected = true;
reconnectAttempt = 0; // Reset backoff on successful connection
log.info("Connected!");
// Drain any messages enqueued while disconnected, discarding ones older
// than staleMsgThresholdMs to avoid sending out-of-context messages.
flushPendingSendQueue().catch((err) =>
log.error(`Pending send queue flush error: ${err}`),
);
}
});

Expand Down Expand Up @@ -465,12 +513,11 @@ export async function syncContacts(): Promise<void> {
log.info("Contact sync triggered, waiting for events...");
}

export async function sendMessage(jid: string, text: string): Promise<void> {
if (!isConnected) {
log.warn("Cannot send message — not connected");
throw new Error("WhatsApp not connected");
}

export async function sendMessage(
jid: string,
text: string,
options?: { scheduled?: boolean },
): Promise<void> {
// Outgoing dedup: prevent duplicate sends within 5 minutes (mirrors incoming dedup pattern)
const hash = createHash("sha256").update(text).digest("hex").slice(0, 16);
const dedupKey = `${jid}|${hash}`;
Expand All @@ -480,6 +527,21 @@ export async function sendMessage(jid: string, text: string): Promise<void> {
}
sentMessages.set(dedupKey, Date.now());

if (!isConnected) {
// Queue for delivery on reconnect; flush will discard if too old.
log.warn(`sendMessage queued (not connected): ${jid} "${text.slice(0, 60)}"`);
return new Promise<void>((resolve, reject) => {
pendingSendQueue.push({
jid,
text,
queuedAt: Date.now(),
scheduled: options?.scheduled === true,
resolve,
reject,
});
});
}

await sock.sendMessage(jid, { text });
}

Expand Down