Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 90 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@
//! | GET | /audit/export | Tenant-scoped audit log export |

use std::env;
use std::time::{SystemTime, UNIX_EPOCH};
use std::sync::{Arc, Mutex};
use std::time::{Instant, SystemTime, UNIX_EPOCH};

use axum::body::Bytes;
use axum::extract::{Query, State};
use axum::extract::{Query, Request, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::Json;
use axum::middleware::{self, Next};
use axum::response::{IntoResponse, Json, Response};
use axum::routing::{get, post};
use axum::Router;
use serde::{Deserialize, Serialize};
Expand All @@ -60,6 +62,13 @@ mod nonce;
/// timestamps caused by client clock skew or deliberate manipulation.
const REPLAY_WINDOW_SECONDS: i64 = 30;

/// Maximum requests served per rolling one-second window across the whole
/// proxy. Generous enough not to interfere with a real incident burst, low
/// enough to shed an unauthenticated flood before it reaches HMAC verification
/// (a CPU-DoS vector even though forgery is infeasible). Global rather than
/// per-IP so it needs no client-IP plumbing through the two serve paths.
const RATE_LIMIT_PER_SECOND: u32 = 100;

/// Application state. Cloned into every request handler, so anything
/// inside must be cheap to clone (Arc-wrapped data, primitives, etc.).
#[derive(Clone)]
Expand All @@ -86,6 +95,35 @@ struct AppState {
/// the last entry on disk so restarts do not break the chain.
/// See `audit::ChainState`.
audit_chain: audit::ChainState,

/// Fixed-window global rate-limit counter: (window start, count in
/// window). Shared across handlers to shed request floods. See
/// `rate_limit` / `rate_check`.
rate: Arc<Mutex<(Instant, u32)>>,
Comment on lines +99 to +102
}

/// Pure rate-limit decision over a fixed one-second window. Extracted from
/// the middleware so it is unit-testable without spinning up the server.
/// Mutates the window in place and returns true if the request is allowed.
fn rate_check(window: &mut (Instant, u32), now: Instant, limit: u32) -> bool {
if now.duration_since(window.0).as_secs() >= 1 {
*window = (now, 0);
}
window.1 += 1;
window.1 <= limit
}

/// Axum middleware applying the global fixed-window rate limit. Returns
/// 429 Too Many Requests once the per-second budget is exhausted.
async fn rate_limit(State(state): State<AppState>, req: Request, next: Next) -> Response {
let allowed = {
let mut window = state.rate.lock().expect("rate-limit mutex poisoned");
rate_check(&mut window, Instant::now(), RATE_LIMIT_PER_SECOND)
};
if !allowed {
return (StatusCode::TOO_MANY_REQUESTS, "rate limit exceeded").into_response();
}
next.run(req).await
}

/// Request payload for `POST /execute`.
Expand Down Expand Up @@ -456,12 +494,14 @@ async fn main() {
nonces: nonce::NonceStore::new(),
edr,
audit_chain,
rate: Arc::new(Mutex::new((Instant::now(), 0))),
};

let app = Router::new()
.route("/health", get(health))
.route("/execute", post(execute))
.route("/audit/export", get(export_audit))
.layer(middleware::from_fn_with_state(state.clone(), rate_limit))
Comment on lines 500 to +504
.with_state(state);

// Bind address is configurable so the container/host can override
Expand Down Expand Up @@ -490,7 +530,25 @@ async fn main() {
(None, None) => {
// Plain HTTP. Intended for deployment behind a TLS-terminating
// reverse proxy (Cloudflare Tunnel, Caddy, nginx). NOT for
// direct internet exposure.
// direct internet exposure. Warn loudly if we're binding a
// non-loopback address in cleartext without an explicit
// acknowledgement — that exposes signed-but-cleartext containment
// traffic if there is no proxy in front. (Warn, not panic: plain
// HTTP on 0.0.0.0 behind a reverse proxy is a supported setup.)
let allow_insecure = parse_bool_env("ALLOW_INSECURE", false);
let is_loopback = bind_addr
.parse::<std::net::SocketAddr>()
.map(|a| a.ip().is_loopback())
.unwrap_or(false);
Comment on lines +539 to +542
if !is_loopback && !allow_insecure {
warn!(
addr = %bind_addr,
"binding plain HTTP to a non-loopback address: containment \
traffic is cleartext unless a TLS-terminating reverse proxy \
sits in front. Set TLS_CERT_PATH/TLS_KEY_PATH, bind 127.0.0.1, \
or set ALLOW_INSECURE=true to acknowledge."
);
}
info!(addr = %bind_addr, tls = false, dry_run, "vyrox proxy starting (plain HTTP)");
let listener = tokio::net::TcpListener::bind(&bind_addr)
.await
Expand All @@ -500,3 +558,31 @@ async fn main() {
_ => panic!("TLS_CERT_PATH and TLS_KEY_PATH must both be set, or both unset"),
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;

#[test]
fn rate_check_allows_up_to_limit_then_blocks() {
let now = Instant::now();
let mut window = (now, 0u32);
assert!(rate_check(&mut window, now, 3));
assert!(rate_check(&mut window, now, 3));
assert!(rate_check(&mut window, now, 3));
// The 4th request in the same one-second window is rejected.
assert!(!rate_check(&mut window, now, 3));
}

#[test]
fn rate_check_resets_after_one_second_window() {
let start = Instant::now();
let mut window = (start, 0u32);
assert!(rate_check(&mut window, start, 1));
assert!(!rate_check(&mut window, start, 1)); // 2nd in window blocked
// A request more than a second later opens a fresh window.
let later = start + Duration::from_secs(2);
assert!(rate_check(&mut window, later, 1));
}
}