From d1448f35688cbca04d40865a12fea30d57f25310 Mon Sep 17 00:00:00 2001 From: Linares Date: Fri, 9 Jan 2026 21:51:32 +0300 Subject: [PATCH 1/2] fix docker and no displaying empty storages --- Dockerfile | 6 ++- pentaract/Cargo.toml | 19 ++++---- pentaract/src/common/telegram_api/bot_api.rs | 48 ++++++++++++++------ pentaract/src/repositories/access.rs | 22 +++++++++ pentaract/src/repositories/storages.rs | 21 +++++++-- pentaract/src/routers/files.rs | 17 ++++++- pentaract/src/routers/storages.rs | 4 ++ pentaract/src/services/storages.rs | 37 +++++++++++++-- ui/src/pages/Storages/StorageCreateForm.jsx | 11 +++-- ui/src/pages/Storages/index.jsx | 2 + 10 files changed, 150 insertions(+), 37 deletions(-) diff --git a/Dockerfile b/Dockerfile index d1cfb8f0..a08bb47a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,7 +10,11 @@ RUN cargo install cargo-chef WORKDIR /app FROM chef AS planner -COPY ./pentaract . +WORKDIR /app +COPY pentaract/Cargo.toml . +COPY pentaract/Cargo.lock . +COPY pentaract/src ./src + RUN cargo chef prepare --recipe-path recipe.json FROM chef AS builder diff --git a/pentaract/Cargo.toml b/pentaract/Cargo.toml index 13eed2df..56fd093b 100644 --- a/pentaract/Cargo.toml +++ b/pentaract/Cargo.toml @@ -3,23 +3,19 @@ name = "pentaract" version = "0.1.0" edition = "2021" -[profile.release] -strip = true -codegen-units = 1 - [dependencies] # routing -axum = { version = "0.6.20", features = ["headers", "tracing", "multipart"]} +axum = { version = "0.6.20", features = ["headers", "tracing", "multipart"] } mime_guess = "2.0.4" -tower = { version = "0.4.13", features = ["limit"], default-features = false} -tower-http = { version = "0.4.4", features = ["fs", "trace", "cors"], default_features = false } +tower = { version = "0.4.13", features = ["limit"], default-features = false } +tower-http = { version = "0.4.4", features = ["fs", "trace", "cors"], default-features = false } # serialization/deserialization serde = { version = "1.0.189", features = ["derive"] } # auth pwhash = "1.0.0" -jsonwebtoken = { version = "9", default-features = false } +jsonwebtoken = { version = "9", default-features = false, features = ["use_pem"] } # async tokio = { version = "1.33.0", features = ["full"] } @@ -28,10 +24,13 @@ futures = "0.3.29" # logging tracing = "0.1.40" -tracing-subscriber = { version = "0.3.17", features = ["env-filter"]} +tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } # others sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "postgres", "uuid"] } thiserror = "1.0.50" uuid = { version = "1.5.0", features = ["serde", "v4"] } -reqwest = { version = "0.11.22", features = ["multipart", "json"] } + +# HTTP client (без native-tls — ок для musl) +reqwest = { version = "0.11.22", default-features = false, features = ["multipart", "json", "rustls-tls"] } +percent-encoding = "2.3" diff --git a/pentaract/src/common/telegram_api/bot_api.rs b/pentaract/src/common/telegram_api/bot_api.rs index 793b3591..f6145a68 100644 --- a/pentaract/src/common/telegram_api/bot_api.rs +++ b/pentaract/src/common/telegram_api/bot_api.rs @@ -2,7 +2,8 @@ use reqwest::multipart; use uuid::Uuid; use crate::{ - common::types::ChatId, errors::PentaractResult, + common::types::ChatId, + errors::{PentaractError, PentaractResult}, services::storage_workers_scheduler::StorageWorkersScheduler, }; @@ -27,15 +28,19 @@ impl<'t> TelegramBotApi<'t> { chat_id: ChatId, storage_id: Uuid, ) -> PentaractResult { - let chat_id = { - // inserting 100 between minus sign and chat id - // cause telegram devs are complete retards and it works this way only - // - // https://stackoverflow.com/a/65965402/12255756 + tracing::debug!( + "[TELEGRAM API] Uploading chunk: chat_id={}, file_size={}", + chat_id, + file.len() + ); - let n = chat_id.abs().checked_ilog10().unwrap_or(0) + 1; - chat_id - (100 * ChatId::from(10).pow(n)) - }; + if chat_id < 0 && chat_id > -10000000000 { + tracing::info!( + "[TELEGRAM API] Using regular group (chat_id={}). If bot can't find the chat, \ + make sure the bot is added and has permissions.", + chat_id + ); + } let token = self.scheduler.get_token(storage_id).await?; let url = self.build_url("", "sendDocument", token); @@ -51,10 +56,27 @@ impl<'t> TelegramBotApi<'t> { .send() .await?; - match response.error_for_status() { - // https://stackoverflow.com/a/32679930/12255756 - Ok(r) => Ok(r.json::().await?.result.document), - Err(e) => Err(e.into()), + let status = response.status(); + if !status.is_success() { + let error_text = response.text().await.unwrap_or_else(|_| "Unable to read error body".to_string()); + tracing::error!( + "[TELEGRAM API] Upload failed: status={}, response={}", + status, + error_text + ); + return Err(PentaractError::TelegramAPIError(format!( + "Status {}: {}", + status, + error_text + ))); + } + + match response.json::().await { + Ok(body) => Ok(body.result.document), + Err(e) => { + tracing::error!("[TELEGRAM API] Failed to parse response: {}", e); + Err(e.into()) + } } } diff --git a/pentaract/src/repositories/access.rs b/pentaract/src/repositories/access.rs index 63dbfe04..af065046 100644 --- a/pentaract/src/repositories/access.rs +++ b/pentaract/src/repositories/access.rs @@ -24,6 +24,13 @@ impl<'d> AccessRepository<'d> { ) -> PentaractResult<()> { let id = Uuid::new_v4(); + tracing::debug!( + "[ACCESS REPO] Attempting to grant access: storage_id={}, user_email={}, access_type={:?}", + storage_id, + grant_access.user_email, + grant_access.access_type + ); + let result = sqlx::query( format!( " @@ -54,13 +61,28 @@ impl<'d> AccessRepository<'d> { } })?; + tracing::debug!( + "[ACCESS REPO] Query affected {} rows", + result.rows_affected() + ); + if result.rows_affected() == 0 { + tracing::error!( + "[ACCESS REPO] User with email \"{}\" not found in users table", + grant_access.user_email + ); return Err(PentaractError::DoesNotExist(format!( "user with email \"{}\"", grant_access.user_email ))); } + tracing::debug!( + "[ACCESS REPO] Successfully granted access to user {} for storage {}", + grant_access.user_email, + storage_id + ); + Ok(()) } diff --git a/pentaract/src/repositories/storages.rs b/pentaract/src/repositories/storages.rs index c2161715..dbdf5996 100644 --- a/pentaract/src/repositories/storages.rs +++ b/pentaract/src/repositories/storages.rs @@ -46,14 +46,19 @@ impl<'d> StoragesRepository<'d> { } pub async fn list_by_user_id(&self, user_id: Uuid) -> PentaractResult> { - sqlx::query_as( + tracing::debug!( + "[STORAGES REPO] Fetching storages for user_id={}", + user_id + ); + + let result = sqlx::query_as( format!( " SELECT s.*, COUNT(f.id) AS files_amount, COALESCE(SUM(f.size), 0)::BigInt as size FROM {TABLE} s JOIN {ACCESS_TABLE} a ON s.id = a.storage_id - LEFT JOIN {FILES_TABLE} f ON s.id = f.storage_id - WHERE a.user_id = $1 AND (f.path NOT LIKE '%/' OR f.path IS NULL) + LEFT JOIN {FILES_TABLE} f ON s.id = f.storage_id AND f.path NOT LIKE '%/' + WHERE a.user_id = $1 GROUP by s.id " ) @@ -62,7 +67,15 @@ impl<'d> StoragesRepository<'d> { .bind(user_id) .fetch_all(self.db) .await - .map_err(|e| map_not_found(e, "storages")) + .map_err(|e| map_not_found(e, "storages"))?; + + tracing::debug!( + "[STORAGES REPO] Found {} storages for user_id={}", + result.len(), + user_id + ); + + Ok(result) } pub async fn get_by_id(&self, id: Uuid) -> PentaractResult { diff --git a/pentaract/src/routers/files.rs b/pentaract/src/routers/files.rs index 07d88150..442e6fce 100644 --- a/pentaract/src/routers/files.rs +++ b/pentaract/src/routers/files.rs @@ -9,6 +9,7 @@ use axum::{ routing::{get, post}, Extension, Json, Router, }; +use percent_encoding::percent_decode_str; use reqwest::header; use tokio_util::bytes::Bytes; use uuid::Uuid; @@ -100,7 +101,13 @@ impl FilesRouter { file = Some(data); filename = Some(field_filename); } - "path" => path = Some(String::from_utf8(data.to_vec()).unwrap()), + "path" => { + let raw_path = String::from_utf8(data.to_vec()).unwrap(); + let decoded = percent_decode_str(&raw_path) + .decode_utf8() + .unwrap_or(std::borrow::Cow::Borrowed(&raw_path)); + path = Some(decoded.to_string()); + } // don't give a fuck about other fields _ => (), } @@ -143,7 +150,13 @@ impl FilesRouter { .get("path") .map(|path| String::from_utf8(path.to_vec()).map_err(|_| "Path cannot be parsed")) .unwrap_or(Err("Path is required")) - .map_err(|e| (StatusCode::BAD_REQUEST, e.to_owned()))?; + .map_err(|e| (StatusCode::BAD_REQUEST, e.to_owned())) + .map(|raw_path| { + percent_decode_str(&raw_path) + .decode_utf8() + .unwrap_or(std::borrow::Cow::Borrowed(&raw_path)) + .to_string() + })?; let file = body_parts .get("file") diff --git a/pentaract/src/routers/storages.rs b/pentaract/src/routers/storages.rs index 7d12b6fc..3ef25992 100644 --- a/pentaract/src/routers/storages.rs +++ b/pentaract/src/routers/storages.rs @@ -66,6 +66,10 @@ impl StoragesRouter { .list(&user) .await .map(|s| StoragesListSchema::new(s))?; + tracing::debug!( + "[STORAGES ROUTER] Returning {} storages to client", + storages.storages.len() + ); Ok::<_, (StatusCode, String)>(Json(storages)) } diff --git a/pentaract/src/services/storages.rs b/pentaract/src/services/storages.rs index bbcca3b5..7bd84c32 100644 --- a/pentaract/src/services/storages.rs +++ b/pentaract/src/services/storages.rs @@ -44,6 +44,13 @@ impl<'d> StoragesService<'d> { // creating storage let in_model = InStorage::new(in_schema.name, in_schema.chat_id); let storage = self.repo.create(in_model).await?; + + tracing::debug!( + "[STORAGES SERVICE] Created storage id={}, name={}, chat_id={}", + storage.id, + storage.name, + storage.chat_id + ); // setting user as the storage admin let access_schema = GrantAccess::new(user.email.clone(), AccessType::A); @@ -51,15 +58,37 @@ impl<'d> StoragesService<'d> { .access_repo .create_or_update(storage.id, access_schema) .await; - if result.is_err() { - // fallback - self.repo.delete_storage(storage.id).await? + + match &result { + Ok(_) => { + tracing::debug!( + "[STORAGES SERVICE] Successfully granted access to user {} for storage {}", + user.email, + storage.id + ); + } + Err(e) => { + tracing::error!( + "[STORAGES SERVICE] Failed to grant access to user {} for storage {}: {:?}. Rolling back storage creation.", + user.email, + storage.id, + e + ); + // fallback + let _ = self.repo.delete_storage(storage.id).await; + } } result.map(|_| storage) } pub async fn list(&self, user: &AuthUser) -> PentaractResult> { - self.repo.list_by_user_id(user.id).await + let storages = self.repo.list_by_user_id(user.id).await?; + tracing::debug!( + "[STORAGES SERVICE] Listed {} storages for user_id={}", + storages.len(), + user.id + ); + Ok(storages) } pub async fn get(&self, id: Uuid, user: &AuthUser) -> PentaractResult { diff --git a/ui/src/pages/Storages/StorageCreateForm.jsx b/ui/src/pages/Storages/StorageCreateForm.jsx index 4ad1f6a3..4752695c 100644 --- a/ui/src/pages/Storages/StorageCreateForm.jsx +++ b/ui/src/pages/Storages/StorageCreateForm.jsx @@ -48,10 +48,12 @@ const StorageCreateForm = () => { let err = null if (value > 0) { - err = 'Chat id must be a valid negative integer' + err = 'Chat id must be a negative integer' } else if (value === '') { - err = 'Chat id is required and must be a valid negative integer' + err = 'Chat id is required' } + // No additional validation - accept any negative number + // Both regular groups (-XXXXXXXXX) and supergroups (-100XXXXXXXXXX) are valid setChatIdErr(err) } @@ -108,7 +110,10 @@ const StorageCreateForm = () => { type="number" variant="standard" onChange={validateChatId} - helperText={chatIdErr} + helperText={ + chatIdErr() || + 'Get chat ID via @userinfobot or @getidsbot. Use the ID exactly as provided.' + } error={typeof chatIdErr() === 'string'} fullWidth required diff --git a/ui/src/pages/Storages/index.jsx b/ui/src/pages/Storages/index.jsx index c3980524..2b43169c 100644 --- a/ui/src/pages/Storages/index.jsx +++ b/ui/src/pages/Storages/index.jsx @@ -24,6 +24,8 @@ const Storages = () => { onMount(async () => { const storagesSchema = await API.storages.listStorages() + console.log('[STORAGES] Received from API:', storagesSchema) + console.log('[STORAGES] Number of storages:', storagesSchema.storages?.length || 0) setStorages(storagesSchema.storages) }) From 5776d45b0643aec8bbaa3cdbe712916e12318199 Mon Sep 17 00:00:00 2001 From: midden2004 Date: Tue, 13 Jan 2026 02:33:58 +0200 Subject: [PATCH 2/2] feat: local telegram-bot-api + streaming chunked transfers --- .env.example | 20 +- README.md | 15 + docker-compose.yml | 25 +- pentaract/Cargo.toml | 5 +- pentaract/src/common/channels.rs | 4 +- pentaract/src/common/telegram_api/bot_api.rs | 169 ++++++++++- pentaract/src/config.rs | 30 +- pentaract/src/main.rs | 5 + pentaract/src/routers/files.rs | 293 +++++++++++++------ pentaract/src/schemas/files.rs | 13 +- pentaract/src/services/files.rs | 35 ++- pentaract/src/services/storage_manager.rs | 68 +++-- pentaract/src/storage_manager.rs | 2 + 13 files changed, 543 insertions(+), 141 deletions(-) diff --git a/.env.example b/.env.example index 6aa1e07a..3f15137e 100644 --- a/.env.example +++ b/.env.example @@ -6,7 +6,25 @@ SUPERUSER_PASS=pentaract ACCESS_TOKEN_EXPIRE_IN_SECS=1800 REFRESH_TOKEN_EXPIRE_IN_DAYS=14 SECRET_KEY=XXX -TELEGRAM_API_BASE_URL=https://api.telegram.org + +# --- Telegram integration --- +# If TELEGRAM_LOCAL_API=true, the server will talk to the Local Telegram Bot API +# (telegram-bot-api container) and can upload/download much larger chunks. +TELEGRAM_LOCAL_API=true +TELEGRAM_API_BASE_URL=http://telegram-bot-api:8081 + +# Requests per minute per bot (scheduler enforces this via DB) +TELEGRAM_RATE_LIMIT=18 + +# Chunk size (MB). For local Bot API you can go up to ~1950. +TELEGRAM_CHUNK_SIZE_MB=1950 + +# Where Pentaract spools uploads before sending to Telegram +WORK_DIR=/work + +# Needed ONLY for the telegram-bot-api container +TELEGRAM_API_ID=00000000 +TELEGRAM_API_HASH=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx DATABASE_USER=pentaract DATABASE_PASSWORD=pentaract diff --git a/README.md b/README.md index ffa1153a..f63f119d 100644 --- a/README.md +++ b/README.md @@ -260,6 +260,21 @@ Currently Telegram API limits file download to 20 MB, hence we can't upload file Pentaract divides uploaded files into chunks and save them to Telegram separately and on downloading a file it fetches all the file chunks from the Telegram API and combine them into one in the order it was divided in. That grants ability to upload and download files with almost unlimited size (it's like you've ever downloaded a file with size >10 GB). +#### Using Local Telegram Bot API (recommended for large files) + +If you run Telegram's **Local Bot API server** (aka `telegram-bot-api` with `--local`), Telegram allows much larger uploads (up to ~2GB per document). + +This repo now supports running it automatically via `docker-compose.yml`. + +Set these variables in `.env`: + +- `TELEGRAM_LOCAL_API=true` +- `TELEGRAM_API_BASE_URL=http://telegram-bot-api:8081` +- `TELEGRAM_API_ID` and `TELEGRAM_API_HASH` (your Telegram app credentials) +- `TELEGRAM_CHUNK_SIZE_MB=1950` (safe default under the 2GB limit) + +With Local Bot API enabled, Pentaract will chunk and stream uploads/downloads so big files do not require loading everything into RAM. + ## Current in storage features - [x] Upload file diff --git a/docker-compose.yml b/docker-compose.yml index abcf614e..c1bc1adb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,8 +1,10 @@ -version: "3.9" - volumes: pentaract-db-volume: name: pentaract-db-volume + pentaract-work-volume: + name: pentaract-work-volume + telegram-bot-api-data: + name: telegram-bot-api-data networks: pentaract-network: @@ -20,6 +22,25 @@ services: restart: unless-stopped depends_on: - db + - telegram-bot-api + volumes: + - pentaract-work-volume:/work + - telegram-bot-api-data:/var/lib/telegram-bot-api:ro + networks: + - pentaract-network + + telegram-bot-api: + container_name: telegram-bot-api + image: aiogram/telegram-bot-api:latest + environment: + TELEGRAM_API_ID: ${TELEGRAM_API_ID} + TELEGRAM_API_HASH: ${TELEGRAM_API_HASH} + TELEGRAM_LOCAL: 1 + restart: unless-stopped + ports: + - ${TELEGRAM_BOT_API_PORT:-8081}:8081 + volumes: + - telegram-bot-api-data:/var/lib/telegram-bot-api networks: - pentaract-network diff --git a/pentaract/Cargo.toml b/pentaract/Cargo.toml index 56fd093b..7dc5e980 100644 --- a/pentaract/Cargo.toml +++ b/pentaract/Cargo.toml @@ -19,8 +19,9 @@ jsonwebtoken = { version = "9", default-features = false, features = ["use_pem"] # async tokio = { version = "1.33.0", features = ["full"] } -tokio-util = "0.7.10" +tokio-util = { version = "0.7.10", features = ["io"] } futures = "0.3.29" +async-stream = "0.3.5" # logging tracing = "0.1.40" @@ -32,5 +33,5 @@ thiserror = "1.0.50" uuid = { version = "1.5.0", features = ["serde", "v4"] } # HTTP client (без native-tls — ок для musl) -reqwest = { version = "0.11.22", default-features = false, features = ["multipart", "json", "rustls-tls"] } +reqwest = { version = "0.11.22", default-features = false, features = ["multipart", "json", "stream", "rustls-tls"] } percent-encoding = "2.3" diff --git a/pentaract/src/common/channels.rs b/pentaract/src/common/channels.rs index fe5c4871..5414dec0 100644 --- a/pentaract/src/common/channels.rs +++ b/pentaract/src/common/channels.rs @@ -1,4 +1,5 @@ use tokio::sync::{mpsc, oneshot}; +use std::path::PathBuf; use uuid::Uuid; use crate::errors::PentaractResult; @@ -20,7 +21,8 @@ pub enum ClientData { pub struct UploadFileData { pub file_id: Uuid, pub user_id: Uuid, - pub file_data: Box<[u8]>, + pub file_path: PathBuf, + pub file_size: i64, } pub struct DownloadFileData { diff --git a/pentaract/src/common/telegram_api/bot_api.rs b/pentaract/src/common/telegram_api/bot_api.rs index f6145a68..339cc9db 100644 --- a/pentaract/src/common/telegram_api/bot_api.rs +++ b/pentaract/src/common/telegram_api/bot_api.rs @@ -1,4 +1,10 @@ +use std::{path::Path, pin::Pin}; + +use futures::{Stream, StreamExt}; use reqwest::multipart; +use tokio::io::AsyncReadExt; +use tokio::io::{AsyncSeekExt, SeekFrom}; +use tokio_util::io::ReaderStream; use uuid::Uuid; use crate::{ @@ -80,6 +86,76 @@ impl<'t> TelegramBotApi<'t> { } } + /// Upload a part of a file from disk without buffering it fully in RAM. + /// + /// `offset` and `len` define the slice of the file to upload. + pub async fn upload_file_part( + &self, + file_path: &Path, + offset: u64, + len: u64, + chat_id: ChatId, + storage_id: Uuid, + ) -> PentaractResult { + tracing::debug!( + "[TELEGRAM API] Uploading chunk from disk: path={:?}, offset={}, len={}, chat_id={}", + file_path, + offset, + len, + chat_id + ); + + let token = self.scheduler.get_token(storage_id).await?; + let url = self.build_url("", "sendDocument", token); + + let mut file = tokio::fs::File::open(file_path).await.map_err(|_| PentaractError::Unknown)?; + file.seek(SeekFrom::Start(offset)) + .await + .map_err(|_| PentaractError::Unknown)?; + let reader = file.take(len); + let stream = ReaderStream::new(reader); + let body = reqwest::Body::wrap_stream(stream); + + let part = multipart::Part::stream_with_length(body, len) + .file_name("pentaract_chunk.bin"); + + let form = multipart::Form::new() + .text("chat_id", chat_id.to_string()) + .part("document", part); + + let response = reqwest::Client::new() + .post(url) + .multipart(form) + .send() + .await?; + + let status = response.status(); + if !status.is_success() { + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unable to read error body".to_string()); + tracing::error!( + "[TELEGRAM API] Upload failed: status={}, response={}", + status, + error_text + ); + return Err(PentaractError::TelegramAPIError(format!( + "Status {}: {}", + status, error_text + ))); + } + + response + .json::() + .await + .map(|body| body.result.document) + .map_err(|e| { + tracing::error!("[TELEGRAM API] Failed to parse response: {}", e); + e.into() + }) + } + pub async fn download( &self, telegram_file_id: &str, @@ -94,14 +170,40 @@ impl<'t> TelegramBotApi<'t> { .query(&[("file_id", telegram_file_id)]) .send() .await? + .error_for_status()? .json() .await?; + // Local Bot API returns an absolute filesystem path. + if body.result.file_path.starts_with('/') { + // Security: only allow reading from the expected local-bot-api directory. + if !body + .result + .file_path + .starts_with("/var/lib/telegram-bot-api/") + { + return Err(PentaractError::TelegramAPIError( + "Unexpected local file_path from telegram-bot-api".to_string(), + )); + } + + let bytes = tokio::fs::read(&body.result.file_path).await.map_err(|e| { + PentaractError::TelegramAPIError(format!( + "Failed to read local bot api file: {}", + e + )) + })?; + return Ok(bytes); + } + // downloading the file itself let token = self.scheduler.get_token(storage_id).await?; let url = self.build_url("file/", &body.result.file_path, token); - let file = reqwest::get(url) + let file = reqwest::Client::new() + .get(url) + .send() .await? + .error_for_status()? .bytes() .await .map(|file| file.to_vec())?; @@ -109,6 +211,71 @@ impl<'t> TelegramBotApi<'t> { Ok(file) } + /// Download file bytes as a stream (does not buffer whole chunk in RAM). + pub async fn download_stream( + &self, + telegram_file_id: &str, + storage_id: Uuid, + ) -> PentaractResult> + Send>>> { + // getting file path + let token = self.scheduler.get_token(storage_id).await?; + let url = self.build_url("", "getFile", token); + + let body: DownloadBodySchema = reqwest::Client::new() + .get(url) + .query(&[("file_id", telegram_file_id)]) + .send() + .await? + .error_for_status()? + .json() + .await?; + + // Local Bot API returns an absolute filesystem path. + if body.result.file_path.starts_with('/') { + if !body + .result + .file_path + .starts_with("/var/lib/telegram-bot-api/") + { + return Err(PentaractError::TelegramAPIError( + "Unexpected local file_path from telegram-bot-api".to_string(), + )); + } + + let file = tokio::fs::File::open(&body.result.file_path).await.map_err(|e| { + PentaractError::TelegramAPIError(format!( + "Failed to open local bot api file: {}", + e + )) + })?; + let stream = ReaderStream::new(file).map(|res| { + res.map_err(|e| { + PentaractError::TelegramAPIError(format!( + "Failed to read local bot api file: {}", + e + )) + }) + }); + return Ok(Box::pin(stream)); + } + + // downloading the file itself + let token = self.scheduler.get_token(storage_id).await?; + let url = self.build_url("file/", &body.result.file_path, token); + + let response = reqwest::Client::new() + .get(url) + .send() + .await? + .error_for_status()?; + + let stream = response + .bytes_stream() + .map(|res| res.map_err(PentaractError::from)); + + Ok(Box::pin(stream)) + } + /// Taking token by a value to force dropping it so it can be used only once #[inline] fn build_url(&self, pre: &str, relative: &str, token: String) -> String { diff --git a/pentaract/src/config.rs b/pentaract/src/config.rs index 717d8cdb..ee6cd120 100644 --- a/pentaract/src/config.rs +++ b/pentaract/src/config.rs @@ -19,6 +19,15 @@ pub struct Config { pub telegram_api_base_url: String, pub telegram_rate_limit: u8, + + /// Where to spool uploads and other temporary data. + pub work_dir: String, + + /// Max size of a single Telegram document chunk. + /// + /// - Official Bot API has a practical 20MB download limitation via `getFile`. + /// - Local Bot API can handle up to ~2GB per upload, so chunk size can be much larger. + pub telegram_chunk_size_mb: u32, } impl Config { @@ -40,9 +49,26 @@ impl Config { let access_token_expire_in_secs = Self::get_env_var("ACCESS_TOKEN_EXPIRE_IN_SECS")?; let refresh_token_expire_in_days = Self::get_env_var("REFRESH_TOKEN_EXPIRE_IN_DAYS")?; let secret_key = Self::get_env_var("SECRET_KEY")?; - let telegram_api_base_url = Self::get_env_var("TELEGRAM_API_BASE_URL")?; + let telegram_local_api: bool = + Self::get_env_var_with_default("TELEGRAM_LOCAL_API", false)?; + let telegram_api_base_url: String = if telegram_local_api { + Self::get_env_var_with_default("TELEGRAM_API_BASE_URL", "http://127.0.0.1:8081".to_owned())? + } else { + Self::get_env_var_with_default("TELEGRAM_API_BASE_URL", "https://api.telegram.org".to_owned())? + }; let telegram_rate_limit = Self::get_env_var_with_default("TELEGRAM_RATE_LIMIT", 18)?; + let work_dir = Self::get_env_var_with_default("WORK_DIR", "work".to_owned())?; + + let default_chunk_mb = if telegram_api_base_url.contains("api.telegram.org") { + 20 + } else { + // stay under the 2GB limit with some headroom + 1950 + }; + let telegram_chunk_size_mb = + Self::get_env_var_with_default("TELEGRAM_CHUNK_SIZE_MB", default_chunk_mb)?; + Ok(Self { db_uri, db_uri_without_dbname, @@ -57,6 +83,8 @@ impl Config { secret_key, telegram_api_base_url, telegram_rate_limit, + work_dir, + telegram_chunk_size_mb, }) } diff --git a/pentaract/src/main.rs b/pentaract/src/main.rs index 3af8615a..4202531a 100644 --- a/pentaract/src/main.rs +++ b/pentaract/src/main.rs @@ -30,6 +30,11 @@ mod storage_manager; async fn main() { let config = Config::new().unwrap(); + // Make sure filesystem work dir exists early. + tokio::fs::create_dir_all(&config.work_dir) + .await + .expect("failed to create WORK_DIR"); + tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { diff --git a/pentaract/src/routers/files.rs b/pentaract/src/routers/files.rs index 442e6fce..15c5cc04 100644 --- a/pentaract/src/routers/files.rs +++ b/pentaract/src/routers/files.rs @@ -1,7 +1,7 @@ -use std::{collections::HashMap, path::Path, sync::Arc}; +use std::{io, path::Path, pin::Pin, sync::Arc}; use axum::{ - body::Full, + body::{Bytes, StreamBody}, extract::{DefaultBodyLimit, Multipart, Path as RoutePath, Query, State}, http::StatusCode, middleware, @@ -9,22 +9,27 @@ use axum::{ routing::{get, post}, Extension, Json, Router, }; +use async_stream::try_stream; +use futures::{Stream, StreamExt}; use percent_encoding::percent_decode_str; use reqwest::header; -use tokio_util::bytes::Bytes; +use tokio::io::AsyncWriteExt; use uuid::Uuid; use crate::{ common::{ + access::check_access, jwt_manager::AuthUser, routing::{app_state::AppState, middlewares::auth::logged_in_required}, + telegram_api::bot_api::TelegramBotApi, }, errors::{PentaractError, PentaractResult}, + models::access::AccessType, models::files::InFile, - schemas::files::{ - InFileSchema, InFolderSchema, SearchQuery, UploadParams, IN_FILE_SCHEMA_FIELDS_AMOUNT, - }, + repositories::{access::AccessRepository, files::FilesRepository}, + schemas::files::{InFileSchema, InFolderSchema, SearchQuery, UploadParams}, services::files::FilesService, + services::storage_workers_scheduler::StorageWorkersScheduler, }; pub struct FilesRouter; @@ -86,45 +91,74 @@ impl FilesRouter { RoutePath(storage_id): RoutePath, mut multipart: Multipart, ) -> Result { - // parsing - let (file, path) = { - let (mut file, mut filename, mut path) = (None, None, None); - - // parsing - while let Some(field) = multipart.next_field().await.unwrap() { - let name = field.name().unwrap().to_owned(); - let field_filename = field.file_name().unwrap_or("unnamed").to_owned(); - let data = field.bytes().await.unwrap(); - - match name.as_str() { - "file" => { - file = Some(data); - filename = Some(field_filename); - } - "path" => { - let raw_path = String::from_utf8(data.to_vec()).unwrap(); - let decoded = percent_decode_str(&raw_path) - .decode_utf8() - .unwrap_or(std::borrow::Cow::Borrowed(&raw_path)); - path = Some(decoded.to_string()); + // stream multipart to disk + let upload_dir = Path::new(&state.config.work_dir).join("uploads"); + tokio::fs::create_dir_all(&upload_dir) + .await + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Can't create upload dir".to_owned()))?; + + let tmp_path = upload_dir.join(format!("{}.upload", Uuid::new_v4())); + let mut tmp_file = tokio::fs::File::create(&tmp_path) + .await + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Can't create temp file".to_owned()))?; + + let (mut filename, mut parent_path, mut file_size) = (None::, None::, 0i64); + + while let Some(mut field) = multipart + .next_field() + .await + .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid multipart".to_owned()))? + { + let name = field.name().unwrap_or("").to_owned(); + + match name.as_str() { + "file" => { + filename = Some(field.file_name().unwrap_or("unnamed").to_owned()); + while let Some(chunk) = field + .chunk() + .await + .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid file stream".to_owned()))? + { + file_size += chunk.len() as i64; + tmp_file + .write_all(&chunk) + .await + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Can't write temp file".to_owned()))?; } - // don't give a fuck about other fields - _ => (), } + "path" => { + let raw_path = field + .text() + .await + .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid path".to_owned()))?; + let decoded = percent_decode_str(&raw_path) + .decode_utf8() + .unwrap_or(std::borrow::Cow::Borrowed(&raw_path)); + parent_path = Some(decoded.to_string()); + } + _ => (), } + } - let file = file.ok_or((StatusCode::BAD_REQUEST, "file file is required".to_owned()))?; - let path = path - .ok_or((StatusCode::BAD_REQUEST, "path file is required".to_owned())) - .map(|path| Self::construct_path(&path, &filename.unwrap()))??; - (file, path) - }; - let size = file.len() as i64; - let in_file = InFile::new(path, size, storage_id); + tmp_file + .flush() + .await + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Can't flush temp file".to_owned()))?; - FilesService::new(&state.db, state.tx.clone()) - .upload_anyway(in_file, file, &user) - .await?; + let parent_path = parent_path.ok_or((StatusCode::BAD_REQUEST, "path field is required".to_owned()))?; + let filename = filename.ok_or((StatusCode::BAD_REQUEST, "file field is required".to_owned()))?; + let path = Self::construct_path(&parent_path, &filename)?; + + let in_file = InFile::new(path, file_size, storage_id); + + let result = FilesService::new(&state.db, state.tx.clone()) + .upload_anyway_from_path(in_file, tmp_path.clone(), file_size, &user) + .await; + + if let Err(e) = result { + let _ = tokio::fs::remove_file(&tmp_path).await; + return Err(<(StatusCode, String)>::from(e)); + } Ok(StatusCode::CREATED) } @@ -134,41 +168,68 @@ impl FilesRouter { RoutePath(storage_id): RoutePath, mut multipart: Multipart, ) -> Result { - // parsing and validating schema - let in_schema = { - let mut body_parts = HashMap::with_capacity(IN_FILE_SCHEMA_FIELDS_AMOUNT); - - // parsing - while let Some(field) = multipart.next_field().await.unwrap() { - let name = field.name().unwrap().to_string(); - let data = field.bytes().await.unwrap(); - body_parts.insert(name, data); - } + let upload_dir = Path::new(&state.config.work_dir).join("uploads"); + tokio::fs::create_dir_all(&upload_dir) + .await + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Can't create upload dir".to_owned()))?; - // validating - let path = body_parts - .get("path") - .map(|path| String::from_utf8(path.to_vec()).map_err(|_| "Path cannot be parsed")) - .unwrap_or(Err("Path is required")) - .map_err(|e| (StatusCode::BAD_REQUEST, e.to_owned())) - .map(|raw_path| { - percent_decode_str(&raw_path) + let tmp_path = upload_dir.join(format!("{}.upload", Uuid::new_v4())); + let mut tmp_file = tokio::fs::File::create(&tmp_path) + .await + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Can't create temp file".to_owned()))?; + + let (mut path, mut file_size) = (None::, 0i64); + + while let Some(mut field) = multipart + .next_field() + .await + .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid multipart".to_owned()))? + { + let name = field.name().unwrap_or("").to_owned(); + match name.as_str() { + "path" => { + let raw_path = field + .text() + .await + .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid path".to_owned()))?; + let decoded = percent_decode_str(&raw_path) .decode_utf8() - .unwrap_or(std::borrow::Cow::Borrowed(&raw_path)) - .to_string() - })?; + .unwrap_or(std::borrow::Cow::Borrowed(&raw_path)); + path = Some(decoded.to_string()); + } + "file" => { + while let Some(chunk) = field + .chunk() + .await + .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid file stream".to_owned()))? + { + file_size += chunk.len() as i64; + tmp_file + .write_all(&chunk) + .await + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Can't write temp file".to_owned()))?; + } + } + _ => (), + } + } - let file = body_parts - .get("file") - .ok_or((StatusCode::BAD_REQUEST, "File is required".to_owned()))?; + tmp_file + .flush() + .await + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Can't flush temp file".to_owned()))?; - InFileSchema::new(storage_id, path, file.clone()) - }; + let path = path.ok_or((StatusCode::BAD_REQUEST, "Path is required".to_owned()))?; + let in_schema = InFileSchema::new(storage_id, path, tmp_path.clone(), file_size); - // do all other stuff - FilesService::new(&state.db, state.tx.clone()) + let result = FilesService::new(&state.db, state.tx.clone()) .upload_to(in_schema, &user) - .await?; + .await; + + if let Err(e) = result { + let _ = tokio::fs::remove_file(&tmp_path).await; + return Err(<(StatusCode, String)>::from(e)); + } Ok(StatusCode::CREATED) } @@ -202,31 +263,77 @@ impl FilesRouter { storage_id: Uuid, path: &str, ) -> Result { - FilesService::new(&state.db, state.tx.clone()) - .download(path, storage_id, &user) + // 0) checking access + check_access( + &AccessRepository::new(&state.db), + user.id, + storage_id, + &AccessType::R, + ) + .await + .map_err(|e| <(StatusCode, String)>::from(e))?; + + // 1) validation + if path.starts_with('/') || path.contains("//") { + return Err((StatusCode::BAD_REQUEST, PentaractError::InvalidPath.to_string())); + } + + // 2) locate file + chunks + let files_repo = FilesRepository::new(&state.db); + let file = files_repo + .get_file_by_path(path, storage_id) .await - .map(|data| { - let filename = Path::new(&path) - .file_name() - .map(|name| name.to_str().unwrap_or_default()) - .unwrap_or("unnamed.bin"); - let content_type = mime_guess::from_path(filename) - .first_or_octet_stream() - .to_string(); - let bytes = Bytes::from(data); - let body = Full::new(bytes); - - let headers = AppendHeaders([ - (header::CONTENT_TYPE, content_type), - ( - header::CONTENT_DISPOSITION, - format!("attachment; filename=\"{filename}\""), - ), - ]); - - (headers, body).into_response() - }) - .map_err(|e| <(StatusCode, String)>::from(e)) + .map_err(|e| <(StatusCode, String)>::from(e))?; + + let mut chunks = files_repo + .list_chunks_of_file(file.id) + .await + .map_err(|e| <(StatusCode, String)>::from(e))?; + chunks.sort_by_key(|c| c.position); + + let base_url = state.config.telegram_api_base_url.clone(); + let rate = state.config.telegram_rate_limit; + let db = state.db.clone(); + + // 3) stream each telegram chunk sequentially to the client + let stream = try_stream! { + for chunk in chunks { + let scheduler = StorageWorkersScheduler::new(&db, rate); + let api = TelegramBotApi::new(&base_url, scheduler); + + let mut s = api + .download_stream(&chunk.telegram_file_id, storage_id) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + while let Some(item) = s.next().await { + let bytes = item + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + yield bytes; + } + } + }; + + let stream: Pin> + Send>> = Box::pin(stream); + let body = StreamBody::new(stream); + + let filename = Path::new(&path) + .file_name() + .map(|name| name.to_str().unwrap_or_default()) + .unwrap_or("unnamed.bin"); + let content_type = mime_guess::from_path(filename) + .first_or_octet_stream() + .to_string(); + + let headers = AppendHeaders([ + (header::CONTENT_TYPE, content_type), + ( + header::CONTENT_DISPOSITION, + format!("attachment; filename=\"{filename}\""), + ), + ]); + + Ok((headers, body).into_response()) } /// diff --git a/pentaract/src/schemas/files.rs b/pentaract/src/schemas/files.rs index e718cbbb..28172e2b 100644 --- a/pentaract/src/schemas/files.rs +++ b/pentaract/src/schemas/files.rs @@ -1,8 +1,8 @@ -use axum::body::Bytes; -use serde::{Deserialize, Serialize}; +use std::path::PathBuf; +use serde::Deserialize; use uuid::Uuid; -use crate::{common::types::Position, models::files::File}; +use crate::common::types::Position; #[derive(Deserialize)] pub struct UploadParams { @@ -14,17 +14,16 @@ pub struct InFileSchema { pub storage_id: Uuid, pub path: String, pub size: i64, - pub file: Bytes, + pub file_path: PathBuf, } impl InFileSchema { - pub fn new(storage_id: Uuid, path: String, file: Bytes) -> Self { - let size = file.len() as i64; + pub fn new(storage_id: Uuid, path: String, file_path: PathBuf, size: i64) -> Self { Self { storage_id, path, size, - file, + file_path, } } } diff --git a/pentaract/src/services/files.rs b/pentaract/src/services/files.rs index 39f96c2a..29c98041 100644 --- a/pentaract/src/services/files.rs +++ b/pentaract/src/services/files.rs @@ -1,5 +1,5 @@ -use axum::body::Bytes; use sqlx::PgPool; +use std::path::PathBuf; use tokio::sync::oneshot; use uuid::Uuid; @@ -100,13 +100,15 @@ impl<'d> FilesService<'d> { // 3. saving file to db let file = self.repo.create_file(in_file).await?; - self._upload(file, in_schema.file, user).await + self._upload_from_path(file, in_schema.file_path, in_schema.size, user) + .await } - pub async fn upload_anyway( + pub async fn upload_anyway_from_path( &self, in_file: InFile, - file_data: Bytes, + file_path: PathBuf, + file_size: i64, user: &AuthUser, ) -> PentaractResult<()> { // 0. checking access @@ -124,23 +126,26 @@ impl<'d> FilesService<'d> { // 2. saving file in db let file = self.repo.create_file_anyway(in_file).await?; - self._upload(file, file_data, user).await + self._upload_from_path(file, file_path, file_size, user).await } - async fn _upload(&self, file: File, file_data: Bytes, user: &AuthUser) -> PentaractResult<()> { - // 2. sending file to storage manager + async fn _upload_from_path( + &self, + file: File, + file_path: PathBuf, + file_size: i64, + user: &AuthUser, + ) -> PentaractResult<()> { let (resp_tx, resp_rx) = oneshot::channel(); - let message = { - let upload_file_data = UploadFileData { + let message = ClientMessage { + data: ClientData::UploadFile(UploadFileData { file_id: file.id, user_id: user.id, - file_data: file_data.as_ref().into(), - }; - ClientMessage { - data: ClientData::UploadFile(upload_file_data), - tx: resp_tx, - } + file_path, + file_size, + }), + tx: resp_tx, }; tracing::debug!("sending task to manager"); diff --git a/pentaract/src/services/storage_manager.rs b/pentaract/src/services/storage_manager.rs index f7cfa9f7..efe66ac3 100644 --- a/pentaract/src/services/storage_manager.rs +++ b/pentaract/src/services/storage_manager.rs @@ -26,10 +26,9 @@ pub struct StorageManagerService<'d> { } impl<'d> StorageManagerService<'d> { - pub fn new(db: &'d PgPool, telegram_baseurl: &'d str, rate_limit: u8) -> Self { + pub fn new(db: &'d PgPool, telegram_baseurl: &'d str, rate_limit: u8, chunk_size: usize) -> Self { let files_repo = FilesRepository::new(db); let storages_repo = StoragesRepository::new(db); - let chunk_size = 20 * 1024 * 1024; Self { storages_repo, files_repo, @@ -44,30 +43,63 @@ impl<'d> StorageManagerService<'d> { // 1. getting storage let storage = self.storages_repo.get_by_file_id(data.file_id).await?; - // 2. dividing file into chunks - let bytes_chunks = data.file_data.chunks(self.chunk_size); + let mut position: usize = 0; + let mut chunks: Vec = Vec::new(); - // 3. uploading by chunks - let futures_: Vec<_> = bytes_chunks - .enumerate() - .map(|(position, bytes_chunk)| { - self.upload_chunk( + let mut offset: u64 = 0; + let total: u64 = data.file_size.max(0) as u64; + + while offset < total { + let len = std::cmp::min(self.chunk_size as u64, total - offset); + let chunk = self + .upload_chunk_from_file( storage.id, storage.chat_id, data.file_id, position, - bytes_chunk, + &data.file_path, + offset, + len, ) - }) - .collect(); + .await?; + chunks.push(chunk); + offset += len; + position += 1; + } - let chunks = join_all(futures_) - .await - .into_iter() - .collect::>>()?; + let result = self.files_repo.create_chunks_batch(chunks).await; + let _ = tokio::fs::remove_file(&data.file_path).await; + result + } - // 4. saving chunks to db - self.files_repo.create_chunks_batch(chunks).await + async fn upload_chunk_from_file( + &self, + storage_id: Uuid, + chat_id: ChatId, + file_id: Uuid, + position: usize, + file_path: &std::path::Path, + offset: u64, + len: u64, + ) -> PentaractResult { + let scheduler = StorageWorkersScheduler::new(self.db, self.rate_limit); + + let document = TelegramBotApi::new(self.telegram_baseurl, scheduler) + .upload_file_part(file_path, offset, len, chat_id, storage_id) + .await?; + + tracing::debug!( + "[TELEGRAM API] uploaded chunk with file_id \"{}\" and position \"{}\"", + document.file_id, + position + ); + + Ok(FileChunk::new( + Uuid::new_v4(), + file_id, + document.file_id, + position as i16, + )) } async fn upload_chunk( diff --git a/pentaract/src/storage_manager.rs b/pentaract/src/storage_manager.rs index 318c2ea5..91db2d17 100644 --- a/pentaract/src/storage_manager.rs +++ b/pentaract/src/storage_manager.rs @@ -44,6 +44,7 @@ impl StorageManager { &self.db, &self.config.telegram_api_base_url, self.config.telegram_rate_limit, + (self.config.telegram_chunk_size_mb as usize) * 1024 * 1024, ) .upload(data) .await; @@ -56,6 +57,7 @@ impl StorageManager { &self.db, &self.config.telegram_api_base_url, self.config.telegram_rate_limit, + (self.config.telegram_chunk_size_mb as usize) * 1024 * 1024, ) .download(data) .await;