diff --git a/app/ai/openai.ts b/app/ai/openai.ts index 131b9ef..9345585 100644 --- a/app/ai/openai.ts +++ b/app/ai/openai.ts @@ -100,7 +100,11 @@ export async function callOpenAiChatStream( mode: AiMode, params: AiRequestBase, onDelta: (text: string) => void | Promise, - opts?: { isCancelled?: () => boolean; getAbortSignal?: () => Promise }, + opts?: { + isCancelled?: () => boolean; + getAbortSignal?: () => Promise; + abortSignal?: AbortSignal; + }, ): Promise { if (!client) { return { @@ -125,31 +129,58 @@ export async function callOpenAiChatStream( mode === "token_info" ? "You are a blockchain and token analyst. Answer clearly and briefly.\n\n" : ""; + let onAbort: (() => void) | null = null; try { + if (opts?.abortSignal?.aborted) { + return { + ok: false, + provider: "openai", + mode, + error: "Generation aborted by newer message.", + }; + } + const stream = client.responses.stream({ model: "gpt-5.2", ...(params.instructions ? { instructions: params.instructions } : {}), input: `${prefix}${trimmed}`, }); + const abortStream = (): void => { + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (stream as any)?.abort?.(); + } catch { + /* ignore */ + } + }; + let cancelLogged = false; + const hardAbort = (reason: string): void => { + if (!cancelLogged) { + cancelLogged = true; + console.log("[STREAM] aborted immediately:", reason); + } + abortStream(); + }; + onAbort = (): void => { + abortStream(); + }; + if (opts?.abortSignal) { + opts.abortSignal.addEventListener("abort", onAbort, { once: true }); + } stream.on("response.output_text.delta", async (event: { snapshot?: string }) => { + if (opts?.abortSignal?.aborted) { + hardAbort("abortSignal"); + return; + } if (opts?.isCancelled && opts.isCancelled()) { - try { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (stream as any)?.abort?.(); - } catch { - /* ignore */ - } + hardAbort("isCancelled"); return; } if (opts?.getAbortSignal && (await opts.getAbortSignal())) { - try { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (stream as any)?.abort?.(); - } catch { - /* ignore */ - } + console.log("[STREAM] stale generation"); + hardAbort("getAbortSignal"); return; } const text = event?.snapshot ?? ""; @@ -157,6 +188,14 @@ export async function callOpenAiChatStream( }); const response = await stream.finalResponse(); + if (opts?.abortSignal?.aborted) { + return { + ok: false, + provider: "openai", + mode, + error: "Generation aborted by newer message.", + }; + } const r = response as any; let output_text = r.output_text; if (output_text == null || String(output_text).trim() === "") { @@ -189,6 +228,14 @@ export async function callOpenAiChatStream( usage: r.usage ?? undefined, }; } catch (e: any) { + if (opts?.abortSignal?.aborted) { + return { + ok: false, + provider: "openai", + mode, + error: "Generation aborted by newer message.", + }; + } const message = (e && typeof e === "object" && "message" in e ? (e as Error).message : null) ?? (e != null ? String(e) : "Failed to call OpenAI. Check OPENAI env and network."); @@ -198,5 +245,9 @@ export async function callOpenAiChatStream( mode, error: message, }; + } finally { + if (opts?.abortSignal && onAbort) { + opts.abortSignal.removeEventListener("abort", onAbort); + } } } diff --git a/app/ai/transmitter.ts b/app/ai/transmitter.ts index b5510a3..e773889 100644 --- a/app/ai/transmitter.ts +++ b/app/ai/transmitter.ts @@ -245,7 +245,11 @@ export async function transmit(request: AiRequest): Promise { export async function transmitStream( request: AiRequest, onDelta: (text: string) => void | Promise, - opts?: { isCancelled?: () => boolean; getAbortSignal?: () => Promise }, + opts?: { + isCancelled?: () => boolean; + getAbortSignal?: () => Promise; + abortSignal?: AbortSignal; + }, ): Promise { const mode: AiMode = request.mode ?? "chat"; const thread = request.threadContext; diff --git a/app/api/base.ts b/app/api/base.ts index 6de8da6..2811a5f 100644 --- a/app/api/base.ts +++ b/app/api/base.ts @@ -14,6 +14,29 @@ function normalizeBase(base: string): string { return base.replace(/\/$/, ""); } +function isLocalhostUrl(value: string): boolean { + try { + const u = new URL(value); + return isPrivateOrLocalHost(u.hostname); + } catch { + return false; + } +} + +function isLikelyTelegramMiniApp(): boolean { + if (typeof window === "undefined") return false; + try { + if ((window.location.hash ?? "").includes("tgWebApp")) return true; + } catch { + // ignore + } + try { + return !!(window as unknown as { Telegram?: { WebApp?: unknown } }).Telegram?.WebApp; + } catch { + return false; + } +} + function isPrivateOrLocalHost(hostname: string): boolean { if (hostname === "localhost" || hostname === "127.0.0.1") return true; if (hostname.startsWith("10.")) return true; @@ -92,6 +115,14 @@ function getNodeBaseUrl(): string { export function getApiBaseUrl(): string { const envBase = process.env.EXPO_PUBLIC_API_BASE_URL?.trim(); if (envBase) { + if ( + typeof window !== "undefined" && + isLikelyTelegramMiniApp() && + isLocalhostUrl(envBase) + ) { + const browserBase = getBrowserBaseUrl(); + if (browserBase) return browserBase; + } return normalizeBase(envBase); } diff --git a/app/api/telegram.ts b/app/api/telegram.ts index ee4d0ec..b7bfb1f 100644 --- a/app/api/telegram.ts +++ b/app/api/telegram.ts @@ -22,6 +22,7 @@ async function handler( res?: { status: (n: number) => void; setHeader: (k: string, v: string) => void; end: (s?: string) => void } ): Promise { const method = (request as { method?: string }).method ?? request.method; + console.log('[api/telegram]', method, new Date().toISOString()); if (method === 'GET') { const body = { ok: true, endpoint: 'telegram', use: 'POST with initData' }; if (res) { @@ -41,6 +42,7 @@ async function handler( try { const { handlePost } = await import('../telegram/post.js'); const response = await handlePost(request); + console.log('[api/telegram] POST status', response.status); if (res) { res.status(response.status); response.headers.forEach((v, k) => res.setHeader(k, v)); diff --git a/app/bot/responder.ts b/app/bot/responder.ts index 4999e1f..a854b3a 100644 --- a/app/bot/responder.ts +++ b/app/bot/responder.ts @@ -12,6 +12,10 @@ import { /** Telegram text message length limit. */ const MAX_MESSAGE_TEXT_LENGTH = 4096; +/** Hard cap on continuation chunks to avoid Telegram flood limits in topics/groups. */ +const MAX_LONG_MESSAGE_PARTS = 2; +const TELEGRAM_TRUNCATION_NOTICE = + "\n\n[Truncated in Telegram. Open the Mini App for the full response.]"; /** Instruction passed to AI when the message comes from the bot: keep replies under 4096 chars and mention TMA for long answers. */ const TELEGRAM_BOT_LENGTH_INSTRUCTION = @@ -34,6 +38,17 @@ function chunkText(text: string, maxLen: number): string[] { return chunks; } +function getRetryAfterSeconds(error: unknown): number { + const retryAfter = (error as { parameters?: { retry_after?: number } })?.parameters?.retry_after; + return typeof retryAfter === "number" && Number.isFinite(retryAfter) ? retryAfter : 0; +} + +function isTelegramRateLimit(error: unknown): boolean { + const code = (error as { error_code?: number })?.error_code; + const description = (error as { description?: string })?.description ?? ""; + return code === 429 || description.includes("Too Many Requests"); +} + /** Send long text as multiple messages (each ≤ MAX_MESSAGE_TEXT_LENGTH). First chunk replies to replyToMessageId or uses replyOptions; rest reply to previous sent message. */ async function sendLongMessage( api: Context["api"], @@ -41,15 +56,22 @@ async function sendLongMessage( fullText: string, replyOptions: { message_thread_id?: number; reply_parameters?: { message_id: number } }, replyOptionsWithHtml: { message_thread_id?: number; reply_parameters?: { message_id: number }; parse_mode: "HTML" }, - opts: { replyToMessageId?: number }, + opts: { replyToMessageId?: number; shouldSkipIo?: () => boolean }, ): Promise { const chunks = chunkText(fullText, MAX_MESSAGE_TEXT_LENGTH); if (chunks.length === 0) return; + const limited = chunks.slice(0, MAX_LONG_MESSAGE_PARTS); let lastSentId: number | undefined = opts.replyToMessageId; - for (let i = 0; i < chunks.length; i++) { + for (let i = 0; i < limited.length; i++) { + if (opts.shouldSkipIo?.()) return; + const isLastAllowed = i === limited.length - 1; + const withNotice = + isLastAllowed && chunks.length > MAX_LONG_MESSAGE_PARTS + ? `${limited[i].trimEnd()}${TELEGRAM_TRUNCATION_NOTICE}` + : limited[i]; const formatted = truncateTelegramHtmlSafe( closeOpenTelegramHtml( - stripUnpairedMarkdownDelimiters(mdToTelegramHtml(chunks[i])), + stripUnpairedMarkdownDelimiters(mdToTelegramHtml(withNotice)), ), MAX_MESSAGE_TEXT_LENGTH, ); @@ -62,26 +84,36 @@ async function sendLongMessage( parse_mode: "HTML" as const, }; try { + if (opts.shouldSkipIo?.()) return; const sent = await api.sendMessage(chatId, formatted, partOptions); const id = (sent as { message_id?: number }).message_id; if (typeof id === "number") lastSentId = id; } catch (e) { console.error("[bot][sendLongMessage]", (e as Error)?.message ?? e); + if (isTelegramRateLimit(e) && getRetryAfterSeconds(e) > 15) return; try { - const markdown = toTelegramMarkdown(chunks[i]); + const markdown = toTelegramMarkdown(withNotice); + if (opts.shouldSkipIo?.()) return; const sent = await api.sendMessage(chatId, markdown, { ...partOptions, parse_mode: "Markdown", }); const id = (sent as { message_id?: number }).message_id; if (typeof id === "number") lastSentId = id; - } catch { - const sent = await api.sendMessage(chatId, chunks[i], { - ...(replyOptions.message_thread_id !== undefined ? { message_thread_id: replyOptions.message_thread_id } : {}), - ...(lastSentId !== undefined ? { reply_parameters: { message_id: lastSentId } } : {}), - }); - const id = (sent as { message_id?: number }).message_id; - if (typeof id === "number") lastSentId = id; + } catch (e2) { + if (isTelegramRateLimit(e2) && getRetryAfterSeconds(e2) > 15) return; + try { + if (opts.shouldSkipIo?.()) return; + const sent = await api.sendMessage(chatId, withNotice, { + ...(replyOptions.message_thread_id !== undefined ? { message_thread_id: replyOptions.message_thread_id } : {}), + ...(lastSentId !== undefined ? { reply_parameters: { message_id: lastSentId } } : {}), + }); + const id = (sent as { message_id?: number }).message_id; + if (typeof id === "number") lastSentId = id; + } catch (e3) { + console.error("[bot][sendLongMessage] plain fallback failed", (e3 as Error)?.message ?? e3); + return; + } } } } @@ -92,12 +124,25 @@ function toTelegramMarkdown(s: string): string { return s.replace(/\*\*/g, "*"); } /** Throttle editMessageText to avoid Telegram 429 rate limits. */ -const EDIT_THROTTLE_MS = 500; +const EDIT_THROTTLE_MS = 1200; /** If content grew by more than this many chars, edit immediately so long tail doesn't stick. */ -const EDIT_MIN_CHARS_TO_SEND_NOW = 20; - -/** Track latest generation per chat so newer messages cancel older streams. */ -const chatGenerations = new Map(); +const EDIT_MIN_CHARS_TO_SEND_NOW = 80; + +/** Track latest generation per thread so newer messages cancel older streams immediately. */ +const activeGeneration = new Map(); +/** Single source of truth for hard-cancel per thread ("latest prompt wins"). */ +const threadControllers = new Map(); + +function startNewGeneration(threadKey: string): AbortController { + const existing = threadControllers.get(threadKey); + if (existing) { + existing.abort(); + console.log("[bot][cancel] aborted previous generation", threadKey); + } + const controller = new AbortController(); + threadControllers.set(threadKey, controller); + return controller; +} type BotSourceContext = { source: "bot"; @@ -179,20 +224,28 @@ export async function handleBotAiResponse(ctx: Context): Promise { const chatId = ctx.chat?.id; const isPrivate = ctx.chat?.type === "private"; const canStream = isPrivate && typeof chatId === "number"; + const threadKey = `bot:${typeof chatId === "number" ? chatId : from?.id ?? "unknown"}:${thread_id}`; + const generationController = startNewGeneration(threadKey); + const generationSignal = generationController.signal; + const gen = (activeGeneration.get(threadKey) ?? 0) + 1; + activeGeneration.set(threadKey, gen); + console.log("[START] new generation", threadKey, gen, "update:", update_id ?? "n/a"); + const isStopMessage = text.toLowerCase().includes("stop"); /** When streaming we send one message early then edit it; used to detect streaming path. */ let streamSentMessageId: number | null = null; - - const numericChatId = - typeof chatId === "number" ? chatId : undefined; - let generation = 0; - if (numericChatId !== undefined) { - const prev = chatGenerations.get(numericChatId) ?? 0; - generation = prev + 1; - chatGenerations.set(numericChatId, generation); - } + try { + const isStaleGeneration = (): boolean => + activeGeneration.get(threadKey) !== gen; + const shouldSkipTelegramIo = (label: string): boolean => { + if (activeGeneration.get(threadKey) !== gen) { + console.log("[CANCEL] skip edit/send", threadKey, label); + return true; + } + return false; + }; const isCancelled = (): boolean => - numericChatId !== undefined && - chatGenerations.get(numericChatId) !== generation; + generationSignal.aborted || + isStaleGeneration(); const shouldAbortSend = async (): Promise => { if (!threadContext) return false; @@ -201,7 +254,45 @@ export async function handleBotAiResponse(ctx: Context): Promise { threadContext.thread_id, "bot", ); - return max !== null && max !== threadContext.telegram_update_id; + // Abort only when a newer Telegram update already exists for this thread. + // Using "!=" can kill the current generation before its own claim row is inserted. + return ( + max !== null && + typeof threadContext.telegram_update_id === "number" && + max > threadContext.telegram_update_id + ); + }; + if (isStopMessage) { + if (chatId !== undefined) { + await ctx.api.sendMessage(chatId, "✅ Stopped. Send a new question.", replyOptions); + } else { + await ctx.reply("✅ Stopped. Send a new question.", replyOptions); + } + return; + } + let staleLogged = false; + const markStaleAndAbort = (): boolean => { + if (!isStaleGeneration()) return false; + if (!staleLogged) { + staleLogged = true; + console.log("[CANCEL] stale generation", threadKey); + } + if (!generationSignal.aborted) { + generationController.abort(); + } + return true; + }; + let cancelLogged = false; + const shouldAbortGeneration = async (): Promise => { + const abort = markStaleAndAbort() || (await shouldAbortSend()) || isCancelled(); + if (abort && !cancelLogged) { + cancelLogged = true; + console.log("[CANCEL] aborting stream for thread:", threadKey); + } + if (abort && !generationSignal.aborted) { + generationController.abort(); + } + return abort; }; let result: Awaited>; @@ -220,8 +311,10 @@ export async function handleBotAiResponse(ctx: Context): Promise { /** When turn is interrupted: message already exists (we sent early); optionally final edit, always persist. HTML only (format pipeline is strict). */ const sendInterruptedReply = async (opts: { sendToChat: boolean }): Promise => { + if (markStaleAndAbort()) return; const content = streamedAccumulated.trim(); - if (sentMessageId !== null && content.length > 0) { + const allowChatSend = opts.sendToChat && !generationSignal.aborted; + if (allowChatSend && sentMessageId !== null && content.length > 0) { const toEdit = truncateTelegramHtmlSafe( closeOpenTelegramHtml( stripUnpairedMarkdownDelimiters(mdToTelegramHtml(content)), @@ -229,11 +322,12 @@ export async function handleBotAiResponse(ctx: Context): Promise { MAX_MESSAGE_TEXT_LENGTH, ); try { + if (shouldSkipTelegramIo("interrupted:edit")) return; await ctx.api.editMessageText(chatId, sentMessageId, toEdit, { parse_mode: "HTML" }); } catch (e) { console.error("[bot][edit] interrupted reply", (e as Error)?.message ?? e); } - } else if (opts.sendToChat && content.length > 0) { + } else if (allowChatSend && content.length > 0) { const toSend = truncateTelegramHtmlSafe( closeOpenTelegramHtml( stripUnpairedMarkdownDelimiters(mdToTelegramHtml(content)), @@ -241,12 +335,14 @@ export async function handleBotAiResponse(ctx: Context): Promise { MAX_MESSAGE_TEXT_LENGTH, ); try { + if (shouldSkipTelegramIo("interrupted:reply")) return; await ctx.reply(toSend, replyOptionsWithHtml); } catch (e) { console.error("[bot][reply] interrupted", (e as Error)?.message ?? e); } - } else if (opts.sendToChat && sentMessageId === null) { + } else if (allowChatSend && sentMessageId === null) { try { + if (shouldSkipTelegramIo("interrupted:ellipsis")) return; await ctx.reply("…", replyOptions); } catch (_) {} } @@ -276,11 +372,14 @@ export async function handleBotAiResponse(ctx: Context): Promise { /** First call sends a message (claims message_id); later calls edit that message. HTML only; format pipeline is strict so Telegram accepts it. */ const sendOrEditOnce = (formatted: string, _rawSlice: string): Promise => { const run = async (): Promise => { - if (await shouldAbortSend()) return; + if (markStaleAndAbort()) return; + if (generationSignal.aborted) return; + if (await shouldAbortGeneration()) return; if (isCancelled() || editsDisabled) return; const text = truncateTelegramHtmlSafe(formatted.trim() || "…", MAX_MESSAGE_TEXT_LENGTH); try { if (sentMessageId === null) { + if (shouldSkipTelegramIo("stream:send")) return; const sent = await ctx.api.sendMessage(chatId, text, replyOptionsWithHtml); const id = (sent as { message_id?: number }).message_id; if (typeof id === "number") { @@ -288,15 +387,24 @@ export async function handleBotAiResponse(ctx: Context): Promise { streamSentMessageId = id; } } else { + if (shouldSkipTelegramIo("stream:edit")) return; await ctx.api.editMessageText(chatId, sentMessageId, text, { parse_mode: "HTML" }); } } catch (e: unknown) { const err = e as { error_code?: number; description?: string; parameters?: { retry_after?: number } }; if (err?.description?.includes("not modified")) return; if (err?.error_code === 429) { - await new Promise((r) => setTimeout(r, Math.min((err.parameters?.retry_after ?? 1) * 1000, 2000))); + const retryAfterSec = err.parameters?.retry_after ?? 1; + if (retryAfterSec > 15) { + console.warn("[bot][edit] disabling edits due to long rate limit window", retryAfterSec); + editsDisabled = true; + return; + } + await new Promise((r) => setTimeout(r, Math.min(retryAfterSec * 1000, 5000))); try { + if (markStaleAndAbort()) return; if (sentMessageId === null) { + if (shouldSkipTelegramIo("stream:send:retry")) return; const sent = await ctx.api.sendMessage(chatId, text, replyOptionsWithHtml); const id = (sent as { message_id?: number }).message_id; if (typeof id === "number") { @@ -304,6 +412,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { streamSentMessageId = id; } } else { + if (shouldSkipTelegramIo("stream:edit:retry")) return; await ctx.api.editMessageText(chatId, sentMessageId, text, { parse_mode: "HTML" }); } } catch (e2) { @@ -321,6 +430,8 @@ export async function handleBotAiResponse(ctx: Context): Promise { }; const flushEdit = (awaitSend = false): void | Promise => { + if (markStaleAndAbort()) return; + if (generationSignal.aborted) return; if (isCancelled()) return; if (pending === null) return; const slice = pending; @@ -340,6 +451,8 @@ export async function handleBotAiResponse(ctx: Context): Promise { const sendOrEdit = (accumulated: string): void => { stopTypingSpinner(); streamedAccumulated = accumulated; + if (markStaleAndAbort()) return; + if (generationSignal.aborted) return; if (isCancelled()) return; const slice = accumulated.length > MAX_MESSAGE_TEXT_LENGTH ? accumulated.slice(0, MAX_MESSAGE_TEXT_LENGTH) @@ -382,11 +495,18 @@ export async function handleBotAiResponse(ctx: Context): Promise { interruptedReplyCallback = sendInterruptedReply; - await sendOrEditOnce(typingFrames[typingIndex], typingFrames[typingIndex]); - typingInterval = setInterval(() => { + if (markStaleAndAbort()) { + stopTypingSpinner(); + return; + } + if (generationSignal.aborted) { + stopTypingSpinner(); + return; + } if (sentMessageId === null) return; typingIndex = (typingIndex + 1) % typingFrames.length; + if (shouldSkipTelegramIo("typing:edit")) return; ctx.api .editMessageText(chatId, sentMessageId, typingFrames[typingIndex]) .catch(() => {}); @@ -397,7 +517,8 @@ export async function handleBotAiResponse(ctx: Context): Promise { sendOrEdit, { isCancelled, - getAbortSignal: async () => (await shouldAbortSend()) || isCancelled(), + getAbortSignal: shouldAbortGeneration, + abortSignal: generationSignal, }, ); } catch (e) { @@ -416,6 +537,11 @@ export async function handleBotAiResponse(ctx: Context): Promise { stopTypingSpinner(); return; } + if (generationSignal.aborted) { + stopTypingSpinner(); + await sendInterruptedReply({ sendToChat: false }); + return; + } if (isCancelled()) { stopTypingSpinner(); await sendInterruptedReply({ sendToChat: !(await shouldAbortSend()) }); @@ -451,7 +577,8 @@ export async function handleBotAiResponse(ctx: Context): Promise { sendOrEdit, { isCancelled, - getAbortSignal: async () => (await shouldAbortSend()) || isCancelled(), + getAbortSignal: shouldAbortGeneration, + abortSignal: generationSignal, }, ); } catch (e) { @@ -470,6 +597,11 @@ export async function handleBotAiResponse(ctx: Context): Promise { stopTypingSpinner(); return; } + if (generationSignal.aborted) { + stopTypingSpinner(); + await sendInterruptedReply({ sendToChat: false }); + return; + } if (isCancelled()) { stopTypingSpinner(); await sendInterruptedReply({ sendToChat: !(await shouldAbortSend()) }); @@ -499,13 +631,17 @@ export async function handleBotAiResponse(ctx: Context): Promise { ); if (finalFormatted.trim()) { sendOrEditQueue = sendOrEditQueue.then(async () => { + if (markStaleAndAbort()) return; + if (generationSignal.aborted) return; try { + if (shouldSkipTelegramIo("final:edit")) return; await ctx.api.editMessageText(chatId, streamSentMessageId!, finalFormatted, { parse_mode: "HTML" }); } catch (e: unknown) { const err = e as { description?: string; message?: string }; if (err?.description?.includes("not modified")) return; console.error("[bot][edit] final completion edit", err?.description ?? err?.message ?? e); try { + if (shouldSkipTelegramIo("final:edit:fallback")) return; await ctx.api.editMessageText(chatId, streamSentMessageId!, fullSlice, {}); } catch (e2: unknown) { const d2 = (e2 as { description?: string })?.description; @@ -544,6 +680,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { } if (!result.ok || !result.output_text) { + if (markStaleAndAbort()) return; if (await shouldAbortSend()) return; if (isCancelled()) return; const errMsg = result.error ?? "AI returned no output."; @@ -554,11 +691,14 @@ export async function handleBotAiResponse(ctx: Context): Promise { : "AI is temporarily unavailable. Please try again in a moment."; if (streamSentMessageId !== null && chatId !== undefined) { try { + if (shouldSkipTelegramIo("error:edit")) return; await ctx.api.editMessageText(chatId, streamSentMessageId, message, {}); } catch { + if (shouldSkipTelegramIo("error:reply:fallback")) return; await ctx.reply(message, replyOptions); } } else { + if (shouldSkipTelegramIo("error:reply")) return; await ctx.reply(message, replyOptions); } return; @@ -568,7 +708,13 @@ export async function handleBotAiResponse(ctx: Context): Promise { await interruptedReplyCallback({ sendToChat: false }); return; } + if (markStaleAndAbort()) return; if (await shouldAbortSend()) return; + if (generationSignal.aborted && interruptedReplyCallback) { + await interruptedReplyCallback({ sendToChat: false }); + return; + } + if (generationSignal.aborted) return; if (isCancelled() && interruptedReplyCallback) { await interruptedReplyCallback({ sendToChat: true }); return; @@ -577,20 +723,27 @@ export async function handleBotAiResponse(ctx: Context): Promise { // Streaming path: first message already has up to 4096. Send overflow as continuation if needed; then we're done. if (streamSentMessageId !== null && chatId !== undefined) { + if (markStaleAndAbort()) return; + if (generationSignal.aborted) return; if (result.output_text.length > MAX_MESSAGE_TEXT_LENGTH) { - await sendLongMessage( - ctx.api, - chatId, - result.output_text.slice(MAX_MESSAGE_TEXT_LENGTH), - replyOptions, - replyOptionsWithHtml, - { replyToMessageId: streamSentMessageId }, - ); + try { + await sendLongMessage( + ctx.api, + chatId, + result.output_text.slice(MAX_MESSAGE_TEXT_LENGTH), + replyOptions, + replyOptionsWithHtml, + { replyToMessageId: streamSentMessageId, shouldSkipIo: () => shouldSkipTelegramIo("overflow:sendLongMessage") }, + ); + } catch (e) { + console.error("[bot][overflow] continuation failed", (e as Error)?.message ?? e); + } } return; } if (result.output_text.length <= MAX_MESSAGE_TEXT_LENGTH) { + if (markStaleAndAbort()) return; const textToFormat = result.output_text; const formatted = truncateTelegramHtmlSafe( closeOpenTelegramHtml( @@ -599,12 +752,15 @@ export async function handleBotAiResponse(ctx: Context): Promise { MAX_MESSAGE_TEXT_LENGTH, ); try { + if (shouldSkipTelegramIo("reply:html")) return; await ctx.reply(formatted, replyOptionsWithHtml); } catch (e) { console.error("[bot][reply] HTML reply failed", (e as Error)?.message ?? e); try { + if (shouldSkipTelegramIo("reply:markdown")) return; await ctx.reply(toTelegramMarkdown(textToFormat), { ...replyOptions, parse_mode: "Markdown" }); } catch { + if (shouldSkipTelegramIo("reply:plain")) return; await ctx.reply(textToFormat, replyOptions); } } @@ -612,9 +768,28 @@ export async function handleBotAiResponse(ctx: Context): Promise { } if (chatId !== undefined) { - await sendLongMessage(ctx.api, chatId, result.output_text, replyOptions, replyOptionsWithHtml, {}); + if (markStaleAndAbort()) return; + try { + await sendLongMessage(ctx.api, chatId, result.output_text, replyOptions, replyOptionsWithHtml, { + shouldSkipIo: () => shouldSkipTelegramIo("sendLongMessage"), + }); + } catch (e) { + console.error("[bot][sendLongMessage] failed", (e as Error)?.message ?? e); + try { + if (shouldSkipTelegramIo("sendLongMessage:errorReply")) return; + await ctx.reply("Response was rate-limited by Telegram. Please retry in a moment.", replyOptions); + } catch { + // no-op + } + } } else { const textToFormat = result.output_text.slice(0, MAX_MESSAGE_TEXT_LENGTH); + if (shouldSkipTelegramIo("reply:no-chatId")) return; await ctx.reply(textToFormat, replyOptions); } + } finally { + if (threadControllers.get(threadKey) === generationController) { + threadControllers.delete(threadKey); + } + } } diff --git a/app/bot/webhook.ts b/app/bot/webhook.ts index 55eb581..1ebd316 100644 --- a/app/bot/webhook.ts +++ b/app/bot/webhook.ts @@ -6,9 +6,8 @@ * - We return 200 OK immediately so Telegram does not retry or hide the user's message. * - We process the update in the background (waitUntil). * - * Per-chat serialization: we process one update per chat at a time. When a new update arrives - * for a chat, we wait for the previous handler for that chat to finish, then run the new one. - * So Reply A is always sent before we start processing Prompt B — no reorder flash. + * Updates are processed concurrently. Cancellation/ordering is handled in responder logic + * ("latest prompt wins"), so a newer message can interrupt an in-flight stream quickly. */ import { waitUntil } from '@vercel/functions'; @@ -23,22 +22,6 @@ interface TelegramUpdate { [key: string]: unknown; } -/** Extract chat id from update so we can serialize processing per chat and avoid reply order flash. */ -function getChatIdFromUpdate(update: TelegramUpdate): number | undefined { - const msg = update.message ?? update.edited_message ?? update.channel_post; - if (msg && typeof msg === 'object' && msg.chat && typeof (msg.chat as { id?: number }).id === 'number') { - return (msg.chat as { id: number }).id; - } - const cq = update.callback_query; - if (cq && typeof cq === 'object' && cq.message && typeof (cq.message as { chat?: { id?: number } }).chat?.id === 'number') { - return (cq.message as { chat: { id: number } }).chat.id; - } - return undefined; -} - -/** Per-chat tail promise: next update for this chat waits for the previous handler to finish. */ -const chatQueue = new Map>(); - const BOT_TOKEN = process.env.BOT_TOKEN || process.env.TELEGRAM_BOT_TOKEN; /** Single bot instance for all webhook requests; created once at module load. */ const bot = BOT_TOKEN ? createBot(BOT_TOKEN) : null; @@ -92,6 +75,7 @@ async function getWebhookInfo(): Promise<{ url?: string }> { } export async function handleRequest(request: Request): Promise { + console.log("🔥 LOCAL WEBHOOK HIT", new Date().toISOString()); const method = request.method; console.log('[webhook]', method, new Date().toISOString()); @@ -149,12 +133,8 @@ export async function handleRequest(request: Request): Promise { // Return 200 OK immediately so Telegram applies the message to the chat. Process update // in waitUntil so we don't block the response on AI/DB. - // Serialize per chat so Reply A is always sent before we start processing Prompt B. const updateId = update.update_id; - const chatId = getChatIdFromUpdate(update); - const prev = chatId !== undefined ? chatQueue.get(chatId) : undefined; - const work = (prev ?? Promise.resolve()) - .then(() => ensureBotInit()) + const work = ensureBotInit() .then(() => bot!.handleUpdate(update as Parameters[0])) .then(() => { console.log('[webhook] handled update', updateId); @@ -162,10 +142,6 @@ export async function handleRequest(request: Request): Promise { .catch((err) => { console.error('[bot]', err); }); - const tail = work.then(() => {}, () => {}); - if (chatId !== undefined) { - chatQueue.set(chatId, tail); - } waitUntil(work); return jsonResponse({ ok: true }); } @@ -234,20 +210,14 @@ async function legacyHandler(req: NodeReq, res: NodeRes): Promise { res.status(400).json({ ok: false, error: 'invalid_body' }); return; } - try { - await ensureBotInit(); - const chatIdLegacy = getChatIdFromUpdate(update); - const prevLegacy = chatIdLegacy !== undefined ? chatQueue.get(chatIdLegacy) : undefined; - await (prevLegacy ?? Promise.resolve()); - await bot.handleUpdate(update as Parameters[0]); - if (chatIdLegacy !== undefined) { - chatQueue.set(chatIdLegacy, Promise.resolve()); - } - } catch (err) { - console.error('[bot]', err); - res.status(500).json({ ok: false, error: 'handler_error' }); - return; - } + void ensureBotInit() + .then(() => bot.handleUpdate(update as Parameters[0])) + .then(() => { + console.log('[webhook] handled update', update.update_id); + }) + .catch((err) => { + console.error('[bot]', err); + }); res.status(200).json({ ok: true }); } diff --git a/app/scripts/run-bot-local.ts b/app/scripts/run-bot-local.ts index fe471fe..16eaf59 100644 --- a/app/scripts/run-bot-local.ts +++ b/app/scripts/run-bot-local.ts @@ -17,8 +17,8 @@ if (!token) { async function main() { const bot = createBot(token); await bot.api.deleteWebhook(); - await bot.start(); console.log('Bot running locally (getUpdates). Press Ctrl+C to stop.'); + await bot.start(); } main().catch((err) => {