From 3eb440970207428e520a006a18132f28231c31f0 Mon Sep 17 00:00:00 2001 From: Qiuyang Mang Date: Tue, 2 Jun 2026 17:10:11 -0400 Subject: [PATCH] Add relaxed Vector DB ANN task --- 2.0/README.md | 7 + .../vector_db_ann_relaxed/config.yaml | 45 ++ .../vector_db_ann_relaxed/evaluate.sh | 38 + .../vector_db_ann_relaxed/evaluator.py | 711 ++++++++++++++++++ .../harbor/app/Cargo.toml | 14 + .../harbor/app/LICENSE.KCORES | 21 + .../harbor/app/README.md | 36 + .../harbor/app/src/api.rs | 46 ++ .../harbor/app/src/db.rs | 23 + .../harbor/app/src/distance.rs | 9 + .../harbor/app/src/main.rs | 54 ++ 2.0/problems/vector_db_ann_relaxed/readme | 161 ++++ .../vector_db_ann_relaxed/reference.py | 6 + .../vector_db_ann_relaxed/reference.rs | 41 + README.md | 2 +- 15 files changed, 1213 insertions(+), 1 deletion(-) create mode 100644 2.0/problems/vector_db_ann_relaxed/config.yaml create mode 100755 2.0/problems/vector_db_ann_relaxed/evaluate.sh create mode 100644 2.0/problems/vector_db_ann_relaxed/evaluator.py create mode 100644 2.0/problems/vector_db_ann_relaxed/harbor/app/Cargo.toml create mode 100644 2.0/problems/vector_db_ann_relaxed/harbor/app/LICENSE.KCORES create mode 100644 2.0/problems/vector_db_ann_relaxed/harbor/app/README.md create mode 100644 2.0/problems/vector_db_ann_relaxed/harbor/app/src/api.rs create mode 100644 2.0/problems/vector_db_ann_relaxed/harbor/app/src/db.rs create mode 100644 2.0/problems/vector_db_ann_relaxed/harbor/app/src/distance.rs create mode 100644 2.0/problems/vector_db_ann_relaxed/harbor/app/src/main.rs create mode 100644 2.0/problems/vector_db_ann_relaxed/readme create mode 100644 2.0/problems/vector_db_ann_relaxed/reference.py create mode 100644 2.0/problems/vector_db_ann_relaxed/reference.rs diff --git a/2.0/README.md b/2.0/README.md index 665b5cf6..e0f650c0 100644 --- a/2.0/README.md +++ b/2.0/README.md @@ -27,3 +27,10 @@ vector search service for a hidden SIFT1M-scale benchmark. Its problem ID is `vector_db_ann`. Submissions are whole `/app` projects served through Harbor, and the objective is to maximize effective QPS subject to `recall@10 >= 0.95`; the score includes query throughput plus a small load/index-build penalty. + +## Vector DB ANN Relaxed + +This variant keeps the same SIFT1M-scale service contract and recall target as +`vector_db_ann`, but reduces the load/index-build penalty by 10x so stronger +offline indexing strategies are more viable. Its problem ID is +`vector_db_ann_relaxed`. diff --git a/2.0/problems/vector_db_ann_relaxed/config.yaml b/2.0/problems/vector_db_ann_relaxed/config.yaml new file mode 100644 index 00000000..e8f36adb --- /dev/null +++ b/2.0/problems/vector_db_ann_relaxed/config.yaml @@ -0,0 +1,45 @@ +tag: systems +runtime: + language: rust + timeout_seconds: 10800 + environment: "Rust project; hidden ANN benchmark; Python/NumPy judge" + apt_packages: + - build-essential + - cargo + - git + - rustc + judge_apt_packages: + - build-essential + - cargo + - rustc + - python3-pip + - python3-numpy + judge_pip_packages: + - faiss-cpu + docker: + image: ubuntu:24.04 +environment: + # If these resource limits change, also update the resource budget text in + # readme and harbor/app/README.md so agents can design parallel algorithms + # for the actual CPU and memory budget. + cpus: 8 + memory_mb: 16384 + storage_mb: 8192 + build_timeout_seconds: 3600 +evaluation: + # The judge drives the search service with this many concurrent workers. + # Keep this aligned with the CPU budget unless the task is intentionally + # changed into a higher-concurrency service benchmark. + query_concurrency: 8 + queries_per_worker: 64 + # Weight applied to load/index-build time when computing effective QPS. + # This relaxed variant rewards stronger indexes by penalizing build time + # 10x less than the standard Vector DB ANN task. + load_penalty_weight: 0.001 +submission: + kind: directory + path: /app + exclude: + - target + - .git + - .frontier-cs diff --git a/2.0/problems/vector_db_ann_relaxed/evaluate.sh b/2.0/problems/vector_db_ann_relaxed/evaluate.sh new file mode 100755 index 00000000..256e9d7f --- /dev/null +++ b/2.0/problems/vector_db_ann_relaxed/evaluate.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) + +if [[ $# -gt 0 ]]; then + exec python3 "$SCRIPT_DIR/evaluator.py" "$@" +fi + +SOLUTION="/work/execution_env/solution_env/solution.rs" +if [[ ! -f "$SOLUTION" ]]; then + echo "Error: Missing $SOLUTION" >&2 + exit 1 +fi + +if ! command -v cargo >/dev/null 2>&1 || ! python3 -c 'import numpy, faiss' >/dev/null 2>&1; then + export DEBIAN_FRONTEND=noninteractive + apt-get update -qq + apt-get install -y -qq --no-install-recommends \ + build-essential cargo rustc python3-pip python3-numpy >/dev/null + python3 -c 'import faiss' >/dev/null 2>&1 || \ + pip3 install --break-system-packages -q faiss-cpu +fi + +WORKDIR=$(mktemp -d) +trap 'rm -rf "$WORKDIR"' EXIT +cp -R "$SCRIPT_DIR/harbor/app/." "$WORKDIR/" +cp "$SOLUTION" "$WORKDIR/src/db.rs" + +# The repository validator checks that the evaluator path works; the full +# 1M-vector benchmark is exercised through Harbor. +export FRONTIER_VECTOR_DB_N="${FRONTIER_VECTOR_DB_N:-5000}" +export FRONTIER_VECTOR_DB_Q="${FRONTIER_VECTOR_DB_Q:-16}" +export FRONTIER_VECTOR_DB_WARMUP="${FRONTIER_VECTOR_DB_WARMUP:-4}" +export FRONTIER_VECTOR_DB_BATCH_SIZE="${FRONTIER_VECTOR_DB_BATCH_SIZE:-500}" +export FRONTIER_VECTOR_DB_CACHE="${FRONTIER_VECTOR_DB_CACHE:-/tmp/frontier_vector_db_ann_relaxed_ci}" + +python3 "$SCRIPT_DIR/evaluator.py" "$WORKDIR" diff --git a/2.0/problems/vector_db_ann_relaxed/evaluator.py b/2.0/problems/vector_db_ann_relaxed/evaluator.py new file mode 100644 index 00000000..89181746 --- /dev/null +++ b/2.0/problems/vector_db_ann_relaxed/evaluator.py @@ -0,0 +1,711 @@ +#!/usr/bin/env python3 +"""Evaluator for the Frontier-CS 2.0 Vector DB ANN Relaxed task.""" + +from __future__ import annotations + +import json +import os +import shutil +import socket +import subprocess +import tempfile +import time +import math +from dataclasses import dataclass +from concurrent.futures import ThreadPoolExecutor, as_completed +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from urllib import request + +import numpy as np + + +def _read_evaluation_config() -> dict[str, float]: + config_path = Path(__file__).with_name("config.yaml") + if not config_path.exists(): + return {} + + values: dict[str, float] = {} + in_evaluation = False + for raw_line in config_path.read_text(encoding="utf-8").splitlines(): + line = raw_line.split("#", 1)[0].rstrip() + if not line: + continue + if not raw_line.startswith((" ", "\t")): + in_evaluation = line == "evaluation:" + continue + if not in_evaluation: + continue + stripped = line.strip() + if ":" not in stripped: + continue + key, value = stripped.split(":", 1) + key = key.strip() + value = value.strip() + if ( + key in {"query_concurrency", "queries_per_worker", "load_penalty_weight"} + and value + ): + values[key] = float(value) + return values + + +def _config_int(name: str, default: int) -> int: + return int(os.environ.get(name, str(default))) + + +_EVALUATION_CONFIG = _read_evaluation_config() +CONFIG_CONCURRENCY = int(_EVALUATION_CONFIG.get("query_concurrency", 8)) +CONFIG_QUERIES_PER_WORKER = int(_EVALUATION_CONFIG.get("queries_per_worker", 64)) +CONFIG_LOAD_PENALTY_WEIGHT = float( + _EVALUATION_CONFIG.get("load_penalty_weight", 0.001) +) + + +DIM = 128 +N_BASE = _config_int("FRONTIER_VECTOR_DB_N", 1000000) +CONCURRENCY = _config_int("FRONTIER_VECTOR_DB_CONCURRENCY", CONFIG_CONCURRENCY) +QUERIES_PER_WORKER = _config_int( + "FRONTIER_VECTOR_DB_QUERIES_PER_WORKER", CONFIG_QUERIES_PER_WORKER +) +N_QUERIES = _config_int( + "FRONTIER_VECTOR_DB_Q", CONCURRENCY * QUERIES_PER_WORKER +) +TOP_K = _config_int("FRONTIER_VECTOR_DB_TOP_K", 10) +SEED = _config_int("FRONTIER_VECTOR_DB_SEED", 20260528) +TARGET_RECALL = float(os.environ.get("FRONTIER_VECTOR_DB_TARGET_RECALL", "0.95")) +QUERY_NOISE = float(os.environ.get("FRONTIER_VECTOR_DB_QUERY_NOISE", "0.02")) +BUILD_TIMEOUT_SECONDS = _config_int("FRONTIER_VECTOR_DB_BUILD_TIMEOUT", 600) +LOAD_TIMEOUT_SECONDS = _config_int("FRONTIER_VECTOR_DB_LOAD_TIMEOUT", 900) +LOAD_PENALTY_WEIGHT = float( + os.environ.get("FRONTIER_VECTOR_DB_LOAD_PENALTY", str(CONFIG_LOAD_PENALTY_WEIGHT)) +) +BATCH_SIZE = _config_int("FRONTIER_VECTOR_DB_BATCH_SIZE", 1000) +WARMUP = _config_int("FRONTIER_VECTOR_DB_WARMUP", 32) +CACHE_DIR = Path( + os.environ.get("FRONTIER_VECTOR_DB_CACHE", "/tmp/frontier_vector_db_ann_relaxed") +) + +_BENCHMARK: "Benchmark | None" = None + + +@dataclass +class Benchmark: + base_path: Path + queries_path: Path + truth: np.ndarray + baseline_qps: float + baseline_effective_qps: float + baseline_seconds: float + baseline_load_seconds: float + + +class QueryTooSlow(RuntimeError): + def __init__( + self, + elapsed_seconds: float, + optimistic_qps: float, + optimistic_effective_qps: float, + completed_queries: int, + ): + super().__init__( + "query phase stopped early because even an optimistic final " + "effective_qps cannot beat the baseline" + ) + self.elapsed_seconds = elapsed_seconds + self.optimistic_qps = optimistic_qps + self.optimistic_effective_qps = optimistic_effective_qps + self.completed_queries = completed_queries + + +class LoadTooSlow(RuntimeError): + def __init__( + self, + load_seconds: float, + optimistic_effective_qps: float, + inserted: int, + ): + super().__init__( + "load/index phase stopped early because even zero query time " + "cannot beat the baseline effective_qps" + ) + self.load_seconds = load_seconds + self.optimistic_effective_qps = optimistic_effective_qps + self.inserted = inserted + + +def prepare() -> dict: + print( + f"[vector-db-ann] preparing benchmark n_base={N_BASE} " + f"n_queries={N_QUERIES} top_k={TOP_K} cache={CACHE_DIR}", + flush=True, + ) + benchmark = _ensure_benchmark() + print( + f"[vector-db-ann] benchmark ready baseline_qps={benchmark.baseline_qps:.6f} " + f"baseline_effective_qps={benchmark.baseline_effective_qps:.6f} " + f"baseline_seconds={benchmark.baseline_seconds:.6f} " + f"baseline_load_seconds={benchmark.baseline_load_seconds:.6f}", + flush=True, + ) + return { + "n_base": N_BASE, + "n_queries": N_QUERIES, + "top_k": TOP_K, + "baseline_qps": benchmark.baseline_qps, + "baseline_effective_qps": benchmark.baseline_effective_qps, + "baseline_seconds": benchmark.baseline_seconds, + "baseline_load_seconds": benchmark.baseline_load_seconds, + } + + +def _write_vectors(path: Path, values: np.ndarray) -> None: + values.astype(np.float32, copy=False).tofile(path) + + +def _load_vectors(path: Path, rows: int) -> np.memmap: + return np.memmap(path, dtype=np.float32, mode="r", shape=(rows, DIM)) + + +def _generate_data(base_path: Path, queries_path: Path) -> None: + rng = np.random.default_rng(SEED) + chunk = 50_000 + base = np.memmap(base_path, dtype=np.float32, mode="w+", shape=(N_BASE, DIM)) + for start in range(0, N_BASE, chunk): + end = min(start + chunk, N_BASE) + base[start:end] = rng.standard_normal((end - start, DIM), dtype=np.float32) + base.flush() + + ids = rng.integers(0, N_BASE, size=N_QUERIES) + selected = np.asarray(base[ids], dtype=np.float32) + noise = rng.standard_normal((N_QUERIES, DIM), dtype=np.float32) * QUERY_NOISE + _write_vectors(queries_path, selected + noise) + + +def _run_reference_server(port: int, base_path: str) -> None: + import faiss + + _ = base_path + index = faiss.IndexIDMap(faiss.IndexFlatL2(DIM)) + + class ReferenceHandler(BaseHTTPRequestHandler): + server_version = "FrontierVectorFaissReference/1.0" + + def _write_json(self, status: int, payload: dict) -> None: + body = json.dumps(payload).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def do_GET(self) -> None: + if self.path == "/health": + self._write_json(200, {"status": "ok"}) + return + self._write_json(404, {"status": "error", "error": "not found"}) + + def do_POST(self) -> None: + if self.path == "/insert": + self._write_json(200, {"status": "ok"}) + return + if self.path == "/bulk_insert": + try: + length = int(self.headers.get("Content-Length", "0")) + payload = json.loads(self.rfile.read(length).decode("utf-8")) + items = payload.get("vectors", []) + inserted = len(items) + if inserted: + vectors = np.asarray( + [item["vector"] for item in items], dtype=np.float32 + ) + ids = np.asarray( + [int(item["id"]) for item in items], dtype=np.int64 + ) + if vectors.shape != (inserted, DIM): + raise ValueError("bulk_insert vector shape mismatch") + index.add_with_ids(vectors, ids) + self._write_json(200, {"status": "ok", "inserted": inserted}) + except Exception as exc: + self._write_json(400, {"status": "error", "error": str(exc)}) + return + if self.path != "/search": + self._write_json(404, {"status": "error", "error": "not found"}) + return + try: + length = int(self.headers.get("Content-Length", "0")) + payload = json.loads(self.rfile.read(length).decode("utf-8")) + vector = np.asarray(payload["vector"], dtype=np.float32) + top_k = int(payload.get("top_k", TOP_K)) + if vector.shape != (DIM,): + raise ValueError("query vector has wrong dimension") + if top_k != TOP_K: + raise ValueError("unexpected top_k") + distances, ids = index.search(vector.reshape(1, DIM), top_k) + results = [ + {"id": int(id_), "distance": float(distance)} + for id_, distance in zip(ids[0], distances[0]) + ] + self._write_json(200, {"results": results}) + except Exception as exc: + self._write_json(400, {"status": "error", "error": str(exc)}) + + def log_message(self, fmt: str, *args: object) -> None: + return + + ThreadingHTTPServer(("127.0.0.1", port), ReferenceHandler).serve_forever() + + +def _measure_reference_baseline(base_path: Path, queries: np.ndarray): + port = _free_port() + process = subprocess.Popen( + [ + "python3", + str(Path(__file__).resolve()), + "--reference-server", + str(port), + str(base_path), + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + try: + _wait_for_server(port, time.time() + 120) + base = _load_vectors(base_path, N_BASE) + inserted, load_seconds = _bulk_insert(f"http://127.0.0.1:{port}", base) + if inserted != N_BASE: + raise RuntimeError(f"reference inserted {inserted}; expected {N_BASE}") + results, latencies, baseline_seconds = _run_queries( + f"http://127.0.0.1:{port}", queries + ) + return results, latencies, baseline_seconds, load_seconds + finally: + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + + +def _ensure_benchmark() -> Benchmark: + global _BENCHMARK + if _BENCHMARK is not None: + return _BENCHMARK + + CACHE_DIR.mkdir(parents=True, exist_ok=True) + base_path = CACHE_DIR / f"base_n{N_BASE}_d{DIM}_seed{SEED}.f32" + queries_path = CACHE_DIR / f"queries_q{N_QUERIES}_d{DIM}_seed{SEED}.f32" + truth_path = CACHE_DIR / ( + f"truth_n{N_BASE}_q{N_QUERIES}_k{TOP_K}_seed{SEED}.u32" + ) + penalty_tag = f"{LOAD_PENALTY_WEIGHT:.6g}".replace(".", "p") + meta_path = CACHE_DIR / ( + f"baseline_faiss_http_bulk_load_v3_n{N_BASE}_q{N_QUERIES}_" + f"k{TOP_K}_seed{SEED}_lp{penalty_tag}.json" + ) + + expected_base_bytes = N_BASE * DIM * 4 + expected_query_bytes = N_QUERIES * DIM * 4 + if ( + not base_path.exists() + or base_path.stat().st_size != expected_base_bytes + or not queries_path.exists() + or queries_path.stat().st_size != expected_query_bytes + ): + print("[vector-db-ann] generating hidden vectors", flush=True) + _generate_data(base_path, queries_path) + truth_path.unlink(missing_ok=True) + meta_path.unlink(missing_ok=True) + + base = _load_vectors(base_path, N_BASE) + queries = _load_vectors(queries_path, N_QUERIES) + + if truth_path.exists() and meta_path.exists(): + truth = np.fromfile(truth_path, dtype=np.uint32).reshape(N_QUERIES, TOP_K) + meta = json.loads(meta_path.read_text(encoding="utf-8")) + baseline_seconds = float(meta["baseline_seconds"]) + baseline_qps = float(meta["baseline_qps"]) + baseline_load_seconds = float(meta["baseline_load_seconds"]) + baseline_effective_qps = float(meta["baseline_effective_qps"]) + else: + print("[vector-db-ann] running Faiss HTTP exact baseline", flush=True) + truth, _, baseline_seconds, baseline_load_seconds = _measure_reference_baseline( + base_path, queries + ) + truth.astype(np.uint32, copy=False).tofile(truth_path) + baseline_qps = N_QUERIES / baseline_seconds + baseline_effective_seconds = baseline_seconds + ( + LOAD_PENALTY_WEIGHT * baseline_load_seconds + ) + baseline_effective_qps = N_QUERIES / max(baseline_effective_seconds, 1e-9) + meta_path.write_text( + json.dumps( + { + "baseline_seconds": baseline_seconds, + "baseline_qps": baseline_qps, + "baseline_load_seconds": baseline_load_seconds, + "baseline_effective_qps": baseline_effective_qps, + }, + indent=2, + ), + encoding="utf-8", + ) + + _BENCHMARK = Benchmark( + base_path=base_path, + queries_path=queries_path, + truth=truth, + baseline_qps=baseline_qps, + baseline_effective_qps=baseline_effective_qps, + baseline_seconds=baseline_seconds, + baseline_load_seconds=baseline_load_seconds, + ) + return _BENCHMARK + + +def _invalid(message: str, metrics: dict | None = None): + return 0.0, 0.0, message, metrics or {} + + +def _copy_project(src: Path, dst: Path) -> None: + ignore = shutil.ignore_patterns("target", ".git", ".frontier-cs") + shutil.copytree(src, dst, ignore=ignore) + + +def _free_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("127.0.0.1", 0)) + return int(sock.getsockname()[1]) + + +def _post_json(url: str, payload: dict, timeout: float = 60.0) -> dict: + body = json.dumps(payload).encode("utf-8") + req = request.Request( + url, + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with request.urlopen(req, timeout=timeout) as response: + return json.loads(response.read().decode("utf-8")) + + +def _wait_for_server(port: int, deadline: float) -> None: + last_error: Exception | None = None + while time.time() < deadline: + try: + with socket.create_connection(("127.0.0.1", port), timeout=1.0): + pass + return + except Exception as exc: + last_error = exc + time.sleep(0.25) + raise RuntimeError(f"server did not become ready: {last_error}") + + +def _bulk_insert( + base_url: str, + base: np.ndarray, + *, + min_effective_qps: float | None = None, +) -> tuple[int, float]: + inserted = 0 + start_time = time.perf_counter() + deadline = start_time + LOAD_TIMEOUT_SECONDS + early_stop_deadline: float | None = None + if min_effective_qps is not None and LOAD_PENALTY_WEIGHT > 0.0: + early_stop_deadline = start_time + ( + N_QUERIES / (min_effective_qps * LOAD_PENALTY_WEIGHT) + ) + + def check_load_budget() -> None: + if early_stop_deadline is None: + return + load_seconds = max(time.perf_counter() - start_time, 1e-9) + optimistic_effective_seconds = LOAD_PENALTY_WEIGHT * load_seconds + optimistic_effective_qps = N_QUERIES / max( + optimistic_effective_seconds, 1e-9 + ) + if optimistic_effective_qps <= min_effective_qps: + raise LoadTooSlow( + load_seconds, + optimistic_effective_qps, + inserted, + ) + + for start in range(0, N_BASE, BATCH_SIZE): + check_load_budget() + remaining = deadline - time.perf_counter() + if remaining <= 0: + raise TimeoutError(f"bulk_insert timed out after {LOAD_TIMEOUT_SECONDS}s") + if early_stop_deadline is not None: + remaining = min(remaining, early_stop_deadline - time.perf_counter()) + if remaining <= 0: + check_load_budget() + raise TimeoutError("bulk_insert early-stop budget expired") + end = min(start + BATCH_SIZE, N_BASE) + vectors = [ + {"id": int(i), "vector": base[i].astype(float).tolist()} + for i in range(start, end) + ] + payload = {"vectors": vectors} + try: + response = _post_json( + f"{base_url}/bulk_insert", + payload, + timeout=max(0.05, min(300.0, remaining)), + ) + except Exception: + check_load_budget() + raise + inserted += int(response.get("inserted", 0)) + check_load_budget() + load_seconds = max(time.perf_counter() - start_time, 1e-9) + return inserted, load_seconds + + +def _search_one(base_url: str, query_index: int, vector: np.ndarray) -> tuple[int, list[int], float]: + start = time.perf_counter() + response = _post_json( + f"{base_url}/search", + {"vector": vector.astype(float).tolist(), "top_k": TOP_K}, + timeout=120.0, + ) + latency_ms = (time.perf_counter() - start) * 1000.0 + ids = [int(item.get("id", -1)) for item in response.get("results", [])[:TOP_K]] + return query_index, ids, latency_ms + + +def _run_queries( + base_url: str, + queries: np.ndarray, + *, + load_seconds: float = 0.0, + min_effective_qps: float | None = None, +) -> tuple[np.ndarray, list[float], float]: + for i in range(min(WARMUP, N_QUERIES)): + try: + _search_one(base_url, i, queries[i]) + except Exception: + pass + + results = np.zeros((N_QUERIES, TOP_K), dtype=np.uint32) + latencies: list[float] = [] + start = time.perf_counter() + pool = ThreadPoolExecutor(max_workers=CONCURRENCY) + futures = [ + pool.submit(_search_one, base_url, i, queries[i]) + for i in range(N_QUERIES) + ] + should_wait = True + try: + for future in as_completed(futures): + query_index, ids, latency_ms = future.result() + if len(ids) != TOP_K: + raise ValueError("search response did not contain top_k results") + if len(set(ids)) != len(ids): + raise ValueError("search response contains duplicate vector ids") + if any(id_ < 0 or id_ >= N_BASE for id_ in ids): + raise ValueError("search response contains an out-of-range vector id") + results[query_index] = np.asarray(ids, dtype=np.uint32) + latencies.append(latency_ms) + + if min_effective_qps is not None: + elapsed = max(time.perf_counter() - start, 1e-9) + optimistic_qps = N_QUERIES / elapsed + optimistic_effective_seconds = elapsed + ( + LOAD_PENALTY_WEIGHT * load_seconds + ) + optimistic_effective_qps = N_QUERIES / max( + optimistic_effective_seconds, 1e-9 + ) + if optimistic_effective_qps <= min_effective_qps: + should_wait = False + raise QueryTooSlow( + elapsed, + optimistic_qps, + optimistic_effective_qps, + len(latencies), + ) + finally: + pool.shutdown(wait=should_wait, cancel_futures=not should_wait) + duration = max(time.perf_counter() - start, 1e-9) + return results, latencies, duration + + +def _recall_at_k(results: np.ndarray, truth: np.ndarray) -> float: + hits = 0 + for got, expected in zip(results, truth): + hits += len(set(int(x) for x in got) & set(int(x) for x in expected)) + return hits / float(N_QUERIES * TOP_K) + + +def evaluate(solution_path: str): + root = Path(solution_path) + if not root.is_dir(): + return _invalid("submission path must be a Rust project directory") + if not (root / "Cargo.toml").exists(): + return _invalid("Cargo.toml not found in submission directory") + + benchmark = _ensure_benchmark() + + with tempfile.TemporaryDirectory(prefix="frontier_vector_db_ann_relaxed_") as tmp: + workdir = Path(tmp) / "project" + _copy_project(root, workdir) + try: + subprocess.run( + ["cargo", "build", "--release", "--quiet"], + cwd=workdir, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=BUILD_TIMEOUT_SECONDS, + ) + except subprocess.TimeoutExpired: + return _invalid("cargo build timed out") + except subprocess.CalledProcessError as exc: + stderr = exc.stderr.decode("utf-8", errors="replace") + return _invalid(f"cargo build failed: {stderr[-800:]}") + + base = _load_vectors(benchmark.base_path, N_BASE) + queries = _load_vectors(benchmark.queries_path, N_QUERIES) + port = _free_port() + base_url = f"http://127.0.0.1:{port}" + process = subprocess.Popen( + ["cargo", "run", "--release", "--quiet"], + cwd=workdir, + env={**os.environ, "PORT": str(port)}, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + load_seconds = 0.0 + try: + _wait_for_server(port, time.time() + 30) + inserted, load_seconds = _bulk_insert( + base_url, + base, + min_effective_qps=benchmark.baseline_effective_qps, + ) + if inserted != N_BASE: + return _invalid(f"bulk_insert inserted {inserted}; expected {N_BASE}") + results, latencies, candidate_seconds = _run_queries( + base_url, + queries, + load_seconds=load_seconds, + min_effective_qps=benchmark.baseline_effective_qps, + ) + except QueryTooSlow as exc: + metrics = { + "baseline_qps": benchmark.baseline_qps, + "baseline_effective_qps": benchmark.baseline_effective_qps, + "baseline_seconds": benchmark.baseline_seconds, + "baseline_load_seconds": benchmark.baseline_load_seconds, + "qps": 0.0, + "qps_upper_bound": exc.optimistic_qps, + "effective_qps": 0.0, + "effective_qps_upper_bound": exc.optimistic_effective_qps, + "candidate_seconds": exc.elapsed_seconds, + "load_seconds": load_seconds, + "load_penalty_weight": LOAD_PENALTY_WEIGHT, + "recall_at_10": 0.0, + "completed_queries": float(exc.completed_queries), + "early_stopped": 1.0, + } + return _invalid(str(exc), metrics) + except LoadTooSlow as exc: + metrics = { + "baseline_qps": benchmark.baseline_qps, + "baseline_effective_qps": benchmark.baseline_effective_qps, + "baseline_seconds": benchmark.baseline_seconds, + "baseline_load_seconds": benchmark.baseline_load_seconds, + "qps": 0.0, + "effective_qps": 0.0, + "effective_qps_upper_bound": exc.optimistic_effective_qps, + "candidate_seconds": 0.0, + "load_seconds": exc.load_seconds, + "load_penalty_weight": LOAD_PENALTY_WEIGHT, + "recall_at_10": 0.0, + "inserted_vectors": float(exc.inserted), + "early_stopped": 1.0, + } + return _invalid(str(exc), metrics) + except Exception as exc: + stderr = b"" + if process.poll() is not None and process.stderr is not None: + stderr = process.stderr.read()[-800:] + metrics = { + "baseline_qps": benchmark.baseline_qps, + "baseline_effective_qps": benchmark.baseline_effective_qps, + "baseline_seconds": benchmark.baseline_seconds, + "baseline_load_seconds": benchmark.baseline_load_seconds, + "qps": 0.0, + "effective_qps": 0.0, + "candidate_seconds": 0.0, + "load_seconds": 0.0, + "recall_at_10": 0.0, + } + detail = stderr.decode("utf-8", errors="replace") + suffix = f": {detail}" if detail else "" + return _invalid(f"candidate benchmark failed: {exc}{suffix}", metrics) + finally: + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + + recall = _recall_at_k(results, benchmark.truth) + qps = N_QUERIES / candidate_seconds + effective_seconds = candidate_seconds + (LOAD_PENALTY_WEIGHT * load_seconds) + effective_qps = N_QUERIES / max(effective_seconds, 1e-9) + if recall < TARGET_RECALL or effective_qps <= benchmark.baseline_effective_qps: + score = 0.0 + else: + score = 100.0 * ( + 1.0 - math.sqrt(benchmark.baseline_effective_qps) / math.sqrt(effective_qps) + ) + + metrics = { + "qps": qps, + "effective_qps": effective_qps, + "baseline_qps": benchmark.baseline_qps, + "baseline_effective_qps": benchmark.baseline_effective_qps, + "recall_at_10": recall, + "candidate_seconds": candidate_seconds, + "load_seconds": load_seconds, + "baseline_seconds": benchmark.baseline_seconds, + "baseline_load_seconds": benchmark.baseline_load_seconds, + "load_penalty_weight": LOAD_PENALTY_WEIGHT, + "avg_latency_ms": float(np.mean(latencies)) if latencies else 0.0, + "p50_latency_ms": float(np.percentile(latencies, 50)) if latencies else 0.0, + "p95_latency_ms": float(np.percentile(latencies, 95)) if latencies else 0.0, + "p99_latency_ms": float(np.percentile(latencies, 99)) if latencies else 0.0, + "concurrency": float(CONCURRENCY), + "n_base": float(N_BASE), + "n_queries": float(N_QUERIES), + "top_k": float(TOP_K), + } + message = ( + f"N={N_BASE}; Q={N_QUERIES}; top_k={TOP_K}; " + f"recall_at_10={recall:.6f}; qps={qps:.6f}; " + f"effective_qps={effective_qps:.6f}; " + f"baseline_effective_qps={benchmark.baseline_effective_qps:.6f}; " + f"load_seconds={load_seconds:.6f}; score={score:.6f}" + ) + return score, score, message, metrics + + +if __name__ == "__main__": + import sys + + if len(sys.argv) == 4 and sys.argv[1] == "--reference-server": + _run_reference_server(int(sys.argv[2]), sys.argv[3]) + raise SystemExit(0) + + if len(sys.argv) != 2: + print("usage: evaluator.py /path/to/rust/project", file=sys.stderr) + raise SystemExit(2) + bounded, unbounded, detail, metrics = evaluate(sys.argv[1]) + print(detail) + print(json.dumps(metrics, indent=2)) + print(f"{bounded:.12f} {unbounded:.12f}") diff --git a/2.0/problems/vector_db_ann_relaxed/harbor/app/Cargo.toml b/2.0/problems/vector_db_ann_relaxed/harbor/app/Cargo.toml new file mode 100644 index 00000000..53c3a883 --- /dev/null +++ b/2.0/problems/vector_db_ann_relaxed/harbor/app/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "vector-db-skeleton" +version = "0.1.0" +edition = "2021" + +[dependencies] +axum = "0.7" +tokio = { version = "1", features = ["full"] } +serde = { version = "1", features = ["derive"] } + +[profile.release] +lto = true +codegen-units = 1 +debug = true diff --git a/2.0/problems/vector_db_ann_relaxed/harbor/app/LICENSE.KCORES b/2.0/problems/vector_db_ann_relaxed/harbor/app/LICENSE.KCORES new file mode 100644 index 00000000..c8393e37 --- /dev/null +++ b/2.0/problems/vector_db_ann_relaxed/harbor/app/LICENSE.KCORES @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 karminski + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/2.0/problems/vector_db_ann_relaxed/harbor/app/README.md b/2.0/problems/vector_db_ann_relaxed/harbor/app/README.md new file mode 100644 index 00000000..c9cda9a3 --- /dev/null +++ b/2.0/problems/vector_db_ann_relaxed/harbor/app/README.md @@ -0,0 +1,36 @@ +# Vector DB ANN Relaxed Skeleton + +This is a starter project for the Vector DB ANN Relaxed task. You may use it, +modify it, or replace it entirely. The judge only requires that `/app` builds +with: + +```bash +cargo build --release +PORT= cargo run --release --quiet +``` + +and serves the required `/bulk_insert` and `/search` HTTP endpoints. + +The Harbor environment uses the Ubuntu `apt` Rust toolchain: + +```text +rustc 1.75 +cargo 1.75 +``` + +Pin crate versions if newer transitive dependencies require a newer Rust +compiler. + +The Harbor task provides the following resource budget: + +```text +vCPUs: 8 +memory: 16 GiB +query concurrency: 8 +timed queries per worker: 64 +``` + +## Attribution + +This starter skeleton is adapted from KCORES/vector-db-bench, licensed under +the MIT License. See `LICENSE.KCORES` for the upstream notice. diff --git a/2.0/problems/vector_db_ann_relaxed/harbor/app/src/api.rs b/2.0/problems/vector_db_ann_relaxed/harbor/app/src/api.rs new file mode 100644 index 00000000..3bb70396 --- /dev/null +++ b/2.0/problems/vector_db_ann_relaxed/harbor/app/src/api.rs @@ -0,0 +1,46 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize)] +pub struct InsertRequest { + pub id: u64, + pub vector: Vec, +} + +#[derive(Serialize)] +pub struct InsertResponse { + pub status: String, +} + +#[derive(Deserialize)] +pub struct BulkInsertItem { + pub id: u64, + pub vector: Vec, +} + +#[derive(Deserialize)] +pub struct BulkInsertRequest { + pub vectors: Vec, +} + +#[derive(Serialize)] +pub struct BulkInsertResponse { + pub status: String, + pub inserted: usize, +} + +#[derive(Deserialize)] +pub struct SearchRequest { + pub vector: Vec, + pub top_k: u32, +} + +#[derive(Serialize)] +pub struct SearchResult { + pub id: u64, + pub distance: f64, +} + +#[derive(Serialize)] +pub struct SearchResponse { + pub results: Vec, +} diff --git a/2.0/problems/vector_db_ann_relaxed/harbor/app/src/db.rs b/2.0/problems/vector_db_ann_relaxed/harbor/app/src/db.rs new file mode 100644 index 00000000..e10305ff --- /dev/null +++ b/2.0/problems/vector_db_ann_relaxed/harbor/app/src/db.rs @@ -0,0 +1,23 @@ +use crate::api::SearchResult; + +pub struct VectorDB { + // Implement your index here. +} + +impl VectorDB { + pub fn new() -> Self { + todo!("initialize your vector database") + } + + pub fn insert(&self, id: u64, vector: Vec) { + todo!("insert one vector") + } + + pub fn bulk_insert(&self, vectors: Vec<(u64, Vec)>) -> usize { + todo!("insert a batch of vectors") + } + + pub fn search(&self, vector: &[f32], top_k: u32) -> Vec { + todo!("return approximate nearest neighbors") + } +} diff --git a/2.0/problems/vector_db_ann_relaxed/harbor/app/src/distance.rs b/2.0/problems/vector_db_ann_relaxed/harbor/app/src/distance.rs new file mode 100644 index 00000000..c67b9092 --- /dev/null +++ b/2.0/problems/vector_db_ann_relaxed/harbor/app/src/distance.rs @@ -0,0 +1,9 @@ +pub fn l2_distance(a: &[f32], b: &[f32]) -> f64 { + a.iter() + .zip(b) + .map(|(x, y)| { + let d = (*x as f64) - (*y as f64); + d * d + }) + .sum() +} diff --git a/2.0/problems/vector_db_ann_relaxed/harbor/app/src/main.rs b/2.0/problems/vector_db_ann_relaxed/harbor/app/src/main.rs new file mode 100644 index 00000000..2c4e6ec7 --- /dev/null +++ b/2.0/problems/vector_db_ann_relaxed/harbor/app/src/main.rs @@ -0,0 +1,54 @@ +use axum::{extract::State, routing::post, Json, Router}; +use std::sync::Arc; +use tokio::net::TcpListener; + +mod api; +mod db; +mod distance; + +use api::*; +use db::VectorDB; + +#[tokio::main] +async fn main() { + let db = Arc::new(VectorDB::new()); + let app = Router::new() + .route("/insert", post(handle_insert)) + .route("/bulk_insert", post(handle_bulk_insert)) + .route("/search", post(handle_search)) + .with_state(db); + + let port = std::env::var("PORT").unwrap_or_else(|_| "8080".to_string()); + let listener = TcpListener::bind(format!("0.0.0.0:{port}")).await.unwrap(); + axum::serve(listener, app).await.unwrap(); +} + +async fn handle_insert( + State(db): State>, + Json(req): Json, +) -> Json { + db.insert(req.id, req.vector); + Json(InsertResponse { + status: "ok".to_string(), + }) +} + +async fn handle_bulk_insert( + State(db): State>, + Json(req): Json, +) -> Json { + let vectors = req.vectors.into_iter().map(|v| (v.id, v.vector)).collect(); + let inserted = db.bulk_insert(vectors); + Json(BulkInsertResponse { + status: "ok".to_string(), + inserted, + }) +} + +async fn handle_search( + State(db): State>, + Json(req): Json, +) -> Json { + let results = db.search(&req.vector, req.top_k); + Json(SearchResponse { results }) +} diff --git a/2.0/problems/vector_db_ann_relaxed/readme b/2.0/problems/vector_db_ann_relaxed/readme new file mode 100644 index 00000000..fefc3ee1 --- /dev/null +++ b/2.0/problems/vector_db_ann_relaxed/readme @@ -0,0 +1,161 @@ +# Vector DB ANN Relaxed + +## Problem + +Build a fast approximate nearest-neighbor vector search engine for a +SIFT1M-scale benchmark. This relaxed variant uses the same data, API, resource +budget, and recall target as Vector DB ANN, but penalizes load/index-build time +10x less heavily. + +The hidden benchmark contains exactly `1,000,000` base vectors with dimension +`128`. Queries use the same dimension, distance is squared Euclidean distance, +and each query asks for the top `10` nearest vector ids. + +Your objective is to maximize serving throughput while preserving search +quality: submissions must reach `recall@10 >= 0.95`, and valid submissions are +ranked by an effective QPS that includes query time plus a small +load/index-build penalty. + +The Harbor agent container starts with a small Rust skeleton project in +`/app`. You may use it, modify it, or replace it entirely. You may also use any +Rust crates, internal harness, data structures, and build layout you want, as +long as the final project satisfies the judge contract below. + +The judge builds and runs your service with: + +```bash +cargo build --release +PORT= cargo run --release --quiet +``` + +The Harbor environment uses the Ubuntu `apt` Rust toolchain: + +```text +rustc 1.75 +cargo 1.75 +``` + +If you add crates, choose versions compatible with this toolchain or pin +transitive dependencies accordingly. + +The service and judge run with the task resource limits below. Design your +parallel search and indexing strategy for this budget: + +```text +vCPUs: 8 +memory: 16 GiB +query concurrency: 8 +timed queries per worker: 64 +``` + +The service must listen on `PORT` and implement these endpoints: + +```text +POST /insert +POST /bulk_insert +POST /search +``` + +`/bulk_insert` receives: + +```json +{"vectors":[{"id":0,"vector":[0.1,0.2,...]}]} +``` + +and returns: + +```json +{"status":"ok","inserted":1} +``` + +`/search` receives: + +```json +{"vector":[0.1,0.2,...],"top_k":10} +``` + +and returns: + +```json +{"results":[{"id":0,"distance":0.0}]} +``` + +## Local Harness + +The official evaluator uses hidden data and a black-box judge. You may call: + +```bash +bash /app/submit.sh +``` + +at any time to submit the current `/app` project to the official judge and get +score feedback. + +## Validity + +A submission is valid if: + +1. It builds successfully with `cargo build --release`. +2. `cargo run --release --quiet` starts the service and implements the required HTTP + endpoints. +3. Every returned id is in `[0, 1_000_000)`. +4. Its `recall@10` is at least `0.95` against the hidden exact top-10 ground + truth. + +## Scoring + +At trial startup, the Harbor judge sidecar prepares the hidden benchmark and +runs an exact-search reference HTTP service through the same `/bulk_insert` +and `/search` client harnesses to produce ground-truth nearest neighbors and +the trial-local scoring baseline: + +```text +baseline_qps +baseline_effective_qps +baseline_load_seconds +``` + +Interactive submissions and the final verifier both score through this same +judge sidecar, so the baseline and runtime environment are shared within a +trial while still letting different machines measure their own local baseline. + +Each submission is then timed independently. The load phase includes all +`/bulk_insert` calls and any index construction performed by the service before +queries begin. The query phase uses 8 concurrent workers, each issuing 64 +queries, and measures only `/search` throughput: + +```text +candidate_qps +candidate_load_seconds +``` + +The reported `qps` is the raw query-only QPS. Scoring uses an effective QPS +that includes a small index-build/load penalty: + +```text +effective_qps = Q / (query_seconds + 0.001 * load_seconds) +``` + +The load phase has a default `900s` timeout. This keeps the benchmark focused +on serving performance while still making very expensive offline indexing pay a +bounded, explicit cost. This relaxed variant makes the load/index-build penalty +small enough that spending additional time on a stronger index can be worthwhile +when it materially improves query throughput or recall. + +During evaluation, the load and query phases may stop early and return `0` once +the elapsed time plus the load penalty makes it impossible for the final +effective QPS to beat the baseline. During load, this assumes a best-case query +time of zero. + +If the submission is invalid, if `recall@10 < 0.95`, or if +`candidate_effective_qps <= baseline_effective_qps`, the score is `0`. +Otherwise: + +```text +score = 100 * (1 - sqrt(baseline_effective_qps) / sqrt(candidate_effective_qps)) +``` + +The bounded and unbounded score fields both report this score. Harbor JSON +results include the measured `qps`, `effective_qps`, `baseline_qps`, +`baseline_effective_qps`, `recall_at_10`, load time, and runtime metrics under +the `metrics` field. diff --git a/2.0/problems/vector_db_ann_relaxed/reference.py b/2.0/problems/vector_db_ann_relaxed/reference.py new file mode 100644 index 00000000..ef1b7bf5 --- /dev/null +++ b/2.0/problems/vector_db_ann_relaxed/reference.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 +"""Reference note for Vector DB ANN Relaxed. + +This is a project-style Rust service task. Agents provide their own harness in +/app; the official Python evaluator is the hidden judge control plane. +""" diff --git a/2.0/problems/vector_db_ann_relaxed/reference.rs b/2.0/problems/vector_db_ann_relaxed/reference.rs new file mode 100644 index 00000000..fbd5b693 --- /dev/null +++ b/2.0/problems/vector_db_ann_relaxed/reference.rs @@ -0,0 +1,41 @@ +use crate::api::SearchResult; +use crate::distance::l2_distance; +use std::sync::RwLock; + +pub struct VectorDB { + vectors: RwLock)>>, +} + +impl VectorDB { + pub fn new() -> Self { + Self { + vectors: RwLock::new(Vec::new()), + } + } + + pub fn insert(&self, id: u64, vector: Vec) { + self.vectors.write().unwrap().push((id, vector)); + } + + pub fn bulk_insert(&self, vectors: Vec<(u64, Vec)>) -> usize { + let inserted = vectors.len(); + self.vectors.write().unwrap().extend(vectors); + inserted + } + + pub fn search(&self, vector: &[f32], top_k: u32) -> Vec { + let mut scored: Vec = self + .vectors + .read() + .unwrap() + .iter() + .map(|(id, candidate)| SearchResult { + id: *id, + distance: l2_distance(vector, candidate), + }) + .collect(); + scored.sort_by(|a, b| a.distance.total_cmp(&b.distance)); + scored.truncate(top_k as usize); + scored + } +} diff --git a/README.md b/README.md index 6ab5fa37..8ba254e2 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ Research Problems Algorithmic Problems - 2.0 Problems + 2.0 Problems

## News