|
| 1 | +use std::collections::HashSet; |
| 2 | +use std::thread; |
| 3 | +use std::time::Duration; |
| 4 | + |
| 5 | +use serde::Deserialize; |
| 6 | +use tracing::{debug, warn}; |
| 7 | + |
| 8 | +use crate::error::ControllerError; |
| 9 | + |
| 10 | +const MAX_RETRIES: u32 = 15; |
| 11 | +const RETRY_DELAY: Duration = Duration::from_secs(2); |
| 12 | + |
| 13 | +#[derive(Deserialize)] |
| 14 | +struct ColumnRow { |
| 15 | + name: String, |
| 16 | + #[serde(rename = "type")] |
| 17 | + column_type: String, |
| 18 | +} |
| 19 | + |
| 20 | +/// Fetch `(name, type)` pairs for all columns in `database.table` via the |
| 21 | +/// ClickHouse HTTP API (`system.columns`). |
| 22 | +fn fetch_columns_for_table( |
| 23 | + clickhouse_url: &str, |
| 24 | + database: &str, |
| 25 | + table: &str, |
| 26 | +) -> Result<Vec<(String, String)>, ControllerError> { |
| 27 | + let base_url = clickhouse_url.trim_end_matches('/'); |
| 28 | + let sql = format!( |
| 29 | + "SELECT name, type FROM system.columns WHERE database = '{}' AND table = '{}'", |
| 30 | + database, table |
| 31 | + ); |
| 32 | + let client = reqwest::blocking::Client::new(); |
| 33 | + |
| 34 | + for attempt in 1..=MAX_RETRIES { |
| 35 | + let response = client |
| 36 | + .get(base_url) |
| 37 | + .query(&[("query", sql.as_str()), ("default_format", "JSONEachRow")]) |
| 38 | + .send() |
| 39 | + .map_err(|e| { |
| 40 | + ControllerError::ClickHouseClient(format!( |
| 41 | + "HTTP request failed for table '{}.{}': {}", |
| 42 | + database, table, e |
| 43 | + )) |
| 44 | + })?; |
| 45 | + |
| 46 | + let status = response.status(); |
| 47 | + |
| 48 | + if status == reqwest::StatusCode::SERVICE_UNAVAILABLE { |
| 49 | + warn!( |
| 50 | + "ClickHouse returned 503 for table '{}.{}' (attempt {}/{}); retrying in {}s", |
| 51 | + database, |
| 52 | + table, |
| 53 | + attempt, |
| 54 | + MAX_RETRIES, |
| 55 | + RETRY_DELAY.as_secs(), |
| 56 | + ); |
| 57 | + thread::sleep(RETRY_DELAY); |
| 58 | + continue; |
| 59 | + } |
| 60 | + |
| 61 | + if !status.is_success() { |
| 62 | + return Err(ControllerError::ClickHouseClient(format!( |
| 63 | + "ClickHouse returned HTTP {} for table '{}.{}'", |
| 64 | + status, database, table |
| 65 | + ))); |
| 66 | + } |
| 67 | + |
| 68 | + let body = response.text().map_err(|e| { |
| 69 | + ControllerError::ClickHouseClient(format!( |
| 70 | + "Failed to read ClickHouse response for table '{}.{}': {}", |
| 71 | + database, table, e |
| 72 | + )) |
| 73 | + })?; |
| 74 | + |
| 75 | + let mut columns = Vec::new(); |
| 76 | + for line in body.lines() { |
| 77 | + let row: ColumnRow = serde_json::from_str(line).map_err(|e| { |
| 78 | + ControllerError::ClickHouseClient(format!( |
| 79 | + "Failed to parse ClickHouse column row {:?}: {}", |
| 80 | + line, e |
| 81 | + )) |
| 82 | + })?; |
| 83 | + columns.push((row.name, row.column_type)); |
| 84 | + } |
| 85 | + |
| 86 | + debug!( |
| 87 | + "Fetched {} columns for table '{}.{}'", |
| 88 | + columns.len(), |
| 89 | + database, |
| 90 | + table |
| 91 | + ); |
| 92 | + return Ok(columns); |
| 93 | + } |
| 94 | + |
| 95 | + Err(ControllerError::ClickHouseClient(format!( |
| 96 | + "ClickHouse returned 503 for table '{}.{}' after {} attempts; giving up", |
| 97 | + database, table, MAX_RETRIES |
| 98 | + ))) |
| 99 | +} |
| 100 | + |
| 101 | +/// Query `system.columns` and return all column names that are not the time |
| 102 | +/// column or one of the value columns, sorted alphabetically. |
| 103 | +/// |
| 104 | +/// These are the metadata (dimension) columns the planner uses for rollup, |
| 105 | +/// analogous to PromQL label sets discovered from Prometheus. |
| 106 | +pub fn infer_metadata_columns( |
| 107 | + clickhouse_url: &str, |
| 108 | + database: &str, |
| 109 | + table_name: &str, |
| 110 | + time_column: &str, |
| 111 | + value_columns: &[String], |
| 112 | +) -> Result<Vec<String>, ControllerError> { |
| 113 | + let all_columns = fetch_columns_for_table(clickhouse_url, database, table_name)?; |
| 114 | + |
| 115 | + let exclude: HashSet<&str> = std::iter::once(time_column) |
| 116 | + .chain(value_columns.iter().map(String::as_str)) |
| 117 | + .collect(); |
| 118 | + |
| 119 | + let mut metadata: Vec<String> = all_columns |
| 120 | + .into_iter() |
| 121 | + .map(|(name, _)| name) |
| 122 | + .filter(|name| !exclude.contains(name.as_str())) |
| 123 | + .collect(); |
| 124 | + metadata.sort(); |
| 125 | + |
| 126 | + debug!( |
| 127 | + "Inferred metadata columns for table '{}': {:?}", |
| 128 | + table_name, metadata |
| 129 | + ); |
| 130 | + Ok(metadata) |
| 131 | +} |
0 commit comments