Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pentaract/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pentaract/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ tower-http = { version = "0.4.4", features = ["fs", "trace", "cors"], default_fe

# serialization/deserialization
serde = { version = "1.0.189", features = ["derive"] }
serde_json = "1.0"

# auth
pwhash = "1.0.0"
Expand Down
177 changes: 161 additions & 16 deletions pentaract/src/common/telegram_api/bot_api.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::time::Instant;

use reqwest::multipart;
use serde_json::json;
use uuid::Uuid;

use crate::{
Expand All @@ -21,12 +24,29 @@ impl<'t> TelegramBotApi<'t> {
}
}

/// Masks the bot token in URL for safe logging
fn mask_url(&self, url: &str) -> String {
// Replace bot token with ***
if let Some(bot_idx) = url.find("/bot") {
if let Some(slash_idx) = url[bot_idx + 4..].find('/') {
let masked = format!(
"{}/bot***{}",
&url[..bot_idx],
&url[bot_idx + 4 + slash_idx..]
);
return masked;
}
}
url.to_string()
}

pub async fn upload(
&self,
file: &[u8],
chat_id: ChatId,
storage_id: Uuid,
) -> PentaractResult<UploadSchema> {
let original_chat_id = chat_id;
let chat_id = {
// inserting 100 between minus sign and chat id
// cause telegram devs are complete retards and it works this way only
Expand All @@ -39,50 +59,175 @@ impl<'t> TelegramBotApi<'t> {

let token = self.scheduler.get_token(storage_id).await?;
let url = self.build_url("", "sendDocument", token);
let masked_url = self.mask_url(&url);

let file_part = multipart::Part::bytes(file.to_vec()).file_name("pentaract_chunk.bin");
let form = multipart::Form::new()
.text("chat_id", chat_id.to_string())
.part("document", file_part);

let start = Instant::now();
let response = reqwest::Client::new()
.post(url)
.post(&url)
.multipart(form)
.send()
.await?;
let elapsed_ms = start.elapsed().as_millis() as u64;

let status = response.status();

match response.error_for_status() {
// https://stackoverflow.com/a/32679930/12255756
Ok(r) => Ok(r.json::<UploadBodySchema>().await?.result.document),
Err(e) => Err(e.into()),
if !status.is_success() {
let error_body = response.text().await.unwrap_or_default();
tracing::error!(
target: "http_outbound",
"{}",
json!({
"status": status.as_u16(),
"method": "POST",
"url": masked_url,
"body": {
"chat_id": chat_id,
"original_chat_id": original_chat_id,
"file_size_bytes": file.len(),
"storage_id": storage_id.to_string()
},
"response": error_body,
"elapsed_ms": elapsed_ms
})
);
return Err(crate::errors::PentaractError::TelegramAPIError(
format!("{}: {}", status, error_body)
));
}

let result = response.json::<UploadBodySchema>().await?;

tracing::info!(
target: "http_outbound",
"{}",
json!({
"status": status.as_u16(),
"method": "POST",
"url": masked_url,
"body": {
"chat_id": chat_id,
"original_chat_id": original_chat_id,
"file_size_bytes": file.len(),
"storage_id": storage_id.to_string()
},
"response": {
"telegram_file_id": result.result.document.file_id
},
"elapsed_ms": elapsed_ms
})
);

Ok(result.result.document)
}

pub async fn download(
&self,
telegram_file_id: &str,
storage_id: Uuid,
) -> PentaractResult<Vec<u8>> {
// getting file path
// Step 1: Get file path from Telegram
let token = self.scheduler.get_token(storage_id).await?;
let url = self.build_url("", "getFile", token);
let masked_url = self.mask_url(&url);

// TODO: add retries with their number taking from env
let body: DownloadBodySchema = reqwest::Client::new()
.get(url)
let start = Instant::now();
let response = reqwest::Client::new()
.get(&url)
.query(&[("file_id", telegram_file_id)])
.send()
.await?
.json()
.await?;
let elapsed_ms = start.elapsed().as_millis() as u64;

// downloading the file itself
let status = response.status();

if !status.is_success() {
let error_body = response.text().await.unwrap_or_default();
tracing::error!(
target: "http_outbound",
"{}",
json!({
"status": status.as_u16(),
"method": "GET",
"url": format!("{}?file_id={}", masked_url, telegram_file_id),
"body": null,
"response": error_body,
"elapsed_ms": elapsed_ms
})
);
return Err(crate::errors::PentaractError::TelegramAPIError(
format!("{}: {}", status, error_body)
));
}

let body: DownloadBodySchema = response.json().await?;

tracing::info!(
target: "http_outbound",
"{}",
json!({
"status": status.as_u16(),
"method": "GET",
"url": format!("{}?file_id={}", masked_url, telegram_file_id),
"body": null,
"response": {
"file_path": body.result.file_path,
"file_size": body.result.file_size
},
"elapsed_ms": elapsed_ms
})
);

// Step 2: Download the file itself
let token = self.scheduler.get_token(storage_id).await?;
let url = self.build_url("file/", &body.result.file_path, token);
let file = reqwest::get(url)
.await?
.bytes()
.await
.map(|file| file.to_vec())?;
let masked_url = self.mask_url(&url);

let start = Instant::now();
let response = reqwest::get(&url).await?;
let elapsed_ms = start.elapsed().as_millis() as u64;

let status = response.status();
if !status.is_success() {
let error_body = response.text().await.unwrap_or_default();
tracing::error!(
target: "http_outbound",
"{}",
json!({
"status": status.as_u16(),
"method": "GET",
"url": masked_url,
"body": null,
"response": error_body,
"elapsed_ms": elapsed_ms
})
);
return Err(crate::errors::PentaractError::TelegramAPIError(
format!("{}: {}", status, error_body)
));
}

let file = response.bytes().await.map(|file| file.to_vec())?;

tracing::info!(
target: "http_outbound",
"{}",
json!({
"status": status.as_u16(),
"method": "GET",
"url": masked_url,
"body": null,
"response": {
"downloaded_bytes": file.len()
},
"elapsed_ms": elapsed_ms
})
);

Ok(file)
}
Expand Down
1 change: 1 addition & 0 deletions pentaract/src/common/telegram_api/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ pub struct DownloadBodySchema {
#[derive(Deserialize)]
pub struct DownloadSchema {
pub file_path: String,
pub file_size: Option<u64>,
}