From b2b5e60dad17568a74ae8f1ec6d16bdf4f16a95c Mon Sep 17 00:00:00 2001 From: Dhereal1 Date: Sat, 28 Mar 2026 16:58:54 +0300 Subject: [PATCH 1/4] fix(bot): hard-stop streaming on new message per thread using AbortController - Introduce thread-scoped AbortController map - Abort previous generation when new message arrives for same thread - Propagate abort signal to AI streaming layer - Stop Telegram edit loop immediately on abort - Preserve existing HTML + chunking behavior Implements G1 (hard cancellation). Multi-thread concurrency intentionally deferred. --- app/ai/openai.ts | 68 ++++++++++++++++++++++++++++++++++--------- app/ai/transmitter.ts | 6 +++- app/bot/responder.ts | 57 ++++++++++++++++++++++++++++++++---- 3 files changed, 112 insertions(+), 19 deletions(-) diff --git a/app/ai/openai.ts b/app/ai/openai.ts index 131b9ef..2967c80 100644 --- a/app/ai/openai.ts +++ b/app/ai/openai.ts @@ -100,7 +100,11 @@ export async function callOpenAiChatStream( mode: AiMode, params: AiRequestBase, onDelta: (text: string) => void | Promise, - opts?: { isCancelled?: () => boolean; getAbortSignal?: () => Promise }, + opts?: { + isCancelled?: () => boolean; + getAbortSignal?: () => Promise; + abortSignal?: AbortSignal; + }, ): Promise { if (!client) { return { @@ -125,31 +129,49 @@ export async function callOpenAiChatStream( mode === "token_info" ? "You are a blockchain and token analyst. Answer clearly and briefly.\n\n" : ""; + let onAbort: (() => void) | null = null; try { + if (opts?.abortSignal?.aborted) { + return { + ok: false, + provider: "openai", + mode, + error: "Generation aborted by newer message.", + }; + } + const stream = client.responses.stream({ model: "gpt-5.2", ...(params.instructions ? { instructions: params.instructions } : {}), input: `${prefix}${trimmed}`, }); + const abortStream = (): void => { + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (stream as any)?.abort?.(); + } catch { + /* ignore */ + } + }; + onAbort = (): void => { + abortStream(); + }; + if (opts?.abortSignal) { + opts.abortSignal.addEventListener("abort", onAbort, { once: true }); + } stream.on("response.output_text.delta", async (event: { snapshot?: string }) => { + if (opts?.abortSignal?.aborted) { + abortStream(); + return; + } if (opts?.isCancelled && opts.isCancelled()) { - try { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (stream as any)?.abort?.(); - } catch { - /* ignore */ - } + abortStream(); return; } if (opts?.getAbortSignal && (await opts.getAbortSignal())) { - try { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (stream as any)?.abort?.(); - } catch { - /* ignore */ - } + abortStream(); return; } const text = event?.snapshot ?? ""; @@ -157,6 +179,14 @@ export async function callOpenAiChatStream( }); const response = await stream.finalResponse(); + if (opts?.abortSignal?.aborted) { + return { + ok: false, + provider: "openai", + mode, + error: "Generation aborted by newer message.", + }; + } const r = response as any; let output_text = r.output_text; if (output_text == null || String(output_text).trim() === "") { @@ -189,6 +219,14 @@ export async function callOpenAiChatStream( usage: r.usage ?? undefined, }; } catch (e: any) { + if (opts?.abortSignal?.aborted) { + return { + ok: false, + provider: "openai", + mode, + error: "Generation aborted by newer message.", + }; + } const message = (e && typeof e === "object" && "message" in e ? (e as Error).message : null) ?? (e != null ? String(e) : "Failed to call OpenAI. Check OPENAI env and network."); @@ -198,5 +236,9 @@ export async function callOpenAiChatStream( mode, error: message, }; + } finally { + if (opts?.abortSignal && onAbort) { + opts.abortSignal.removeEventListener("abort", onAbort); + } } } diff --git a/app/ai/transmitter.ts b/app/ai/transmitter.ts index b5510a3..e773889 100644 --- a/app/ai/transmitter.ts +++ b/app/ai/transmitter.ts @@ -245,7 +245,11 @@ export async function transmit(request: AiRequest): Promise { export async function transmitStream( request: AiRequest, onDelta: (text: string) => void | Promise, - opts?: { isCancelled?: () => boolean; getAbortSignal?: () => Promise }, + opts?: { + isCancelled?: () => boolean; + getAbortSignal?: () => Promise; + abortSignal?: AbortSignal; + }, ): Promise { const mode: AiMode = request.mode ?? "chat"; const thread = request.threadContext; diff --git a/app/bot/responder.ts b/app/bot/responder.ts index 4999e1f..6b77342 100644 --- a/app/bot/responder.ts +++ b/app/bot/responder.ts @@ -98,6 +98,16 @@ const EDIT_MIN_CHARS_TO_SEND_NOW = 20; /** Track latest generation per chat so newer messages cancel older streams. */ const chatGenerations = new Map(); +/** Single source of truth for hard-cancel per thread ("latest prompt wins"). */ +const threadControllers = new Map(); + +function startNewGeneration(threadKey: string): AbortController { + const existing = threadControllers.get(threadKey); + if (existing) existing.abort(); + const controller = new AbortController(); + threadControllers.set(threadKey, controller); + return controller; +} type BotSourceContext = { source: "bot"; @@ -179,8 +189,12 @@ export async function handleBotAiResponse(ctx: Context): Promise { const chatId = ctx.chat?.id; const isPrivate = ctx.chat?.type === "private"; const canStream = isPrivate && typeof chatId === "number"; + const threadKey = `bot:${typeof chatId === "number" ? chatId : from?.id ?? "unknown"}:${thread_id}`; + const generationController = startNewGeneration(threadKey); + const generationSignal = generationController.signal; /** When streaming we send one message early then edit it; used to detect streaming path. */ let streamSentMessageId: number | null = null; + try { const numericChatId = typeof chatId === "number" ? chatId : undefined; @@ -191,8 +205,9 @@ export async function handleBotAiResponse(ctx: Context): Promise { chatGenerations.set(numericChatId, generation); } const isCancelled = (): boolean => - numericChatId !== undefined && - chatGenerations.get(numericChatId) !== generation; + generationSignal.aborted || + (numericChatId !== undefined && + chatGenerations.get(numericChatId) !== generation); const shouldAbortSend = async (): Promise => { if (!threadContext) return false; @@ -221,7 +236,8 @@ export async function handleBotAiResponse(ctx: Context): Promise { /** When turn is interrupted: message already exists (we sent early); optionally final edit, always persist. HTML only (format pipeline is strict). */ const sendInterruptedReply = async (opts: { sendToChat: boolean }): Promise => { const content = streamedAccumulated.trim(); - if (sentMessageId !== null && content.length > 0) { + const allowChatSend = opts.sendToChat && !generationSignal.aborted; + if (allowChatSend && sentMessageId !== null && content.length > 0) { const toEdit = truncateTelegramHtmlSafe( closeOpenTelegramHtml( stripUnpairedMarkdownDelimiters(mdToTelegramHtml(content)), @@ -233,7 +249,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { } catch (e) { console.error("[bot][edit] interrupted reply", (e as Error)?.message ?? e); } - } else if (opts.sendToChat && content.length > 0) { + } else if (allowChatSend && content.length > 0) { const toSend = truncateTelegramHtmlSafe( closeOpenTelegramHtml( stripUnpairedMarkdownDelimiters(mdToTelegramHtml(content)), @@ -245,7 +261,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { } catch (e) { console.error("[bot][reply] interrupted", (e as Error)?.message ?? e); } - } else if (opts.sendToChat && sentMessageId === null) { + } else if (allowChatSend && sentMessageId === null) { try { await ctx.reply("…", replyOptions); } catch (_) {} @@ -276,6 +292,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { /** First call sends a message (claims message_id); later calls edit that message. HTML only; format pipeline is strict so Telegram accepts it. */ const sendOrEditOnce = (formatted: string, _rawSlice: string): Promise => { const run = async (): Promise => { + if (generationSignal.aborted) return; if (await shouldAbortSend()) return; if (isCancelled() || editsDisabled) return; const text = truncateTelegramHtmlSafe(formatted.trim() || "…", MAX_MESSAGE_TEXT_LENGTH); @@ -321,6 +338,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { }; const flushEdit = (awaitSend = false): void | Promise => { + if (generationSignal.aborted) return; if (isCancelled()) return; if (pending === null) return; const slice = pending; @@ -340,6 +358,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { const sendOrEdit = (accumulated: string): void => { stopTypingSpinner(); streamedAccumulated = accumulated; + if (generationSignal.aborted) return; if (isCancelled()) return; const slice = accumulated.length > MAX_MESSAGE_TEXT_LENGTH ? accumulated.slice(0, MAX_MESSAGE_TEXT_LENGTH) @@ -385,6 +404,10 @@ export async function handleBotAiResponse(ctx: Context): Promise { await sendOrEditOnce(typingFrames[typingIndex], typingFrames[typingIndex]); typingInterval = setInterval(() => { + if (generationSignal.aborted) { + stopTypingSpinner(); + return; + } if (sentMessageId === null) return; typingIndex = (typingIndex + 1) % typingFrames.length; ctx.api @@ -398,6 +421,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { { isCancelled, getAbortSignal: async () => (await shouldAbortSend()) || isCancelled(), + abortSignal: generationSignal, }, ); } catch (e) { @@ -416,6 +440,11 @@ export async function handleBotAiResponse(ctx: Context): Promise { stopTypingSpinner(); return; } + if (generationSignal.aborted) { + stopTypingSpinner(); + await sendInterruptedReply({ sendToChat: false }); + return; + } if (isCancelled()) { stopTypingSpinner(); await sendInterruptedReply({ sendToChat: !(await shouldAbortSend()) }); @@ -452,6 +481,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { { isCancelled, getAbortSignal: async () => (await shouldAbortSend()) || isCancelled(), + abortSignal: generationSignal, }, ); } catch (e) { @@ -470,6 +500,11 @@ export async function handleBotAiResponse(ctx: Context): Promise { stopTypingSpinner(); return; } + if (generationSignal.aborted) { + stopTypingSpinner(); + await sendInterruptedReply({ sendToChat: false }); + return; + } if (isCancelled()) { stopTypingSpinner(); await sendInterruptedReply({ sendToChat: !(await shouldAbortSend()) }); @@ -499,6 +534,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { ); if (finalFormatted.trim()) { sendOrEditQueue = sendOrEditQueue.then(async () => { + if (generationSignal.aborted) return; try { await ctx.api.editMessageText(chatId, streamSentMessageId!, finalFormatted, { parse_mode: "HTML" }); } catch (e: unknown) { @@ -569,6 +605,11 @@ export async function handleBotAiResponse(ctx: Context): Promise { return; } if (await shouldAbortSend()) return; + if (generationSignal.aborted && interruptedReplyCallback) { + await interruptedReplyCallback({ sendToChat: false }); + return; + } + if (generationSignal.aborted) return; if (isCancelled() && interruptedReplyCallback) { await interruptedReplyCallback({ sendToChat: true }); return; @@ -577,6 +618,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { // Streaming path: first message already has up to 4096. Send overflow as continuation if needed; then we're done. if (streamSentMessageId !== null && chatId !== undefined) { + if (generationSignal.aborted) return; if (result.output_text.length > MAX_MESSAGE_TEXT_LENGTH) { await sendLongMessage( ctx.api, @@ -617,4 +659,9 @@ export async function handleBotAiResponse(ctx: Context): Promise { const textToFormat = result.output_text.slice(0, MAX_MESSAGE_TEXT_LENGTH); await ctx.reply(textToFormat, replyOptions); } + } finally { + if (threadControllers.get(threadKey) === generationController) { + threadControllers.delete(threadKey); + } + } } From ededf395e5872e010c958c6bd5a1414a40741ab4 Mon Sep 17 00:00:00 2001 From: Dhereal1 Date: Sat, 28 Mar 2026 21:56:27 +0300 Subject: [PATCH 2/4] fix(bot): hard cancellation + webhook concurrency + logging improvements --- app/api/base.ts | 31 ++++++++++++ app/api/telegram.ts | 2 + app/bot/responder.ts | 96 +++++++++++++++++++++++++++--------- app/bot/webhook.ts | 37 ++------------ app/scripts/run-bot-local.ts | 2 +- 5 files changed, 110 insertions(+), 58 deletions(-) diff --git a/app/api/base.ts b/app/api/base.ts index 6de8da6..2811a5f 100644 --- a/app/api/base.ts +++ b/app/api/base.ts @@ -14,6 +14,29 @@ function normalizeBase(base: string): string { return base.replace(/\/$/, ""); } +function isLocalhostUrl(value: string): boolean { + try { + const u = new URL(value); + return isPrivateOrLocalHost(u.hostname); + } catch { + return false; + } +} + +function isLikelyTelegramMiniApp(): boolean { + if (typeof window === "undefined") return false; + try { + if ((window.location.hash ?? "").includes("tgWebApp")) return true; + } catch { + // ignore + } + try { + return !!(window as unknown as { Telegram?: { WebApp?: unknown } }).Telegram?.WebApp; + } catch { + return false; + } +} + function isPrivateOrLocalHost(hostname: string): boolean { if (hostname === "localhost" || hostname === "127.0.0.1") return true; if (hostname.startsWith("10.")) return true; @@ -92,6 +115,14 @@ function getNodeBaseUrl(): string { export function getApiBaseUrl(): string { const envBase = process.env.EXPO_PUBLIC_API_BASE_URL?.trim(); if (envBase) { + if ( + typeof window !== "undefined" && + isLikelyTelegramMiniApp() && + isLocalhostUrl(envBase) + ) { + const browserBase = getBrowserBaseUrl(); + if (browserBase) return browserBase; + } return normalizeBase(envBase); } diff --git a/app/api/telegram.ts b/app/api/telegram.ts index ee4d0ec..b7bfb1f 100644 --- a/app/api/telegram.ts +++ b/app/api/telegram.ts @@ -22,6 +22,7 @@ async function handler( res?: { status: (n: number) => void; setHeader: (k: string, v: string) => void; end: (s?: string) => void } ): Promise { const method = (request as { method?: string }).method ?? request.method; + console.log('[api/telegram]', method, new Date().toISOString()); if (method === 'GET') { const body = { ok: true, endpoint: 'telegram', use: 'POST with initData' }; if (res) { @@ -41,6 +42,7 @@ async function handler( try { const { handlePost } = await import('../telegram/post.js'); const response = await handlePost(request); + console.log('[api/telegram] POST status', response.status); if (res) { res.status(response.status); response.headers.forEach((v, k) => res.setHeader(k, v)); diff --git a/app/bot/responder.ts b/app/bot/responder.ts index 6b77342..4fbf2b9 100644 --- a/app/bot/responder.ts +++ b/app/bot/responder.ts @@ -12,6 +12,10 @@ import { /** Telegram text message length limit. */ const MAX_MESSAGE_TEXT_LENGTH = 4096; +/** Hard cap on continuation chunks to avoid Telegram flood limits in topics/groups. */ +const MAX_LONG_MESSAGE_PARTS = 2; +const TELEGRAM_TRUNCATION_NOTICE = + "\n\n[Truncated in Telegram. Open the Mini App for the full response.]"; /** Instruction passed to AI when the message comes from the bot: keep replies under 4096 chars and mention TMA for long answers. */ const TELEGRAM_BOT_LENGTH_INSTRUCTION = @@ -34,6 +38,17 @@ function chunkText(text: string, maxLen: number): string[] { return chunks; } +function getRetryAfterSeconds(error: unknown): number { + const retryAfter = (error as { parameters?: { retry_after?: number } })?.parameters?.retry_after; + return typeof retryAfter === "number" && Number.isFinite(retryAfter) ? retryAfter : 0; +} + +function isTelegramRateLimit(error: unknown): boolean { + const code = (error as { error_code?: number })?.error_code; + const description = (error as { description?: string })?.description ?? ""; + return code === 429 || description.includes("Too Many Requests"); +} + /** Send long text as multiple messages (each ≤ MAX_MESSAGE_TEXT_LENGTH). First chunk replies to replyToMessageId or uses replyOptions; rest reply to previous sent message. */ async function sendLongMessage( api: Context["api"], @@ -45,11 +60,17 @@ async function sendLongMessage( ): Promise { const chunks = chunkText(fullText, MAX_MESSAGE_TEXT_LENGTH); if (chunks.length === 0) return; + const limited = chunks.slice(0, MAX_LONG_MESSAGE_PARTS); let lastSentId: number | undefined = opts.replyToMessageId; - for (let i = 0; i < chunks.length; i++) { + for (let i = 0; i < limited.length; i++) { + const isLastAllowed = i === limited.length - 1; + const withNotice = + isLastAllowed && chunks.length > MAX_LONG_MESSAGE_PARTS + ? `${limited[i].trimEnd()}${TELEGRAM_TRUNCATION_NOTICE}` + : limited[i]; const formatted = truncateTelegramHtmlSafe( closeOpenTelegramHtml( - stripUnpairedMarkdownDelimiters(mdToTelegramHtml(chunks[i])), + stripUnpairedMarkdownDelimiters(mdToTelegramHtml(withNotice)), ), MAX_MESSAGE_TEXT_LENGTH, ); @@ -67,21 +88,28 @@ async function sendLongMessage( if (typeof id === "number") lastSentId = id; } catch (e) { console.error("[bot][sendLongMessage]", (e as Error)?.message ?? e); + if (isTelegramRateLimit(e) && getRetryAfterSeconds(e) > 15) return; try { - const markdown = toTelegramMarkdown(chunks[i]); + const markdown = toTelegramMarkdown(withNotice); const sent = await api.sendMessage(chatId, markdown, { ...partOptions, parse_mode: "Markdown", }); const id = (sent as { message_id?: number }).message_id; if (typeof id === "number") lastSentId = id; - } catch { - const sent = await api.sendMessage(chatId, chunks[i], { - ...(replyOptions.message_thread_id !== undefined ? { message_thread_id: replyOptions.message_thread_id } : {}), - ...(lastSentId !== undefined ? { reply_parameters: { message_id: lastSentId } } : {}), - }); - const id = (sent as { message_id?: number }).message_id; - if (typeof id === "number") lastSentId = id; + } catch (e2) { + if (isTelegramRateLimit(e2) && getRetryAfterSeconds(e2) > 15) return; + try { + const sent = await api.sendMessage(chatId, withNotice, { + ...(replyOptions.message_thread_id !== undefined ? { message_thread_id: replyOptions.message_thread_id } : {}), + ...(lastSentId !== undefined ? { reply_parameters: { message_id: lastSentId } } : {}), + }); + const id = (sent as { message_id?: number }).message_id; + if (typeof id === "number") lastSentId = id; + } catch (e3) { + console.error("[bot][sendLongMessage] plain fallback failed", (e3 as Error)?.message ?? e3); + return; + } } } } @@ -92,9 +120,9 @@ function toTelegramMarkdown(s: string): string { return s.replace(/\*\*/g, "*"); } /** Throttle editMessageText to avoid Telegram 429 rate limits. */ -const EDIT_THROTTLE_MS = 500; +const EDIT_THROTTLE_MS = 1200; /** If content grew by more than this many chars, edit immediately so long tail doesn't stick. */ -const EDIT_MIN_CHARS_TO_SEND_NOW = 20; +const EDIT_MIN_CHARS_TO_SEND_NOW = 80; /** Track latest generation per chat so newer messages cancel older streams. */ const chatGenerations = new Map(); @@ -103,7 +131,10 @@ const threadControllers = new Map(); function startNewGeneration(threadKey: string): AbortController { const existing = threadControllers.get(threadKey); - if (existing) existing.abort(); + if (existing) { + existing.abort(); + console.log("[bot][cancel] aborted previous generation", threadKey); + } const controller = new AbortController(); threadControllers.set(threadKey, controller); return controller; @@ -311,7 +342,13 @@ export async function handleBotAiResponse(ctx: Context): Promise { const err = e as { error_code?: number; description?: string; parameters?: { retry_after?: number } }; if (err?.description?.includes("not modified")) return; if (err?.error_code === 429) { - await new Promise((r) => setTimeout(r, Math.min((err.parameters?.retry_after ?? 1) * 1000, 2000))); + const retryAfterSec = err.parameters?.retry_after ?? 1; + if (retryAfterSec > 15) { + console.warn("[bot][edit] disabling edits due to long rate limit window", retryAfterSec); + editsDisabled = true; + return; + } + await new Promise((r) => setTimeout(r, Math.min(retryAfterSec * 1000, 5000))); try { if (sentMessageId === null) { const sent = await ctx.api.sendMessage(chatId, text, replyOptionsWithHtml); @@ -620,14 +657,18 @@ export async function handleBotAiResponse(ctx: Context): Promise { if (streamSentMessageId !== null && chatId !== undefined) { if (generationSignal.aborted) return; if (result.output_text.length > MAX_MESSAGE_TEXT_LENGTH) { - await sendLongMessage( - ctx.api, - chatId, - result.output_text.slice(MAX_MESSAGE_TEXT_LENGTH), - replyOptions, - replyOptionsWithHtml, - { replyToMessageId: streamSentMessageId }, - ); + try { + await sendLongMessage( + ctx.api, + chatId, + result.output_text.slice(MAX_MESSAGE_TEXT_LENGTH), + replyOptions, + replyOptionsWithHtml, + { replyToMessageId: streamSentMessageId }, + ); + } catch (e) { + console.error("[bot][overflow] continuation failed", (e as Error)?.message ?? e); + } } return; } @@ -654,7 +695,16 @@ export async function handleBotAiResponse(ctx: Context): Promise { } if (chatId !== undefined) { - await sendLongMessage(ctx.api, chatId, result.output_text, replyOptions, replyOptionsWithHtml, {}); + try { + await sendLongMessage(ctx.api, chatId, result.output_text, replyOptions, replyOptionsWithHtml, {}); + } catch (e) { + console.error("[bot][sendLongMessage] failed", (e as Error)?.message ?? e); + try { + await ctx.reply("Response was rate-limited by Telegram. Please retry in a moment.", replyOptions); + } catch { + // no-op + } + } } else { const textToFormat = result.output_text.slice(0, MAX_MESSAGE_TEXT_LENGTH); await ctx.reply(textToFormat, replyOptions); diff --git a/app/bot/webhook.ts b/app/bot/webhook.ts index 55eb581..4a8973c 100644 --- a/app/bot/webhook.ts +++ b/app/bot/webhook.ts @@ -6,9 +6,8 @@ * - We return 200 OK immediately so Telegram does not retry or hide the user's message. * - We process the update in the background (waitUntil). * - * Per-chat serialization: we process one update per chat at a time. When a new update arrives - * for a chat, we wait for the previous handler for that chat to finish, then run the new one. - * So Reply A is always sent before we start processing Prompt B — no reorder flash. + * Updates are processed concurrently. Cancellation/ordering is handled in responder logic + * ("latest prompt wins"), so a newer message can interrupt an in-flight stream quickly. */ import { waitUntil } from '@vercel/functions'; @@ -23,22 +22,6 @@ interface TelegramUpdate { [key: string]: unknown; } -/** Extract chat id from update so we can serialize processing per chat and avoid reply order flash. */ -function getChatIdFromUpdate(update: TelegramUpdate): number | undefined { - const msg = update.message ?? update.edited_message ?? update.channel_post; - if (msg && typeof msg === 'object' && msg.chat && typeof (msg.chat as { id?: number }).id === 'number') { - return (msg.chat as { id: number }).id; - } - const cq = update.callback_query; - if (cq && typeof cq === 'object' && cq.message && typeof (cq.message as { chat?: { id?: number } }).chat?.id === 'number') { - return (cq.message as { chat: { id: number } }).chat.id; - } - return undefined; -} - -/** Per-chat tail promise: next update for this chat waits for the previous handler to finish. */ -const chatQueue = new Map>(); - const BOT_TOKEN = process.env.BOT_TOKEN || process.env.TELEGRAM_BOT_TOKEN; /** Single bot instance for all webhook requests; created once at module load. */ const bot = BOT_TOKEN ? createBot(BOT_TOKEN) : null; @@ -149,12 +132,8 @@ export async function handleRequest(request: Request): Promise { // Return 200 OK immediately so Telegram applies the message to the chat. Process update // in waitUntil so we don't block the response on AI/DB. - // Serialize per chat so Reply A is always sent before we start processing Prompt B. const updateId = update.update_id; - const chatId = getChatIdFromUpdate(update); - const prev = chatId !== undefined ? chatQueue.get(chatId) : undefined; - const work = (prev ?? Promise.resolve()) - .then(() => ensureBotInit()) + const work = ensureBotInit() .then(() => bot!.handleUpdate(update as Parameters[0])) .then(() => { console.log('[webhook] handled update', updateId); @@ -162,10 +141,6 @@ export async function handleRequest(request: Request): Promise { .catch((err) => { console.error('[bot]', err); }); - const tail = work.then(() => {}, () => {}); - if (chatId !== undefined) { - chatQueue.set(chatId, tail); - } waitUntil(work); return jsonResponse({ ok: true }); } @@ -236,13 +211,7 @@ async function legacyHandler(req: NodeReq, res: NodeRes): Promise { } try { await ensureBotInit(); - const chatIdLegacy = getChatIdFromUpdate(update); - const prevLegacy = chatIdLegacy !== undefined ? chatQueue.get(chatIdLegacy) : undefined; - await (prevLegacy ?? Promise.resolve()); await bot.handleUpdate(update as Parameters[0]); - if (chatIdLegacy !== undefined) { - chatQueue.set(chatIdLegacy, Promise.resolve()); - } } catch (err) { console.error('[bot]', err); res.status(500).json({ ok: false, error: 'handler_error' }); diff --git a/app/scripts/run-bot-local.ts b/app/scripts/run-bot-local.ts index fe471fe..16eaf59 100644 --- a/app/scripts/run-bot-local.ts +++ b/app/scripts/run-bot-local.ts @@ -17,8 +17,8 @@ if (!token) { async function main() { const bot = createBot(token); await bot.api.deleteWebhook(); - await bot.start(); console.log('Bot running locally (getUpdates). Press Ctrl+C to stop.'); + await bot.start(); } main().catch((err) => { From 0f7f1b506f6c93ceb20187c66c5b27f5f1424fd0 Mon Sep 17 00:00:00 2001 From: Dhereal1 Date: Sun, 29 Mar 2026 18:33:27 +0300 Subject: [PATCH 3/4] fix(bot): stabilize streaming cancellation and webhook concurrency - fix incorrect abort logic (max > current update_id) - ensure streams stop immediately on cancel - prevent stale generations from sending updates - add generation-based cancellation guard - make streaming non-blocking (no awaited edits) - improve logging with update_id for tracing --- app/ai/openai.ts | 15 ++++-- app/bot/responder.ts | 122 +++++++++++++++++++++++++++++++++++-------- app/bot/webhook.ts | 17 +++--- 3 files changed, 121 insertions(+), 33 deletions(-) diff --git a/app/ai/openai.ts b/app/ai/openai.ts index 2967c80..9345585 100644 --- a/app/ai/openai.ts +++ b/app/ai/openai.ts @@ -154,6 +154,14 @@ export async function callOpenAiChatStream( /* ignore */ } }; + let cancelLogged = false; + const hardAbort = (reason: string): void => { + if (!cancelLogged) { + cancelLogged = true; + console.log("[STREAM] aborted immediately:", reason); + } + abortStream(); + }; onAbort = (): void => { abortStream(); }; @@ -163,15 +171,16 @@ export async function callOpenAiChatStream( stream.on("response.output_text.delta", async (event: { snapshot?: string }) => { if (opts?.abortSignal?.aborted) { - abortStream(); + hardAbort("abortSignal"); return; } if (opts?.isCancelled && opts.isCancelled()) { - abortStream(); + hardAbort("isCancelled"); return; } if (opts?.getAbortSignal && (await opts.getAbortSignal())) { - abortStream(); + console.log("[STREAM] stale generation"); + hardAbort("getAbortSignal"); return; } const text = event?.snapshot ?? ""; diff --git a/app/bot/responder.ts b/app/bot/responder.ts index 4fbf2b9..a854b3a 100644 --- a/app/bot/responder.ts +++ b/app/bot/responder.ts @@ -56,13 +56,14 @@ async function sendLongMessage( fullText: string, replyOptions: { message_thread_id?: number; reply_parameters?: { message_id: number } }, replyOptionsWithHtml: { message_thread_id?: number; reply_parameters?: { message_id: number }; parse_mode: "HTML" }, - opts: { replyToMessageId?: number }, + opts: { replyToMessageId?: number; shouldSkipIo?: () => boolean }, ): Promise { const chunks = chunkText(fullText, MAX_MESSAGE_TEXT_LENGTH); if (chunks.length === 0) return; const limited = chunks.slice(0, MAX_LONG_MESSAGE_PARTS); let lastSentId: number | undefined = opts.replyToMessageId; for (let i = 0; i < limited.length; i++) { + if (opts.shouldSkipIo?.()) return; const isLastAllowed = i === limited.length - 1; const withNotice = isLastAllowed && chunks.length > MAX_LONG_MESSAGE_PARTS @@ -83,6 +84,7 @@ async function sendLongMessage( parse_mode: "HTML" as const, }; try { + if (opts.shouldSkipIo?.()) return; const sent = await api.sendMessage(chatId, formatted, partOptions); const id = (sent as { message_id?: number }).message_id; if (typeof id === "number") lastSentId = id; @@ -91,6 +93,7 @@ async function sendLongMessage( if (isTelegramRateLimit(e) && getRetryAfterSeconds(e) > 15) return; try { const markdown = toTelegramMarkdown(withNotice); + if (opts.shouldSkipIo?.()) return; const sent = await api.sendMessage(chatId, markdown, { ...partOptions, parse_mode: "Markdown", @@ -100,6 +103,7 @@ async function sendLongMessage( } catch (e2) { if (isTelegramRateLimit(e2) && getRetryAfterSeconds(e2) > 15) return; try { + if (opts.shouldSkipIo?.()) return; const sent = await api.sendMessage(chatId, withNotice, { ...(replyOptions.message_thread_id !== undefined ? { message_thread_id: replyOptions.message_thread_id } : {}), ...(lastSentId !== undefined ? { reply_parameters: { message_id: lastSentId } } : {}), @@ -124,8 +128,8 @@ const EDIT_THROTTLE_MS = 1200; /** If content grew by more than this many chars, edit immediately so long tail doesn't stick. */ const EDIT_MIN_CHARS_TO_SEND_NOW = 80; -/** Track latest generation per chat so newer messages cancel older streams. */ -const chatGenerations = new Map(); +/** Track latest generation per thread so newer messages cancel older streams immediately. */ +const activeGeneration = new Map(); /** Single source of truth for hard-cancel per thread ("latest prompt wins"). */ const threadControllers = new Map(); @@ -223,22 +227,25 @@ export async function handleBotAiResponse(ctx: Context): Promise { const threadKey = `bot:${typeof chatId === "number" ? chatId : from?.id ?? "unknown"}:${thread_id}`; const generationController = startNewGeneration(threadKey); const generationSignal = generationController.signal; + const gen = (activeGeneration.get(threadKey) ?? 0) + 1; + activeGeneration.set(threadKey, gen); + console.log("[START] new generation", threadKey, gen, "update:", update_id ?? "n/a"); + const isStopMessage = text.toLowerCase().includes("stop"); /** When streaming we send one message early then edit it; used to detect streaming path. */ let streamSentMessageId: number | null = null; try { - - const numericChatId = - typeof chatId === "number" ? chatId : undefined; - let generation = 0; - if (numericChatId !== undefined) { - const prev = chatGenerations.get(numericChatId) ?? 0; - generation = prev + 1; - chatGenerations.set(numericChatId, generation); - } + const isStaleGeneration = (): boolean => + activeGeneration.get(threadKey) !== gen; + const shouldSkipTelegramIo = (label: string): boolean => { + if (activeGeneration.get(threadKey) !== gen) { + console.log("[CANCEL] skip edit/send", threadKey, label); + return true; + } + return false; + }; const isCancelled = (): boolean => generationSignal.aborted || - (numericChatId !== undefined && - chatGenerations.get(numericChatId) !== generation); + isStaleGeneration(); const shouldAbortSend = async (): Promise => { if (!threadContext) return false; @@ -247,7 +254,45 @@ export async function handleBotAiResponse(ctx: Context): Promise { threadContext.thread_id, "bot", ); - return max !== null && max !== threadContext.telegram_update_id; + // Abort only when a newer Telegram update already exists for this thread. + // Using "!=" can kill the current generation before its own claim row is inserted. + return ( + max !== null && + typeof threadContext.telegram_update_id === "number" && + max > threadContext.telegram_update_id + ); + }; + if (isStopMessage) { + if (chatId !== undefined) { + await ctx.api.sendMessage(chatId, "✅ Stopped. Send a new question.", replyOptions); + } else { + await ctx.reply("✅ Stopped. Send a new question.", replyOptions); + } + return; + } + let staleLogged = false; + const markStaleAndAbort = (): boolean => { + if (!isStaleGeneration()) return false; + if (!staleLogged) { + staleLogged = true; + console.log("[CANCEL] stale generation", threadKey); + } + if (!generationSignal.aborted) { + generationController.abort(); + } + return true; + }; + let cancelLogged = false; + const shouldAbortGeneration = async (): Promise => { + const abort = markStaleAndAbort() || (await shouldAbortSend()) || isCancelled(); + if (abort && !cancelLogged) { + cancelLogged = true; + console.log("[CANCEL] aborting stream for thread:", threadKey); + } + if (abort && !generationSignal.aborted) { + generationController.abort(); + } + return abort; }; let result: Awaited>; @@ -266,6 +311,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { /** When turn is interrupted: message already exists (we sent early); optionally final edit, always persist. HTML only (format pipeline is strict). */ const sendInterruptedReply = async (opts: { sendToChat: boolean }): Promise => { + if (markStaleAndAbort()) return; const content = streamedAccumulated.trim(); const allowChatSend = opts.sendToChat && !generationSignal.aborted; if (allowChatSend && sentMessageId !== null && content.length > 0) { @@ -276,6 +322,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { MAX_MESSAGE_TEXT_LENGTH, ); try { + if (shouldSkipTelegramIo("interrupted:edit")) return; await ctx.api.editMessageText(chatId, sentMessageId, toEdit, { parse_mode: "HTML" }); } catch (e) { console.error("[bot][edit] interrupted reply", (e as Error)?.message ?? e); @@ -288,12 +335,14 @@ export async function handleBotAiResponse(ctx: Context): Promise { MAX_MESSAGE_TEXT_LENGTH, ); try { + if (shouldSkipTelegramIo("interrupted:reply")) return; await ctx.reply(toSend, replyOptionsWithHtml); } catch (e) { console.error("[bot][reply] interrupted", (e as Error)?.message ?? e); } } else if (allowChatSend && sentMessageId === null) { try { + if (shouldSkipTelegramIo("interrupted:ellipsis")) return; await ctx.reply("…", replyOptions); } catch (_) {} } @@ -323,12 +372,14 @@ export async function handleBotAiResponse(ctx: Context): Promise { /** First call sends a message (claims message_id); later calls edit that message. HTML only; format pipeline is strict so Telegram accepts it. */ const sendOrEditOnce = (formatted: string, _rawSlice: string): Promise => { const run = async (): Promise => { + if (markStaleAndAbort()) return; if (generationSignal.aborted) return; - if (await shouldAbortSend()) return; + if (await shouldAbortGeneration()) return; if (isCancelled() || editsDisabled) return; const text = truncateTelegramHtmlSafe(formatted.trim() || "…", MAX_MESSAGE_TEXT_LENGTH); try { if (sentMessageId === null) { + if (shouldSkipTelegramIo("stream:send")) return; const sent = await ctx.api.sendMessage(chatId, text, replyOptionsWithHtml); const id = (sent as { message_id?: number }).message_id; if (typeof id === "number") { @@ -336,6 +387,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { streamSentMessageId = id; } } else { + if (shouldSkipTelegramIo("stream:edit")) return; await ctx.api.editMessageText(chatId, sentMessageId, text, { parse_mode: "HTML" }); } } catch (e: unknown) { @@ -350,7 +402,9 @@ export async function handleBotAiResponse(ctx: Context): Promise { } await new Promise((r) => setTimeout(r, Math.min(retryAfterSec * 1000, 5000))); try { + if (markStaleAndAbort()) return; if (sentMessageId === null) { + if (shouldSkipTelegramIo("stream:send:retry")) return; const sent = await ctx.api.sendMessage(chatId, text, replyOptionsWithHtml); const id = (sent as { message_id?: number }).message_id; if (typeof id === "number") { @@ -358,6 +412,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { streamSentMessageId = id; } } else { + if (shouldSkipTelegramIo("stream:edit:retry")) return; await ctx.api.editMessageText(chatId, sentMessageId, text, { parse_mode: "HTML" }); } } catch (e2) { @@ -375,6 +430,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { }; const flushEdit = (awaitSend = false): void | Promise => { + if (markStaleAndAbort()) return; if (generationSignal.aborted) return; if (isCancelled()) return; if (pending === null) return; @@ -395,6 +451,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { const sendOrEdit = (accumulated: string): void => { stopTypingSpinner(); streamedAccumulated = accumulated; + if (markStaleAndAbort()) return; if (generationSignal.aborted) return; if (isCancelled()) return; const slice = accumulated.length > MAX_MESSAGE_TEXT_LENGTH @@ -438,15 +495,18 @@ export async function handleBotAiResponse(ctx: Context): Promise { interruptedReplyCallback = sendInterruptedReply; - await sendOrEditOnce(typingFrames[typingIndex], typingFrames[typingIndex]); - typingInterval = setInterval(() => { + if (markStaleAndAbort()) { + stopTypingSpinner(); + return; + } if (generationSignal.aborted) { stopTypingSpinner(); return; } if (sentMessageId === null) return; typingIndex = (typingIndex + 1) % typingFrames.length; + if (shouldSkipTelegramIo("typing:edit")) return; ctx.api .editMessageText(chatId, sentMessageId, typingFrames[typingIndex]) .catch(() => {}); @@ -457,7 +517,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { sendOrEdit, { isCancelled, - getAbortSignal: async () => (await shouldAbortSend()) || isCancelled(), + getAbortSignal: shouldAbortGeneration, abortSignal: generationSignal, }, ); @@ -517,7 +577,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { sendOrEdit, { isCancelled, - getAbortSignal: async () => (await shouldAbortSend()) || isCancelled(), + getAbortSignal: shouldAbortGeneration, abortSignal: generationSignal, }, ); @@ -571,14 +631,17 @@ export async function handleBotAiResponse(ctx: Context): Promise { ); if (finalFormatted.trim()) { sendOrEditQueue = sendOrEditQueue.then(async () => { + if (markStaleAndAbort()) return; if (generationSignal.aborted) return; try { + if (shouldSkipTelegramIo("final:edit")) return; await ctx.api.editMessageText(chatId, streamSentMessageId!, finalFormatted, { parse_mode: "HTML" }); } catch (e: unknown) { const err = e as { description?: string; message?: string }; if (err?.description?.includes("not modified")) return; console.error("[bot][edit] final completion edit", err?.description ?? err?.message ?? e); try { + if (shouldSkipTelegramIo("final:edit:fallback")) return; await ctx.api.editMessageText(chatId, streamSentMessageId!, fullSlice, {}); } catch (e2: unknown) { const d2 = (e2 as { description?: string })?.description; @@ -617,6 +680,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { } if (!result.ok || !result.output_text) { + if (markStaleAndAbort()) return; if (await shouldAbortSend()) return; if (isCancelled()) return; const errMsg = result.error ?? "AI returned no output."; @@ -627,11 +691,14 @@ export async function handleBotAiResponse(ctx: Context): Promise { : "AI is temporarily unavailable. Please try again in a moment."; if (streamSentMessageId !== null && chatId !== undefined) { try { + if (shouldSkipTelegramIo("error:edit")) return; await ctx.api.editMessageText(chatId, streamSentMessageId, message, {}); } catch { + if (shouldSkipTelegramIo("error:reply:fallback")) return; await ctx.reply(message, replyOptions); } } else { + if (shouldSkipTelegramIo("error:reply")) return; await ctx.reply(message, replyOptions); } return; @@ -641,6 +708,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { await interruptedReplyCallback({ sendToChat: false }); return; } + if (markStaleAndAbort()) return; if (await shouldAbortSend()) return; if (generationSignal.aborted && interruptedReplyCallback) { await interruptedReplyCallback({ sendToChat: false }); @@ -655,6 +723,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { // Streaming path: first message already has up to 4096. Send overflow as continuation if needed; then we're done. if (streamSentMessageId !== null && chatId !== undefined) { + if (markStaleAndAbort()) return; if (generationSignal.aborted) return; if (result.output_text.length > MAX_MESSAGE_TEXT_LENGTH) { try { @@ -664,7 +733,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { result.output_text.slice(MAX_MESSAGE_TEXT_LENGTH), replyOptions, replyOptionsWithHtml, - { replyToMessageId: streamSentMessageId }, + { replyToMessageId: streamSentMessageId, shouldSkipIo: () => shouldSkipTelegramIo("overflow:sendLongMessage") }, ); } catch (e) { console.error("[bot][overflow] continuation failed", (e as Error)?.message ?? e); @@ -674,6 +743,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { } if (result.output_text.length <= MAX_MESSAGE_TEXT_LENGTH) { + if (markStaleAndAbort()) return; const textToFormat = result.output_text; const formatted = truncateTelegramHtmlSafe( closeOpenTelegramHtml( @@ -682,12 +752,15 @@ export async function handleBotAiResponse(ctx: Context): Promise { MAX_MESSAGE_TEXT_LENGTH, ); try { + if (shouldSkipTelegramIo("reply:html")) return; await ctx.reply(formatted, replyOptionsWithHtml); } catch (e) { console.error("[bot][reply] HTML reply failed", (e as Error)?.message ?? e); try { + if (shouldSkipTelegramIo("reply:markdown")) return; await ctx.reply(toTelegramMarkdown(textToFormat), { ...replyOptions, parse_mode: "Markdown" }); } catch { + if (shouldSkipTelegramIo("reply:plain")) return; await ctx.reply(textToFormat, replyOptions); } } @@ -695,11 +768,15 @@ export async function handleBotAiResponse(ctx: Context): Promise { } if (chatId !== undefined) { + if (markStaleAndAbort()) return; try { - await sendLongMessage(ctx.api, chatId, result.output_text, replyOptions, replyOptionsWithHtml, {}); + await sendLongMessage(ctx.api, chatId, result.output_text, replyOptions, replyOptionsWithHtml, { + shouldSkipIo: () => shouldSkipTelegramIo("sendLongMessage"), + }); } catch (e) { console.error("[bot][sendLongMessage] failed", (e as Error)?.message ?? e); try { + if (shouldSkipTelegramIo("sendLongMessage:errorReply")) return; await ctx.reply("Response was rate-limited by Telegram. Please retry in a moment.", replyOptions); } catch { // no-op @@ -707,6 +784,7 @@ export async function handleBotAiResponse(ctx: Context): Promise { } } else { const textToFormat = result.output_text.slice(0, MAX_MESSAGE_TEXT_LENGTH); + if (shouldSkipTelegramIo("reply:no-chatId")) return; await ctx.reply(textToFormat, replyOptions); } } finally { diff --git a/app/bot/webhook.ts b/app/bot/webhook.ts index 4a8973c..1ebd316 100644 --- a/app/bot/webhook.ts +++ b/app/bot/webhook.ts @@ -75,6 +75,7 @@ async function getWebhookInfo(): Promise<{ url?: string }> { } export async function handleRequest(request: Request): Promise { + console.log("🔥 LOCAL WEBHOOK HIT", new Date().toISOString()); const method = request.method; console.log('[webhook]', method, new Date().toISOString()); @@ -209,14 +210,14 @@ async function legacyHandler(req: NodeReq, res: NodeRes): Promise { res.status(400).json({ ok: false, error: 'invalid_body' }); return; } - try { - await ensureBotInit(); - await bot.handleUpdate(update as Parameters[0]); - } catch (err) { - console.error('[bot]', err); - res.status(500).json({ ok: false, error: 'handler_error' }); - return; - } + void ensureBotInit() + .then(() => bot.handleUpdate(update as Parameters[0])) + .then(() => { + console.log('[webhook] handled update', update.update_id); + }) + .catch((err) => { + console.error('[bot]', err); + }); res.status(200).json({ ok: true }); } From d366851b40212771261d003353786e9558eba7df Mon Sep 17 00:00:00 2001 From: Dhereal1 Date: Thu, 2 Apr 2026 19:12:04 +0100 Subject: [PATCH 4/4] chore(ai): centralize prompts + versioned /start --- .github/workflows/app-bot-tests.yml | 32 +++++++++++++++++++++ .github/workflows/bot-python-tests.yml | 32 +++++++++++++++++++++ app/ai/instructions.ts | 13 +++++++++ app/ai/openai.ts | 9 ++---- app/bot/__tests__/start.test.ts | 37 +++++++++++++++++++++++++ app/bot/grammy.ts | 3 +- app/bot/responder.ts | 5 +--- app/bot/start.ts | 22 +++++++++++++++ app/package.json | 1 + bot/ENV_VARIABLES.md | 6 ++++ bot/README.md | 2 ++ bot/bot.py | 28 ++++++++++++++++++- bot/handler.ts | 12 +++----- bot/instructions.ts | 15 ++++++++++ bot/tests/test_start_message_version.py | 34 +++++++++++++++++++++++ packages/bot/src/ai.ts | 12 +++----- packages/bot/src/instructions.ts | 15 ++++++++++ 17 files changed, 250 insertions(+), 28 deletions(-) create mode 100644 .github/workflows/app-bot-tests.yml create mode 100644 .github/workflows/bot-python-tests.yml create mode 100644 app/ai/instructions.ts create mode 100644 app/bot/__tests__/start.test.ts create mode 100644 app/bot/start.ts create mode 100644 bot/instructions.ts create mode 100644 bot/tests/test_start_message_version.py create mode 100644 packages/bot/src/instructions.ts diff --git a/.github/workflows/app-bot-tests.yml b/.github/workflows/app-bot-tests.yml new file mode 100644 index 0000000..f1b1979 --- /dev/null +++ b/.github/workflows/app-bot-tests.yml @@ -0,0 +1,32 @@ +name: app-bot-tests + +on: + pull_request: + paths: + - "app/bot/**" + - "app/package.json" + - "app/package-lock.json" + push: + branches: + - main + paths: + - "app/bot/**" + - "app/package.json" + - "app/package-lock.json" + +jobs: + test-bot-start-message: + runs-on: ubuntu-latest + defaults: + run: + working-directory: app + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-node@v4 + with: + node-version: 22 + cache: npm + cache-dependency-path: app/package-lock.json + - run: npm ci + - run: npm run test:bot + diff --git a/.github/workflows/bot-python-tests.yml b/.github/workflows/bot-python-tests.yml new file mode 100644 index 0000000..2925e1f --- /dev/null +++ b/.github/workflows/bot-python-tests.yml @@ -0,0 +1,32 @@ +name: bot-python-tests + +on: + pull_request: + paths: + - "bot/**" + - ".github/workflows/bot-python-tests.yml" + push: + branches: + - main + paths: + - "bot/**" + - ".github/workflows/bot-python-tests.yml" + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.11" + cache: pip + cache-dependency-path: bot/requirements.txt + - name: Install deps + run: | + python -m pip install --upgrade pip + pip install -r bot/requirements.txt + pip install pytest + - name: Run tests + run: python -m pytest -q bot/tests + diff --git a/app/ai/instructions.ts b/app/ai/instructions.ts new file mode 100644 index 0000000..3713ffa --- /dev/null +++ b/app/ai/instructions.ts @@ -0,0 +1,13 @@ +import type { AiMode } from "./openai.js"; + +export const TOKEN_INFO_INPUT_PREFIX = + "You are a blockchain and token analyst. Answer clearly and briefly.\n\n"; + +export function getInputPrefixForMode(mode: AiMode): string { + return mode === "token_info" ? TOKEN_INFO_INPUT_PREFIX : ""; +} + +/** Instruction passed to AI for Telegram bot messages (HTML replies must fit Telegram limits). */ +export const TELEGRAM_BOT_LENGTH_INSTRUCTION = + "Please give an answer in less than 4096 chars. If user asks for a long message or a message with more than 4096 chars add a sentence that full responses are available only in TMA and your bot you can give just a short answer that follows."; + diff --git a/app/ai/openai.ts b/app/ai/openai.ts index 9345585..64b9e1a 100644 --- a/app/ai/openai.ts +++ b/app/ai/openai.ts @@ -1,4 +1,5 @@ import OpenAI from "openai"; +import { getInputPrefixForMode } from "./instructions.js"; export type AiMode = "chat" | "token_info"; @@ -65,9 +66,7 @@ export async function callOpenAiChat( } const prefix = - mode === "token_info" - ? "You are a blockchain and token analyst. Answer clearly and briefly.\n\n" - : ""; + getInputPrefixForMode(mode); try { const response = await client.responses.create({ @@ -126,9 +125,7 @@ export async function callOpenAiChatStream( } const prefix = - mode === "token_info" - ? "You are a blockchain and token analyst. Answer clearly and briefly.\n\n" - : ""; + getInputPrefixForMode(mode); let onAbort: (() => void) | null = null; try { diff --git a/app/bot/__tests__/start.test.ts b/app/bot/__tests__/start.test.ts new file mode 100644 index 0000000..fa91432 --- /dev/null +++ b/app/bot/__tests__/start.test.ts @@ -0,0 +1,37 @@ +import { test } from "node:test"; +import assert from "node:assert/strict"; +import { buildStartMessage, getBotVersion } from "../start.js"; + +test("getBotVersion prefers BOT_VERSION", () => { + const prev = process.env.BOT_VERSION; + process.env.BOT_VERSION = "123"; + try { + assert.equal(getBotVersion(), "123"); + } finally { + if (prev === undefined) delete process.env.BOT_VERSION; + else process.env.BOT_VERSION = prev; + } +}); + +test("getBotVersion shortens a SHA to 7 chars", () => { + const prev = process.env.BOT_VERSION; + process.env.BOT_VERSION = "0f7f1b5abcdef1234567890"; + try { + assert.equal(getBotVersion(), "0f7f1b5"); + } finally { + if (prev === undefined) delete process.env.BOT_VERSION; + else process.env.BOT_VERSION = prev; + } +}); + +test("buildStartMessage includes version tag", () => { + const prev = process.env.BOT_VERSION; + process.env.BOT_VERSION = "123"; + try { + assert.match(buildStartMessage(), /@HyperlinksSpaceBot v\.123/); + } finally { + if (prev === undefined) delete process.env.BOT_VERSION; + else process.env.BOT_VERSION = prev; + } +}); + diff --git a/app/bot/grammy.ts b/app/bot/grammy.ts index ecd4408..268391e 100644 --- a/app/bot/grammy.ts +++ b/app/bot/grammy.ts @@ -8,6 +8,7 @@ import { upsertUserFromBot, } from '../database/users.js'; import { handleBotAiResponse } from './responder.js'; +import { buildStartMessage } from './start.js'; export function createBot(token: string): Bot { const bot = new Bot(token); @@ -31,7 +32,7 @@ export function createBot(token: string): Bot { bot.command('start', async (ctx: Context) => { await handleUserUpsert(ctx); - await ctx.reply("That's @HyperlinksSpaceBot, you can use AI in bot and explore the app for more features"); + await ctx.reply(buildStartMessage()); }); bot.on('message:text', async (ctx: Context) => { diff --git a/app/bot/responder.ts b/app/bot/responder.ts index a854b3a..9696d63 100644 --- a/app/bot/responder.ts +++ b/app/bot/responder.ts @@ -1,6 +1,7 @@ import type { Context } from "grammy"; import { normalizeSymbol } from "../blockchain/coffee.js"; import { transmit, transmitStream } from "../ai/transmitter.js"; +import { TELEGRAM_BOT_LENGTH_INSTRUCTION } from "../ai/instructions.js"; import { normalizeUsername } from "../database/users.js"; import { getMaxTelegramUpdateIdForThread, insertMessage } from "../database/messages.js"; import { @@ -17,10 +18,6 @@ const MAX_LONG_MESSAGE_PARTS = 2; const TELEGRAM_TRUNCATION_NOTICE = "\n\n[Truncated in Telegram. Open the Mini App for the full response.]"; -/** Instruction passed to AI when the message comes from the bot: keep replies under 4096 chars and mention TMA for long answers. */ -const TELEGRAM_BOT_LENGTH_INSTRUCTION = - "Please give an answer in less than 4096 chars. If user asks for a long message or a message with more than 4096 chars add a sentence that full responses are available only in TMA and your bot you can give just a short answer that follows."; - /** Split text into chunks of at most maxLen, preferring to break at newlines. */ function chunkText(text: string, maxLen: number): string[] { if (text.length <= maxLen) return [text]; diff --git a/app/bot/start.ts b/app/bot/start.ts new file mode 100644 index 0000000..e2df996 --- /dev/null +++ b/app/bot/start.ts @@ -0,0 +1,22 @@ +function normalizeVersion(raw: string): string { + const v = raw.trim(); + if (!v) return "dev"; + if (/^[0-9a-f]{7,40}$/i.test(v)) return v.slice(0, 7); + return v; +} + +export function getBotVersion(): string { + const raw = + process.env.BOT_VERSION ?? + process.env.VERCEL_GIT_COMMIT_SHA ?? + process.env.GIT_COMMIT_SHA ?? + process.env.npm_package_version ?? + "dev"; + return normalizeVersion(raw); +} + +export function buildStartMessage(): string { + const version = getBotVersion(); + return `That's @HyperlinksSpaceBot v.${version}, you can use AI in bot and explore the app for more features`; +} + diff --git a/app/package.json b/app/package.json index 2dcc0f0..f40507d 100644 --- a/app/package.json +++ b/app/package.json @@ -24,6 +24,7 @@ "web": "expo start --web", "dev:vercel": "cross-env TS_NODE_PROJECT=api/tsconfig.json npx vercel dev", "lint": "expo lint", + "test:bot": "node --import tsx --test bot/**/*.test.ts", "draft": "npx eas-cli@latest workflow:run create-draft.yml", "development-builds": "npx eas-cli@latest workflow:run create-development-builds.yml", "deploy": "npx eas-cli@latest workflow:run deploy-to-production.yml", diff --git a/bot/ENV_VARIABLES.md b/bot/ENV_VARIABLES.md index df07855..2ff704b 100644 --- a/bot/ENV_VARIABLES.md +++ b/bot/ENV_VARIABLES.md @@ -38,6 +38,12 @@ - **Usage**: Used in `/start` command to create inline button linking to the app - **Note**: If not set, button will have `None?mode=fullscreen` as URL +### `BOT_VERSION` +- **Description**: Human-friendly version tag shown in the `/start` message (helps avoid mixing up deployments) +- **Required**: No +- **Example**: `123` (shown as `v.123`) +- **Usage**: Included in the `/start` message so deployers can verify the running version + ## Setup Instructions ### Local Development diff --git a/bot/README.md b/bot/README.md index 68ce33b..86ad1d8 100644 --- a/bot/README.md +++ b/bot/README.md @@ -22,6 +22,7 @@ Required: Optional: - `APP_URL` (used in `/start` button) +- `BOT_VERSION` (shown in `/start` message) Example: @@ -31,6 +32,7 @@ DATABASE_URL=postgresql://user:pass@host:5432/db AI_BACKEND_URL=http://127.0.0.1:8000 INNER_CALLS_KEY=change-me-shared-secret APP_URL=https://your-frontend-domain +BOT_VERSION=123 ``` ## Railway diff --git a/bot/bot.py b/bot/bot.py index 1b2969c..d0cf2be 100644 --- a/bot/bot.py +++ b/bot/bot.py @@ -142,6 +142,32 @@ def build_app_launch_url() -> str | None: return result +def resolve_bot_version() -> str: + """Resolve a human-friendly bot version string for /start and logs.""" + candidates = [ + "BOT_VERSION", + "RELEASE", + "RAILWAY_GIT_COMMIT_SHA", + "VERCEL_GIT_COMMIT_SHA", + "GIT_SHA", + "GIT_COMMIT", + "HEROKU_SLUG_COMMIT", + ] + for key in candidates: + value = (os.getenv(key) or "").strip() + if value: + return value + return "dev" + + +def build_start_message_text() -> str: + version = resolve_bot_version() + return ( + f"That's @HyperlinksSpaceBot (https://t.me/HyperlinksSpaceBot) v.{version}, " + "you can use AI in bot and explore the app for more features" + ) + + async def cancel_stream(chat_id: int, message_id: int) -> None: """Signal cancellation only. Do not await old task cleanup.""" key = (chat_id, message_id) @@ -812,7 +838,7 @@ async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> N async def hello(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle /start: always send a reply; use Run app button only when APP_URL is valid.""" app_launch_url = build_app_launch_url() - message_text = "That's @HyperlinksSpaceBot, you can use AI in bot and explore the app for more features" + message_text = build_start_message_text() if app_launch_url: try: diff --git a/bot/handler.ts b/bot/handler.ts index a82f7cf..1b64741 100644 --- a/bot/handler.ts +++ b/bot/handler.ts @@ -1,5 +1,6 @@ import { callOpenAi, getOpenAiGuardState, type ChatMessage } from "./openapi.js"; import { extractTickerFromText, fetchCoffeeContext } from "./coffee.js"; +import { buildVerifiedTokenContextSystemMessage } from "./instructions.js"; export type HandleChatInput = { messages: ChatMessage[]; @@ -51,14 +52,10 @@ function buildCoffeeOnlySummary(symbol: string, facts: string[]): string { function withContextMessages(messages: ChatMessage[], facts: string[], sourceUrls: string[]): ChatMessage[] { if (facts.length === 0) return messages; - - const contextLines = [ - "Use this verified token context if relevant to the user question.", - ...facts.map((fact) => `${fact}`), + return [ + { role: "system", content: buildVerifiedTokenContextSystemMessage(facts, sourceUrls) }, + ...messages, ]; - if (sourceUrls.length > 0) contextLines.push(`Sources: ${sourceUrls.join(", ")}`); - - return [{ role: "system", content: contextLines.join("\n") }, ...messages]; } export async function handleChat(input: HandleChatInput): Promise { @@ -141,4 +138,3 @@ export async function handleChat(input: HandleChatInput): Promise `${fact}`), + ]; + if (sourceUrls.length > 0) contextLines.push(`Sources: ${sourceUrls.join(", ")}`); + return contextLines.join("\n"); +} + diff --git a/bot/tests/test_start_message_version.py b/bot/tests/test_start_message_version.py new file mode 100644 index 0000000..aa6c713 --- /dev/null +++ b/bot/tests/test_start_message_version.py @@ -0,0 +1,34 @@ +import asyncio + +import pytest + +pytest.importorskip("telegram") +pytest.importorskip("aiohttp") + +from bot import bot as bot_module + + +class _DummyMessage: + def __init__(self): + self.calls = [] + + async def reply_text(self, text, reply_markup=None): + self.calls.append({"text": text, "reply_markup": reply_markup}) + + +class _DummyUpdate: + def __init__(self, message): + self.message = message + + +def test_start_message_includes_version(monkeypatch): + monkeypatch.setenv("BOT_VERSION", "123") + monkeypatch.setenv("APP_URL", "https://example.com") + + msg = _DummyMessage() + update = _DummyUpdate(msg) + + asyncio.run(bot_module.hello(update, None)) + assert msg.calls, "Expected /start to call reply_text" + assert "v.123" in msg.calls[0]["text"] + diff --git a/packages/bot/src/ai.ts b/packages/bot/src/ai.ts index e8f7198..fc6b824 100644 --- a/packages/bot/src/ai.ts +++ b/packages/bot/src/ai.ts @@ -1,6 +1,7 @@ import { detectRequestedLanguage, fallbackNarrative, hasGenericFallbackText } from "./fallback.js"; import type { LlmClient } from "./llm.js"; import { extractTickerFromText, RagContextBuilder } from "./rag.js"; +import { buildVerifiedTokenContextSystemMessage } from "./instructions.js"; import type { ChatMessage, GenerateAnswerInput, @@ -108,14 +109,10 @@ function withContextMessages( sourceUrls: string[] ): ChatMessage[] { if (contextBlocks.length === 0) return messages; - - const contextLines = [ - "Use this verified token context if relevant to the user question.", - ...contextBlocks, + return [ + { role: "system", content: buildVerifiedTokenContextSystemMessage(contextBlocks, sourceUrls) }, + ...messages, ]; - if (sourceUrls.length > 0) contextLines.push(`Sources: ${sourceUrls.join(", ")}`); - - return [{ role: "system", content: contextLines.join("\n") }, ...messages]; } function normalizeSymbol(value: string | undefined): string | undefined { @@ -123,4 +120,3 @@ function normalizeSymbol(value: string | undefined): string | undefined { const normalized = value.replace("$", "").trim().toUpperCase(); return normalized || undefined; } - diff --git a/packages/bot/src/instructions.ts b/packages/bot/src/instructions.ts new file mode 100644 index 0000000..80cae3f --- /dev/null +++ b/packages/bot/src/instructions.ts @@ -0,0 +1,15 @@ +export const VERIFIED_TOKEN_CONTEXT_SYSTEM_INSTRUCTION = + "Use this verified token context if relevant to the user question."; + +export function buildVerifiedTokenContextSystemMessage( + contextBlocks: string[], + sourceUrls: string[], +): string { + const contextLines = [ + VERIFIED_TOKEN_CONTEXT_SYSTEM_INSTRUCTION, + ...contextBlocks, + ]; + if (sourceUrls.length > 0) contextLines.push(`Sources: ${sourceUrls.join(", ")}`); + return contextLines.join("\n"); +} +