From 4421323510e3b59d14df1de88e363744adc9bba9 Mon Sep 17 00:00:00 2001 From: dankarization <26330178+dankarization@users.noreply.github.com> Date: Sat, 13 Jun 2026 20:28:12 +0400 Subject: [PATCH 1/3] fix getFile fallback on local 400 --- src/telegram-bot-api-proxy.mjs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/telegram-bot-api-proxy.mjs b/src/telegram-bot-api-proxy.mjs index 010a346..d041c7a 100755 --- a/src/telegram-bot-api-proxy.mjs +++ b/src/telegram-bot-api-proxy.mjs @@ -178,6 +178,14 @@ function isSafeMethodForStatusFallback(method) { return safeCloudFallbackMethods.has(method); } +// Cloud getUpdates может вернуть file_id, который local Bot API ещё не видел. +// В этом случае local getFile отвечает 400, но cloud API всё ещё может его разрешить. +function shouldRetryCloudAfterLocalStatus(method, statusCode) { + if (statusCode === 401 || statusCode === 404) return true; + if (method === "getFile" && statusCode === 400) return true; + return false; +} + // Ключ кэша размера файла привязан к botId, потому что file_path уникален в рамках бота. function fileInfoKey(token, filePath) { return `${botIdFromToken(token)}:${filePath}`; @@ -991,7 +999,7 @@ async function handleBuffered(req, res, method, token, startedAt) { log(`method=getUpdates target=local action=dropped-local-update dropped=${localGuard.dropped} floor=${localGuard.floor ?? "none"} ackOffset=${localGuard.ackOffset ?? "none"}`); await acknowledgeDroppedLocalUpdates(localReq, token, localBody, localGuard.ackOffset); } - if (cloudFallbackAllowed && (local.statusCode === 401 || local.statusCode === 404) && body.length <= bufferLimitBytes) { + if (cloudFallbackAllowed && shouldRetryCloudAfterLocalStatus(method, local.statusCode) && body.length <= bufferLimitBytes) { const cloudRequest = cloudRequestForGetUpdates(localReq, method, token, localBody); const cloudRaw = await forwardBuffered(req, cloudRoot, cloudRequest.body, cloudRequest.reqUrl); const cloudProcessed = processGetFileResult(method, token, cloudRaw, "cloud"); From 6e00a326f258bf0b8ed87546d9bac898ea0b1a93 Mon Sep 17 00:00:00 2001 From: dankarization <26330178+dankarization@users.noreply.github.com> Date: Sat, 13 Jun 2026 22:04:44 +0400 Subject: [PATCH 2/3] harden cloud getUpdates fallback --- src/telegram-bot-api-proxy.mjs | 152 +++++++++++++++++++++++++++++---- 1 file changed, 135 insertions(+), 17 deletions(-) diff --git a/src/telegram-bot-api-proxy.mjs b/src/telegram-bot-api-proxy.mjs index d041c7a..a2b6e90 100755 --- a/src/telegram-bot-api-proxy.mjs +++ b/src/telegram-bot-api-proxy.mjs @@ -47,8 +47,12 @@ const localGetUpdatesUpstreamTimeoutMs = parsePositiveInteger( ); // TTL проверки cloud pending updates, чтобы не дергать getWebhookInfo на каждом long poll. const cloudPendingProbeTtlMs = Number.parseInt(process.env.CLOUD_PENDING_PROBE_TTL_MS || "5000", 10); +// Минимальный возраст cloud pending backlog перед fallback, если local API здоров и просто пуст. +const cloudPendingFallbackDelayMs = Number.parseInt(process.env.CLOUD_PENDING_FALLBACK_DELAY_MS || "60000", 10); // Максимальный возраст cloud update, который можно виртуально поднять над local offset. const cloudFreshUpdateMaxAgeMs = Number.parseInt(process.env.CLOUD_FRESH_UPDATE_MAX_AGE_MS || String(6 * 60 * 60 * 1000), 10); +// Минимальная разница между OpenClaw offset и local update_id, при которой считаем, что id из разных пространств. +const localVirtualOffsetSkewMin = parsePositiveInteger(process.env.LOCAL_VIRTUAL_OFFSET_SKEW_MIN, 1000000); // Служебные методы local/cloud Bot API, которые не отправляют пользовательский контент. const localAdminMethods = new Set(["getMe", "getUpdates", "getWebhookInfo", "deleteWebhook"]); @@ -88,6 +92,8 @@ let localUnhealthyUntil = 0; let lastHealthLogState = ""; // Отдельный cloud cursor по каждому botId для безопасного fallback getUpdates. const cloudUpdateStateByBotId = new Map(); +// Перевод local update_id в виртуальную шкалу после cloud getUpdates fallback. +const localUpdateStateByBotId = new Map(); // Кэш file_path -> file_size из getFile, чтобы решать, можно ли фолбечить /file. const fileInfoByBotIdAndPath = new Map(); // Кэш pending_update_count из cloud getWebhookInfo по каждому botId. @@ -491,6 +497,27 @@ function cloudRequestForGetUpdates(req, method, token, body) { return { ...bodyWithOffset(req, body, cloudOffset), translated: true }; } +function localRequestForGetUpdates(req, method, token, body) { + if (method !== "getUpdates") return { req: requestWithUrl(req, req.url), body, translated: false }; + const botId = botIdFromToken(token); + const state = botId ? localUpdateStateByBotId.get(botId) : null; + if (!state || state.localFloor == null || state.virtualFloor == null) { + return { req: requestWithUrl(req, req.url), body, translated: false }; + } + + const requestedOffset = requestOffsetValue(req, body); + let localOffset = state.localFloor + 1; + if (requestedOffset != null && requestedOffset > state.virtualFloor) { + localOffset = state.localFloor + (requestedOffset - state.virtualFloor); + } + const translated = bodyWithOffset(req, body, localOffset); + return { + req: requestWithUrl(req, translated.reqUrl), + body: translated.body, + translated: true, + }; +} + function jsonCloudResponse(upstream, payload) { return { ...upstream, @@ -623,16 +650,88 @@ function emptySuccessfulGetUpdates(method, upstream) { } } +function shouldBridgeLocalUpdateIds(floor, localUpdateId, updates) { + if (floor == null || localUpdateId == null || floor <= localUpdateId) return false; + if (floor - localUpdateId < localVirtualOffsetSkewMin) return false; + return updates.some(isFreshCloudUpdate); +} + +function bridgeLocalUpdateIds(botId, localFloor, virtualFloor) { + if (!botId || localFloor == null || virtualFloor == null) return false; + const previous = localUpdateStateByBotId.get(botId); + if (previous && previous.localFloor === localFloor && previous.virtualFloor === virtualFloor) return false; + localUpdateStateByBotId.set(botId, { localFloor, virtualFloor }); + log(`method=getUpdates target=local action=bridge-local-update-ids localFloor=${localFloor} virtualFloor=${virtualFloor}`); + return true; +} + +function translateLocalUpdatesWithBridge(token, payload) { + const botId = botIdFromToken(token); + const state = botId ? localUpdateStateByBotId.get(botId) : null; + if (!state || state.localFloor == null || state.virtualFloor == null) return null; + + const result = []; + let dropped = 0; + let maxDroppedUpdateId = null; + let nextLocalFloor = state.localFloor; + let nextVirtualFloor = state.virtualFloor; + for (const update of payload.result) { + const localUpdateId = numericOffset(update?.update_id); + if (localUpdateId == null) { + dropped += 1; + continue; + } + if (localUpdateId <= state.localFloor) { + dropped += 1; + maxDroppedUpdateId = Math.max(maxDroppedUpdateId ?? localUpdateId, localUpdateId); + continue; + } + const virtualUpdateId = state.virtualFloor + (localUpdateId - state.localFloor); + result.push({ ...update, update_id: virtualUpdateId }); + nextLocalFloor = Math.max(nextLocalFloor, localUpdateId); + nextVirtualFloor = Math.max(nextVirtualFloor, virtualUpdateId); + } + + if (nextLocalFloor !== state.localFloor || nextVirtualFloor !== state.virtualFloor) { + localUpdateStateByBotId.set(botId, { localFloor: nextLocalFloor, virtualFloor: nextVirtualFloor }); + log(`method=getUpdates target=local action=virtualized-local-update-id count=${result.length} dropped=${dropped} localFloor=${nextLocalFloor} virtualFloor=${nextVirtualFloor}`); + } + + return { + result, + dropped, + floor: state.virtualFloor, + ackOffset: maxDroppedUpdateId == null ? null : maxDroppedUpdateId + 1, + translated: result.length > 0, + }; +} + // Отбрасываем local updates ниже сохраненного OpenClaw offset, чтобы Docker Bot API не оживлял старые сессии. function guardedLocalGetUpdates(req, method, token, body, upstream) { if (method !== "getUpdates" || upstream.statusCode !== 200 || !upstream.body?.length) { - return { upstream, dropped: 0, floor: null, ackOffset: null }; + return { upstream, dropped: 0, floor: null, ackOffset: null, translated: false, bridged: false }; } try { const payload = JSON.parse(upstream.body.toString("utf8")); - if (!payload?.ok || !Array.isArray(payload.result)) return { upstream, dropped: 0, floor: null, ackOffset: null }; + if (!payload?.ok || !Array.isArray(payload.result)) { + return { upstream, dropped: 0, floor: null, ackOffset: null, translated: false, bridged: false }; + } const floor = localOffsetFloor(req, token, body); - if (floor == null) return { upstream, dropped: 0, floor: null, ackOffset: null }; + if (floor == null) return { upstream, dropped: 0, floor: null, ackOffset: null, translated: false, bridged: false }; + + const translated = translateLocalUpdatesWithBridge(token, payload); + if (translated) { + return { + upstream: translated.translated ? jsonCloudResponse(upstream, { ...payload, result: translated.result }) : upstream, + dropped: translated.dropped, + floor: translated.floor, + ackOffset: translated.ackOffset, + translated: translated.translated, + bridged: false, + }; + } + + const botId = botIdFromToken(token); let maxDroppedUpdateId = null; const result = payload.result.filter((update) => { const updateId = numericOffset(update?.update_id); @@ -641,14 +740,19 @@ function guardedLocalGetUpdates(req, method, token, body, upstream) { return false; }); const dropped = payload.result.length - result.length; + const bridged = result.length === 0 && shouldBridgeLocalUpdateIds(floor, maxDroppedUpdateId, payload.result) + ? bridgeLocalUpdateIds(botId, maxDroppedUpdateId, floor) + : false; return { upstream: dropped > 0 ? jsonCloudResponse(upstream, { ...payload, result }) : upstream, dropped, floor, ackOffset: maxDroppedUpdateId == null ? null : maxDroppedUpdateId + 1, + translated: false, + bridged, }; } catch { - return { upstream, dropped: 0, floor: null, ackOffset: null }; + return { upstream, dropped: 0, floor: null, ackOffset: null, translated: false, bridged: false }; } } @@ -673,12 +777,13 @@ async function acknowledgeDroppedLocalUpdates(req, token, body, ackOffset) { // Для getUpdates fallback проверяем cloud backlog отдельно: local API может быть здоровым, но пустым. async function probeCloudPendingUpdates(token) { const botId = botIdFromToken(token); - if (!botId) return { pending: 0, cached: false }; + if (!botId) return { pending: 0, cached: false, pendingAgeMs: 0 }; const now = Date.now(); const cached = cloudPendingProbeByBotId.get(botId); if (cached && now - cached.checkedAt < cloudPendingProbeTtlMs) { - log(`method=getWebhookInfo target=cloud action=cloud-pending-probe pending=${cached.pending} cached=yes`); - return { pending: cached.pending, cached: true }; + const pendingAgeMs = cached.pending > 0 && cached.firstPendingAt ? now - cached.firstPendingAt : 0; + log(`method=getWebhookInfo target=cloud action=cloud-pending-probe pending=${cached.pending} cached=yes pendingAgeMs=${pendingAgeMs}`); + return { pending: cached.pending, cached: true, pendingAgeMs }; } const raw = await forwardBuffered( @@ -693,9 +798,13 @@ async function probeCloudPendingUpdates(token) { } catch { pending = 0; } - cloudPendingProbeByBotId.set(botId, { pending, checkedAt: now }); - log(`method=getWebhookInfo target=cloud action=cloud-pending-probe pending=${pending} cached=no`); - return { pending, cached: false }; + const firstPendingAt = pending > 0 + ? (cached?.pending > 0 && cached.firstPendingAt ? cached.firstPendingAt : now) + : null; + const pendingAgeMs = pending > 0 && firstPendingAt ? now - firstPendingAt : 0; + cloudPendingProbeByBotId.set(botId, { pending, checkedAt: now, firstPendingAt }); + log(`method=getWebhookInfo target=cloud action=cloud-pending-probe pending=${pending} cached=no pendingAgeMs=${pendingAgeMs}`); + return { pending, cached: false, pendingAgeMs }; } function errorChain(error) { @@ -881,12 +990,13 @@ async function forwardBuffered(req, root, body, reqUrl = req.url, options = {}) } } -async function forwardLocalBuffered(req, method, body) { +async function forwardLocalBuffered(req, method, token, body) { if (method !== "getUpdates") { return { upstream: await forwardBuffered(req, localRoot, body), req, body, attempts: 1, timeoutCapped: false, timeout: null }; } - const localRequest = applyGetUpdatesTimeoutCap(req, method, body); + const translatedRequest = localRequestForGetUpdates(req, method, token, body); + const localRequest = applyGetUpdatesTimeoutCap(translatedRequest.req, method, translatedRequest.body); let lastHttpUpstream = null; for (let attempt = 1; attempt <= localGetUpdatesMaxAttempts; attempt += 1) { try { @@ -906,6 +1016,7 @@ async function forwardLocalBuffered(req, method, body) { attempts: attempt, timeoutCapped: localRequest.capped, timeout: localRequest.timeout, + translatedOffset: translatedRequest.translated, }; } lastHttpUpstream = upstream; @@ -927,6 +1038,7 @@ async function forwardLocalBuffered(req, method, body) { attempts: localGetUpdatesMaxAttempts, timeoutCapped: localRequest.capped, timeout: localRequest.timeout, + translatedOffset: translatedRequest.translated, }; } @@ -988,7 +1100,7 @@ async function handleBuffered(req, res, method, token, startedAt) { } try { - const localAttempt = await forwardLocalBuffered(req, method, body); + const localAttempt = await forwardLocalBuffered(req, method, token, body); const localReq = localAttempt.req; const localBody = localAttempt.body; const localRaw = localAttempt.upstream; @@ -1019,7 +1131,7 @@ async function handleBuffered(req, res, method, token, startedAt) { log(`method=${method} target=cloud reason=local-${local.statusCode} localAttempts=${localAttempt.attempts} status=${cloud.statusCode} dropped=${cloudResult.dropped} floor=${cloudResult.floor ?? "none"} translated=${cloudRequest.translated || cloudResult.translated ? "yes" : "no"} ms=${Date.now() - startedAt}`); return; } - if (cloudFallbackAllowed && cloudGetUpdatesFallbackEnabled && emptySuccessfulGetUpdates(method, local)) { + if (cloudFallbackAllowed && cloudGetUpdatesFallbackEnabled && localGuard.dropped === 0 && emptySuccessfulGetUpdates(method, local)) { let pendingProbe = null; try { pendingProbe = await probeCloudPendingUpdates(token); @@ -1030,18 +1142,24 @@ async function handleBuffered(req, res, method, token, startedAt) { return; } if (pendingProbe.pending > 0) { + if (pendingProbe.pendingAgeMs < cloudPendingFallbackDelayMs) { + writeBufferedResponse(res, local); + log(`method=getUpdates target=cloud action=cloud-pending-deferred pending=${pendingProbe.pending} cached=${pendingProbe.cached ? "yes" : "no"} pendingAgeMs=${pendingProbe.pendingAgeMs} minAgeMs=${cloudPendingFallbackDelayMs}`); + log(`method=${method} target=local status=${local.statusCode}${localAttempt.attempts > 1 ? ` attempts=${localAttempt.attempts}` : ""}${localAttempt.timeoutCapped ? ` timeoutCapped=${localAttempt.timeout}` : ""} cloudProbe=deferred ms=${Date.now() - startedAt}`); + return; + } const cloudRequest = cloudRequestForGetUpdates(localReq, method, token, localBody); const cloudRaw = await forwardBuffered(req, cloudRoot, cloudRequest.body, cloudRequest.reqUrl); const cloudProcessed = processGetFileResult(method, token, cloudRaw, "cloud"); const cloudResult = guardedCloudGetUpdates(localReq, method, token, localBody, cloudProcessed, { virtualizeLowerIds: true }); const cloud = cloudResult.upstream; writeBufferedResponse(res, cloud); - log(`method=${method} target=cloud reason=local-empty-cloud-pending pending=${pendingProbe.pending} cached=${pendingProbe.cached ? "yes" : "no"} status=${cloud.statusCode} dropped=${cloudResult.dropped} floor=${cloudResult.floor ?? "none"} translated=${cloudRequest.translated || cloudResult.translated ? "yes" : "no"} ms=${Date.now() - startedAt}`); + log(`method=${method} target=cloud reason=local-empty-cloud-pending pending=${pendingProbe.pending} cached=${pendingProbe.cached ? "yes" : "no"} pendingAgeMs=${pendingProbe.pendingAgeMs} status=${cloud.statusCode} dropped=${cloudResult.dropped} floor=${cloudResult.floor ?? "none"} translated=${cloudRequest.translated || cloudResult.translated ? "yes" : "no"} ms=${Date.now() - startedAt}`); return; } } writeBufferedResponse(res, local); - log(`method=${method} target=local status=${local.statusCode}${localAttempt.attempts > 1 ? ` attempts=${localAttempt.attempts}` : ""}${localAttempt.timeoutCapped ? ` timeoutCapped=${localAttempt.timeout}` : ""}${localGuard.dropped ? ` dropped=${localGuard.dropped} floor=${localGuard.floor ?? "none"}` : ""} ms=${Date.now() - startedAt}`); + log(`method=${method} target=local status=${local.statusCode}${localAttempt.attempts > 1 ? ` attempts=${localAttempt.attempts}` : ""}${localAttempt.timeoutCapped ? ` timeoutCapped=${localAttempt.timeout}` : ""}${localAttempt.translatedOffset ? " translatedOffset=yes" : ""}${localGuard.dropped ? ` dropped=${localGuard.dropped} floor=${localGuard.floor ?? "none"}` : ""}${localGuard.translated ? " translatedLocal=yes" : ""}${localGuard.bridged ? " bridgedLocal=yes" : ""} ms=${Date.now() - startedAt}`); } catch (error) { if (cloudFallbackAllowed && isClearlyLocalUnavailable(error)) { markLocalUnhealthy(errorReason(error)); @@ -1117,7 +1235,7 @@ const server = http.createServer(async (req, res) => { // Запускаем listener только после полной инициализации правил fallback и in-memory state. server.listen(listenPort, listenHost, () => { - log(`listening=${listenHost}:${listenPort} local=${localRoot} cloud=${cloudRoot} cloudFallback=${cloudFallbackEnabled ? "enabled" : "disabled"} cloudGetUpdatesFallback=${cloudGetUpdatesFallbackEnabled ? "enabled" : "disabled"} localGetUpdatesTimeout=${localGetUpdatesTimeoutSeconds} localGetUpdatesAttempts=${localGetUpdatesMaxAttempts} cloudFileMaxBytes=${cloudFileFallbackMaxBytes}`); + log(`listening=${listenHost}:${listenPort} local=${localRoot} cloud=${cloudRoot} cloudFallback=${cloudFallbackEnabled ? "enabled" : "disabled"} cloudGetUpdatesFallback=${cloudGetUpdatesFallbackEnabled ? "enabled" : "disabled"} cloudPendingFallbackDelayMs=${cloudPendingFallbackDelayMs} localGetUpdatesTimeout=${localGetUpdatesTimeoutSeconds} localGetUpdatesAttempts=${localGetUpdatesMaxAttempts} localVirtualOffsetSkewMin=${localVirtualOffsetSkewMin} cloudFileMaxBytes=${cloudFileFallbackMaxBytes}`); }); // При остановке systemd закрываем listener штатно, но не зависаем дольше пяти секунд. From 9f7053f336a09d581f62f1f3aa18370a6f8f79cb Mon Sep 17 00:00:00 2001 From: dankarization <26330178+dankarization@users.noreply.github.com> Date: Mon, 15 Jun 2026 19:19:35 +0400 Subject: [PATCH 3/3] docs: document Telegram proxy fallback hardening --- CHANGELOG.md | 14 ++++++++++++++ README.md | 13 +++++++++++++ 2 files changed, 27 insertions(+) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..bb1396b --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,14 @@ +# Changelog + +## Unreleased + +- Fallback `getFile` to cloud when local Bot API returns `400`, covering cloud + fallback updates whose `file_id` is not known by the local API yet. +- Bridge/virtualize local `update_id` values after cloud `getUpdates` fallback + when local and cloud update spaces diverge. +- Defer cloud pending fallback for 60 seconds when local `getUpdates` is healthy + but temporarily empty. +- Log `pendingAgeMs`, `translatedLocal`, and `bridgedLocal` for fallback and + update-id translation diagnostics. +- Reduce the risk of selecting the wrong voice/media file caused by mixing + local and cloud update spaces. diff --git a/README.md b/README.md index d2c67de..b511151 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,9 @@ # OpenClaw Telegram Bot API Proxy Proxy для OpenClaw: локальный Telegram Bot API в приоритете, cloud Bot API только как аварийный fallback. +Он защищает pipeline от расхождения local/cloud Bot API `update_id`, чтобы +fallback не смешивал разные пространства updates и не ломал выбор voice/media +файлов. ## Схема @@ -23,12 +26,20 @@ OpenClaw Gateway превращает его в short polling. - Cloud fallback включается при ошибке local API после retry или когда local `getUpdates` пустой, но в cloud есть свежие pending updates. +- Если local API здоров и просто возвращает пустой `getUpdates`, cloud pending + fallback откладывается минимум на `CLOUD_PENDING_FALLBACK_DELAY_MS`: краткий + лаг local Bot API не должен сразу переводить pipeline в cloud. - `getUpdates` защищён от старых cloud updates: - proxy читает локальный OpenClaw offset; - ведёт отдельный cloud cursor; - при необходимости поднимает cloud `update_id` выше локального offset. +- После cloud fallback local `update_id` мостится в виртуальную шкалу над + OpenClaw offset, если local и cloud id явно разъехались. Это снижает риск + смешать local/cloud update spaces и выбрать не тот voice/media file. - Старые local updates ниже OpenClaw offset отбрасываются и подтверждаются в local Bot API, чтобы они не возвращались снова. +- `getFile` при local `400` повторяется через cloud: это покрывает file_id, + полученные из cloud fallback, которые локальный Bot API ещё не знает. - `/file/...` уходит в cloud только если размер известен из `getFile` и не больше `CLOUD_FILE_FALLBACK_MAX_BYTES`. - Файлы неизвестного размера и тяжёлые файлы остаются только на local API. @@ -82,7 +93,9 @@ systemd/openclaw-telegram-api-proxy.service.example | `LOCAL_GETUPDATES_RETRY_BASE_MS` | `300` | Базовая пауза между retry; растёт экспоненциально. | | `LOCAL_GETUPDATES_UPSTREAM_TIMEOUT_MS` | `15000` | Сетевой timeout одного local `getUpdates` запроса. | | `CLOUD_PENDING_PROBE_TTL_MS` | `5000` | TTL проверки cloud pending updates. | +| `CLOUD_PENDING_FALLBACK_DELAY_MS` | `60000` | Минимальный возраст cloud pending backlog перед fallback, когда local API здоров и возвращает пустой `getUpdates`. | | `CLOUD_FRESH_UPDATE_MAX_AGE_MS` | `21600000` | Максимальный возраст cloud update для виртуального подъёма id. | +| `LOCAL_VIRTUAL_OFFSET_SKEW_MIN` | `1000000` | Минимальный разрыв между OpenClaw offset и local `update_id`, при котором proxy считает id разными пространствами и мостит local updates в виртуальную шкалу. | ## OpenClaw