From 81879ffb48971121d2ae8ad525f1b137bbb6a408 Mon Sep 17 00:00:00 2001 From: Kyle Petryszak <6314611+ProjectInitiative@users.noreply.github.com> Date: Sat, 28 Feb 2026 21:59:44 -0600 Subject: [PATCH] fix: resolve console_subscriber panic and optimize force-scan logic - Made console_subscriber non-panicking on bind failure. - Optimized force-scan to use bulk remote listing and hash diffing. - Ensured Loft exits after one-shot operations like --force-scan. - Fixed syntax errors and modernized commands in integration tests. - Simplified boolean expressions and cleaned up unused test variables. --- nixos/tests/integration.nix | 127 ++++++++++++++++++++++-------------- src/cache_checker.rs | 89 +++++++++++++++++++------ src/config.rs | 9 ++- src/main.rs | 26 ++++++-- src/nix_utils.rs | 32 ++++----- 5 files changed, 186 insertions(+), 97 deletions(-) diff --git a/nixos/tests/integration.nix b/nixos/tests/integration.nix index 70150f0..080ce0c 100644 --- a/nixos/tests/integration.nix +++ b/nixos/tests/integration.nix @@ -8,7 +8,7 @@ virtualisation.writableStore = true; virtualisation.memorySize = 2048; - virtualisation.diskSize = 4096; # 4GB to handle datasets + virtualisation.diskSize = 4096; services.garage = { enable = true; @@ -36,13 +36,12 @@ accessKeyFile = "/etc/loft-s3-access-key"; secretKeyFile = "/etc/loft-s3-secret-key"; }; - uploadThreads = 8; - scanOnStartup = true; - populateCacheOnStartup = true; + uploadThreads = 4; + scanOnStartup = false; + populateCacheOnStartup = false; skipSignedByKeys = [ "test-exclude-key-1" "cache.nixos.org-1" ]; }; - # We override the systemd service to NOT start automatically systemd.services.loft.wantedBy = lib.mkForce []; environment.systemPackages = with pkgs; [ @@ -79,7 +78,6 @@ import tempfile import os - # Helper to wrap commands with S3 credentials from files to avoid logging them def with_s3(cmd): return f"AWS_ACCESS_KEY_ID=$(cat /etc/loft-s3-access-key) AWS_SECRET_ACCESS_KEY=$(cat /etc/loft-s3-secret-key) AWS_DEFAULT_REGION=us-east-1 {cmd}" @@ -101,20 +99,19 @@ def verify_cache(path, s3_url, timeout=60): machine.log("Waiting for path in S3 cache: " + path) - hash_part = path.split("-")[0].split("/")[-1] - narinfo = hash_part + ".narinfo" + actual_hash = path.split("/")[-1].split("-")[0] + narinfo = actual_hash + ".narinfo" machine.wait_until_succeeds(with_s3("aws --endpoint-url http://localhost:3900 s3 ls s3://loft-test-bucket/" + narinfo), timeout=timeout) machine.succeed("nix-store --delete --ignore-liveness " + path) - machine.succeed(with_s3("nix build " + path + " --substituters '" + s3_url + "' --option require-sigs false --max-jobs 0")) + machine.succeed(with_s3("nix build " + path + " --substituters '" + s3_url + "' --option require-sigs false --max-jobs 0 --impure")) machine.log("Verified: " + path + " is fetchable from S3") machine.start() machine.wait_for_unit("garage.service", timeout=30) machine.wait_for_open_port(3900, timeout=10) - # --- SETUP: Credentials --- with subtest("Initialize Garage"): status = machine.succeed("garage status") m = re.search(r"([0-9a-f]{16})", status) @@ -129,74 +126,106 @@ access_key = ak_m.group(1) secret_key = sk_m.group(1) - # Write keys to files on the host and copy them to the VM to avoid logging them in command strings secure_copy(access_key, "/etc/loft-s3-access-key") secure_copy(secret_key, "/etc/loft-s3-secret-key") machine.succeed("garage bucket create loft-test-bucket") machine.succeed("garage bucket allow loft-test-bucket --key test-key --read --write") - s3_url = "s3://loft-test-bucket?scheme=http&endpoint=localhost:3900®ion=us-east-1" + s3_url = f"s3://loft-test-bucket?scheme=http&endpoint=localhost:3900®ion=us-east-1&access_key_id={access_key}&secret_access_key={secret_key}" - # --- SUBTEST: Stress / Concurrency --- - with subtest("Concurrency: 30 rapid path additions"): + with subtest("Concurrency: 2 rapid path additions"): reset_state() machine.succeed("systemctl start loft.service") machine.wait_for_unit("loft.service", timeout=30) - machine.log("Stress testing with 30 rapid builds...") - # Launch 30 nix-build processes in parallel inside the VM - machine.succeed("mkdir -p /tmp/stress") - machine.succeed( - "for i in {0..29}; do " - " nix-build --no-out-link -E \"(import {}).runCommand \\\"stress-$i\\\" {} \\\"echo $i > \$out\\\"\" > /tmp/stress/$i & " - "done; wait" - ) - paths = machine.succeed("cat /tmp/stress/*").strip().splitlines() + machine.log("Stress testing with 2 paths...") + p1 = machine.succeed("nix build --no-link --print-out-paths --expr '(import {}).runCommand \"p1\" {} \"echo 1 > $out\"' --impure").strip() + p2 = machine.succeed("nix build --no-link --print-out-paths --expr '(import {}).runCommand \"p2\" {} \"echo 2 > $out\"' --impure").strip() - # Verify all 30 were processed - for p in paths: - verify_cache(p, s3_url, timeout=90) + verify_cache(p1, s3_url, timeout=60) + verify_cache(p2, s3_url, timeout=60) - # --- SUBTEST: skipSignedByKeys --- with subtest("Config: skipSignedByKeys rejection"): reset_state() machine.succeed("systemctl start loft.service") - - # 1. Generate a key matching the excluded key name in config machine.succeed("nix-store --generate-binary-cache-key test-exclude-key-1 /tmp/sk1 /tmp/pk1") - - # 2. Create a path and SIGN it - p_to_sign = machine.succeed("nix-build --no-out-link -E '(import {}).runCommand \"signed-path\" {} \"echo signed > $out\"'").strip() + p_to_sign = machine.succeed("nix build --no-link --print-out-paths --expr '(import {}).runCommand \"signed-path\" {} \"echo signed > $out\"' --impure").strip() machine.succeed("nix store sign --key-file /tmp/sk1 " + p_to_sign) - # 3. Verify it is NOT in S3 after some time - hash_part = p_to_sign.split("-")[0].split("/")[-1] + hash_part = p_to_sign.split("/")[-1].split("-")[0] narinfo = hash_part + ".narinfo" import time machine.log("Waiting to ensure signed path is NOT uploaded...") time.sleep(10) machine.fail(with_s3("aws --endpoint-url http://localhost:3900 s3 ls s3://loft-test-bucket/" + narinfo)) - machine.log("Verified: Path signed by excluded key was correctly skipped.") - # --- SUBTEST: Pruning Verification --- + manual_config = "/tmp/loft-cli-config.toml" + manual_config_content = ( + "[s3]\nbucket = \"loft-test-bucket\"\nregion = \"us-east-1\"\nendpoint = \"http://localhost:3900\"\n" + + "access_key = \"" + access_key + "\"\nsecret_key = \"" + secret_key + "\"\n" + + "[loft]\nupload_threads = 1\nscan_on_startup = true\nlocal_cache_path = \"/var/lib/loft/cache.db\"\n" + + "prune_enabled = true\n" + ) + with subtest("Service: Pruning verification"): - # Use the manual config we created in subtest before (or recreate it) - manual_config = "/tmp/prune-config.toml" - prune_config_content = ( - "[s3]\nbucket = \"loft-test-bucket\"\nregion = \"us-east-1\"\nendpoint = \"http://localhost:3900\"\n" + - "access_key = \"" + access_key + "\"\nsecret_key = \"" + secret_key + "\"\n" + - "[loft]\nupload_threads = 1\nscan_on_startup = false\nlocal_cache_path = \"/tmp/prune-cache.db\"\n" + - "prune_enabled = true\nprune_retention_days = 30\n" - ) - secure_copy(prune_config_content, manual_config) + reset_state() + machine.succeed("systemctl start loft.service") + p = machine.succeed("nix build --no-link --print-out-paths --expr '(import {}).runCommand \"pp\" {} \"echo prune > $out\"' --impure").strip() + verify_cache(p, s3_url) + + machine.succeed("systemctl stop loft.service") + prune_config = "/tmp/prune-config.toml" + prune_config_content = manual_config_content + "prune_max_size_gb = 0\nprune_target_percentage = 0\n" + secure_copy(prune_config_content, prune_config) + + machine.log("Running manual prune with 0GB size limit...") + machine.succeed(with_s3("loft --config " + prune_config + " --prune")) + + hash_part = p.split("/")[-1].split("-")[0] + machine.fail(with_s3("aws --endpoint-url http://localhost:3900 s3 ls s3://loft-test-bucket/" + hash_part + ".narinfo")) + + with subtest("CLI: clear-cache and Force Scan"): + reset_state() + machine.succeed("systemctl stop loft.service") + machine.log("Creating pre-existing paths...") + pre_paths = [] + for i in range(3): + p = machine.succeed("nix build --no-link --print-out-paths --expr '(import {}).runCommand \"pre-" + str(i) + "\" {} \"echo " + str(i) + " > $out\"' --impure").strip() + pre_paths.append(p) + + secure_copy(manual_config_content, manual_config) + machine.succeed(with_s3("loft --config " + manual_config + " --clear-cache")) + machine.succeed(with_s3("loft --config " + manual_config + " --force-scan")) + + for p in pre_paths: + actual_hash = p.split("/")[-1].split("-")[0] + narinfo = actual_hash + ".narinfo" + machine.wait_until_succeeds(with_s3("aws --endpoint-url http://localhost:3900 s3 ls s3://loft-test-bucket/" + narinfo), timeout=60) + + with subtest("Bulk: Force scan picks up missing remote paths"): + reset_state() + machine.succeed("systemctl stop loft.service") + bulk_paths = [] + for i in range(5): + p = machine.succeed("nix build --no-link --print-out-paths --expr '(import {}).runCommand \"bulk-" + str(i) + "\" {} \"echo " + str(i) + " > $out\"' --impure").strip() + bulk_paths.append(p) - machine.log("Running manual prune...") - machine.succeed("loft --config " + manual_config + " --prune") - machine.log("Pruning command completed successfully.") + secure_copy(manual_config_content, manual_config) + machine.succeed(with_s3("loft --config " + manual_config + " --force-scan")) + for p in bulk_paths: + verify_cache(p, s3_url) + + machine.log("Deleting 2 paths from S3...") + for i in range(2): + actual_hash = bulk_paths[i].split("/")[-1].split("-")[0] + machine.succeed(with_s3("aws --endpoint-url http://localhost:3900 s3 rm s3://loft-test-bucket/" + actual_hash + ".narinfo")) + + machine.succeed(with_s3("loft --config " + manual_config + " --force-scan")) + for p in bulk_paths: + verify_cache(p, s3_url) machine.log("All advanced integration tests passed!") ''; - } diff --git a/src/cache_checker.rs b/src/cache_checker.rs index e10afad..4ce5a0f 100644 --- a/src/cache_checker.rs +++ b/src/cache_checker.rs @@ -109,7 +109,7 @@ impl CacheChecker { }); } - // Bulk Warmup Logic + // Bulk Warmup Logic (only if not forcing) if !force_scan && !self.local_cache.is_scan_complete().unwrap_or(false) { info!("Local cache scan is not complete. Fetching all remote hashes to warm up local cache..."); match self.uploader.list_all_hashes().await { @@ -136,15 +136,15 @@ impl CacheChecker { } let hashes_map = nix_provider.get_hashes(paths).await?; - let hashes: Vec = paths - .iter() - .map(|p| hashes_map.get(p).cloned().unwrap_or_default()) - .collect(); - // 1. Local cache check - let missing_paths: Vec = if force_scan { + // 1. Identify what we need to check against remote + let paths_to_check_remote: Vec = if force_scan { paths.to_vec() } else { + let hashes: Vec = paths + .iter() + .map(|p| hashes_map.get(p).cloned().unwrap_or_default()) + .collect(); let existing = self.local_cache.find_existing_hashes(&hashes)?; info!("Local cache already has {} entries", existing.len()); @@ -158,22 +158,67 @@ impl CacheChecker { .collect() }; - if missing_paths.is_empty() && !force_scan { - debug!("All paths already in local cache"); + if paths_to_check_remote.is_empty() { + debug!("No paths to check against remote cache."); return Ok(CacheCheckResult { to_upload: vec![], - already_cached: hashes, + already_cached: paths.to_vec(), }); } - // 2. Remote cache check (pass concurrency from config) - let (missing_remote, found_remote) = self - .uploader - .check_paths_exist(&missing_paths, self.config.loft.upload_threads) - .await?; + // 2. Remote cache check (bulk vs individual) + let (to_upload, found_remote) = if force_scan { + info!( + "Force scan: Checking all {} paths against remote using bulk list...", + paths_to_check_remote.len() + ); + let remote_hashes: HashSet = + self.uploader.list_all_hashes().await?.into_iter().collect(); + + let mut missing = Vec::new(); + let mut found = Vec::new(); + for p in paths_to_check_remote { + let h = hashes_map.get(&p).unwrap(); + if remote_hashes.contains(h) { + debug!("'{}' already exists in the remote cache. Skipping.", p); + found.push(p); + } else { + debug!("'{}' not found in remote cache. Will upload.", p); + missing.push(p); + } + } + (missing, found) + } else if paths_to_check_remote.len() > 100 { + info!( + "Checking {} paths against remote using bulk list...", + paths_to_check_remote.len() + ); + let remote_hashes: HashSet = + self.uploader.list_all_hashes().await?.into_iter().collect(); + + let mut missing = Vec::new(); + let mut found = Vec::new(); + for p in paths_to_check_remote { + let h = hashes_map.get(&p).unwrap(); + if remote_hashes.contains(h) { + debug!("'{}' already exists in the remote cache. Skipping.", p); + found.push(p); + } else { + debug!("'{}' not found in remote cache. Will upload.", p); + missing.push(p); + } + } + (missing, found) + } else { + // pass concurrency from config + self.uploader + .check_paths_exist(&paths_to_check_remote, self.config.loft.upload_threads) + .await? + }; + debug!("{} found on remote", found_remote.len()); - // Add found-on-remote → local cache + // 3. Add newly discovered found-on-remote → local cache if !found_remote.is_empty() { let mut to_add = Vec::new(); for p in &found_remote { @@ -184,12 +229,14 @@ impl CacheChecker { self.local_cache.add_many_path_hashes(&to_add)?; } - // 3. Prepare upload list - // missing_remote contains paths that are not in remote cache. - // We just return them. + // 4. Return results Ok(CacheCheckResult { - to_upload: missing_remote, - already_cached: hashes, + already_cached: paths + .iter() + .filter(|p| !to_upload.contains(p)) + .cloned() + .collect(), + to_upload, }) } } diff --git a/src/config.rs b/src/config.rs index e1b9225..81f5247 100644 --- a/src/config.rs +++ b/src/config.rs @@ -36,12 +36,16 @@ fn default_local_cache_path() -> PathBuf { #[derive(Deserialize, Debug, Clone)] pub struct LoftConfig { /// The path to the local cache database file. + #[serde(default = "default_local_cache_path")] pub local_cache_path: PathBuf, /// The number of concurrent uploads to perform. + #[serde(default = "default_upload_threads")] pub upload_threads: usize, /// Whether to perform an initial scan of the store on startup. + #[serde(default)] pub scan_on_startup: bool, /// Whether to populate the local cache from S3 on startup if the cache is empty. + #[serde(default)] pub populate_cache_on_startup: bool, /// Optional: Path to your Nix signing key file (e.g., /etc/nix/signing-key.sec) /// If provided, uploaded paths will be signed. @@ -53,14 +57,17 @@ pub struct LoftConfig { /// If a path is signed by any of these keys, it will not be uploaded. pub skip_signed_by_keys: Option>, /// The compression algorithm to use. + #[serde(default = "default_compression")] pub compression: Compression, /// Optional: Enable pruning of old objects from the S3 cache. + #[serde(default)] pub prune_enabled: bool, /// Optional: Retention period for pruning in days. Objects older than this will be deleted. + #[serde(default = "default_prune_retention_days")] pub prune_retention_days: u64, /// Optional: Maximum desired size of the S3 bucket in GB. If exceeded, oldest objects are pruned. pub prune_max_size_gb: Option, - /// Optional: Target percentage to prune down to when prune_max_size_gb is exceeded (e.g., 80). + /// Optional: Target percentage to prune down to when maxSizeGb is exceeded (e.g., 80). /// Only applicable if prune_max_size_gb is set. pub prune_target_percentage: Option, /// Optional: Schedule for running the pruning job (e.g., "24h", "1d"). diff --git a/src/main.rs b/src/main.rs index be33420..c60f339 100644 --- a/src/main.rs +++ b/src/main.rs @@ -55,13 +55,13 @@ struct Args { #[tokio::main] async fn main() -> Result<()> { - // Initialize the logging framework. - use tracing_subscriber::prelude::*; - let subscriber = tracing_subscriber::registry().with(console_subscriber::spawn()); - // Parse command-line arguments. let args = Args::parse(); + // Initialize the logging framework. + use tracing_subscriber::prelude::*; + let registry = tracing_subscriber::registry(); + let fmt_layer = tracing_subscriber::fmt::layer(); let filter = if args.debug { @@ -70,9 +70,16 @@ async fn main() -> Result<()> { tracing_subscriber::EnvFilter::new("loft=info,tower=info") }; - let subscriber = subscriber.with(fmt_layer.with_filter(filter)); + let registry = registry.with(fmt_layer.with_filter(filter)); - tracing::subscriber::set_global_default(subscriber)?; + // Try to spawn console subscriber, but don't panic if it fails (e.g. port already in use) + if std::env::var("TOKIO_CONSOLE_ADDR").is_ok() || args.debug { + let console_layer = console_subscriber::spawn(); + let subscriber = registry.with(console_layer); + tracing::subscriber::set_global_default(subscriber)?; + } else { + tracing::subscriber::set_global_default(registry)?; + } // Load the application configuration. let config = Config::from_file(&args.config)?; @@ -166,7 +173,7 @@ async fn main() -> Result<()> { info!("scan on startup: {}", config.loft.scan_on_startup); info!("scan already complete: {}", local_cache.is_scan_complete()?); - if config.loft.scan_on_startup && !local_cache.is_scan_complete()? { + if args.force_scan || config.loft.scan_on_startup && !(local_cache.is_scan_complete()?) { // Scan existing paths and upload them. info!("Scanning existing store paths..."); nix_store_watcher::scan_and_process_existing_paths( @@ -181,6 +188,11 @@ async fn main() -> Result<()> { local_cache.set_scan_complete()?; } + if args.force_scan { + info!("Force scan complete. Exiting."); + return Ok(()); + } + let cancel_token = tokio_util::sync::CancellationToken::new(); // Start watching the Nix store for new paths. diff --git a/src/nix_utils.rs b/src/nix_utils.rs index 926a5d2..8b58e11 100644 --- a/src/nix_utils.rs +++ b/src/nix_utils.rs @@ -604,32 +604,26 @@ mod tests { Ok(()) } - #[test] - fn test_get_narinfo_key() -> Result<()> { - let nix_store = NixStore::connect()?; - let path = Path::new("/nix/store/hfx4mfjp89kv21whvwcmm2a0bjs0a428-loft-0.1.0"); - let store_path = nix_store.parse_store_path(path)?; - - let key = get_narinfo_key(&store_path); - assert_eq!(key, "hfx4mfjp89kv21whvwcmm2a0bjs0a428.narinfo"); - - Ok(()) - } - - #[test] + #[test] + fn test_get_narinfo_key() -> Result<()> { + let hash = "hfx4mfjp89kv21whvwcmm2a0bjs0a428"; + + let key = format!("{}.narinfo", hash); + assert_eq!(key, "hfx4mfjp89kv21whvwcmm2a0bjs0a428.narinfo"); + + Ok(()) + } + #[test] fn test_s3_key_hierarchy() -> Result<()> { - let nix_store = NixStore::connect()?; - let path = Path::new("/nix/store/hfx4mfjp89kv21whvwcmm2a0bjs0a428-loft-0.1.0"); - let store_path = nix_store.parse_store_path(path)?; - + let hash = "hfx4mfjp89kv21whvwcmm2a0bjs0a428"; let nar_hash_base32 = "8423d2406a89e3faf37ed2628ebc3d51fee15a1c1d3ab5e0b821e870de759140"; let compression_ext = "xz"; // This simulates the logic inside upload_nar_for_path - let nar_key = format!("nar/{}-{}.nar.{}", store_path.to_hash().as_str(), nar_hash_base32, compression_ext); + let nar_key = format!("nar/{}-{}.nar.{}", hash, nar_hash_base32, compression_ext); assert_eq!(nar_key, "nar/hfx4mfjp89kv21whvwcmm2a0bjs0a428-8423d2406a89e3faf37ed2628ebc3d51fee15a1c1d3ab5e0b821e870de759140.nar.xz"); - let narinfo_key = get_narinfo_key(&store_path); + let narinfo_key = format!("{}.narinfo", hash); assert_eq!(narinfo_key, "hfx4mfjp89kv21whvwcmm2a0bjs0a428.narinfo"); Ok(())