Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions backend/brain-prompt.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Observation } from "./observer.js";
import { isPromoOrAutomatedSender } from "./observer.js";
import type { MemoryNode, WorkingMemory } from "./memory/types.js";
import type { MemoryGraph } from "./memory/graph.js";
import { serializeNodesForPrompt, collectRelevantRejectedEdges, formatRejectedEdgesForPrompt } from "./memory/activation.js";
Expand Down Expand Up @@ -318,10 +319,23 @@ function formatWorkingMemory(wm: WorkingMemory): string {

// Active conversation threads
if (wm.conversationThreads && wm.conversationThreads.length > 0) {
const activeThreads = wm.conversationThreads.filter(t => t.status === "active").slice(0, 5);
// Defense-in-depth: even if a promo/automated email slipped past intake,
// drop threads whose only participant looks promotional before rendering.
// (See updateConversationThreads + isPromoOrAutomatedSender.)
const activeThreads = wm.conversationThreads
.filter(t => t.status === "active")
.filter(t => {
if (!t.id.startsWith("email:")) return true;
const ps = Array.isArray(t.participants) ? t.participants : [];
if (ps.length === 0) return true;
return !ps.every(p => isPromoOrAutomatedSender(p));
})
.slice(0, 5);
if (activeThreads.length > 0) {
const threadLines = activeThreads.map(t => {
const who = Array.isArray(t.participants) ? t.participants.join(", ") : (t.participants || "unknown");
const ps = Array.isArray(t.participants) ? t.participants : [t.participants || "unknown"];
// Cap participant list so a noisy thread can't blow past a single line.
const who = ps.length > 3 ? `${ps.slice(0, 3).join(", ")} +${ps.length - 3}` : ps.join(", ");
return ` - ${who}: "${t.topic}" (${t.messageCount || 0} msgs, last ${timeAgo(t.lastMessageAt)})`;
});
parts.push(`Active threads:\n${threadLines.join("\n")}`);
Expand Down
36 changes: 29 additions & 7 deletions backend/memory/working-memory.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { safeReadJSON, atomicWriteJSON, ensureDir } from "../utils/file-store.js";
import type { WorkingMemory, PendingFollowUp, ConversationThread, TemporalContext, TemporalSummaries } from "./types.js";
import type { Observation } from "../observer.js";
import { extractEmailAddress, isPromoOrAutomatedSender } from "../observer.js";
import { getBrainConfig, getOwnerLocalTime, getOwnerLocalDate } from "../brain-config.js";
import { extractKeywordsFromText } from "./activation.js";
import { createLogger } from "../logger.js";
Expand Down Expand Up @@ -188,15 +189,32 @@ export function updateConversationThreads(wm: WorkingMemory, observations: Obser
for (const obs of observations) {
if (!obs.sender) continue;

// For DMs, key by the chat counterpart (chatJid), not the sender — so both
// incoming and outgoing messages map to the same thread.
const key = obs.isGroup ? `group:${obs.groupName || obs.senderJid}` : `dm:${obs.chatJid || obs.senderJid}`;
let key: string;
let participantLabel = obs.sender;

if (obs.source === "gmail" && obs.emailMeta) {
// All emails share senderJid=gmail:<accountId>, which previously collapsed
// every promo/notification mail into one giant "thread". Key per-sender
// address instead so genuine correspondents each get their own thread.
const fromHeader = obs.emailMeta.from || obs.sender;
if (isPromoOrAutomatedSender(fromHeader)) continue; // skip pure-promo senders entirely
const addr = extractEmailAddress(fromHeader);
key = `email:${obs.emailMeta.accountId}:${addr}`;
participantLabel = fromHeader;
} else if (obs.isGroup) {
key = `group:${obs.groupName || obs.senderJid}`;
} else {
// For DMs, key by the chat counterpart (chatJid), not the sender — so both
// incoming and outgoing messages map to the same thread.
key = `dm:${obs.chatJid || obs.senderJid}`;
}

let thread = wm.conversationThreads.find(t => t.id === key);

if (!thread) {
thread = {
id: key,
participants: [obs.sender],
participants: [participantLabel],
topic: obs.text.slice(0, 60),
lastMessageAt: obs.timestamp,
messageCount: 0,
Expand All @@ -209,18 +227,22 @@ export function updateConversationThreads(wm: WorkingMemory, observations: Obser
thread.messageCount++;
thread.status = "active";

if (!thread.participants.includes(obs.sender)) {
thread.participants.push(obs.sender);
if (!thread.participants.includes(participantLabel)) {
thread.participants.push(participantLabel);
}
}

// Thread lifecycle: active → stale (48h) → closed (7d) → removed (14d)
const CLOSED_THRESHOLD = 7 * 24 * 60 * 60 * 1000; // 7 days since last message
const REMOVE_THRESHOLD = 14 * 24 * 60 * 60 * 1000; // 14 days since last message

// Remove closed threads older than 14 days
// Remove closed threads older than 14 days; also evict legacy bundled email
// threads keyed by gmail account (pre per-sender split) — they collected
// dozens of unrelated promo senders and will be rebuilt per-sender from
// future observations.
wm.conversationThreads = wm.conversationThreads.filter(thread => {
if (thread.status === "closed" && (now - thread.lastMessageAt) > REMOVE_THRESHOLD) return false;
if (thread.id.startsWith("dm:gmail:")) return false;
return true;
});

Expand Down
57 changes: 57 additions & 0 deletions backend/observer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,63 @@ function sanitizeImageCaption(text: string): string {
const OBS_FILE = `${BRAIN_DIR}/observations.jsonl`;
const RETENTION_DAYS = Number(process.env.BRAIN_OBSERVATION_DAYS ?? 7);

/** "Display Name <user@example.com>" → "user@example.com" (lowercased). */
export function extractEmailAddress(from: string): string {
const match = /<([^>]+)>/.exec(from);
return (match ? match[1] : from.trim()).toLowerCase();
}

// Local-part prefixes that strongly indicate automated/promotional mail. Matched
// against the local part of the extracted email address (case-insensitive).
const AUTOMATED_LOCAL_PREFIXES = [
"no-reply", "noreply", "no_reply",
"do-not-reply", "donotreply",
"notification", "notifications",
"alert", "alerts", "jobalerts",
"newsletter", "news", "info",
"marketing", "promo", "promotions",
"billing", "receipts", "invoice", "invoices",
"support", "help", "service", "services",
"automated", "auto-confirm", "auto",
"mailer", "mailbot", "bounce", "bounces",
];

// Domain substrings whose entire mail flow is promotional/automated for our purposes.
const PROMO_DOMAIN_SUBSTRINGS = [
"aliexpress.com",
"linkedin.com",
"quora.com",
"autoscout24",
"marktplaats.nl",
"paypal.",
"schoolkassa",
"123accu",
"google.com/webmasters",
"search-console",
"googlemail.com/webmasters",
];

/**
* Returns true when the From header looks like promotional or automated mail
* that should NOT be surfaced as an "active thread" in brain context.
*
* Heuristic: local part prefix (no-reply@, alerts@, …) OR known promo domain.
* Conservative on purpose — real people occasionally have prefixes like
* "info@" so the domain list stays narrow and the prefix list focuses on
* unambiguously machine-origin names.
*/
export function isPromoOrAutomatedSender(from: string): boolean {
if (!from) return false;
const addr = extractEmailAddress(from);
const [local, domain] = addr.split("@");
if (!local || !domain) return false;
if (AUTOMATED_LOCAL_PREFIXES.some(p => local === p || local.startsWith(p + "-") || local.startsWith(p + ".") || local.startsWith(p + "_"))) {
return true;
}
if (PROMO_DOMAIN_SUBSTRINGS.some(d => domain.includes(d))) return true;
return false;
}

// Guard against concurrent append + prune on the observations file.
// When pruneObservations() is active, appends are buffered and flushed after prune completes.
let pruneInProgress = false;
Expand Down