diff --git a/crates/core/migrations/057_account_manager.sql b/crates/core/migrations/057_account_manager.sql index e030a2a8d..9eadb0eea 100644 --- a/crates/core/migrations/057_account_manager.sql +++ b/crates/core/migrations/057_account_manager.sql @@ -73,6 +73,7 @@ CREATE TABLE IF NOT EXISTS app_wallet_ledger_entries ( CREATE INDEX IF NOT EXISTS idx_app_wallet_ledger_wallet_created ON app_wallet_ledger_entries(wallet_id, created_at DESC); CREATE INDEX IF NOT EXISTS idx_app_wallet_ledger_api_key ON app_wallet_ledger_entries(api_key_id); +CREATE INDEX IF NOT EXISTS idx_app_wallet_ledger_request_log_kind ON app_wallet_ledger_entries(request_log_id, entry_kind); CREATE TABLE IF NOT EXISTS api_key_owners ( key_id TEXT PRIMARY KEY REFERENCES api_keys(id) ON DELETE CASCADE, diff --git a/crates/core/src/storage/account_manager.rs b/crates/core/src/storage/account_manager.rs index 23bfcd1c3..fa17d8dc4 100644 --- a/crates/core/src/storage/account_manager.rs +++ b/crates/core/src/storage/account_manager.rs @@ -475,6 +475,34 @@ impl Storage { Ok(out) } + /// 函数 `list_api_key_ids_for_user` + /// + /// + /// 时间: 2026-05-28 + /// + /// # 参数 + /// - self: 参数 self + /// - user_id: 参数 user_id + /// + /// # 返回 + /// 返回函数执行结果 + pub fn list_api_key_ids_for_user(&self, user_id: &str) -> Result> { + let normalized_user_id = user_id.trim(); + if normalized_user_id.is_empty() { + return Ok(Vec::new()); + } + + // 中文注释:这里直接按 owner_user_id 走索引取 key_id,避免把整张 api_key_owners 表拉回 Rust 再过滤。 + let mut stmt = self.conn.prepare( + "SELECT key_id + FROM api_key_owners + WHERE owner_kind = 'user' AND owner_user_id = ?1 + ORDER BY key_id ASC", + )?; + let rows = stmt.query_map([normalized_user_id], |row| row.get(0))?; + rows.collect() + } + pub fn api_key_owner_count(&self) -> Result { self.conn .query_row("SELECT COUNT(*) FROM api_key_owners", [], |row| row.get(0)) diff --git a/crates/core/src/storage/api_key_quota_limits.rs b/crates/core/src/storage/api_key_quota_limits.rs index bb442a3bb..f56c6cb35 100644 --- a/crates/core/src/storage/api_key_quota_limits.rs +++ b/crates/core/src/storage/api_key_quota_limits.rs @@ -1,125 +1,164 @@ -use std::collections::HashMap; - -use rusqlite::{OptionalExtension, Result}; - -use super::{now_ts, Storage}; - -impl Storage { - pub fn upsert_api_key_quota_limit( - &self, - key_id: &str, - quota_limit_tokens: Option, - ) -> Result<()> { - let normalized = quota_limit_tokens.filter(|value| *value > 0); - let Some(limit) = normalized else { - self.conn.execute( - "DELETE FROM api_key_quota_limits WHERE key_id = ?1", - [key_id], - )?; - return Ok(()); - }; - - let now = now_ts(); - self.conn.execute( - "INSERT INTO api_key_quota_limits ( - key_id, quota_limit_tokens, created_at, updated_at - ) VALUES (?1, ?2, ?3, ?3) - ON CONFLICT(key_id) DO UPDATE SET - quota_limit_tokens = excluded.quota_limit_tokens, - updated_at = excluded.updated_at", - (key_id, limit, now), - )?; - Ok(()) - } - - pub fn find_api_key_quota_limit(&self, key_id: &str) -> Result> { - self.conn - .query_row( - "SELECT quota_limit_tokens - FROM api_key_quota_limits - WHERE key_id = ?1 - LIMIT 1", - [key_id], - |row| row.get(0), - ) - .optional() - } - - pub fn list_api_key_quota_limits(&self) -> Result> { - let mut stmt = self.conn.prepare( - "SELECT key_id, quota_limit_tokens - FROM api_key_quota_limits - WHERE quota_limit_tokens > 0", - )?; - let mut rows = stmt.query([])?; - let mut out = HashMap::new(); - while let Some(row) = rows.next()? { - out.insert(row.get(0)?, row.get(1)?); - } - Ok(out) - } - - pub fn api_key_total_token_usage(&self, key_id: &str) -> Result { - let mut stmt = self.conn.prepare( - "WITH all_stats AS ( - SELECT - key_id, - input_tokens, - cached_input_tokens, - output_tokens, - total_tokens - FROM request_token_stats - UNION ALL - SELECT - NULLIF(key_id, '') AS key_id, - input_tokens, - cached_input_tokens, - output_tokens, - total_tokens - FROM request_token_stat_rollups - ) - SELECT - IFNULL( - SUM( - CASE - WHEN total_tokens IS NOT NULL THEN - CASE WHEN total_tokens > 0 THEN total_tokens ELSE 0 END - ELSE - CASE - WHEN IFNULL(input_tokens, 0) - IFNULL(cached_input_tokens, 0) + IFNULL(output_tokens, 0) > 0 - THEN IFNULL(input_tokens, 0) - IFNULL(cached_input_tokens, 0) + IFNULL(output_tokens, 0) - ELSE 0 - END - END - ), - 0 - ) AS total_tokens - FROM all_stats - WHERE key_id = ?1", - )?; - let mut rows = stmt.query([key_id])?; - if let Some(row) = rows.next()? { - let total: i64 = row.get(0)?; - return Ok(total.max(0)); - } - Ok(0) - } - - pub(super) fn ensure_api_key_quota_limits_table(&self) -> Result<()> { - self.conn.execute( - "CREATE TABLE IF NOT EXISTS api_key_quota_limits ( - key_id TEXT PRIMARY KEY REFERENCES api_keys(id) ON DELETE CASCADE, - quota_limit_tokens INTEGER NOT NULL, - created_at INTEGER NOT NULL, - updated_at INTEGER NOT NULL - )", - [], - )?; - self.conn.execute( - "CREATE INDEX IF NOT EXISTS idx_api_key_quota_limits_updated_at - ON api_key_quota_limits(updated_at DESC)", - [], - )?; - Ok(()) - } -} +use std::collections::HashMap; + +use rusqlite::{params_from_iter, OptionalExtension, Result}; + +use super::key_id_filters::{key_id_in_clause, normalize_key_ids, SQLITE_IN_CLAUSE_BATCH_SIZE}; +use super::{now_ts, Storage}; + +impl Storage { + pub fn upsert_api_key_quota_limit( + &self, + key_id: &str, + quota_limit_tokens: Option, + ) -> Result<()> { + let normalized = quota_limit_tokens.filter(|value| *value > 0); + let Some(limit) = normalized else { + self.conn.execute( + "DELETE FROM api_key_quota_limits WHERE key_id = ?1", + [key_id], + )?; + return Ok(()); + }; + + let now = now_ts(); + self.conn.execute( + "INSERT INTO api_key_quota_limits ( + key_id, quota_limit_tokens, created_at, updated_at + ) VALUES (?1, ?2, ?3, ?3) + ON CONFLICT(key_id) DO UPDATE SET + quota_limit_tokens = excluded.quota_limit_tokens, + updated_at = excluded.updated_at", + (key_id, limit, now), + )?; + Ok(()) + } + + pub fn find_api_key_quota_limit(&self, key_id: &str) -> Result> { + self.conn + .query_row( + "SELECT quota_limit_tokens + FROM api_key_quota_limits + WHERE key_id = ?1 + LIMIT 1", + [key_id], + |row| row.get(0), + ) + .optional() + } + + pub fn list_api_key_quota_limits(&self) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT key_id, quota_limit_tokens + FROM api_key_quota_limits + WHERE quota_limit_tokens > 0", + )?; + let mut rows = stmt.query([])?; + let mut out = HashMap::new(); + while let Some(row) = rows.next()? { + out.insert(row.get(0)?, row.get(1)?); + } + Ok(out) + } + + pub fn list_api_key_quota_limits_for_ids( + &self, + key_ids: &[String], + ) -> Result> { + let key_ids = normalize_key_ids(key_ids); + if key_ids.is_empty() { + return Ok(HashMap::new()); + } + + let mut out = HashMap::new(); + for chunk in key_ids.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) { + out.extend(list_api_key_quota_limits_for_ids_chunk(self, chunk)?); + } + Ok(out) + } + + pub fn api_key_total_token_usage(&self, key_id: &str) -> Result { + let mut stmt = self.conn.prepare( + "WITH all_stats AS ( + SELECT + key_id, + input_tokens, + cached_input_tokens, + output_tokens, + total_tokens + FROM request_token_stats + UNION ALL + SELECT + NULLIF(key_id, '') AS key_id, + input_tokens, + cached_input_tokens, + output_tokens, + total_tokens + FROM request_token_stat_rollups + ) + SELECT + IFNULL( + SUM( + CASE + WHEN total_tokens IS NOT NULL THEN + CASE WHEN total_tokens > 0 THEN total_tokens ELSE 0 END + ELSE + CASE + WHEN IFNULL(input_tokens, 0) - IFNULL(cached_input_tokens, 0) + IFNULL(output_tokens, 0) > 0 + THEN IFNULL(input_tokens, 0) - IFNULL(cached_input_tokens, 0) + IFNULL(output_tokens, 0) + ELSE 0 + END + END + ), + 0 + ) AS total_tokens + FROM all_stats + WHERE key_id = ?1", + )?; + let mut rows = stmt.query([key_id])?; + if let Some(row) = rows.next()? { + let total: i64 = row.get(0)?; + return Ok(total.max(0)); + } + Ok(0) + } + + pub(super) fn ensure_api_key_quota_limits_table(&self) -> Result<()> { + self.conn.execute( + "CREATE TABLE IF NOT EXISTS api_key_quota_limits ( + key_id TEXT PRIMARY KEY REFERENCES api_keys(id) ON DELETE CASCADE, + quota_limit_tokens INTEGER NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + )", + [], + )?; + self.conn.execute( + "CREATE INDEX IF NOT EXISTS idx_api_key_quota_limits_updated_at + ON api_key_quota_limits(updated_at DESC)", + [], + )?; + Ok(()) + } +} + +fn list_api_key_quota_limits_for_ids_chunk( + storage: &Storage, + key_ids: &[String], +) -> Result> { + let Some((clause, params)) = key_id_in_clause("key_id", key_ids) else { + return Ok(HashMap::new()); + }; + let sql = format!( + "SELECT key_id, quota_limit_tokens + FROM api_key_quota_limits + WHERE quota_limit_tokens > 0 + AND {clause}" + ); + let mut stmt = storage.conn.prepare(&sql)?; + let mut rows = stmt.query(params_from_iter(params.iter()))?; + let mut out = HashMap::new(); + while let Some(row) = rows.next()? { + out.insert(row.get(0)?, row.get(1)?); + } + Ok(out) +} diff --git a/crates/core/src/storage/api_keys.rs b/crates/core/src/storage/api_keys.rs index a1b97c049..d16ec5954 100644 --- a/crates/core/src/storage/api_keys.rs +++ b/crates/core/src/storage/api_keys.rs @@ -1,5 +1,6 @@ -use rusqlite::{Result, Row}; +use rusqlite::{params_from_iter, Result, Row}; +use super::key_id_filters::{key_id_in_clause, normalize_key_ids, SQLITE_IN_CLAUSE_BATCH_SIZE}; use super::{now_ts, ApiKey, Storage}; const API_KEY_SELECT_SQL: &str = "SELECT @@ -108,6 +109,36 @@ impl Storage { Ok(out) } + /// 函数 `list_api_keys_for_ids` + /// + /// 作者: gaohongshun + /// + /// 时间: 2026-05-28 + /// + /// # 参数 + /// - self: 参数 self + /// - key_ids: 参数 key_ids + /// + /// # 返回 + /// 返回函数执行结果 + pub fn list_api_keys_for_ids(&self, key_ids: &[String]) -> Result> { + let key_ids = normalize_key_ids(key_ids); + if key_ids.is_empty() { + return Ok(Vec::new()); + } + + let mut out = Vec::new(); + for chunk in key_ids.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) { + out.extend(list_api_keys_for_ids_chunk(self, chunk)?); + } + out.sort_by(|a, b| { + b.created_at + .cmp(&a.created_at) + .then_with(|| a.id.cmp(&b.id)) + }); + Ok(out) + } + /// 函数 `find_api_key_by_hash` /// /// 作者: gaohongshun @@ -674,6 +705,24 @@ impl Storage { } } +fn list_api_keys_for_ids_chunk(storage: &Storage, key_ids: &[String]) -> Result> { + let Some((clause, params)) = key_id_in_clause("k.id", key_ids) else { + return Ok(Vec::new()); + }; + let sql = format!( + "{API_KEY_SELECT_SQL} + WHERE {clause} + ORDER BY k.created_at DESC, k.id ASC" + ); + let mut stmt = storage.conn.prepare(&sql)?; + let mut rows = stmt.query(params_from_iter(params.iter()))?; + let mut out = Vec::new(); + while let Some(row) = rows.next()? { + out.push(map_api_key_row(row)?); + } + Ok(out) +} + /// 函数 `map_api_key_row` /// /// 作者: gaohongshun @@ -707,3 +756,7 @@ fn map_api_key_row(row: &Row<'_>) -> Result { last_used_at: row.get(17)?, }) } + +#[cfg(test)] +#[path = "tests/api_keys_tests.rs"] +mod tests; diff --git a/crates/core/src/storage/key_id_filters.rs b/crates/core/src/storage/key_id_filters.rs new file mode 100644 index 000000000..24d1f113c --- /dev/null +++ b/crates/core/src/storage/key_id_filters.rs @@ -0,0 +1,166 @@ +use std::sync::atomic::{AtomicU64, Ordering}; + +use rusqlite::{types::Value, Result}; + +use super::Storage; + +// SQLite commonly defaults to 999 host parameters. Keep a little room for +// companion predicates when callers still need an IN-list based lookup. +pub(super) const SQLITE_IN_CLAUSE_BATCH_SIZE: usize = 900; + +static NEXT_TEMP_FILTER_ID: AtomicU64 = AtomicU64::new(1); + +pub(super) fn normalize_key_ids(key_ids: &[String]) -> Vec { + let mut normalized = key_ids + .iter() + .map(|key_id| key_id.trim()) + .filter(|key_id| !key_id.is_empty()) + .map(|key_id| key_id.to_string()) + .collect::>(); + normalized.sort(); + normalized.dedup(); + normalized +} + +pub(super) fn key_id_in_clause(column: &str, key_ids: &[String]) -> Option<(String, Vec)> { + let key_ids = normalize_key_ids(key_ids); + if key_ids.is_empty() { + return None; + } + + let placeholders = std::iter::repeat("?") + .take(key_ids.len()) + .collect::>() + .join(", "); + let params = key_ids.into_iter().map(Value::Text).collect::>(); + Some((format!("{column} IN ({placeholders})"), params)) +} + +pub(super) struct KeyIdSqlFilter<'a> { + condition: String, + params: Vec, + // Keep the temporary table alive for as long as the generated SQL can run. + _temp_filter: Option>, +} + +impl<'a> KeyIdSqlFilter<'a> { + pub(super) fn create( + storage: &'a Storage, + column: &str, + key_ids: &[String], + ) -> Result> { + let key_ids = normalize_key_ids(key_ids); + if key_ids.is_empty() { + return Ok(None); + } + + if key_ids.len() <= SQLITE_IN_CLAUSE_BATCH_SIZE { + let Some((condition, params)) = key_id_in_clause(column, &key_ids) else { + return Ok(None); + }; + return Ok(Some(Self { + condition, + params, + _temp_filter: None, + })); + } + + let Some(temp_filter) = TempKeyIdFilter::create(storage, &key_ids)? else { + return Ok(None); + }; + let condition = temp_filter.condition(column); + Ok(Some(Self { + condition, + params: Vec::new(), + _temp_filter: Some(temp_filter), + })) + } + + pub(super) fn condition(&self) -> &str { + &self.condition + } + + pub(super) fn params(&self) -> &[Value] { + &self.params + } +} + +pub(super) struct TempKeyIdFilter<'a> { + storage: &'a Storage, + table_name: String, +} + +impl<'a> TempKeyIdFilter<'a> { + pub(super) fn create(storage: &'a Storage, key_ids: &[String]) -> Result> { + let key_ids = normalize_key_ids(key_ids); + if key_ids.is_empty() { + return Ok(None); + } + + let id = NEXT_TEMP_FILTER_ID.fetch_add(1, Ordering::Relaxed); + let table_name = format!("cm_temp_key_id_filter_{id}"); + storage.conn.execute( + &format!( + "CREATE TEMP TABLE {table_name} ( + key_id TEXT PRIMARY KEY + ) WITHOUT ROWID" + ), + [], + )?; + + let savepoint_name = format!("cm_key_filter_insert_{id}"); + storage + .conn + .execute_batch(&format!("SAVEPOINT {savepoint_name}"))?; + let insert_result = (|| -> Result<()> { + let mut stmt = storage + .conn + .prepare(&format!("INSERT INTO {table_name} (key_id) VALUES (?1)"))?; + for key_id in key_ids { + stmt.execute([key_id])?; + } + Ok(()) + })(); + if let Err(err) = insert_result { + let _ = storage.conn.execute_batch(&format!( + "ROLLBACK TO {savepoint_name}; RELEASE {savepoint_name}" + )); + let _ = storage + .conn + .execute(&format!("DROP TABLE IF EXISTS {table_name}"), []); + return Err(err); + } + storage + .conn + .execute_batch(&format!("RELEASE {savepoint_name}"))?; + + Ok(Some(Self { + storage, + table_name, + })) + } + + pub(super) fn condition(&self, column: &str) -> String { + format!( + "EXISTS ( + SELECT 1 + FROM {} key_filter + WHERE key_filter.key_id = {column} + )", + self.table_name + ) + } + + pub(super) fn exists_clause(&self, column: &str) -> String { + format!(" AND {}", self.condition(column)) + } +} + +impl Drop for TempKeyIdFilter<'_> { + fn drop(&mut self) { + let _ = self + .storage + .conn + .execute(&format!("DROP TABLE IF EXISTS {}", self.table_name), []); + } +} diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index d0ec5164b..49e05db33 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -13,6 +13,7 @@ mod api_keys; mod conversation_bindings; mod events; mod gateway_error_logs; +mod key_id_filters; mod model_groups; mod model_options; mod model_price_rules; diff --git a/crates/core/src/storage/request_logs.rs b/crates/core/src/storage/request_logs.rs index 3c8153450..5fe77bcd3 100644 --- a/crates/core/src/storage/request_logs.rs +++ b/crates/core/src/storage/request_logs.rs @@ -1,5 +1,6 @@ use rusqlite::{params, params_from_iter, types::Value, Result, Row}; +use super::key_id_filters::KeyIdSqlFilter; use super::{ request_log_query, RequestLog, RequestLogQuerySummary, RequestLogTodaySummary, RequestTokenStat, Storage, @@ -307,6 +308,9 @@ impl Storage { limit: i64, key_ids: &[String], ) -> Result> { + let Some(key_filter) = KeyIdSqlFilter::create(self, "r.key_id", key_ids)? else { + return Ok(Vec::new()); + }; let normalized_limit = normalize_request_log_limit(limit); let normalized_offset = offset.max(0); let include_account_lookup = self.has_table("accounts")?; @@ -316,7 +320,7 @@ impl Storage { start_ts, end_ts, include_account_lookup, - Some(key_ids), + Some(&key_filter), false, ); let sql = format!( @@ -401,6 +405,9 @@ impl Storage { end_ts: Option, key_ids: &[String], ) -> Result { + let Some(key_filter) = KeyIdSqlFilter::create(self, "r.key_id", key_ids)? else { + return Ok(0); + }; let include_account_lookup = self.has_table("accounts")?; let filters = build_request_log_filters( query, @@ -408,7 +415,7 @@ impl Storage { start_ts, end_ts, include_account_lookup, - Some(key_ids), + Some(&key_filter), false, ); let sql = format!( @@ -501,6 +508,9 @@ impl Storage { end_ts: Option, key_ids: &[String], ) -> Result { + let Some(key_filter) = KeyIdSqlFilter::create(self, "r.key_id", key_ids)? else { + return Ok(empty_request_log_query_summary()); + }; let include_account_lookup = self.has_table("accounts")?; let filters = build_request_log_filters( query, @@ -508,7 +518,7 @@ impl Storage { start_ts, end_ts, include_account_lookup, - Some(key_ids), + Some(&key_filter), false, ); let sql = format!( @@ -612,35 +622,26 @@ impl Storage { end_ts: i64, key_ids: &[String], ) -> Result { - if key_ids.is_empty() { - return Ok(RequestLogTodaySummary { - input_tokens: 0, - cached_input_tokens: 0, - output_tokens: 0, - reasoning_output_tokens: 0, - estimated_cost_usd: 0.0, - }); - } - let placeholders = std::iter::repeat("?") - .take(key_ids.len()) - .collect::>() - .join(", "); + let Some(key_filter) = KeyIdSqlFilter::create(self, "s.key_id", key_ids)? else { + return Ok(empty_request_log_today_summary()); + }; let sql = format!( "SELECT - IFNULL(SUM(input_tokens), 0), - IFNULL(SUM(cached_input_tokens), 0), - IFNULL(SUM(output_tokens), 0), - IFNULL(SUM(reasoning_output_tokens), 0), - IFNULL(SUM(estimated_cost_usd), 0.0) - FROM request_token_stats - WHERE created_at >= ? - AND created_at < ? - AND IFNULL(key_id, '') IN ({placeholders})" + IFNULL(SUM(s.input_tokens), 0), + IFNULL(SUM(s.cached_input_tokens), 0), + IFNULL(SUM(s.output_tokens), 0), + IFNULL(SUM(s.reasoning_output_tokens), 0), + IFNULL(SUM(s.estimated_cost_usd), 0.0) + FROM request_token_stats s + WHERE s.created_at >= ? + AND s.created_at < ? + AND {}", + key_filter.condition() ); - let mut params = Vec::with_capacity(key_ids.len() + 2); + let mut params = Vec::with_capacity(key_filter.params().len() + 2); params.push(Value::Integer(start_ts)); params.push(Value::Integer(end_ts)); - params.extend(key_ids.iter().cloned().map(Value::Text)); + params.extend_from_slice(key_filter.params()); self.conn .query_row(&sql, params_from_iter(params.iter()), |row| { Ok(RequestLogTodaySummary { @@ -1066,7 +1067,7 @@ fn build_request_log_filters( start_ts: Option, end_ts: Option, include_account_lookup: bool, - key_ids: Option<&[String]>, + key_filter: Option<&KeyIdSqlFilter<'_>>, include_route_detail_fields: bool, ) -> RequestLogSqlFilters { let mut clauses = Vec::new(); @@ -1081,7 +1082,7 @@ fn build_request_log_filters( ); append_status_filter_clause(status_filter, &mut clauses, &mut params); append_time_range_clause(start_ts, end_ts, &mut clauses, &mut params); - append_key_ids_clause(key_ids, &mut clauses, &mut params); + append_key_filter_clause(key_filter, &mut clauses, &mut params); RequestLogSqlFilters { where_clause: if clauses.is_empty() { @@ -1093,33 +1094,36 @@ fn build_request_log_filters( } } -fn append_key_ids_clause( - key_ids: Option<&[String]>, +fn append_key_filter_clause( + key_filter: Option<&KeyIdSqlFilter<'_>>, clauses: &mut Vec, params: &mut Vec, ) { - let Some(key_ids) = key_ids else { + let Some(key_filter) = key_filter else { return; }; - let normalized = key_ids - .iter() - .map(|value| value.trim()) - .filter(|value| !value.is_empty()) - .collect::>(); - if normalized.is_empty() { - clauses.push("1 = 0".to_string()); - return; + clauses.push(key_filter.condition().to_string()); + params.extend_from_slice(key_filter.params()); +} + +fn empty_request_log_today_summary() -> RequestLogTodaySummary { + RequestLogTodaySummary { + input_tokens: 0, + cached_input_tokens: 0, + output_tokens: 0, + reasoning_output_tokens: 0, + estimated_cost_usd: 0.0, + } +} + +fn empty_request_log_query_summary() -> RequestLogQuerySummary { + RequestLogQuerySummary { + count: 0, + success_count: 0, + error_count: 0, + total_tokens: 0, + estimated_cost_usd: 0.0, } - let placeholders = std::iter::repeat("?") - .take(normalized.len()) - .collect::>() - .join(", "); - clauses.push(format!("IFNULL(r.key_id, '') IN ({placeholders})")); - params.extend( - normalized - .into_iter() - .map(|value| Value::Text(value.to_string())), - ); } fn is_route_detail_query_column(column: &str) -> bool { diff --git a/crates/core/src/storage/request_token_stats.rs b/crates/core/src/storage/request_token_stats.rs index 4a213dd05..667aa8f2b 100644 --- a/crates/core/src/storage/request_token_stats.rs +++ b/crates/core/src/storage/request_token_stats.rs @@ -1,6 +1,7 @@ -use rusqlite::{params, Result, Row}; +use rusqlite::{params, params_from_iter, types::Value, Result, Row}; use std::sync::atomic::{AtomicI64, Ordering}; +use super::key_id_filters::TempKeyIdFilter; use super::{ now_ts, ApiKeyModelTokenUsageSummary, ApiKeyTokenUsageSummary, DailyTokenUsageRollup, RequestLogTodaySummary, RequestTokenStat, SourceTokenUsageRollup, Storage, TokenUsageRollup, @@ -71,20 +72,23 @@ const TOKEN_ROLLUP_COLUMNS: &str = " COUNT(DISTINCT CASE WHEN r.status_code >= 200 AND r.status_code <= 299 THEN r.id END) AS success_count, COUNT(DISTINCT CASE WHEN IFNULL(r.status_code, 0) >= 400 OR TRIM(IFNULL(r.error, '')) <> '' THEN r.id END) AS error_count"; -const USER_OWNER_EXPR: &str = - "COALESCE(NULLIF(TRIM(charge.owner_id), ''), NULLIF(TRIM(owner.owner_user_id), ''))"; - -// User attribution prefers the request_charge wallet owner. The api_key_owners -// fallback is current-owner based, so old uncharged logs are approximate. -const USER_OWNER_JOINS: &str = " - LEFT JOIN ( - SELECT l.request_log_id, MIN(w.owner_id) AS owner_id +const USER_OWNER_EXPR: &str = "COALESCE( + ( + SELECT MIN(NULLIF(TRIM(w.owner_id), '')) FROM app_wallet_ledger_entries l JOIN app_wallets w ON w.id = l.wallet_id - WHERE l.entry_kind = 'request_charge' + WHERE l.request_log_id = r.id + AND l.entry_kind = 'request_charge' AND w.owner_kind = 'user' - GROUP BY l.request_log_id - ) charge ON charge.request_log_id = r.id + ), + NULLIF(TRIM(owner.owner_user_id), '') +)"; + +// User attribution prefers the request_charge wallet owner. Use a correlated +// lookup so dashboard range queries do not pre-aggregate the entire ledger table. +// The api_key_owners fallback is current-owner based, so old uncharged logs are +// approximate. +const USER_OWNER_JOINS: &str = " LEFT JOIN api_key_owners owner ON owner.key_id = r.key_id AND owner.owner_kind = 'user'"; fn token_usage_rollup_from_row(row: &Row<'_>, offset: usize) -> Result { @@ -129,6 +133,39 @@ fn source_id_expr(source_kind: &str) -> Option<&'static str> { } } +fn map_api_key_token_usage_summary(row: &Row<'_>) -> Result { + Ok(ApiKeyTokenUsageSummary { + key_id: row.get(0)?, + total_tokens: row.get(1)?, + estimated_cost_usd: row.get(2)?, + }) +} + +fn map_token_usage_summary(row: &Row<'_>) -> Result { + Ok(TokenUsageSummary { + model: row.get(0)?, + input_tokens: row.get::<_, i64>(1)?.max(0), + cached_input_tokens: row.get::<_, i64>(2)?.max(0), + output_tokens: row.get::<_, i64>(3)?.max(0), + reasoning_output_tokens: row.get::<_, i64>(4)?.max(0), + total_tokens: row.get::<_, i64>(5)?.max(0), + estimated_cost_usd: row.get::<_, f64>(6)?.max(0.0), + }) +} + +fn map_api_key_model_token_usage_summary(row: &Row<'_>) -> Result { + Ok(ApiKeyModelTokenUsageSummary { + key_id: row.get(0)?, + model: row.get(1)?, + input_tokens: row.get::<_, i64>(2)?.max(0), + cached_input_tokens: row.get::<_, i64>(3)?.max(0), + output_tokens: row.get::<_, i64>(4)?.max(0), + reasoning_output_tokens: row.get::<_, i64>(5)?.max(0), + total_tokens: row.get::<_, i64>(6)?.max(0), + estimated_cost_usd: row.get::<_, f64>(7)?.max(0.0), + }) +} + impl Storage { /// 函数 `insert_request_token_stat` /// @@ -286,6 +323,36 @@ impl Storage { } pub fn summarize_request_token_stats_by_key(&self) -> Result> { + self.summarize_request_token_stats_by_key_filtered(None) + } + + pub fn summarize_request_token_stats_by_key_for_keys( + &self, + key_ids: &[String], + ) -> Result> { + self.summarize_request_token_stats_by_key_filtered(Some(key_ids)) + } + + fn summarize_request_token_stats_by_key_filtered( + &self, + key_ids: Option<&[String]>, + ) -> Result> { + let Some(key_ids) = key_ids else { + return self.query_request_token_stats_by_key(None); + }; + let Some(key_filter) = TempKeyIdFilter::create(self, key_ids)? else { + return Ok(Vec::new()); + }; + self.query_request_token_stats_by_key(Some(&key_filter)) + } + + fn query_request_token_stats_by_key( + &self, + key_filter: Option<&TempKeyIdFilter<'_>>, + ) -> Result> { + let key_filter_clause = key_filter + .map(|filter| filter.exists_clause("s.key_id")) + .unwrap_or_default(); let mut stmt = self.conn.prepare(&format!( "WITH all_stats AS ( SELECT @@ -307,23 +374,19 @@ impl Storage { FROM request_token_stat_rollups ) SELECT - key_id, + s.key_id, IFNULL(SUM({token_total}), 0) AS total_tokens, - IFNULL(SUM(estimated_cost_usd), 0.0) AS estimated_cost_usd - FROM all_stats - WHERE key_id IS NOT NULL AND TRIM(key_id) <> '' - GROUP BY key_id - ORDER BY total_tokens DESC, key_id ASC", + IFNULL(SUM(s.estimated_cost_usd), 0.0) AS estimated_cost_usd + FROM all_stats s + WHERE s.key_id IS NOT NULL AND TRIM(s.key_id) <> ''{key_filter_clause} + GROUP BY s.key_id + ORDER BY total_tokens DESC, s.key_id ASC", token_total = token_total_sql_expr(), ))?; let mut rows = stmt.query([])?; let mut items = Vec::new(); while let Some(row) = rows.next()? { - items.push(ApiKeyTokenUsageSummary { - key_id: row.get(0)?, - total_tokens: row.get(1)?, - estimated_cost_usd: row.get(2)?, - }); + items.push(map_api_key_token_usage_summary(row)?); } Ok(items) } @@ -332,12 +395,49 @@ impl Storage { &self, start_ts: Option, end_ts: Option, + ) -> Result> { + self.summarize_request_token_stats_by_model_filtered(start_ts, end_ts, None) + } + + pub fn summarize_request_token_stats_by_model_for_keys( + &self, + start_ts: Option, + end_ts: Option, + key_ids: &[String], + ) -> Result> { + self.summarize_request_token_stats_by_model_filtered(start_ts, end_ts, Some(key_ids)) + } + + fn summarize_request_token_stats_by_model_filtered( + &self, + start_ts: Option, + end_ts: Option, + key_ids: Option<&[String]>, + ) -> Result> { + let Some(key_ids) = key_ids else { + return self.query_request_token_stats_by_model(start_ts, end_ts, None); + }; + let Some(key_filter) = TempKeyIdFilter::create(self, key_ids)? else { + return Ok(Vec::new()); + }; + self.query_request_token_stats_by_model(start_ts, end_ts, Some(&key_filter)) + } + + fn query_request_token_stats_by_model( + &self, + start_ts: Option, + end_ts: Option, + key_filter: Option<&TempKeyIdFilter<'_>>, ) -> Result> { let include_rollups = start_ts.is_none() && end_ts.is_none(); + let key_filter_clause = key_filter + .map(|filter| filter.exists_clause("s.key_id")) + .unwrap_or_default(); let sql = if include_rollups { format!( "WITH all_stats AS ( SELECT + key_id, model, input_tokens, cached_input_tokens, @@ -348,6 +448,7 @@ impl Storage { FROM request_token_stats UNION ALL SELECT + NULLIF(key_id, '') AS key_id, NULLIF(model, '') AS model, input_tokens, cached_input_tokens, @@ -358,14 +459,15 @@ impl Storage { FROM request_token_stat_rollups ) SELECT - COALESCE(NULLIF(TRIM(model), ''), 'unknown') AS normalized_model, - IFNULL(SUM(input_tokens), 0) AS input_tokens, - IFNULL(SUM(cached_input_tokens), 0) AS cached_input_tokens, - IFNULL(SUM(output_tokens), 0) AS output_tokens, - IFNULL(SUM(reasoning_output_tokens), 0) AS reasoning_output_tokens, + COALESCE(NULLIF(TRIM(s.model), ''), 'unknown') AS normalized_model, + IFNULL(SUM(s.input_tokens), 0) AS input_tokens, + IFNULL(SUM(s.cached_input_tokens), 0) AS cached_input_tokens, + IFNULL(SUM(s.output_tokens), 0) AS output_tokens, + IFNULL(SUM(s.reasoning_output_tokens), 0) AS reasoning_output_tokens, IFNULL(SUM({token_total}), 0) AS total_tokens, - IFNULL(SUM(estimated_cost_usd), 0.0) AS estimated_cost_usd - FROM all_stats + IFNULL(SUM(s.estimated_cost_usd), 0.0) AS estimated_cost_usd + FROM all_stats s + WHERE 1 = 1{key_filter_clause} GROUP BY normalized_model ORDER BY total_tokens DESC, normalized_model ASC", token_total = token_total_sql_expr(), @@ -373,16 +475,16 @@ impl Storage { } else { format!( "SELECT - COALESCE(NULLIF(TRIM(model), ''), 'unknown') AS normalized_model, - IFNULL(SUM(input_tokens), 0) AS input_tokens, - IFNULL(SUM(cached_input_tokens), 0) AS cached_input_tokens, - IFNULL(SUM(output_tokens), 0) AS output_tokens, - IFNULL(SUM(reasoning_output_tokens), 0) AS reasoning_output_tokens, + COALESCE(NULLIF(TRIM(s.model), ''), 'unknown') AS normalized_model, + IFNULL(SUM(s.input_tokens), 0) AS input_tokens, + IFNULL(SUM(s.cached_input_tokens), 0) AS cached_input_tokens, + IFNULL(SUM(s.output_tokens), 0) AS output_tokens, + IFNULL(SUM(s.reasoning_output_tokens), 0) AS reasoning_output_tokens, IFNULL(SUM({token_total}), 0) AS total_tokens, - IFNULL(SUM(estimated_cost_usd), 0.0) AS estimated_cost_usd - FROM request_token_stats - WHERE (?1 IS NULL OR created_at >= ?1) - AND (?2 IS NULL OR created_at < ?2) + IFNULL(SUM(s.estimated_cost_usd), 0.0) AS estimated_cost_usd + FROM request_token_stats s + WHERE (?1 IS NULL OR s.created_at >= ?1) + AND (?2 IS NULL OR s.created_at < ?2){key_filter_clause} GROUP BY normalized_model ORDER BY total_tokens DESC, normalized_model ASC", token_total = token_total_sql_expr(), @@ -392,19 +494,15 @@ impl Storage { let mut rows = if include_rollups { stmt.query([])? } else { - stmt.query((start_ts, end_ts))? + let params = [ + start_ts.map(Value::Integer).unwrap_or(Value::Null), + end_ts.map(Value::Integer).unwrap_or(Value::Null), + ]; + stmt.query(params_from_iter(params.iter()))? }; let mut items = Vec::new(); while let Some(row) = rows.next()? { - items.push(TokenUsageSummary { - model: row.get(0)?, - input_tokens: row.get::<_, i64>(1)?.max(0), - cached_input_tokens: row.get::<_, i64>(2)?.max(0), - output_tokens: row.get::<_, i64>(3)?.max(0), - reasoning_output_tokens: row.get::<_, i64>(4)?.max(0), - total_tokens: row.get::<_, i64>(5)?.max(0), - estimated_cost_usd: row.get::<_, f64>(6)?.max(0.0), - }); + items.push(map_token_usage_summary(row)?); } Ok(items) } @@ -413,8 +511,48 @@ impl Storage { &self, start_ts: Option, end_ts: Option, + ) -> Result> { + self.summarize_request_token_stats_by_key_and_model_filtered(start_ts, end_ts, None) + } + + pub fn summarize_request_token_stats_by_key_and_model_for_keys( + &self, + start_ts: Option, + end_ts: Option, + key_ids: &[String], + ) -> Result> { + self.summarize_request_token_stats_by_key_and_model_filtered( + start_ts, + end_ts, + Some(key_ids), + ) + } + + fn summarize_request_token_stats_by_key_and_model_filtered( + &self, + start_ts: Option, + end_ts: Option, + key_ids: Option<&[String]>, + ) -> Result> { + let Some(key_ids) = key_ids else { + return self.query_request_token_stats_by_key_and_model(start_ts, end_ts, None); + }; + let Some(key_filter) = TempKeyIdFilter::create(self, key_ids)? else { + return Ok(Vec::new()); + }; + self.query_request_token_stats_by_key_and_model(start_ts, end_ts, Some(&key_filter)) + } + + fn query_request_token_stats_by_key_and_model( + &self, + start_ts: Option, + end_ts: Option, + key_filter: Option<&TempKeyIdFilter<'_>>, ) -> Result> { let include_rollups = start_ts.is_none() && end_ts.is_none(); + let key_filter_clause = key_filter + .map(|filter| filter.exists_clause("s.key_id")) + .unwrap_or_default(); let sql = if include_rollups { format!( "WITH all_stats AS ( @@ -441,37 +579,37 @@ impl Storage { FROM request_token_stat_rollups ) SELECT - key_id, - COALESCE(NULLIF(TRIM(model), ''), 'unknown') AS normalized_model, - IFNULL(SUM(input_tokens), 0) AS input_tokens, - IFNULL(SUM(cached_input_tokens), 0) AS cached_input_tokens, - IFNULL(SUM(output_tokens), 0) AS output_tokens, - IFNULL(SUM(reasoning_output_tokens), 0) AS reasoning_output_tokens, + s.key_id, + COALESCE(NULLIF(TRIM(s.model), ''), 'unknown') AS normalized_model, + IFNULL(SUM(s.input_tokens), 0) AS input_tokens, + IFNULL(SUM(s.cached_input_tokens), 0) AS cached_input_tokens, + IFNULL(SUM(s.output_tokens), 0) AS output_tokens, + IFNULL(SUM(s.reasoning_output_tokens), 0) AS reasoning_output_tokens, IFNULL(SUM({token_total}), 0) AS total_tokens, - IFNULL(SUM(estimated_cost_usd), 0.0) AS estimated_cost_usd - FROM all_stats - WHERE key_id IS NOT NULL AND TRIM(key_id) <> '' - GROUP BY key_id, normalized_model - ORDER BY total_tokens DESC, key_id ASC, normalized_model ASC", + IFNULL(SUM(s.estimated_cost_usd), 0.0) AS estimated_cost_usd + FROM all_stats s + WHERE s.key_id IS NOT NULL AND TRIM(s.key_id) <> ''{key_filter_clause} + GROUP BY s.key_id, normalized_model + ORDER BY total_tokens DESC, s.key_id ASC, normalized_model ASC", token_total = token_total_sql_expr(), ) } else { format!( "SELECT - key_id, - COALESCE(NULLIF(TRIM(model), ''), 'unknown') AS normalized_model, - IFNULL(SUM(input_tokens), 0) AS input_tokens, - IFNULL(SUM(cached_input_tokens), 0) AS cached_input_tokens, - IFNULL(SUM(output_tokens), 0) AS output_tokens, - IFNULL(SUM(reasoning_output_tokens), 0) AS reasoning_output_tokens, + s.key_id, + COALESCE(NULLIF(TRIM(s.model), ''), 'unknown') AS normalized_model, + IFNULL(SUM(s.input_tokens), 0) AS input_tokens, + IFNULL(SUM(s.cached_input_tokens), 0) AS cached_input_tokens, + IFNULL(SUM(s.output_tokens), 0) AS output_tokens, + IFNULL(SUM(s.reasoning_output_tokens), 0) AS reasoning_output_tokens, IFNULL(SUM({token_total}), 0) AS total_tokens, - IFNULL(SUM(estimated_cost_usd), 0.0) AS estimated_cost_usd - FROM request_token_stats - WHERE key_id IS NOT NULL AND TRIM(key_id) <> '' - AND (?1 IS NULL OR created_at >= ?1) - AND (?2 IS NULL OR created_at < ?2) - GROUP BY key_id, normalized_model - ORDER BY total_tokens DESC, key_id ASC, normalized_model ASC", + IFNULL(SUM(s.estimated_cost_usd), 0.0) AS estimated_cost_usd + FROM request_token_stats s + WHERE s.key_id IS NOT NULL AND TRIM(s.key_id) <> '' + AND (?1 IS NULL OR s.created_at >= ?1) + AND (?2 IS NULL OR s.created_at < ?2){key_filter_clause} + GROUP BY s.key_id, normalized_model + ORDER BY total_tokens DESC, s.key_id ASC, normalized_model ASC", token_total = token_total_sql_expr(), ) }; @@ -479,20 +617,15 @@ impl Storage { let mut rows = if include_rollups { stmt.query([])? } else { - stmt.query((start_ts, end_ts))? + let params = [ + start_ts.map(Value::Integer).unwrap_or(Value::Null), + end_ts.map(Value::Integer).unwrap_or(Value::Null), + ]; + stmt.query(params_from_iter(params.iter()))? }; let mut items = Vec::new(); while let Some(row) = rows.next()? { - items.push(ApiKeyModelTokenUsageSummary { - key_id: row.get(0)?, - model: row.get(1)?, - input_tokens: row.get::<_, i64>(2)?.max(0), - cached_input_tokens: row.get::<_, i64>(3)?.max(0), - output_tokens: row.get::<_, i64>(4)?.max(0), - reasoning_output_tokens: row.get::<_, i64>(5)?.max(0), - total_tokens: row.get::<_, i64>(6)?.max(0), - estimated_cost_usd: row.get::<_, f64>(7)?.max(0.0), - }); + items.push(map_api_key_model_token_usage_summary(row)?); } Ok(items) } @@ -751,3 +884,7 @@ impl Storage { Ok(()) } } + +#[cfg(test)] +#[path = "tests/request_token_stats_tests.rs"] +mod tests; diff --git a/crates/core/src/storage/tests/api_keys_tests.rs b/crates/core/src/storage/tests/api_keys_tests.rs new file mode 100644 index 000000000..0a12df417 --- /dev/null +++ b/crates/core/src/storage/tests/api_keys_tests.rs @@ -0,0 +1,77 @@ +use super::{ApiKey, Storage}; + +/// 函数 `make_test_api_key` +/// +/// 作者: gaohongshun +/// +/// 时间: 2026-05-28 +/// +/// # 参数 +/// - index: 参数 index +/// +/// # 返回 +/// 返回函数执行结果 +fn make_test_api_key(index: usize) -> ApiKey { + ApiKey { + id: format!("key-{index:04}"), + name: Some(format!("Key {index}")), + model_slug: Some("gpt-5".to_string()), + reasoning_effort: Some("medium".to_string()), + service_tier: Some("priority".to_string()), + rotation_strategy: "account_rotation".to_string(), + aggregate_api_id: None, + account_plan_filter: None, + aggregate_api_url: None, + client_type: "codex".to_string(), + protocol_type: "openai_compat".to_string(), + auth_scheme: "authorization_bearer".to_string(), + upstream_base_url: None, + static_headers_json: None, + key_hash: format!("hash-{index:04}"), + status: "active".to_string(), + created_at: index as i64, + last_used_at: Some(index as i64), + } +} + +/// 函数 `large_key_sets_are_chunked_for_api_key_and_quota_queries` +/// +/// 作者: gaohongshun +/// +/// 时间: 2026-05-28 +/// +/// # 参数 +/// 无 +/// +/// # 返回 +/// 无 +#[test] +fn large_key_sets_are_chunked_for_api_key_and_quota_queries() { + let storage = Storage::open_in_memory().expect("open"); + storage.init().expect("init"); + + let mut selected = Vec::new(); + for index in 0..901 { + let key = make_test_api_key(index); + selected.push(key.id.clone()); + storage.insert_api_key(&key).expect("insert api key"); + storage + .upsert_api_key_quota_limit(&key.id, Some(1000 + index as i64)) + .expect("insert quota limit"); + } + + let requested = selected.iter().rev().cloned().collect::>(); + let keys = storage + .list_api_keys_for_ids(&requested) + .expect("list api keys"); + assert_eq!(keys.len(), selected.len()); + assert_eq!(keys.first().map(|item| item.id.as_str()), Some("key-0900")); + assert_eq!(keys.last().map(|item| item.id.as_str()), Some("key-0000")); + + let quota_limits = storage + .list_api_key_quota_limits_for_ids(&requested) + .expect("list quota limits"); + assert_eq!(quota_limits.len(), selected.len()); + assert_eq!(quota_limits.get("key-0000"), Some(&1000)); + assert_eq!(quota_limits.get("key-0900"), Some(&1900)); +} diff --git a/crates/core/src/storage/tests/request_logs_tests.rs b/crates/core/src/storage/tests/request_logs_tests.rs index a098fc5d8..5a8931221 100644 --- a/crates/core/src/storage/tests/request_logs_tests.rs +++ b/crates/core/src/storage/tests/request_logs_tests.rs @@ -480,3 +480,90 @@ fn request_logs_support_time_range_filters() { assert_eq!(summary.count, 2); assert_eq!(summary.total_tokens, 20); } + +#[test] +fn request_logs_for_empty_key_sets_return_empty_results() { + let storage = Storage::open_in_memory().expect("open"); + storage.init().expect("init"); + let empty_keys = vec![" ".to_string(), String::new()]; + + let logs = storage + .list_request_logs_paginated_for_keys(None, None, None, None, 0, 20, &empty_keys) + .expect("list logs for empty keys"); + assert!(logs.is_empty()); + + let total = storage + .count_request_logs_for_keys(None, None, None, None, &empty_keys) + .expect("count logs for empty keys"); + assert_eq!(total, 0); + + let filtered = storage + .summarize_request_logs_filtered_for_keys(None, None, None, None, &empty_keys) + .expect("summarize logs for empty keys"); + assert_eq!(filtered.count, 0); + assert_eq!(filtered.total_tokens, 0); + + let today = storage + .summarize_request_logs_between_for_keys(0, 10_000, &empty_keys) + .expect("summarize today for empty keys"); + assert_eq!(today.input_tokens, 0); + assert_eq!(today.estimated_cost_usd, 0.0); +} + +#[test] +fn request_logs_for_large_key_sets_use_temp_filter() { + let storage = Storage::open_in_memory().expect("open"); + storage.init().expect("init"); + + let request_log_id = storage + .insert_request_log(&RequestLog { + trace_id: Some("trc-large-key-filter".to_string()), + key_id: Some("key-0949".to_string()), + account_id: Some("acc-large-key-filter".to_string()), + request_path: "/v1/responses".to_string(), + method: "POST".to_string(), + status_code: Some(200), + created_at: 5_000, + ..Default::default() + }) + .expect("insert request log"); + storage + .insert_request_token_stat(&RequestTokenStat { + request_log_id, + key_id: Some("key-0949".to_string()), + account_id: Some("acc-large-key-filter".to_string()), + model: Some("gpt-5".to_string()), + input_tokens: Some(30), + cached_input_tokens: Some(5), + output_tokens: Some(10), + total_tokens: Some(40), + reasoning_output_tokens: Some(2), + estimated_cost_usd: Some(0.04), + created_at: 5_000, + }) + .expect("insert token stat"); + + // More than SQLITE_IN_CLAUSE_BATCH_SIZE keys forces the shared temp-table + // path, preventing SQLite host-parameter overflows on member dashboards. + let key_ids = (0..950) + .map(|index| format!("key-{index:04}")) + .collect::>(); + + let total = storage + .count_request_logs_for_keys(None, None, None, None, &key_ids) + .expect("count logs for large key set"); + assert_eq!(total, 1); + + let logs = storage + .list_request_logs_paginated_for_keys(None, None, None, None, 0, 20, &key_ids) + .expect("list logs for large key set"); + assert_eq!(logs.len(), 1); + assert_eq!(logs[0].trace_id.as_deref(), Some("trc-large-key-filter")); + + let summary = storage + .summarize_request_logs_between_for_keys(4_000, 6_000, &key_ids) + .expect("summarize today for large key set"); + assert_eq!(summary.input_tokens, 30); + assert_eq!(summary.output_tokens, 10); + assert_eq!(summary.estimated_cost_usd, 0.04); +} diff --git a/crates/core/src/storage/tests/request_token_stats_tests.rs b/crates/core/src/storage/tests/request_token_stats_tests.rs new file mode 100644 index 000000000..4a45c6627 --- /dev/null +++ b/crates/core/src/storage/tests/request_token_stats_tests.rs @@ -0,0 +1,273 @@ +use rusqlite::params; + +use super::{RequestTokenStat, Storage}; + +/// 函数 `insert_rollup_row` +/// +/// 作者: gaohongshun +/// +/// 时间: 2026-05-28 +/// +/// # 参数 +/// - storage: 参数 storage +/// - key_id: 参数 key_id +/// - account_id: 参数 account_id +/// - model: 参数 model +/// - total_tokens: 参数 total_tokens +/// - estimated_cost_usd: 参数 estimated_cost_usd +/// - updated_at: 参数 updated_at +/// +/// # 返回 +/// 无 +fn insert_rollup_row( + storage: &Storage, + key_id: &str, + account_id: &str, + model: &str, + input_tokens: i64, + cached_input_tokens: i64, + output_tokens: i64, + total_tokens: i64, + reasoning_output_tokens: i64, + estimated_cost_usd: f64, + source_rows: i64, + updated_at: i64, +) { + storage + .conn + .execute( + "INSERT INTO request_token_stat_rollups ( + key_id, + account_id, + model, + input_tokens, + cached_input_tokens, + output_tokens, + total_tokens, + reasoning_output_tokens, + estimated_cost_usd, + source_rows, + updated_at + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)", + params![ + key_id, + account_id, + model, + input_tokens, + cached_input_tokens, + output_tokens, + total_tokens, + reasoning_output_tokens, + estimated_cost_usd, + source_rows, + updated_at, + ], + ) + .expect("insert rollup row"); +} + +/// 函数 `assert_float_close` +/// +/// 作者: gaohongshun +/// +/// 时间: 2026-05-28 +/// +/// # 参数 +/// - left: 参数 left +/// - right: 参数 right +/// +/// # 返回 +/// 无 +fn assert_float_close(left: f64, right: f64) { + assert!( + (left - right).abs() < 1e-9, + "expected {left} to be close to {right}" + ); +} + +/// 函数 `summaries_for_selected_keys_include_rollups_and_respect_time_ranges` +/// +/// 作者: gaohongshun +/// +/// 时间: 2026-05-28 +/// +/// # 参数 +/// 无 +/// +/// # 返回 +/// 无 +#[test] +fn summaries_for_selected_keys_include_rollups_and_respect_time_ranges() { + let storage = Storage::open_in_memory().expect("open"); + storage.init().expect("init"); + + // 明细行和 rollup 行分属不同 key,便于验证过滤是否真的落在数据库层。 + storage + .insert_request_token_stat(&RequestTokenStat { + request_log_id: 1, + key_id: Some("key-a".to_string()), + account_id: Some("acc-a".to_string()), + model: Some("gpt-5".to_string()), + input_tokens: Some(10), + cached_input_tokens: Some(1), + output_tokens: Some(2), + total_tokens: Some(12), + reasoning_output_tokens: Some(3), + estimated_cost_usd: Some(0.10), + created_at: 100, + }) + .expect("insert raw key a"); + storage + .insert_request_token_stat(&RequestTokenStat { + request_log_id: 2, + key_id: Some("key-b".to_string()), + account_id: Some("acc-b".to_string()), + model: Some("gpt-5-mini".to_string()), + input_tokens: Some(20), + cached_input_tokens: Some(0), + output_tokens: Some(0), + total_tokens: Some(20), + reasoning_output_tokens: Some(0), + estimated_cost_usd: Some(0.20), + created_at: 110, + }) + .expect("insert raw key b"); + + // Rollup 只写入 key-a,用来验证无时间范围时会把 rollup 一并纳入。 + insert_rollup_row( + &storage, "key-a", "acc-a", "gpt-5", 5, 0, 0, 5, 0, 0.05, 1, 999, + ); + + let selected = vec!["key-a".to_string()]; + let by_key = storage + .summarize_request_token_stats_by_key_for_keys(&selected) + .expect("summarize by key"); + assert_eq!(by_key.len(), 1); + assert_eq!(by_key[0].key_id, "key-a"); + assert_eq!(by_key[0].total_tokens, 17); + assert_float_close(by_key[0].estimated_cost_usd, 0.15); + + let by_model = storage + .summarize_request_token_stats_by_model_for_keys(None, None, &selected) + .expect("summarize by model"); + assert_eq!(by_model.len(), 1); + assert_eq!(by_model[0].model, "gpt-5"); + assert_eq!(by_model[0].total_tokens, 17); + + let by_key_and_model = storage + .summarize_request_token_stats_by_key_and_model_for_keys(Some(90), Some(110), &selected) + .expect("summarize by key and model"); + assert_eq!(by_key_and_model.len(), 1); + assert_eq!(by_key_and_model[0].key_id, "key-a"); + assert_eq!(by_key_and_model[0].model, "gpt-5"); + assert_eq!(by_key_and_model[0].total_tokens, 12); + assert_float_close(by_key_and_model[0].estimated_cost_usd, 0.10); +} + +#[test] +fn summaries_for_empty_key_lists_return_empty_results() { + let storage = Storage::open_in_memory().expect("open"); + storage.init().expect("init"); + + storage + .insert_request_token_stat(&RequestTokenStat { + request_log_id: 1, + key_id: Some("key-a".to_string()), + account_id: Some("acc-a".to_string()), + model: Some("gpt-5".to_string()), + input_tokens: Some(10), + cached_input_tokens: Some(0), + output_tokens: Some(5), + total_tokens: Some(15), + reasoning_output_tokens: Some(0), + estimated_cost_usd: Some(0.10), + created_at: 100, + }) + .expect("insert raw key a"); + + let empty = Vec::::new(); + assert!(storage + .summarize_request_token_stats_by_key_for_keys(&empty) + .expect("summarize by key") + .is_empty()); + assert!(storage + .summarize_request_token_stats_by_model_for_keys(None, None, &empty) + .expect("summarize by model") + .is_empty()); + assert!(storage + .summarize_request_token_stats_by_key_and_model_for_keys(None, None, &empty) + .expect("summarize by key and model") + .is_empty()); +} + +/// 函数 `summaries_for_large_key_lists_use_temp_filter` +/// +/// 作者: gaohongshun +/// +/// 时间: 2026-05-28 +/// +/// # 参数 +/// 无 +/// +/// # 返回 +/// 无 +#[test] +fn summaries_for_large_key_lists_use_temp_filter() { + let storage = Storage::open_in_memory().expect("open"); + storage.init().expect("init"); + + let mut selected = Vec::new(); + for index in 0..901 { + let key_id = format!("key-{index:04}"); + selected.push(key_id.clone()); + storage + .insert_request_token_stat(&RequestTokenStat { + request_log_id: index as i64 + 1, + key_id: Some(key_id), + account_id: Some(format!("acc-{index:04}")), + model: Some("gpt-5".to_string()), + input_tokens: Some(1), + cached_input_tokens: Some(0), + output_tokens: Some(0), + total_tokens: Some(1), + reasoning_output_tokens: Some(0), + estimated_cost_usd: Some(0.01), + created_at: 1_000 + index as i64, + }) + .expect("insert request token stat"); + } + + let by_key = storage + .summarize_request_token_stats_by_key_for_keys(&selected) + .expect("summarize by key"); + assert_eq!(by_key.len(), selected.len()); + assert_eq!( + by_key.first().map(|item| item.key_id.as_str()), + Some("key-0000") + ); + assert_eq!( + by_key.last().map(|item| item.key_id.as_str()), + Some("key-0900") + ); + + let by_model = storage + .summarize_request_token_stats_by_model_for_keys(None, None, &selected) + .expect("summarize by model"); + assert_eq!(by_model.len(), 1); + assert_eq!(by_model[0].model, "gpt-5"); + assert_eq!(by_model[0].total_tokens, selected.len() as i64); + assert_float_close(by_model[0].estimated_cost_usd, 9.01); + + let by_key_and_model = storage + .summarize_request_token_stats_by_key_and_model_for_keys(None, None, &selected) + .expect("summarize by key and model"); + assert_eq!(by_key_and_model.len(), selected.len()); + assert_eq!( + by_key_and_model.first().map(|item| item.key_id.as_str()), + Some("key-0000") + ); + assert_eq!( + by_key_and_model.last().map(|item| item.key_id.as_str()), + Some("key-0900") + ); +} diff --git a/crates/service/src/account/account_list.rs b/crates/service/src/account/account_list.rs index 1821d01e7..d0e02f62a 100644 --- a/crates/service/src/account/account_list.rs +++ b/crates/service/src/account/account_list.rs @@ -20,6 +20,57 @@ enum AccountFilter { Low, } +#[derive(Debug)] +pub(crate) struct AccountSummaryContext { + pub items: Vec, + pub usage_snapshots: Vec, +} + +#[derive(Debug)] +struct AccountSummaryParts { + id: String, + label: String, + group_name: Option, + sort: i64, + status: String, +} + +impl From for AccountSummaryParts { + fn from(account: Account) -> Self { + Self { + id: account.id, + label: account.label, + group_name: account.group_name, + sort: account.sort, + status: account.status, + } + } +} + +#[derive(Debug)] +struct AccountSummarySetup { + preferred_account_id: Option, + status_reasons: HashMap, + tokens: HashMap, + usage_snapshots: Vec, + metadata: HashMap, + subscriptions: HashMap, + model_slugs_by_account: HashMap>, + quota_overrides: HashMap, +} + +impl From<&Account> for AccountSummaryParts { + fn from(account: &Account) -> Self { + Self { + id: account.id.clone(), + label: account.label.clone(), + group_name: account.group_name.clone(), + sort: account.sort, + status: account.status.clone(), + } + } +} + /// 函数 `read_accounts` /// /// 作者: gaohongshun @@ -300,7 +351,7 @@ fn filtered_accounts( /// # 返回 /// 返回函数执行结果 fn to_account_summary_with_reason( - acc: Account, + parts: AccountSummaryParts, preferred: bool, status_reason: Option, has_token: bool, @@ -317,12 +368,12 @@ fn to_account_summary_with_reason( quota_capacity_secondary_window_tokens: Option, ) -> AccountSummary { AccountSummary { - id: acc.id, - label: acc.label, - group_name: acc.group_name, + id: parts.id, + label: parts.label, + group_name: parts.group_name, preferred, - sort: acc.sort, - status: acc.status, + sort: parts.sort, + status: parts.status, status_reason, has_token, plan_type, @@ -351,6 +402,22 @@ fn to_account_summary_with_reason( /// /// # 返回 /// 返回函数执行结果 +pub(crate) fn build_account_summary_context( + storage: &codexmanager_core::storage::Storage, + accounts: &[Account], +) -> Result { + let account_ids = accounts + .iter() + .map(|account| account.id.clone()) + .collect::>(); + let setup = load_account_summary_setup(storage, &account_ids)?; + let items = build_account_summary_items(accounts.iter(), &setup); + Ok(AccountSummaryContext { + items, + usage_snapshots: setup.usage_snapshots, + }) +} + fn to_account_summaries( storage: &codexmanager_core::storage::Storage, accounts: Vec, @@ -359,11 +426,19 @@ fn to_account_summaries( .iter() .map(|account| account.id.clone()) .collect::>(); + let setup = load_account_summary_setup(storage, &account_ids)?; + Ok(build_account_summary_items(accounts, &setup)) +} + +fn load_account_summary_setup( + storage: &codexmanager_core::storage::Storage, + account_ids: &[String], +) -> Result { let preferred_account_id = storage .preferred_account_id() .map_err(|err| format!("load preferred account failed: {err}"))?; let status_reasons = storage - .latest_account_status_reasons(&account_ids) + .latest_account_status_reasons(account_ids) .map_err(|err| format!("load account status reasons failed: {err}"))?; let tokens = storage .list_tokens() @@ -371,12 +446,9 @@ fn to_account_summaries( .into_iter() .map(|token| (token.account_id.clone(), token)) .collect::>(); - let usages = storage + let usage_snapshots = storage .latest_usage_snapshots_by_account() - .map_err(|err| format!("load account usage snapshots failed: {err}"))? - .into_iter() - .map(|snapshot| (snapshot.account_id.clone(), snapshot)) - .collect::>(); + .map_err(|err| format!("load account usage snapshots failed: {err}"))?; let metadata = storage .list_account_metadata() .map_err(|err| format!("load account metadata failed: {err}"))? @@ -407,22 +479,47 @@ fn to_account_summaries( .into_iter() .map(|item| (item.account_id.clone(), item)) .collect::>(); - Ok(accounts + Ok(AccountSummarySetup { + preferred_account_id, + status_reasons, + tokens, + usage_snapshots, + metadata, + subscriptions, + model_slugs_by_account, + quota_overrides, + }) +} + +fn build_account_summary_items( + accounts: I, + setup: &AccountSummarySetup, +) -> Vec +where + I: IntoIterator, + A: Into, +{ + let usages = setup + .usage_snapshots + .iter() + .map(|snapshot| (snapshot.account_id.clone(), snapshot)) + .collect::>(); + accounts .into_iter() .map(|account| { map_account_summary( account, - preferred_account_id.as_deref(), - &status_reasons, - &tokens, + setup.preferred_account_id.as_deref(), + &setup.status_reasons, + &setup.tokens, &usages, - &metadata, - &subscriptions, - &model_slugs_by_account, - "a_overrides, + &setup.metadata, + &setup.subscriptions, + &setup.model_slugs_by_account, + &setup.quota_overrides, ) }) - .collect()) + .collect() } /// 函数 `map_account_summary` @@ -440,24 +537,34 @@ fn to_account_summaries( /// /// # 返回 /// 返回函数执行结果 -fn map_account_summary( - account: Account, +fn map_account_summary( + account: A, preferred_account_id: Option<&str>, status_reasons: &HashMap, tokens: &HashMap, - usages: &HashMap, + usages: &HashMap, metadata: &HashMap, subscriptions: &HashMap, model_slugs_by_account: &HashMap>, quota_overrides: &HashMap, -) -> AccountSummary { - let account_id = account.id.clone(); +) -> AccountSummary +where + A: Into, +{ + let account = account.into(); + let AccountSummaryParts { + id: account_id, + label, + group_name, + sort, + status, + } = account; let status_reason = status_reasons.get(&account_id).cloned(); let preferred = preferred_account_id.is_some_and(|id| id == account_id); let subscription = subscriptions.get(&account_id); let plan = resolve_effective_account_plan( tokens.get(&account_id), - usages.get(&account_id), + usages.get(&account_id).copied(), subscription, ); let has_token = tokens.contains_key(&account_id); @@ -474,7 +581,13 @@ fn map_account_summary( let subscription_plan = subscription.and_then(|value| value.plan_type.clone()); let plan_type = fallback_plan_type; to_account_summary_with_reason( - account, + AccountSummaryParts { + id: account_id, + label, + group_name, + sort, + status, + }, preferred, status_reason, has_token, diff --git a/crates/service/src/apikey/apikey_list.rs b/crates/service/src/apikey/apikey_list.rs index 68b6d18bc..f4172b0a1 100644 --- a/crates/service/src/apikey/apikey_list.rs +++ b/crates/service/src/apikey/apikey_list.rs @@ -1,5 +1,5 @@ use codexmanager_core::rpc::types::ApiKeySummary; -use std::collections::HashSet; +use codexmanager_core::storage::Storage; use crate::storage_helpers::open_storage; use crate::RpcActor; @@ -18,6 +18,27 @@ use crate::RpcActor; pub(crate) fn read_api_keys() -> Result, String> { // 读取平台 Key 列表 let storage = open_storage().ok_or_else(|| "open storage failed".to_string())?; + read_api_keys_with_storage(&storage) +} + +pub(crate) fn read_api_keys_for_ids(key_ids: &[String]) -> Result, String> { + let storage = open_storage().ok_or_else(|| "open storage failed".to_string())?; + read_api_keys_for_ids_with_storage(&storage, key_ids) +} + +pub(crate) fn read_api_keys_for_actor(actor: &RpcActor) -> Result, String> { + if actor.is_admin() { + return read_api_keys(); + } + let user_id = actor + .user_id + .as_deref() + .ok_or_else(|| "permission_denied: apikey requires user session".to_string())?; + let key_ids = crate::list_api_key_ids_for_user(user_id)?; + read_api_keys_for_ids(&key_ids) +} + +fn read_api_keys_with_storage(storage: &Storage) -> Result, String> { let keys = storage .list_api_keys() .map_err(|err| format!("list api keys failed: {err}"))?; @@ -26,43 +47,54 @@ pub(crate) fn read_api_keys() -> Result, String> { .map_err(|err| format!("list api key quota limits failed: {err}"))?; Ok(keys .into_iter() - .map(|key| ApiKeySummary { - quota_limit_tokens: quota_limits.get(&key.id).copied(), - id: key.id, - name: key.name, - model_slug: key.model_slug, - reasoning_effort: key.reasoning_effort, - service_tier: key.service_tier, - rotation_strategy: key.rotation_strategy, - aggregate_api_id: key.aggregate_api_id, - account_plan_filter: key.account_plan_filter, - aggregate_api_url: key.aggregate_api_url, - client_type: key.client_type, - protocol_type: key.protocol_type, - auth_scheme: key.auth_scheme, - upstream_base_url: key.upstream_base_url, - static_headers_json: key.static_headers_json, - status: key.status, - created_at: key.created_at, - last_used_at: key.last_used_at, + .map(|key| { + let quota_limit_tokens = quota_limits.get(&key.id).copied(); + map_api_key_summary(key, quota_limit_tokens) }) .collect()) } -pub(crate) fn read_api_keys_for_actor(actor: &RpcActor) -> Result, String> { - let items = read_api_keys()?; - if actor.is_admin() { - return Ok(items); - } - let user_id = actor - .user_id - .as_deref() - .ok_or_else(|| "permission_denied: apikey requires user session".to_string())?; - let owned_key_ids = crate::list_api_key_ids_for_user(user_id)? - .into_iter() - .collect::>(); - Ok(items +fn read_api_keys_for_ids_with_storage( + storage: &Storage, + key_ids: &[String], +) -> Result, String> { + let keys = storage + .list_api_keys_for_ids(key_ids) + .map_err(|err| format!("list api keys failed: {err}"))?; + let quota_limits = storage + .list_api_key_quota_limits_for_ids(key_ids) + .map_err(|err| format!("list api key quota limits failed: {err}"))?; + Ok(keys .into_iter() - .filter(|item| owned_key_ids.contains(&item.id)) + .map(|key| { + let quota_limit_tokens = quota_limits.get(&key.id).copied(); + map_api_key_summary(key, quota_limit_tokens) + }) .collect()) } + +fn map_api_key_summary( + key: codexmanager_core::storage::ApiKey, + quota_limit_tokens: Option, +) -> ApiKeySummary { + ApiKeySummary { + quota_limit_tokens, + id: key.id, + name: key.name, + model_slug: key.model_slug, + reasoning_effort: key.reasoning_effort, + service_tier: key.service_tier, + rotation_strategy: key.rotation_strategy, + aggregate_api_id: key.aggregate_api_id, + account_plan_filter: key.account_plan_filter, + aggregate_api_url: key.aggregate_api_url, + client_type: key.client_type, + protocol_type: key.protocol_type, + auth_scheme: key.auth_scheme, + upstream_base_url: key.upstream_base_url, + static_headers_json: key.static_headers_json, + status: key.status, + created_at: key.created_at, + last_used_at: key.last_used_at, + } +} diff --git a/crates/service/src/apikey/apikey_usage_stats.rs b/crates/service/src/apikey/apikey_usage_stats.rs index 53850bb1a..c2696f4ed 100644 --- a/crates/service/src/apikey/apikey_usage_stats.rs +++ b/crates/service/src/apikey/apikey_usage_stats.rs @@ -1,5 +1,4 @@ -use codexmanager_core::rpc::types::ApiKeyUsageStatSummary; -use std::collections::HashSet; +use codexmanager_core::{rpc::types::ApiKeyUsageStatSummary, storage::ApiKeyTokenUsageSummary}; use crate::storage_helpers::open_storage; use crate::RpcActor; @@ -21,32 +20,35 @@ pub(crate) fn read_api_key_usage_stats() -> Result, .summarize_request_token_stats_by_key() .map_err(|err| format!("summarize api key token stats failed: {err}"))?; - Ok(items - .into_iter() - .map(|item| ApiKeyUsageStatSummary { - key_id: item.key_id, - total_tokens: item.total_tokens.max(0), - estimated_cost_usd: item.estimated_cost_usd.max(0.0), - }) - .collect()) + Ok(map_api_key_usage_stats(items)) } pub(crate) fn read_api_key_usage_stats_for_actor( actor: &RpcActor, ) -> Result, String> { - let items = read_api_key_usage_stats()?; if actor.is_admin() { - return Ok(items); + return read_api_key_usage_stats(); } let user_id = actor .user_id .as_deref() .ok_or_else(|| "permission_denied: apikey usage requires user session".to_string())?; - let owned_key_ids = crate::list_api_key_ids_for_user(user_id)? - .into_iter() - .collect::>(); - Ok(items + let owned_key_ids = crate::list_api_key_ids_for_user(user_id)?; + let storage = open_storage().ok_or_else(|| "open storage failed".to_string())?; + let items = storage + .summarize_request_token_stats_by_key_for_keys(&owned_key_ids) + .map_err(|err| format!("summarize api key token stats failed: {err}"))?; + + Ok(map_api_key_usage_stats(items)) +} + +fn map_api_key_usage_stats(items: Vec) -> Vec { + items .into_iter() - .filter(|item| owned_key_ids.contains(&item.key_id)) - .collect()) + .map(|item| ApiKeyUsageStatSummary { + key_id: item.key_id, + total_tokens: item.total_tokens.max(0), + estimated_cost_usd: item.estimated_cost_usd.max(0.0), + }) + .collect() } diff --git a/crates/service/src/auth/app_manager.rs b/crates/service/src/auth/app_manager.rs index 44a94ec6c..4d36e4ec5 100644 --- a/crates/service/src/auth/app_manager.rs +++ b/crates/service/src/auth/app_manager.rs @@ -551,18 +551,9 @@ pub fn list_api_key_ids_for_user(user_id: &str) -> Result, String> { } crate::initialize_storage_if_needed()?; let storage = open_storage_or_error()?; - let mut key_ids = storage - .list_api_key_owners() - .map_err(|err| format!("list api key owners failed: {err}"))? - .into_values() - .filter(|owner| { - owner.owner_kind == "user" - && owner.owner_user_id.as_deref().map(str::trim) == Some(user_id) - }) - .map(|owner| owner.key_id) - .collect::>(); - key_ids.sort(); - Ok(key_ids) + storage + .list_api_key_ids_for_user(user_id) + .map_err(|err| format!("list api key ids for user failed: {err}")) } pub fn api_key_belongs_to_user(key_id: &str, user_id: &str) -> Result { diff --git a/crates/service/src/dashboard.rs b/crates/service/src/dashboard.rs index a8c1c3cbd..619c07aa4 100644 --- a/crates/service/src/dashboard.rs +++ b/crates/service/src/dashboard.rs @@ -392,11 +392,7 @@ pub(crate) fn read_member_dashboard_summary( }; let key_ids = crate::list_api_key_ids_for_user(&user_id)?; - let key_id_set = key_ids.iter().cloned().collect::>(); - let api_keys = apikey_list::read_api_keys()? - .into_iter() - .filter(|key| key_id_set.contains(&key.id)) - .collect::>(); + let api_keys = apikey_list::read_api_keys_for_ids(&key_ids)?; let api_key_summary = build_api_key_summary(&api_keys); let wallet = read_member_wallet(&user_id)?; @@ -422,7 +418,7 @@ pub(crate) fn read_member_dashboard_summary( let usage_trend_7d = read_usage_trend_7d(&user_id, day_start, day_end)?; let (top_keys, top_models) = - read_member_usage_breakdown(&api_keys, &key_id_set, day_start, day_end)?; + read_member_usage_breakdown(&api_keys, &key_ids, day_start, day_end)?; let available_models = read_available_models_with_price_summary()?; let recent_logs = requestlog_list::read_request_log_page_for_key_ids( RequestLogListParams { @@ -598,30 +594,32 @@ fn read_usage_trend_7d( fn read_member_usage_breakdown( api_keys: &[ApiKeySummary], - key_id_set: &HashSet, + key_ids: &[String], day_start: i64, day_end: i64, ) -> Result<(Vec, Vec), String> { let storage = storage_helpers::open_storage().ok_or_else(|| "open storage failed".to_string())?; let today_usage = storage - .summarize_request_token_stats_by_key_and_model(Some(day_start), Some(day_end)) + .summarize_request_token_stats_by_key_and_model_for_keys( + Some(day_start), + Some(day_end), + key_ids, + ) .map_err(|err| format!("summarize today key usage failed: {err}"))?; let total_usage = storage - .summarize_request_token_stats_by_key() + .summarize_request_token_stats_by_key_for_keys(key_ids) .map_err(|err| format!("summarize key usage failed: {err}"))?; let seven_day_usage = storage - .summarize_request_token_stats_by_key_and_model( + .summarize_request_token_stats_by_key_and_model_for_keys( Some(day_start.saturating_sub((TREND_DAYS - 1) * (day_end - day_start).max(1))), Some(day_end), + key_ids, ) .map_err(|err| format!("summarize model usage failed: {err}"))?; let mut today_by_key: HashMap = HashMap::new(); - for item in today_usage - .into_iter() - .filter(|item| key_id_set.contains(&item.key_id)) - { + for item in today_usage.into_iter() { let entry = today_by_key.entry(item.key_id).or_insert((0, 0.0)); entry.0 = entry.0.saturating_add(item.total_tokens.max(0)); entry.1 += item.estimated_cost_usd.max(0.0); @@ -629,7 +627,6 @@ fn read_member_usage_breakdown( let total_by_key = total_usage .into_iter() - .filter(|item| key_id_set.contains(&item.key_id)) .map(|item| { ( item.key_id, @@ -667,10 +664,7 @@ fn read_member_usage_breakdown( top_keys.truncate(MEMBER_TOP_KEY_LIMIT); let mut model_usage = BTreeMap::::new(); - for item in seven_day_usage - .into_iter() - .filter(|item| key_id_set.contains(&item.key_id)) - { + for item in seven_day_usage.into_iter() { let entry = model_usage.entry(item.model).or_insert((0, 0.0)); entry.0 = entry.0.saturating_add(item.total_tokens.max(0)); entry.1 += item.estimated_cost_usd.max(0.0); diff --git a/crates/service/src/startup_snapshot.rs b/crates/service/src/startup_snapshot.rs index 2ea1d4b62..29b1af423 100644 --- a/crates/service/src/startup_snapshot.rs +++ b/crates/service/src/startup_snapshot.rs @@ -1,10 +1,8 @@ -use codexmanager_core::rpc::types::{ - AccountListParams, StartupSnapshotResult, UsageAggregateSummaryResult, -}; +use codexmanager_core::rpc::types::{StartupSnapshotResult, UsageAggregateSummaryResult}; use crate::{ account_list, apikey_list, apikey_models, gateway, requestlog_list, requestlog_today_summary, - usage_aggregate, usage_list, RpcActor, + storage_helpers, usage_aggregate, RpcActor, }; /// 函数 `read_startup_snapshot` @@ -23,9 +21,21 @@ pub(crate) fn read_startup_snapshot( day_start_ts: Option, day_end_ts: Option, ) -> Result { - let accounts = account_list::read_accounts(AccountListParams::default(), false)?.items; - let usage_snapshots = usage_list::read_usage_snapshots()?; - let usage_aggregate_summary = usage_aggregate::read_usage_aggregate_summary()?; + let storage = + storage_helpers::open_storage().ok_or_else(|| "open storage failed".to_string())?; + let accounts = storage + .list_accounts() + .map_err(|err| format!("list accounts failed: {err}"))?; + let account_context = account_list::build_account_summary_context(&storage, &accounts)?; + let usage_aggregate_summary = usage_aggregate::compute_usage_aggregate_summary( + &accounts, + &account_context.usage_snapshots, + ); + let usage_snapshots = account_context + .usage_snapshots + .into_iter() + .map(crate::usage_read::usage_snapshot_result_from_record) + .collect(); let api_keys = apikey_list::read_api_keys()?; let api_models = apikey_models::read_model_options(false)?; let manual_preferred_account_id = gateway::manual_preferred_account(); @@ -34,7 +44,7 @@ pub(crate) fn read_startup_snapshot( let request_logs = requestlog_list::read_request_logs(None, request_log_limit)?; Ok(StartupSnapshotResult { - accounts, + accounts: account_context.items, usage_snapshots, usage_aggregate_summary, api_keys, @@ -59,7 +69,7 @@ pub(crate) fn read_startup_snapshot_for_actor( .as_deref() .ok_or_else(|| "permission_denied: startup requires user session".to_string())?; let key_ids = crate::list_api_key_ids_for_user(user_id)?; - let api_keys = apikey_list::read_api_keys_for_actor(actor)?; + let api_keys = apikey_list::read_api_keys_for_ids(&key_ids)?; let api_models = apikey_models::read_model_options(false)?; let request_log_today_summary = requestlog_today_summary::read_requestlog_today_summary_for_key_ids( diff --git a/crates/service/src/tests/lib_tests.rs b/crates/service/src/tests/lib_tests.rs index 3bb5c1f8f..d063ca5d2 100644 --- a/crates/service/src/tests/lib_tests.rs +++ b/crates/service/src/tests/lib_tests.rs @@ -947,6 +947,62 @@ fn member_cannot_read_or_mutate_other_user_api_key() { let _ = std::fs::remove_file(db_path); } +#[test] +fn member_api_key_usage_stats_filter_to_owned_keys() { + let _guard = test_env_guard(); + let db_path = setup_dashboard_test_db("codexmanager-member-apikey-usage-filter"); + let day_start = 1_700_000_000; + let user_one = create_test_member("apikey-usage-one", Some(2_000_000)); + let user_two = create_test_member("apikey-usage-two", Some(2_000_000)); + let key_one = create_owned_test_api_key(&user_one.id, "usage one key", "gpt-5-mini"); + let key_two = create_owned_test_api_key(&user_two.id, "usage two key", "gpt-5"); + + insert_test_request_log( + &key_one, + "trace-usage-one", + "gpt-5-mini", + 200, + 80, + 10, + 40, + 0.08, + day_start + 10, + ); + insert_test_request_log( + &key_two, + "trace-usage-two", + "gpt-5", + 200, + 800, + 0, + 500, + 0.8, + day_start + 20, + ); + + let member_stats = response_result(handle_request_with_actor( + rpc_request("apikey/usageStats", serde_json::json!({})), + RpcActor::from_parts(Some(ROLE_MEMBER), Some(&user_one.id)), + )); + assert!( + member_stats.result.get("error").is_none(), + "{:?}", + member_stats.result + ); + let member_items = member_stats.result["items"].as_array().unwrap(); + assert_eq!(member_items.len(), 1); + assert_eq!(member_items[0]["keyId"], key_one); + assert_eq!(member_items[0]["totalTokens"], 120); + + let admin_stats = response_result(handle_request_with_actor( + rpc_request("apikey/usageStats", serde_json::json!({})), + RpcActor::system_admin(), + )); + assert_eq!(admin_stats.result["items"].as_array().unwrap().len(), 2); + + let _ = std::fs::remove_file(db_path); +} + #[test] fn member_created_api_key_ignores_admin_only_routing_fields() { let _guard = test_env_guard();