Skip to content

Implement Rust QBG Agent#3459

Open
kmrmt wants to merge 69 commits intomainfrom
feature/rust/integrate-agent
Open

Implement Rust QBG Agent#3459
kmrmt wants to merge 69 commits intomainfrom
feature/rust/integrate-agent

Conversation

@kmrmt
Copy link
Contributor

@kmrmt kmrmt commented Jan 29, 2026

Description

SSIA

Related Issue

Versions

  • Vald Version: v1.7.17
  • Go Version: v1.25.5
  • Rust Version: v1.92.0
  • Docker Version: v29.1.3
  • Kubernetes Version: v1.34.3
  • Helm Version: v4.0.4
  • NGT Version: v2.5.0
  • Faiss Version: v1.13.1

Checklist

Special notes for your reviewer

Summary by CodeRabbit

  • New Features

    • YAML-based agent configuration with tracing/OTEL support.
    • Kubernetes pod-annotation metrics exporter for index visibility.
    • Durable index persistence with copy-on-write, backups and recovery.
    • Background daemon for automatic index creation and periodic saves.
    • New in-memory store and QBG-backed indexing/search service.
    • Expanded gRPC surface including flush and enhanced search/insert/update/remove/object/index endpoints.
  • Tests

    • Large expansion of Rust test coverage and new per-language coverage targets.

@vdaas-ci
Copy link
Collaborator

[CHATOPS:HELP] ChatOps commands.

  • 🙆‍♀️ /approve - approve
  • 🍱 /format - format codes and add licenses
  • /gen-test - generate test codes
  • 🏷️ /label - add labels
  • 🔚 2️⃣ 🔚 /label actions/e2e-deploy - run E2E deploy & integration test

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 29, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a service-driven Rust agent and QBG integration: new AgentConfig loader, tracing/observability, generic Agent<S: ANN> with daemon lifecycle and gRPC server, QBGService implementing ANN with persistence/memstore/vqueue/kvs/k8s metrics, broad library API changes (algorithm, vqueue, kvs, observability, proto), CI/test/coverage and Helm/Makefile updates.

Changes

Cohort / File(s) Summary
Build & CI
Makefile.d/test.mk, Makefile.d/dependencies.mk, .github/workflows/coverage.yaml, .deepsource.toml
Added Rust test targets and coverage/rust target; install cargo-llvm-cov; split Go/Rust coverage workflows; toggled deepsource coverage analyzer.
Agent config & entry
rust/bin/agent/src/config.rs, rust/bin/agent/src/main.rs
New AgentConfig with YAML/env expansion, bind/validate; new serve(config) runtime path, tracing/otel wiring and service selection.
Agent core & gRPC wiring
rust/bin/agent/src/handler.rs, rust/bin/agent/src/handler/common.rs
Agent generalized to Agent<S: ANN> with Arc<RwLock, daemon lifecycle (start/stop/shutdown), serve_grpc orchestration, and global DOMAIN cache.
gRPC handlers
rust/bin/agent/src/handler/*.rs (index,insert,object,remove,search,update,upsert,flush,...)
All handlers parameterized over S: algorithm::ANN; switched to Arc<RwLock>; centralized error-details via build_error_details; streaming/multi APIs tightened and consistent request handling.
Service layer
rust/bin/agent/src/service.rs, rust/bin/agent/src/service/*.rs, rust/bin/agent/src/metrics.rs
New modular services: daemon (background ticks/handle), k8s metrics exporter, memstore (KVS+VQueue timestamp logic), metadata, persistence manager with CoW, QBGService implementing ANN, and OpenTelemetry metrics registration.
Algorithm crate
rust/libs/algorithm/src/error.rs, rust/libs/algorithm/src/lib.rs, rust/libs/algorithm/Cargo.toml
Introduced error module (Error, MultiError), converted to async ANN trait surface, added thiserror and bumped some deps.
VQueue & KVS libraries
rust/libs/vqueue/src/lib.rs, rust/libs/vqueue/Cargo.toml, rust/libs/kvs/src/*, rust/libs/kvs/Cargo.toml
Expanded Queue trait with many async methods and NotFound error; sled enabled with compression; wincode Schema bounds updated to DefaultConfig; public re-exports added.
Observability / tracing
rust/libs/observability/src/tracing.rs, rust/libs/observability/src/lib.rs, rust/libs/observability/Cargo.toml, rust/libs/observability/src/{config,observability}.rs
New TracingConfig, init/shutdown helpers, ObservabilityImpl, OTLP init; added tracing, tracing-subscriber, tracing-opentelemetry deps and re-exports.
Proto & generated layout
rust/libs/proto/src/lib.rs and many proto mod.rs removals
Consolidated module layout: removed many per-path mod.rs includes and restructured to grouped versioned includes under proto::v1 in root lib.rs.
Algorithms – QBG / C++ FFI
rust/libs/algorithms/qbg/*, rust/libs/algorithms/qbg/src/{input.cpp,input.h}
Added QBG algorithm crate and FFI/type enum conversions; changed Property/Index signatures to enum-backed types; tests updated.
Helm & charts
charts/vald/..., .github/workflows/e2e.v2.yaml, charts/vald/values.schema.json, charts/vald/values.yaml, charts/vald/templates/agent/qbg/*
Added qbg Helm values/schema, templates and e2e paths; chart conditionals generalized to algorithm-config; new values-qbg.yaml and e2e/test wiring.
Makefile / e2e helpers
Makefile.d/e2e.mk, Makefile.d/functions.mk, Makefile.d/build.mk
Added e2e v2/qbg targets and run macro; use cargo --manifest-path for builds.
Other crates & minor bumps
assorted Cargo.toml, go.mod, example/client/go.mod, Dockerfile
Multiple dependency bumps across crates; Go module updates; base builder image bumped.
Tests
many Rust test additions across agent/service/vqueue/kvs/persistence
Extensive unit and integration tests added for config, daemon, QBGService, persistence, memstore, vqueue, k8s exporter, metrics, and handler flows.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Client
  rect rgba(180,200,255,0.5)
    participant GRPC as gRPC
    participant Agent as Agent<S>
  end
  rect rgba(200,255,200,0.5)
    participant Service as QBGService
    participant VQ as VQueue
    participant KVS as KVS
    participant Persist as Persistence
    participant Daemon as Daemon
    participant K8s as K8sExporter
  end

  Client->>GRPC: gRPC request (search/insert/flush/...)
  GRPC->>Agent: dispatch to handler
  Agent->>Service: acquire read/write via Arc/RwLock
  alt write op (insert/update/remove)
    Service->>VQ: enqueue / pop / get_vector
    Service->>KVS: update bidirectional map
    Service->>Persist: prepare/save index (on create/save)
    Service->>K8s: export metrics (optional)
  else read op (search/get)
    Service->>KVS: lookup uuid(s)
    Service->>VQ: check for newer queued data
    Service->>Service: perform index search
  end
  Daemon->>Service: periodic tick -> create/save/regenerate index
  Service-->>Agent: result / Error
  Agent-->>GRPC: respond (with structured error details)
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

Suggested reviewers

  • kpango
  • vankichi
  • datelier

Poem

A config, a daemon, a service so bold,
Queues and KV maps in stories told,
Traces that shimmer, indexes that sing—
Agents awaken; background tasks bring.
🌱🔧🎉

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/rust/integrate-agent

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements a Rust-based QBG (Quantized Ball Graph) Agent for the Vald vector search system. It introduces a comprehensive service implementation including vector queue management, key-value storage, observability features, gRPC handlers, automatic index management, and Kubernetes integration for metrics export.

Changes:

  • Complete QBG service implementation with async operations
  • New vqueue operations (pop, get, exists, range methods)
  • Observability/tracing integration with OpenTelemetry
  • Daemon for automatic indexing and saving
  • Kubernetes metrics export via pod annotations
  • Handler refactoring to use generics and async/await patterns

Reviewed changes

Copilot reviewed 15 out of 36 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
rust/libs/vqueue/Cargo.toml Downgraded bincode from 3.0 to 2.0, enabled sled compression
rust/libs/vqueue/src/lib.rs Added new queue methods: pop_insert, pop_delete, get_vector, iv_exists, dv_exists, range
rust/libs/observability/* New tracing module with OpenTelemetry integration
rust/libs/algorithm/src/*.rs Refactored ANN trait to use async methods and added new error types
rust/bin/agent/src/service/* New service modules for QBG, daemon, k8s, metadata, persistence, memstore
rust/bin/agent/src/handler/*.rs Refactored handlers to use generics and async patterns
rust/bin/agent/src/main.rs Complete refactoring with new service initialization and graceful shutdown
rust/bin/agent/src/config.rs New configuration structure with validation
Makefile.d/test.mk Enhanced test targets for Rust crates

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (9)
rust/bin/agent/src/handler/object.rs (1)

124-139: Fix typo in log message.

✏️ Suggested fix
-        info!("Recieved a request from {:?}", request.remote_addr());
+        info!("Received a request from {:?}", request.remote_addr());
rust/bin/agent/src/handler/common.rs (1)

33-46: Avoid panic on non‑UTF8 hostnames.

to_str().unwrap() panics when the hostname contains non-UTF8 bytes. Use lossy conversion to handle this gracefully.

✅ Suggested fix
-    err_details.set_error_info(err_msg.to_string(), DOMAIN.get_or_init(|| { gethostname::gethostname().to_str().unwrap().to_string() }), metadata);
+    let domain = DOMAIN.get_or_init(|| gethostname::gethostname().to_string_lossy().into_owned());
+    err_details.set_error_info(err_msg.to_string(), domain, metadata);
rust/bin/agent/src/handler/upsert.rs (1)

179-179: Fix typo: "Recieved" → "Received".

📝 Proposed fix
-        info!("Recieved a request from {:?}", request.remote_addr());
+        info!("Received a request from {:?}", request.remote_addr());
rust/bin/agent/src/handler/remove.rs (2)

135-135: Typo: "Recieved" should be "Received".

✏️ Proposed fix
-        info!("Recieved a request from {:?}", request.remote_addr());
+        info!("Received a request from {:?}", request.remote_addr());

285-285: Typo: "Recieved" should be "Received".

✏️ Proposed fix
-        info!("Recieved a request from {:?}", request.remote_addr());
+        info!("Received a request from {:?}", request.remote_addr());
rust/bin/agent/src/handler/update.rs (3)

199-199: Typo: "Recieved" should be "Received".

✏️ Proposed fix
-        info!("Recieved a request from {:?}", request.remote_addr());
+        info!("Received a request from {:?}", request.remote_addr());

255-255: Typo: "Recieved" should be "Received".

✏️ Proposed fix
-        info!("Recieved a request from {:?}", request.remote_addr());
+        info!("Received a request from {:?}", request.remote_addr());

283-284: Typos in error message: "Incombatible" and "detedted".

The error message contains two typos that would be visible to API consumers.

✏️ Proposed fix
                     let status = Status::with_error_details(
                         Code::InvalidArgument,
-                        "MultiUpdate API Incombatible Dimension Size detedted",
+                        "MultiUpdate API Incompatible Dimension Size detected",
                         err_details,
                     );
rust/bin/agent/src/handler/insert.rs (1)

159-159: Typo: "Recieved" should be "Received".

✏️ Proposed fix
-        info!("Recieved a request from {:?}", request.remote_addr());
+        info!("Received a request from {:?}", request.remote_addr());
🤖 Fix all issues with AI agents
In `@rust/bin/agent/Cargo.toml`:
- Around line 36-37: Update the k8s-openapi dependency feature flag in
Cargo.toml so generated API types match the project's minimum Kubernetes
version: change the k8s-openapi entry where features = ["v1_32"] to use features
= ["v1_34"] (i.e., update the k8s-openapi dependency declaration in Cargo.toml
accordingly).

In `@rust/bin/agent/src/config.rs`:
- Around line 245-255: Config field names in config.rs don't match keys used by
QBGService::new (it expects qbg.is_read_replica and
qbg.enable_export_index_info), causing silent misconfiguration; fix by renaming
the struct fields in config.rs from is_readreplica → is_read_replica and
enable_export_index_info_to_k8s → enable_export_index_info (keep
export_index_info_duration as-is or rename if service expects different key),
update any YAML config files to use the same keys, and ensure QBGService::new's
settings.get() calls match the final key names so deserialization and lookups
align.

In `@rust/bin/agent/src/handler.rs`:
- Line 135: Replace the hardcoded addr assignment in handler.rs (the let addr =
"0.0.0.0:8081".parse()? line) with a value read from the existing settings
Config (e.g., read a "server.address" or "grpc.address" string via
settings.get_str or equivalent), parse that string into the socket address as
before, and provide a sensible default fallback ("0.0.0.0:8081") if the setting
is missing; keep error handling consistent (propagate parse errors) so the addr
variable is set from configuration rather than a hardcoded literal.

In `@rust/bin/agent/src/handler/remove.rs`:
- Around line 196-199: remove_by_timestamp currently constructs remove::Request
with config: None which triggers remove()'s InvalidArgument path; update
remove_by_timestamp to populate the request's config field with a valid config
(for example reuse the same config passed to/remove used elsewhere or clone the
agent's stored config) so the request becomes remove::Request { id:
Some(object::Id { id: uuid.clone() }), config: Some(config.clone()) } (i.e.,
ensure remove::Request.config is Some(...) instead of None).

In `@rust/bin/agent/src/main.rs`:
- Around line 49-52: The match that creates `service` currently panics on
unsupported values; instead return a proper error: change `main` to return a
Result (e.g., anyhow::Result or your crate's error type), capture the
`settings.get_string("service.type")?` value into a variable, match on it and
for the unsupported branch return Err(anyhow::anyhow!("unsupported service type:
{}", svc)) (or your crate's error constructor) rather than panic, and keep
calling `QBGService::new(settings.clone()).await` in the `"qbg"` arm so the
error propagates to the caller instead of aborting.

In `@rust/bin/agent/src/service/memstore.rs`:
- Around line 430-456: The update_timestamp logic incorrectly skips vqueue-only
items when dts == 0; modify the branch in update_timestamp that currently reads
"if vqok && !kvok && dts != 0 && dts < ts && (force || its < ts)" so it also
handles the dts == 0 case (e.g., change to "if vqok && !kvok && dts < ts &&
(force || its < ts)"), and adjust the inner logic to only call
vq.pop_delete(uuid).await? and the subsequent rollback when dts != 0 (i.e., skip
pop/rollback when dts == 0) so vq.push_insert(uuid, v, Some(ts)).await? proceeds
for uncommitted vqueue-only items instead of falling through to
NothingToBeDoneForUpdate; keep calls to vq.push_insert, kv.set, and
vq.push_delete unchanged in the other branch.
- Around line 255-298: The uuids() function only iterates kv.range_stream() and
thus omits vqueue‑only inserts; to fix it iterate the vqueue insert queue after
processing kvs (using the vqueue API available in your codebase — e.g. ivq_len +
ivq_at or an iv_range_stream if present) and for each uuid not already in seen
perform the same delete/insert timestamp checks (call vq.dv_exists(&uuid).await
and vq.iv_exists(&uuid).await) and push the uuid when the insert is newer than
any delete (de‑duplicate via the existing seen HashSet) so the returned Vec
includes both committed (kvs) and uncommitted (vqueue) inserts.

In `@rust/bin/agent/src/service/qbg.rs`:
- Around line 279-295: insert_internal currently enqueues vectors without
checking their dimensionality; before calling self.vq.push_insert in
insert_internal, run the same dimension validation used by ready_for_update
(i.e., compare vector.len() against the index/agent dimension and return the
same error variant used there) and return early with that error if dimensions
mismatch so invalid-length vectors are rejected before enqueueing.
- Around line 641-665: In the search method of qbg.rs (async fn search), remove
the .unwrap() on the call to self.index.search(...) and instead handle the
potential cxx::Exception by mapping it into an Error::Internal (preserving the
exception message for logging/context); e.g., call
self.index.search(...).map_err(|e| Error::Internal(format!("Index::search
failed: {}", e)))? (or use a match to convert the Err variant into
Error::Internal) before proceeding to build the results Vec and returning
search::Response.
- Around line 577-639: The timestamp generation using
Local::now().nanosecond().into() in insert, update, remove, update_multiple (and
their internal calls insert_internal, update_internal, remove_internal,
remove_multiple_internal and ready_for_update) causes non-monotonic,
second-local nanosecond values; replace each occurrence with a true epoch
nanosecond timestamp via Utc::now().timestamp_nanos_opt().unwrap_or(0) so all
ts/ kts/ its comparisons use monotonic epoch nanos consistently across
insert_multiple, update_multiple, remove_multiple and ready_for_update paths.

In `@rust/libs/algorithm/src/lib.rs`:
- Around line 20-22: Remove the conflicting `use anyhow::Result;` import and
ensure the file uses the local `Error` type in trait method signatures (i.e.,
keep `Result<T, Error>` as used by the trait methods and the `Error` imported
from the local error module); specifically delete the `anyhow::Result` import so
the compiler can resolve the two-parameter `Result<T, Error>` used by the trait
methods and retain any necessary `use` for the local `Error` symbol.
🟡 Minor comments (18)
rust/libs/kvs/Cargo.toml-23-24 (1)

23-24: Address sled compression feature compatibility across deployments.

Enabling sled's compression feature (compile-time) changes on-disk compatibility. Any existing database written without compression must be opened with a matching sled build; activating compression requires all binaries that access the same data directory to be recompiled consistently. Additionally, sled warns that on-disk format may change pre-1.0 and can require manual migrations.

rust/libs/vqueue/src/lib.rs-148-183 (1)

148-183: Docs don’t match return behavior for iv/dv exists and get_vector.

The comments mention “0 if not found” and an exists flag, but the signatures return Result (with NotFound) and no exists flag for get_vector.

📝 Suggested doc alignment
-    /// The insert timestamp if the UUID exists, or 0 if not found.
+    /// The insert timestamp if the UUID exists, otherwise `QueueError::NotFound`.
@@
-    /// The delete timestamp if the UUID exists, or 0 if not found.
+    /// The delete timestamp if the UUID exists, otherwise `QueueError::NotFound`.
@@
-    /// A tuple of (vector, insert_timestamp, exists).
+    /// A tuple of (vector, insert_timestamp).
rust/libs/algorithm/src/error.rs-45-56 (1)

45-56: Fix thiserror format strings so UUID/limit values render correctly.

The conditional expressions return string literals containing {uuid} and {limit} as plain text. Since these are evaluated at runtime and passed as format arguments, the placeholders won't be interpolated by the format! macro. Wrap them in format!() calls to interpolate the variables before passing them to the error formatter.

🛠️ Suggested fix
-    #[error("object uuid{} not found", if uuid == "0" { "" } else { " {uuid}'s metadata" })]
+    #[error("object uuid{} not found", if uuid == "0" { "" } else { format!(" {}'s metadata", uuid) })]
@@
-    #[error("dimension size {} is invalid, the supporting dimension size must be {}", current, if limit == "0" { "bigger than 2" } else { "between 2 ~ {limit}" })]
+    #[error("dimension size {} is invalid, the supporting dimension size must be {}", current, if limit == "0" { "bigger than 2".to_string() } else { format!("between 2 ~ {}", limit) })]
rust/bin/agent/src/handler/flush.rs-28-33 (1)

28-33: Fix typo in log message.

Minor, but worth cleaning up for operational logs.

✏️ Suggested fix
-        info!("Recieved a request from {:?}", request.remote_addr());
+        info!("Received a request from {:?}", request.remote_addr());
rust/bin/agent/src/main.rs-44-45 (1)

44-45: Replace expect() with proper error propagation.

Using expect() will panic on failure, but serve() returns Result. Propagate the error instead.

🛡️ Proposed fix
-    let tracer_provider = init_tracing(&tracing_config, otel_config.as_ref())
-        .expect("failed to initialize tracing");
+    let tracer_provider = init_tracing(&tracing_config, otel_config.as_ref())?;
rust/bin/agent/src/handler/upsert.rs-153-167 (1)

153-167: Potential panic on missing error details.

st.get_details_error_info().unwrap().reason will panic if the status doesn't contain error info. Use safe access instead.

🛡️ Proposed fix
                 _ => {
                     let resource_type = format!("{}{}", resource_type, rt_name);
                     let resource_name = format!("{}: {}({})", api_name, name, ip);
+                    let reason = st.get_details_error_info()
+                        .map(|info| info.reason)
+                        .unwrap_or_else(|| st.message().to_string());
                     let err_details = build_error_details(
-                        st.get_details_error_info().unwrap().reason,
+                        reason,
                         &uuid,
                         request.encode_to_vec(),
                         &resource_type,
                         &resource_name,
                         None,
                     );
                     Status::with_error_details(st.code(), st.message(), err_details)
                 }
rust/bin/agent/src/main.rs-116-124 (1)

116-124: Configuration loading should handle errors gracefully.

The unwrap() on config build will panic if the config file is missing or malformed. This should return a user-friendly error.

🛡️ Proposed fix
 #[tokio::main]
 async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let settings = ::config::Config::builder()
         .add_source(::config::File::with_name("/etc/server/config.yaml"))
         .build()
-        .unwrap();
+        .map_err(|e| format!("failed to load configuration from /etc/server/config.yaml: {}", e))?;
     
     serve(settings).await
 }
rust/libs/observability/src/tracing.rs-190-211 (1)

190-211: Document that init_tracing should be called only once, or add a guard to global::set_tracer_provider.

The function global::set_tracer_provider at line 208 unconditionally replaces the global provider without any protection. While init_tracing is currently called only once (in rust/bin/agent/src/main.rs:44), the lack of safeguards makes this vulnerable to accidental multiple initializations, especially in test environments. Consider adding documentation explicitly stating this requirement or implementing an initialization guard (e.g., using std::sync::Once or similar).

rust/bin/agent/src/handler/update.rs-428-428 (1)

428-428: Typo: "Recieved" should be "Received".

✏️ Proposed fix
-        info!("Recieved a request from {:?}", request.remote_addr());
+        info!("Received a request from {:?}", request.remote_addr());
rust/bin/agent/src/handler/insert.rs-214-214 (1)

214-214: Typo: "Recieved" should be "Received".

✏️ Proposed fix
-        info!("Recieved a request from {:?}", request.remote_addr());
+        info!("Received a request from {:?}", request.remote_addr());
rust/bin/agent/src/handler/remove.rs-153-153 (1)

153-153: Typo: "Recieved" should be "Received".

✏️ Proposed fix
-        info!("Recieved a request from {:?}", request.remote_addr());
+        info!("Received a request from {:?}", request.remote_addr());
rust/bin/agent/src/handler/insert.rs-136-141 (1)

136-141: Inconsistent error code: Code::Unknown used instead of Code::Internal.

Other handlers (update.rs, remove.rs) use Code::Internal for the catch-all error case. Using Code::Unknown with the message "failed to parse Insert gRPC error response" is misleading since this is a fallback for unhandled ANN errors, not a parsing failure.

♻️ Proposed fix for consistency
                 _ => {
                     let err_details = build_error_details(
                         err,
                         &vec.id,
                         request_bytes,
                         &resource_type,
                         &resource_name,
                         None,
                     );
                     Status::with_error_details(
-                        Code::Unknown,
-                        "failed to parse Insert gRPC error response",
+                        Code::Internal,
+                        "Insert API failed",
                         err_details,
                     )
                 }
rust/bin/agent/src/handler/insert.rs-241-242 (1)

241-242: Typos in error message: "Incombatible" and "detedted".

✏️ Proposed fix
                 let status = Status::with_error_details(
                     Code::InvalidArgument,
-                    "MultiInsert API Incombatible Dimension Size detedted",
+                    "MultiInsert API Incompatible Dimension Size detected",
                     err_details,
                 );
rust/bin/agent/src/handler/insert.rs-64-65 (1)

64-65: Typos in error message: "Incombatible" and "detedted".

✏️ Proposed fix
         let status = Status::with_error_details(
             Code::InvalidArgument,
-            "Insert API Incombatible Dimension Size detedted",
+            "Insert API Incompatible Dimension Size detected",
             err_details,
         );
rust/bin/agent/src/service/persistence.rs-294-299 (1)

294-299: unwrap() on SystemTime::now().duration_since(UNIX_EPOCH) could panic.

While extremely rare, if the system clock is set before Unix epoch, this will panic. Consider using unwrap_or_default() or handling the error gracefully.

🛡️ Proposed defensive fix
         // Create new backup directory with timestamp
         let timestamp = SystemTime::now()
             .duration_since(UNIX_EPOCH)
-            .unwrap()
+            .unwrap_or_default()
             .as_nanos();
rust/bin/agent/src/service/persistence.rs-383-388 (1)

383-388: Same unwrap() issue with SystemTime in mktmp().

🛡️ Proposed defensive fix
         // Create a unique temp directory using timestamp and random suffix
         let timestamp = SystemTime::now()
             .duration_since(UNIX_EPOCH)
-            .unwrap()
+            .unwrap_or_default()
             .as_nanos();
rust/bin/agent/src/handler/search.rs-55-58 (1)

55-58: Fix typo in client-facing Status message.

The InvalidArgument string has typos (“Incombatible” / “detedted”). This is returned to clients.

✏️ Suggested fix
-            "Search API Incombatible Dimension Size detedted",
+            "Search API incompatible dimension size detected",
rust/bin/agent/src/handler/search.rs-45-47 (1)

45-47: Avoid hard‑coding resource_type to qbg.Search inside the helper.

search() appends /qbg.Search unconditionally. When called by stream_search, the caller already passes a StreamSearch resource type, so error details become mis‑labeled (e.g., .../qbg.StreamSearch/qbg.Search). Consider passing the fully qualified operation name into search() and removing the hard-coded suffix, then update call sites accordingly.

🔧 Directional refactor (helper no longer mutates the operation name)
-        let resource_type = format!("{}/qbg.Search", resource_type);
+        let resource_type = resource_type.to_string();
@@
-            let resource_type = format!("{}/qbg.Search", resource_type);
+            let resource_type = resource_type.to_string();
🧹 Nitpick comments (17)
rust/libs/algorithm/src/error.rs (1)

117-118: Consider trimming/ignoring empty UUIDs in split_uuids.

This avoids empty entries from trailing commas or whitespace-only input.

♻️ Optional tweak
-    fn split_uuids(uuids: String) -> Vec<String> {
-        uuids.split(",").map(|x| x.to_string()).collect()
-    }
+    fn split_uuids(uuids: String) -> Vec<String> {
+        uuids
+            .split(',')
+            .map(|x| x.trim())
+            .filter(|x| !x.is_empty())
+            .map(|x| x.to_string())
+            .collect()
+    }
Makefile.d/test.mk (1)

378-381: Verify CI/tooling installs cargo llvm-cov and LLVM tools.

The new coverage/rust target will fail unless cargo-llvm-cov and llvm-tools-preview are available in the build image.

rust/bin/agent/src/service/k8s.rs (1)

305-318: Assert applied annotations in test_export_on_tick.

Right now the test doesn’t verify any behavior because the boxed mock is inaccessible after construction. Consider wrapping the mock in a shared handle so you can assert on the applied annotations.

✅ Suggested test fix
-    use std::sync::Mutex;
+    use std::sync::{Arc, Mutex};
@@
+    struct SharedPatcher(Arc<MockPatcher>);
+
+    #[async_trait::async_trait]
+    impl Patcher for SharedPatcher {
+        async fn apply_pod_annotations(
+            &self,
+            name: &str,
+            namespace: &str,
+            annotations: HashMap<String, String>,
+        ) -> Result<()> {
+            self.0.apply_pod_annotations(name, namespace, annotations).await
+        }
+    }
+
     #[tokio::test]
     async fn test_export_on_tick() {
-        let patcher = Box::new(MockPatcher::new());
+        let patcher = Arc::new(MockPatcher::new());
         let exporter = MetricsExporter::new(
-            patcher,
+            Box::new(SharedPatcher(patcher.clone())),
             "test-pod".to_string(),
             "default".to_string(),
             true,
         );
 
         exporter.export_on_tick(100, 5).await.unwrap();
 
-        // Note: Can't access mock directly due to Box, would need Arc<dyn Patcher>
+        let applied = patcher.get_applied();
+        assert_eq!(applied.len(), 1);
+        assert_eq!(
+            applied[0].get(annotations::INDEX_COUNT),
+            Some(&"100".to_string())
+        );
     }
rust/bin/agent/src/handler/object.rs (1)

179-221: Release the read lock between items to avoid long writer blocking.

Holding a read lock for the full stream can starve writers on large datasets. Consider narrowing the lock scope to each lookup.

♻️ Suggested refactor
-        tokio::spawn(async move {
-            let s = s.read().await;
-            let uuids = s.uuids().await;
-            
-            for uuid in uuids {
-                let response = match s.get_object(uuid.clone()).await {
+        tokio::spawn(async move {
+            let uuids = {
+                let s = s.read().await;
+                s.uuids().await
+            };
+
+            for uuid in uuids {
+                let response = {
+                    let s = s.read().await;
+                    match s.get_object(uuid.clone()).await {
                         Ok((vec, ts)) => object::list::Response {
                             payload: Some(object::list::response::Payload::Vector(object::Vector {
                                 id: uuid,
                                 vector: vec,
                                 timestamp: ts,
                             })),
                         },
                         Err(_) => {
                             let status = proto::google::rpc::Status {
                                 code: Code::NotFound as i32,
                                 message: format!("failed to get object with uuid: {}", uuid),
                                 details: vec![],
                             };
                             object::list::Response {
                                 payload: Some(object::list::response::Payload::Status(status)),
                             }
                         }
-                };
+                    }
+                };
 
                 if tx.send(Ok(response)).await.is_err() {
                     // Receiver dropped, stop sending
                     break;
                 }
             }
         });
rust/libs/observability/src/tracing.rs (1)

129-184: Consider extracting subscriber initialization to reduce code duplication.

The match arms have significant repetition in the subscriber setup. While the comment acknowledges avoiding "complex type combinations," you could reduce duplication with a helper or by using Option layers.

♻️ Example: Extract common subscriber setup
// Alternative approach using boxed layers (trade-off: some runtime overhead)
fn build_subscriber(
    env_filter: EnvFilter,
    enable_stdout: bool,
    enable_json: bool,
    otel_layer: Option<OpenTelemetryLayer<...>>,
) -> impl tracing::Subscriber { ... }

However, the current explicit approach is acceptable given Rust's type system constraints with layers.

rust/bin/agent/src/service.rs (1)

36-203: Mock service is well-structured for testing dimension error paths.

The _MockService provides a comprehensive ANN trait implementation for testing. The underscore prefix correctly signals it's for internal test use.

However, consider if this mock could be useful in other test files. If so, making it pub(crate) without the underscore and moving to a shared test utilities module would improve reusability.

rust/bin/agent/src/main.rs (1)

53-60: Hardcoded agent configuration values should be externalized.

The agent name, IP, resource type, and stream concurrency are hardcoded. These should come from the configuration file for flexibility.

♻️ Suggested improvement
     let mut agent = Agent::new(
         service,
-        "agent-qbg",
-        "127.0.0.1",
-        "vald/internal/core/algorithm",
-        "vald-agent",
-        10,
+        &settings.get_string("agent.name").unwrap_or_else(|_| "agent-qbg".to_string()),
+        &settings.get_string("agent.ip").unwrap_or_else(|_| "127.0.0.1".to_string()),
+        &settings.get_string("agent.resource_type").unwrap_or_else(|_| "vald/internal/core/algorithm".to_string()),
+        &settings.get_string("agent.api_name").unwrap_or_else(|_| "vald-agent".to_string()),
+        settings.get::<usize>("agent.stream_concurrency").unwrap_or(10),
     );
rust/bin/agent/src/handler/upsert.rs (1)

300-321: Simplify result aggregation in multi_upsert.

The conditional checks on lines 313-318 are redundant since the else branch (lines 306-321) only executes when both ireqs and ureqs are non-empty.

♻️ Simplified aggregation
         } else {
             let ures = self.multi_update(tonic::Request::new(ureqs)).await?;
             let ires = self.multi_insert(tonic::Request::new(ireqs)).await?;

-            let mut locs = object::Locations { locations: vec![] };
             let ilocs = ires.into_inner().locations;
             let ulocs = ures.into_inner().locations;
-            if ulocs.is_empty() {
-                locs.locations = ilocs;
-            } else if ilocs.is_empty() {
-                locs.locations = ulocs;
-            } else {
-                locs.locations = [ilocs, ulocs].concat();
-            }
-            return Ok(tonic::Response::new(locs));
+            // Both are guaranteed non-empty since we're in the else branch
+            return Ok(tonic::Response::new(object::Locations {
+                locations: [ilocs, ulocs].concat(),
+            }));
         }
rust/bin/agent/src/service/metadata.rs (1)

155-171: Consider flushing data to disk for durability.

The store function doesn't explicitly sync the file to disk. On crash, the data could be lost even though File::create and BufWriter succeeded. For critical metadata, consider calling sync_all().

♻️ Add fsync for durability
 pub fn store<P: AsRef<Path>>(path: P, metadata: &Metadata) -> Result<(), MetadataError> {
     let path = path.as_ref();
     
     // Ensure parent directory exists
     if let Some(parent) = path.parent() {
         fs::create_dir_all(parent)?;
     }
     
     // Open file for writing (create or truncate)
     let file = File::create(path)?;
-    let writer = BufWriter::new(file);
+    let mut writer = BufWriter::new(file);
     
     // Write metadata as JSON
     serde_json::to_writer_pretty(&mut writer, metadata)?;
+    
+    // Ensure data is flushed to disk
+    let file = writer.into_inner().map_err(|e| e.into_error())?;
+    file.sync_all()?;
     
     Ok(())
 }
rust/bin/agent/src/handler/remove.rs (1)

49-49: Use is_empty() instead of len() == 0 for idiomatic Rust.

♻️ Proposed fix
-        if uuid.len() == 0 {
+        if uuid.is_empty() {
rust/bin/agent/src/handler/update.rs (1)

71-71: Use is_empty() instead of len() == 0 for idiomatic Rust.

♻️ Proposed fix
-    if uuid.len() == 0 {
+    if uuid.is_empty() {
rust/bin/agent/src/handler.rs (3)

139-144: Empty match arm _ => {} should use continue for clarity.

The empty match arm silently ignores non-grpc servers. Using continue makes the intent clearer.

♻️ Proposed fix
             match name.as_str() {
                 "grpc" => {
                     grpc_key = format!("server_config.servers[{i}]");
+                    break; // Found grpc, no need to continue
                 }
-                _ => {}
+                _ => continue,
             }

261-274: Document why error_rx is not cloned with a comment.

The comment on line 271 explains the behavior, but a more prominent doc comment on the Clone impl would help future maintainers understand this intentional design decision.

📝 Proposed improvement
+/// Note: Cloning an Agent does NOT clone the error receiver (`error_rx`).
+/// Only the original Agent instance can receive daemon errors.
+/// Cloned instances share the same underlying ANN service and daemon handle.
 impl<S: algorithm::ANN + 'static> Clone for Agent<S> {
     fn clone(&self) -> Self {
         Self {
             s: self.s.clone(),
             name: self.name.clone(),
             ip: self.ip.clone(),
             resource_type: self.resource_type.clone(),
             api_name: self.api_name.clone(),
             stream_concurrency: self.stream_concurrency,
             daemon_handle: self.daemon_handle.clone(),
-            error_rx: None, // error_rx is not cloneable, only main instance handles errors
+            error_rx: None,
         }
     }
 }

283-306: parse_duration_from_string doesn't handle milliseconds or nanoseconds.

The function only supports s, m, h suffixes. Consider adding ms for milliseconds which is common in timeout configurations.

♻️ Proposed enhancement
 fn parse_duration_from_string(input: &str) -> Option<Duration> {
     if input.len() < 2 {
         return None;
     }
-    let last_char = match input.chars().last() {
-        Some(c) => c,
-        None => return None,
-    };
-    if last_char.is_numeric() {
-        return None;
-    }
 
-    let (value, unit) = input.split_at(input.len() - 1);
-    let num: u64 = match value.parse() {
-        Ok(n) => n,
-        Err(_) => return None,
-    };
-    match unit {
-        "s" => Some(Duration::from_secs(num)),
-        "m" => Some(Duration::from_secs(num * 60)),
-        "h" => Some(Duration::from_secs(num * 60 * 60)),
-        _ => None,
+    // Check for multi-char suffixes first (ms, ns)
+    if input.ends_with("ms") {
+        let num: u64 = input[..input.len() - 2].parse().ok()?;
+        return Some(Duration::from_millis(num));
+    }
+    if input.ends_with("ns") {
+        let num: u64 = input[..input.len() - 2].parse().ok()?;
+        return Some(Duration::from_nanos(num));
     }
+
+    // Single char suffixes
+    let (value, unit) = input.split_at(input.len() - 1);
+    let num: u64 = value.parse().ok()?;
+    match unit {
+        "s" => Some(Duration::from_secs(num)),
+        "m" => Some(Duration::from_secs(num * 60)),
+        "h" => Some(Duration::from_secs(num * 60 * 60)),
+        _ => None,
+    }
 }
rust/bin/agent/src/service/persistence.rs (2)

401-401: unwrap() on RwLock::write() could panic if the lock is poisoned.

If a thread panics while holding the write lock, subsequent write() calls will return Err. Consider using unwrap_or_else with logging or propagating the error.

🛡️ Proposed fix
-        let mut guard = self.tmp_path.write().unwrap();
+        let mut guard = self.tmp_path.write().map_err(|e| {
+            PersistenceError::SaveFailed(format!("lock poisoned: {}", e))
+        })?;

Note: Similar pattern should be applied to other .unwrap() calls on RwLock guards at lines 413, 442.


545-567: copy_dir function is defined but never used.

This appears to be dead code. If it's intended for future use, consider adding #[allow(dead_code)] with a comment, or remove it.

♻️ Options

Option 1: Remove if not needed:

-/// Copies a directory from source to destination.
-fn copy_dir<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> Result<(), PersistenceError> {
-    // ... entire function
-}

Option 2: Mark as intentionally unused:

+#[allow(dead_code)] // Reserved for future rollback functionality
 /// Copies a directory from source to destination.
 fn copy_dir<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> Result<(), PersistenceError> {
rust/bin/agent/src/handler/search.rs (1)

424-438: StreamSearchByID collapses all errors into Internal.

The unary search_by_id maps specific error variants to Aborted, NotFound, and InvalidArgument, but the streaming variant returns Internal for any error. This loses semantics for clients. Consider extracting the unary error‑mapping logic into a shared helper and reuse it here (and in stream_linear_search_by_id) for consistent status codes.

@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Jan 29, 2026

Deploying vald with  Cloudflare Pages  Cloudflare Pages

Latest commit: 53b86ba
Status: ✅  Deploy successful!
Preview URL: https://85b4e1c3.vald.pages.dev
Branch Preview URL: https://feature-rust-integrate-agent.vald.pages.dev

View logs

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
.github/workflows/coverage.yaml (1)

21-31: Add rust/** to workflow trigger paths.

The workflow now uploads Rust coverage (lines 77-82, 90-95), but the trigger paths only include Go directories (internal/**, pkg/**, cmd/**). Changes to Rust source files under rust/** won't trigger this workflow, resulting in stale or missing Rust coverage reports.

🔧 Proposed fix
     paths:
       - ".github/workflows/coverage.yaml"
       - "internal/**"
       - "pkg/**"
       - "cmd/**"
+      - "rust/**"
   pull_request:
     paths:
       - ".github/workflows/coverage.yaml"
       - "internal/**"
       - "pkg/**"
       - "cmd/**"
+      - "rust/**"
🧹 Nitpick comments (2)
.github/workflows/coverage.yaml (2)

90-95: Redundant DeepSource CLI download.

The DeepSource CLI is already downloaded in the Go coverage step (line 86). Re-downloading it in the Rust step wastes time and bandwidth. Reuse the existing binary instead.

♻️ Proposed fix
       - name: Upload rust coverage report to deepsource
         run: |
-          curl https://deepsource.io/cli | sh
           ./bin/deepsource report --analyzer test-coverage --key rust --value-file ./rust-coverage.out
         env:
           DEEPSOURCE_DSN: ${{ secrets.DEEPSOURCE_DSN }}

77-82: Consider adding a file existence check or continue-on-error for Rust coverage steps.

If the make coverage step fails to generate rust-coverage.out (e.g., Rust tests aren't built or fail), both Rust upload steps will fail and halt the workflow. Consider adding resilience similar to the continue-on-error: true on line 68, or check file existence before upload.

🛡️ Example: conditional upload
       - name: Upload rust coverage report to Codecov
+        if: hashFiles('rust-coverage.out') != ''
         uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
         with:
           token: ${{secrets.CODECOV_TOKEN}}
           files: ./rust-coverage.out
           flags: rust

@codecov
Copy link

codecov bot commented Jan 29, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 25.03%. Comparing base (70de684) to head (53b86ba).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #3459   +/-   ##
=======================================
  Coverage   25.02%   25.03%           
=======================================
  Files         544      544           
  Lines       43800    43800           
=======================================
+ Hits        10963    10966    +3     
+ Misses      32108    32105    -3     
  Partials      729      729           
Flag Coverage Δ
go 25.03% <100.00%> (?)
rust 25.03% <100.00%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In @.github/workflows/coverage.yaml:
- Around line 84-98: The workflow's path filters currently only cover
internal/**, pkg/**, cmd/** so Rust-only changes and Makefile updates won't
trigger the "Run Rust coverage" job; update the workflow trigger paths to
include Rust source directories and coverage targets by adding patterns such as
**/*.rs, Cargo.toml, and Makefile (or paths like rust/** and Makefile) so
changes to Rust code or the Makefile will run the Rust coverage steps ("Run Rust
coverage", "Upload rust coverage report to Codecov", "Upload rust coverage
report to deepsource").
- Around line 67-98: The upload steps (Upload go coverage report to Codecov,
Upload go coverage report to deepsource, Upload rust coverage report to Codecov,
Upload rust coverage report to deepsource) run even when the corresponding
report files are missing; guard each upload step by checking for the generated
file (coverage.out for Go, rust-coverage.out for Rust) before running (e.g., use
a workflow conditional that verifies the file exists / hashFiles non-empty), and
remove or keep continue-on-error only on the actual coverage generation steps
(Run Go coverage / Run Rust coverage) so uploads never run when reports weren't
produced.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Fix all issues with AI agents
In `@rust/bin/agent/Cargo.toml`:
- Around line 36-37: The k8s-openapi dependency is targeting v1_35 which is
newer than the declared minimum Kubernetes target (v1_34); update the
k8s-openapi feature to use "v1_34" instead of "v1_35" in Cargo.toml (the
dependency line for k8s-openapi) so the crate compiles against the correct
Kubernetes API version and then run cargo update / rebuild to ensure the
lockfile and build reflect the change.

In `@rust/bin/agent/src/handler.rs`:
- Around line 72-83: The stored error_rx from start_daemon is never consumed
which can cause backpressure; update the start method (and related fields
daemon_handle/error_rx) to either spawn a background task that continuously
reads from error_rx and logs/drops errors (so the daemon never stalls) or
provide a public getter/consumer so callers can drain it; specifically modify
the start function that calls start_daemon and the error_rx handling to ensure
messages from error_rx are actively polled (e.g., spawn tokio::spawn to drain
and log) or add an accessor method to expose the Receiver for external
consumption.
- Around line 168-222: Replace the blocking .serve(addr) call with
.serve_with_shutdown(addr, shutdown_future) so the gRPC server listens for the
shutdown signal; create a shutdown_future that awaits the ctrl_c signal (or
existing shutdown trigger used by stop()) and then invokes Agent::shutdown()
(e.g., call self.shutdown().await or Agent::shutdown().await on the cloned
handler) before completing the future; ensure the existing ctrl_c handler’s call
to stop() remains or is integrated but that Agent::shutdown() is always called
on shutdown to perform cleanup and data persistence.

In `@rust/bin/agent/src/service/daemon.rs`:
- Around line 161-266: The daemon spawns a bounded error channel
(error_tx/error_rx) that is never consumed, so using the async send().await can
block the daemon loop; change every occurrence of error_tx.send(e).await inside
the daemon_task (the branches that call svc.create_index().await,
svc.create_and_save_index().await, and svc.save_index().await) to a non-blocking
error_tx.try_send(e) and handle the Result (e.g., ignore Full or log a debug
message) so the daemon never stalls; keep the existing special-case matching of
Error::UncommittedIndexNotFound and only attempt try_send when sending is
desired.

In `@rust/bin/agent/src/service/qbg.rs`:
- Around line 132-160: The code uses several unwraps that panic on failure
(Index::new_prebuilt / Index::new,
vqueue::Builder::new(...).build().await.unwrap(), and
BidirectionalMapBuilder::new(...).build().await.unwrap()); change the enclosing
function to return Result<Self, Error> and replace each unwrap with propagation
into the existing Error::Internal pattern (use the ? operator or map_err(|e|
Error::Internal(format!("...: {}", e))) to convert IO/QBG errors into
Error::Internal with a descriptive message), e.g., propagate errors from
Index::new_prebuilt/Index::new, the vqueue build call, and the
BidirectionalMapBuilder build call instead of panicking so initialization
failures are returned to the caller.
🧹 Nitpick comments (2)
rust/bin/agent/src/main.rs (1)

112-121: Prefer ? over unwrap() for config load/deserialize.

main already returns Result, so propagating errors keeps startup failures graceful and testable.

🔧 Suggested change
-    let settings = ::config::Config::builder()
-        .add_source(::config::File::with_name("/etc/server/config.yaml"))
-        .build()
-        .unwrap();
+    let settings = ::config::Config::builder()
+        .add_source(::config::File::with_name("/etc/server/config.yaml"))
+        .build()?;
     
-    let mut config: AgentConfig = settings.try_deserialize().unwrap();
+    let mut config: AgentConfig = settings.try_deserialize()?;
rust/bin/agent/src/config.rs (1)

43-52: Validate server_config presence in AgentConfig::validate.

serve_grpc expects a grpc server entry; missing config currently fails at runtime. Consider validating early.

🔧 Suggested addition
     pub fn validate(&self) -> Result<(), String> {
         self.qbg.validate()?;
+        if !self.server_config.servers.iter().any(|s| s.name == "grpc") {
+            return Err("grpc server config not found".to_string());
+        }
         Ok(())
     }

@kmrmt kmrmt requested review from Matts966, datelier and kpango February 3, 2026 04:45
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
example/client/go.mod.default (1)

5-11: ⚠️ Potential issue | 🟠 Major

Replace directives use invalid upgrade keyword.

The upgrade keyword is not valid go.mod syntax. Valid replace directives require a specific version (e.g., module => module@v1.0.0) or path. The build process in Makefile.d/dependencies.mk copies this file to go.mod and runs go mod tidy, but the sed step only removes comments—not the invalid keyword. This will cause a parse error:

go: parsing go.mod: expected '[', found 'upgrade'

Either specify actual versions in the replace directives or add a processing step that substitutes upgrade with valid version specifiers before running go mod tidy.

@kmrmt
Copy link
Contributor Author

kmrmt commented Feb 3, 2026

/format

@vdaas-ci
Copy link
Collaborator

vdaas-ci commented Feb 3, 2026

[FORMAT] Updating license headers and formatting go codes triggered by kmrmt.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
rust/bin/agent/src/handler/upsert.rs (1)

240-297: ⚠️ Potential issue | 🟠 Major

Add UUID validation in multi_upsert to match the single-upsert checks.

multi_upsert accepts empty IDs and routes them to update/insert, which can create invalid entries or yield inconsistent errors. Please reject empty UUIDs before exists checks, consistent with the single upsert path.

🛠️ Suggested fix
             let vec = match req.vector.clone() {
                 Some(v) => v,
                 None => return Err(Status::invalid_argument("Missing vector in request")),
             };
             let config = match req.config.clone() {
                 Some(c) => c,
                 None => return Err(Status::invalid_argument("Missing config in request")),
             };
+            if vec.id.is_empty() {
+                let err = Error::InvalidUUID { uuid: vec.id.clone() };
+                let resource_type = self.resource_type.clone() + "/qbg.MultiUpsert";
+                let resource_name = format!("{}: {}({})", self.api_name, self.name, self.ip);
+                let err_details = build_error_details(
+                    err,
+                    &vec.id,
+                    req.encode_to_vec(),
+                    &resource_type,
+                    &resource_name,
+                    Some("uuid"),
+                );
+                let status = Status::with_error_details(
+                    Code::InvalidArgument,
+                    format!("MultiUpsert API invalid argument for uuid \"{}\" detected", vec.id),
+                    err_details,
+                );
+                warn!("{:?}", status);
+                return Err(status);
+            }
             if vec.vector.len() != s.get_dimension_size() {
🤖 Fix all issues with AI agents
In `@rust/bin/agent/src/handler/common.rs`:
- Around line 45-49: The code calls gethostname::gethostname().to_str().unwrap()
when initializing DOMAIN for err_details.set_error_info which will panic on
non-UTF-8 hostnames; replace that unwrap with a lossy UTF-8 conversion so
non-UTF-8 bytes are handled safely. Update the DOMAIN.get_or_init closure used
in err_details.set_error_info to use
gethostname::gethostname().to_string_lossy().into_owned() (or equivalent lossy
conversion) instead of to_str().unwrap(), ensuring DOMAIN and the call site
(err_details.set_error_info) no longer panic on invalid UTF-8 hostnames.

In `@rust/bin/agent/src/handler/flush.rs`:
- Line 32: Fix the typo in the log message: change the info! invocation that
currently logs "Recieved a request from {:?}", request.remote_addr() to use the
correct spelling "Received" (locate the info! macro in flush.rs that references
request.remote_addr()).

In `@rust/bin/agent/src/handler/object.rs`:
- Around line 183-217: The code currently holds the read lock across the entire
spawned task (s.read().await) while iterating and calling s.get_object, blocking
writers; change it to acquire the read lock only to fetch the uuid list (call
s.read().await then call s.uuids().await and drop the guard), then iterate over
the collected uuids and for each item re-acquire the read lock (call
s.read().await) just before calling s.get_object(uuid.clone()).await, build the
object::list::Response as before, and send via tx.send; ensure the outer
s.clone() and tx/ rx usage remain unchanged and that you avoid holding any read
guard across the per-item async get_object call so writers can proceed between
items.

In `@rust/bin/agent/src/handler/search.rs`:
- Around line 379-444: The stream and multi "by id" handlers (the process_fn
closure calling s.search_by_id and analogous multi_*_by_id/stream_*_by_id paths)
currently map all ANN errors to Code::Internal and lose the unary endpoints'
distinctions (NotFound/Aborted). Extract a shared error-to-Status mapper (e.g.,
map_ann_error_to_status(err, request_id, payload_bytes, &resource_type,
&resource_name)) and use it in both the Ok/Err match arms where
build_error_details and Status::with_error_details are created; the mapper
should convert specific ANN errors
(CreateIndexingIsInProgress/FlushingIsInProgress -> Code::Aborted,
EmptySearchResult/ObjectIDNotFound -> Code::NotFound, otherwise ->
Code::Internal) and return a Status with appropriate details so stream_*_by_id
and multi_*_by_id preserve the same error-code semantics as
search_by_id/linear_search_by_id.

In `@rust/bin/agent/src/service/metadata.rs`:
- Around line 157-171: In store, avoid losing write errors by making the
BufWriter mutable, call serde_json::to_writer_pretty with &mut writer (so it
doesn't consume the buffer), then explicitly call writer.flush()? and propagate
any error (using ?). Update the sequence around BufWriter creation and the
serde_json::to_writer_pretty call so you have a mutable writer you can flush
(referencing function store, BufWriter, serde_json::to_writer_pretty, and
writer.flush()).

In `@rust/bin/agent/src/service/persistence.rs`:
- Around line 218-259: The needs_backup function currently treats metadata::load
errors as non-fatal and returns false, which skips backups for corrupt metadata;
change the Err(_) branch to return true so load failures trigger backups (leave
the existing logic for Ok(meta) that returns meta.is_invalid ||
meta.index_count() > 0), referencing the needs_backup function,
AGENT_METADATA_FILENAME, and metadata::load/meta.is_invalid/meta.index_count to
locate the code to update.

In `@rust/bin/meta/Cargo.toml`:
- Line 27: The workspace Cargo.lock is out of sync with the declared tonic
version bump (Cargo.toml now lists tonic = "0.14.3" but rust/Cargo.lock still
pins tonic 0.14.2); regenerate the lockfile so it records tonic 0.14.3 by
running a Cargo command (e.g., cargo update -p tonic --manifest-path
rust/Cargo.toml or a full cargo build/update in the rust workspace) and commit
the updated rust/Cargo.lock so the lockfile matches the tonic version declared
in Cargo.toml.

In `@rust/libs/algorithm/Cargo.toml`:
- Around line 27-28: Update the thiserror dependency in the Cargo.toml for this
crate: replace the pinned version string "2.0.18" with the broader
workspace-aligned constraint "2.0" so it matches other crates (e.g., those using
thiserror = "2.0") and avoids duplicate versions in the dependency tree; locate
the thiserror entry in the dependency list (near tonic = "0.14.3") and change
its version accordingly.

In `@rust/libs/algorithm/src/error.rs`:
- Around line 33-44: The #[error] attributes for the UUIDNotFound and
InvalidDimensionSize variants embed conditional string literals so placeholders
like {uuid} and {limit} are not interpolated; instead either (A) change the
attributes to reference fields directly (e.g. #[error("object uuid {uuid} not
found")] for UUIDNotFound and #[error("dimension size {current} is invalid, the
supporting dimension size must be {limit}")] for InvalidDimensionSize) or (B) if
you must keep conditional phrasing, precompute the final message with format!()
before constructing the variant and expose that as the field the error macro
uses (e.g. add a message: String field and use #[error("{message}")],
constructing message via format!("...") when creating UUIDNotFound or
InvalidDimensionSize). Ensure changes are applied to the UUIDNotFound and
InvalidDimensionSize enum variants and any call sites that construct them.

In `@rust/libs/vqueue/src/lib.rs`:
- Around line 149-171: The docs for iv_exists and dv_exists are incorrect: they
state "return 0 if not found" but the actual signature returns Result<i64,
QueueError> and the implementation/tests return Err(QueueError::NotFound);
update the method docs for iv_exists and dv_exists to describe that they return
Ok(timestamp) when present and Err(QueueError::NotFound) when absent (mentioning
the Result<i64, QueueError> return and the QueueError::NotFound variant) so
callers won't expect a 0 sentinel.
- Around line 746-789: The loop inside the tokio::task::spawn_blocking silently
ignores parse/decode failures (calls to Self::parse_key and
bincode::decode_from_slice) and thus can drop corrupted entries; change the
closure to return a Result<Vec<(UuidType, Vec<f32>, i64)>, Box<dyn
std::error::Error + Send + Sync>> (or another error type you already use), and
on the first parse or decode error return Err(...) instead of continuing; then
after .await, reuse the existing match: on Ok(items) send each item over tx as
before, and on Err(e) send Err(QueueError::Internal(e)) (or map to an
appropriate QueueError variant) so the stream surfaces the failure rather than
silently dropping entries (update references: the spawn_blocking closure, calls
to Self::parse_key and bincode::decode_from_slice, and the tx.send(Err(...))
branch).
🧹 Nitpick comments (2)
rust/libs/vqueue/src/lib.rs (1)

741-777: blocking_send is feasible but current streaming approach is already reasonable.
The Vec buffering concern is valid for large queues, but blocking_send can be used safely within the spawn_blocking closure since it runs on a separate worker thread outside the async context. However, the current implementation—collecting in the blocking task then streaming asynchronously—is an acceptable trade-off: it allows the worker thread to complete independently while still streaming results to the channel. A more significant improvement would be refactoring to iterate and send items as they're collected rather than buffering the full Vec, but this requires architectural changes to integrate async iteration with the sled database.

rust/bin/agent/src/service/k8s.rs (1)

286-329: Add an assertion so the tick export test validates behavior.
Right now the test exercises the call path but doesn’t assert that annotations were applied. Keeping shared state in an Arc lets you assert the mock was invoked.

🧪 Suggested test tweak
-    use std::sync::Mutex;
+    use std::sync::{Arc, Mutex};
@@
-    struct MockPatcher {
-        applied: Mutex<Vec<HashMap<String, String>>>,
-    }
+    struct MockPatcher {
+        applied: Arc<Mutex<Vec<HashMap<String, String>>>>,
+    }
@@
-    impl MockPatcher {
-        fn new() -> Self {
-            Self {
-                applied: Mutex::new(Vec::new()),
-            }
-        }
-
-        fn get_applied(&self) -> Vec<HashMap<String, String>> {
-            self.applied.lock().unwrap().clone()
-        }
-    }
+    impl MockPatcher {
+        fn new(applied: Arc<Mutex<Vec<HashMap<String, String>>>>) -> Self {
+            Self { applied }
+        }
+    }
@@
-        let patcher = Box::new(MockPatcher::new());
+        let applied = Arc::new(Mutex::new(Vec::new()));
+        let patcher = Box::new(MockPatcher::new(applied.clone()));
         let exporter =
             MetricsExporter::new(patcher, "test-pod".to_string(), "default".to_string(), true);
 
         exporter.export_on_tick(100, 5).await.unwrap();
-
-        // Note: Can't access mock directly due to Box, would need Arc<dyn Patcher>
+        assert_eq!(applied.lock().unwrap().len(), 1);

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@rust/bin/agent/src/main.rs`:
- Around line 66-71: The metrics gating block in serve references an undefined
variable `settings`; replace uses of `settings.get::<bool>("observability...")`
with the in-scope config (e.g., `config.observability`) so the condition
compiles. Specifically, update the if condition that checks observability and
meter enabled to read from `config.observability` (or the appropriate field on
`config`) before calling `metrics::register_metrics(agent.service())`, keeping
the existing error handling via `error!` intact.
🧹 Nitpick comments (2)
rust/bin/agent/src/metrics.rs (2)

84-89: *Use observable counters for _total metrics (and avoid constant 0).
COMPLETED_CREATE_INDEX_TOTAL and EXECUTED_PROACTIVE_GC_TOTAL are described as cumulative counts but are registered as gauges; OTel backends generally expect monotonic totals to be counters. Also, EXECUTED_PROACTIVE_GC_TOTAL is always observed as 0, which is misleading—wire a real count or drop the metric.

Proposed adjustment for the create-index total (apply similarly for GC when a real count is available)
-    let completed_create_index_total = meter.i64_observable_gauge(COMPLETED_CREATE_INDEX_TOTAL)
+    let completed_create_index_total = meter.u64_observable_counter(COMPLETED_CREATE_INDEX_TOTAL)
         .with_description("The cumulative count of completed create index execution")
         .build();
...
-                    observer.observe_i64(&completed_create_index_total_c, s.number_of_create_index_executions() as i64, &[]);
+                    observer.observe_u64(&completed_create_index_total_c, s.number_of_create_index_executions(), &[]);

Also applies to: 226-227


365-377: Isolate the global meter provider in tests.
global::set_meter_provider mutates process-wide state; running tests in parallel can cause interference or flakiness. Consider serializing this test or using a shared global initialization strategy.

@kmrmt kmrmt force-pushed the feature/rust/integrate-agent branch from 2b8fb31 to 28eba88 Compare February 10, 2026 04:45
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (8)
rust/libs/algorithm/src/lib.rs (1)

22-272: ⚠️ Potential issue | 🔴 Critical

Unresolved git merge conflict markers — file will not compile.

Lines 22–272 contain <<<<<<< HEAD, |||||||, =======, and >>>>>>> markers from an unresolved merge. This is a hard compilation blocker.

rust/bin/agent/src/handler/upsert.rs (2)

240-298: ⚠️ Potential issue | 🟠 Major

Read lock held across all exists() calls in multi_upsert blocks writers.

The s.read().await at line 242 is held for the entire loop (lines 243–297), including async exists() calls. On large batches, this starves write operations. Consider collecting UUIDs and their existence status, then dropping the lock before routing to multi_insert/multi_update — similar to the pattern used in the single upsert (lines 100-104).


179-179: ⚠️ Potential issue | 🟡 Minor

Typo: "Recieved" → "Received".

rust/bin/agent/src/handler/object.rs (1)

128-128: ⚠️ Potential issue | 🟡 Minor

Typo: "Recieved" → "Received".

This typo appears consistently across multiple handler files (upsert.rs, update.rs, object.rs). Consider a bulk fix.

rust/bin/agent/src/handler/update.rs (2)

283-284: ⚠️ Potential issue | 🟡 Minor

Two typos in user-facing error message: "Incombatible" and "detedted".

Should be "Incompatible" and "detected" to match the error messages in other handlers.

Suggested fix
-                        "MultiUpdate API Incombatible Dimension Size detedted",
+                        "MultiUpdate API Incompatible Dimension Size detected",

199-199: ⚠️ Potential issue | 🟡 Minor

Typo: "Recieved" → "Received".

Same typo as in other handler files. Consider a project-wide search-and-replace.

Also applies to: 255-255, 432-432

rust/bin/agent/src/handler/remove.rs (1)

71-100: ⚠️ Potential issue | 🔴 Critical

Fix undefined request_bytes in error path.

request_bytes is referenced but never defined, which will not compile.

🐛 Proposed fix
-                let err_msg = err.to_string();
+                let err_msg = err.to_string();
+                let request_bytes = request.encode_to_vec();
                 let mut err_details = build_error_details(
                     err_msg.clone(),
                     &uuid,
-                    request.encode_to_vec(),
+                    request_bytes.clone(),
                     &resource_type,
                     &resource_name,
                     None,
                 );
...
                     Error::FlushingIsInProgress {} => {
                         let err_details = build_error_details(
                             err,
                             &uuid,
-                            request_bytes,
+                            request_bytes,
                             &resource_type,
                             &resource_name,
                             None,
                         );
rust/bin/agent/src/handler/search.rs (1)

818-1040: ⚠️ Potential issue | 🟠 Major

Align stream and multi linear-search error mapping with unary behavior.

The unary linear_search and linear_search_by_id handlers map specific error types to appropriate status codes:

  • CreateIndexingIsInProgress/FlushingIsInProgressCode::Aborted
  • EmptySearchResult/ObjectIDNotFoundCode::NotFound
  • UnsupportedCode::Unimplemented
  • Other errors → Code::Internal

However, stream_linear_search, stream_linear_search_by_id, and multi_linear_search return Code::Internal for all ANN errors without distinguishing these cases, losing important error type information needed for proper client-side error handling.

🤖 Fix all issues with AI agents
In `@rust/bin/agent/Cargo.toml`:
- Line 53: Replace the unmaintained serde_yaml dependency in Cargo.toml with a
maintained alternative: update the serde_yaml entry to one of serde_norway,
serde_yaml_ng, or serde-saphyr depending on compatibility needs (serde_norway
for maintained libyaml-backed behavior, serde_yaml_ng for closest API
compatibility, or serde-saphyr for pure-Rust parsing); adjust any feature flags
or serde integration settings as needed in Cargo.toml and run cargo build/test
to confirm no API breakages, then update any serde_yaml imports/usages in the
codebase to the chosen crate name (e.g., replace serde_yaml::... with
serde_norway::... or serde_yaml_ng::... or serde_saphyr::... where applicable).

In `@rust/bin/agent/src/config.rs`:
- Around line 738-750: get_actual_value currently returns the literal "${VAR}"
when an env var is missing and no default is provided; change it to detect that
unresolved case (after extracting env_var from value) and return an empty string
(or a Result/Option if you prefer) instead of value.to_string(), and emit a
warning via the project's logger (e.g., log::warn or tracing::warn) including
the env_var name so callers can see unresolved placeholders; update the branch
that calls env::var(env_var).unwrap_or_else(|_| value.to_string()) to instead
log the warning and return an empty string (or Err/None) so unresolved
placeholders are not used as real paths.

In `@rust/bin/agent/src/handler/flush.rs`:
- Line 38: The resource_type string in flush.rs currently hardcodes "/qbg.Flush"
which mislabels non-qbg ANN implementations; update the construction of
resource_type (the line creating self.resource_type.clone() + "/qbg.Flush") to
derive or accept the algorithm name instead—e.g., add a parameter or method on
Agent or S (bound S: algorithm::ANN) that returns the algorithm identifier and
use it to compose resource_type (algorithm_name + "/Flush"), or expose a
function like Agent::algorithm_name() and call that when building resource_type
so the metadata reflects the actual ANN implementation.

In `@rust/bin/agent/src/handler/insert.rs`:
- Around line 47-66: The user-facing error message in the Insert handler uses
misspelled words; update the Status::with_error_details call that constructs the
InvalidArgument message (currently "Insert API Incombatible Dimension Size
detedted") to a normalized, correct string such as "Insert API incompatible
dimension size detected", and make the same correction for the duplicate message
occurrence later in the file (around the other Status::with_error_details /
Code::InvalidArgument usage); keep the surrounding context
(Error::IncompatibleDimensionSize, build_error_details, resource_type,
resource_name) unchanged.

In `@rust/bin/agent/src/handler/upsert.rs`:
- Around line 155-158: The code currently calls
st.get_details_error_info().unwrap() when building err_details, which will panic
if the Status lacks structured error details; change this to handle the None
case explicitly: call st.get_details_error_info().map(|d|
d.reason).unwrap_or_default() (or match on get_details_error_info()) and pass
that safe reason into build_error_details instead of unwrapping, so that
err_details construction in the upsert handler (where insert_fn/update_fn return
a Status) cannot panic.

In `@rust/bin/agent/src/main.rs`:
- Around line 45-46: The call to init_tracing currently uses expect and will
panic on failure; change it to propagate the error instead (use the ? operator)
so initialization failures return an Err from main/serve rather than aborting.
Replace the expect usage on init_tracing (the tracer_provider binding) with the
? operator and ensure the surrounding function signature (main/serve) already
returns a Result so the error can be propagated from init_tracing up the call
stack.
- Around line 125-137: The two .unwrap() calls in main cause panics; replace
them with the ? operator so errors propagate through main's Result: use
settings.build()? instead of .build().unwrap() and let mut config: AgentConfig =
settings.try_deserialize()? instead of .try_deserialize().unwrap(); keep calling
config.bind() and config.validate()? as before so errors are returned rather
than panicking.

In `@rust/bin/agent/src/metrics.rs`:
- Line 227: The metric executed_proactive_gc_total is being hardcoded to 0
because the ANN trait doesn't expose a GC execution count; either add a
reporting method to ANN or remove the metric to avoid misleading data. To fix,
choose one: (A) add a method like fn executed_proactive_gc_total(&self) -> u64
to the ANN trait and implement it in all ANN implementations, then replace
observer.observe_i64(&executed_proactive_gc_total_c, 0, &[]) with
observer.observe_i64(&executed_proactive_gc_total_c,
ann.executed_proactive_gc_total() as i64, &[]); or (B) remove the
executed_proactive_gc_total_c metric and its observation call from metrics.rs so
no zero-valued metric is published. Ensure changes reference the ANN trait and
executed_proactive_gc_total_c/observer.observe_i64 locations.
- Around line 365-416: The test test_metrics_integration currently calls
global::set_meter_provider which mutates global state and can leak into parallel
tests; change the test to avoid setting the global provider by either (A)
obtaining a Meter from the local SdkMeterProvider (built via
SdkMeterProvider::builder() and reader) and passing that Meter into
register_metrics (refactor register_metrics to accept a &Meter or Meter) so
registration uses the local provider, or (B) mark the test serial with
#[serial_test::serial] to enforce sequential execution; update references to
ManualReader and SdkMeterProvider usage to create and use a local Meter instead
of global::set_meter_provider if choosing option A.

In `@rust/bin/agent/src/service/memstore.rs`:
- Around line 94-97: Timestamps are being unsafely cast between i64 and u128
(e.g., in the Ok((oid, kts)) block where (kts as i64) and its as u128 are used,
and similarly in exists, get_object, update_timestamp), which can truncate or
wrap values; fix by introducing a safe conversion strategy: convert the vqueue
i64 timestamp (its) to u128 only after validating it is non-negative (use
TryFrom/checked conversion) or treat negative values as a defined sentinel
(e.g., 0 or return an error), and compare timestamps in the same unsigned type
(convert kts to i128 only if you intend signed comparisons but prefer converting
its -> u128 safely), then call kv.set with the validated u128 timestamp; update
all occurrences (exists, get_object, update_timestamp and the kv.set call) to
use these safe conversions and avoid direct as-casts that can overflow or wrap.
- Around line 477-496: The branch guarded by `!vqok && its != 0` is unreachable;
remove the entire Case 4 block (the code doing `kv.set(uuid.to_string(), oid, ts
as u128).await?;` the `get_vector_fn`/`vq.push_insert` path and the
`vq.pop_insert` rollback) from memstore.rs so execution falls through to the
existing Case 3/NothingToBeDoneForUpdate logic; keep other cases unchanged and
ensure no references to the removed variables are left dangling (check `kv.set`,
`vq.push_insert`, `vq.pop_insert`, `get_vector_fn`, `vqok`, and `its` usage).

In `@rust/libs/vqueue/src/lib.rs`:
- Around line 600-655: The counter decrement (c.fetch_sub) must be moved out of
the sled transaction closure to avoid multiple decrements when the transaction
retries: inside pop_internal, remove c.fetch_sub(1, Ordering::Relaxed) from the
closure passed to (&q, &i).transaction(|(q_tx, i_tx)| { ... }) and instead
perform the decrement only after the transaction call has returned successfully
(i.e., after the transaction(...) returns Ok((value, ts)) inside the
tokio::task::spawn_blocking closure), so that the counter is decremented exactly
once per committed pop and not on transactional retries.
🧹 Nitpick comments (21)
rust/libs/algorithm/src/error.rs (1)

17-23: MultiError trait with only associated functions (no &self) is unconventional.

All methods on MultiError are static constructors that return Error. Since there's no self receiver, this trait can never be used polymorphically. Consider making these plain associated functions on Error directly (e.g., Error::new_uuid_already_exists(...)) instead of a trait, which would be more idiomatic Rust.

rust/libs/vqueue/src/lib.rs (2)

746-777: range() collects all items into a Vec in the blocking task before streaming.

The entire insert queue is materialized in memory inside spawn_blocking before any items are sent through the channel. For large queues, this negates the benefit of streaming and may cause significant memory pressure.

Consider processing in batches (similar to drain_batch) or sending items through a channel directly from within the blocking task to maintain backpressure.


527-543: dv_exists duplicates load_dvq almost entirely.

dv_exists (lines 850–866) and load_dvq (lines 527–543) contain nearly identical logic for looking up a timestamp from the delete index. Consider having dv_exists delegate to load_dvq:

♻️ Proposed refactor
     async fn dv_exists(&self, uuid: impl AsRef<str> + Send) -> Result<i64, QueueError> {
-        let uuid_bytes = uuid.as_ref().as_bytes().to_vec();
-        let uuid_string = uuid.as_ref().to_string();
-        let index = self.delete_index.clone();
-
-        tokio::task::spawn_blocking(move || match index.get(&uuid_bytes)? {
-            Some(ts_bytes) => {
-                let ts_bytes_arr: [u8; 8] = ts_bytes
-                    .as_ref()
-                    .try_into()
-                    .map_err(|_| QueueError::KeyParse("Invalid timestamp in index".to_string()))?;
-                Ok(i64::from_be_bytes(ts_bytes_arr))
-            }
-            None => Err(QueueError::NotFound(uuid_string)),
-        })
-        .await?
+        self.load_dvq(uuid.as_ref()).await
     }

Also applies to: 850-866

rust/bin/agent/src/handler/flush.rs (1)

79-87: Flush response returns zeroed Count instead of actual post-flush state.

After a successful regenerate_indexes(), the response hardcodes stored: 0, uncommitted: 0, indexing: false, saving: false. The caller has no way to verify the actual post-flush state. Consider populating these from s.len(), s.insert_vqueue_buffer_len(), s.is_indexing(), etc. while the write lock is still held.

♻️ Suggested improvement
         Ok(()) => {
             let res = info::index::Count {
-                stored: 0,
-                uncommitted: 0,
-                indexing: false,
-                saving: false,
+                stored: s.len(),
+                uncommitted: s.insert_vqueue_buffer_len() + s.delete_vqueue_buffer_len(),
+                indexing: s.is_indexing(),
+                saving: s.is_saving(),
             };
             Ok(tonic::Response::new(res))
rust/bin/agent/src/metrics.rs (1)

133-173: Consider grouping gauge handles into a struct to reduce cloning boilerplate.

The 38 individual clone() calls (lines 133–173) are correct but verbose. Grouping them into a MetricsInstruments struct would make the callback capture a single move and simplify maintenance.

rust/bin/agent/src/service.rs (1)

50-51: Redundant as usize cast.

Vec::len() already returns usize, so the cast is unnecessary.

Suggested fix
-            Err(Error::IncompatibleDimensionSize {
-                got: vector.len() as usize,
-                want: self.dim,
-            })
+            Err(Error::IncompatibleDimensionSize {
+                got: vector.len(),
+                want: self.dim,
+            })
rust/bin/agent/src/main.rs (2)

54-61: Hardcoded Agent metadata should come from configuration.

"agent-qbg", "127.0.0.1", "vald/internal/core/algorithm", "vald-agent", and 10 (stream concurrency) are all hardcoded. These should be derived from config for flexibility across deployment environments.


203-223: test_unsupported_service_type only checks deserialization — doesn't verify runtime behavior.

The test confirms the config parses with type: "unsupported" but never calls serve() to verify it's properly rejected. This gives a false sense of coverage for the unsupported-service path.

rust/bin/agent/src/service/metadata.rs (2)

126-147: TOCTOU between path.exists() and File::open() — prefer attempting open directly.

The file could be deleted or created between the exists() check (line 130) and File::open() (line 141). It's more idiomatic and race-free to attempt File::open() directly and map io::ErrorKind::NotFound to MetadataError::FileNotFound. The empty-file check could similarly use the file handle's metadata.

Suggested approach
 pub fn load<P: AsRef<Path>>(path: P) -> Result<Metadata, MetadataError> {
     let path = path.as_ref();
-
-    if !path.exists() {
-        return Err(MetadataError::FileNotFound(path.display().to_string()));
-    }
-
-    let file_metadata = fs::metadata(path)?;
-    if file_metadata.len() == 0 {
-        return Err(MetadataError::FileEmpty(path.display().to_string()));
-    }
-
-    let file = File::open(path)?;
+    let file = File::open(path).map_err(|e| {
+        if e.kind() == std::io::ErrorKind::NotFound {
+            MetadataError::FileNotFound(path.display().to_string())
+        } else {
+            MetadataError::ReadError(e)
+        }
+    })?;
+    if file.metadata()?.len() == 0 {
+        return Err(MetadataError::FileEmpty(path.display().to_string()));
+    }
     let reader = BufReader::new(file);

240-248: JSON field format test may be fragile across serde versions.

The test asserts specific JSON whitespace formatting ("is_invalid": false). If serde_json's pretty-print formatting changes across versions, this test breaks. Consider deserializing back and asserting field values instead.

rust/bin/agent/src/handler/upsert.rs (1)

300-321: Simplify result aggregation — concat handles empty slices.

The three branches checking ulocs.is_empty() / ilocs.is_empty() are redundant since [ilocs, ulocs].concat() returns the correct result even when one side is empty.

Suggested simplification
-        if ireqs.requests.is_empty() {
-            let res = self.multi_update(tonic::Request::new(ureqs)).await?;
-            return Ok(res);
-        } else if ureqs.requests.is_empty() {
-            let res = self.multi_insert(tonic::Request::new(ireqs)).await?;
-            return Ok(res);
-        } else {
-            let ures = self.multi_update(tonic::Request::new(ureqs)).await?;
-            let ires = self.multi_insert(tonic::Request::new(ireqs)).await?;
-
-            let mut locs = object::Locations { locations: vec![] };
-            let ilocs = ires.into_inner().locations;
-            let ulocs = ures.into_inner().locations;
-            if ulocs.is_empty() {
-                locs.locations = ilocs;
-            } else if ilocs.is_empty() {
-                locs.locations = ulocs;
-            } else {
-                locs.locations = [ilocs, ulocs].concat();
-            }
-            return Ok(tonic::Response::new(locs));
-        }
+        let mut locations = Vec::new();
+        if !ureqs.requests.is_empty() {
+            let ures = self.multi_update(tonic::Request::new(ureqs)).await?;
+            locations.extend(ures.into_inner().locations);
+        }
+        if !ireqs.requests.is_empty() {
+            let ires = self.multi_insert(tonic::Request::new(ireqs)).await?;
+            locations.extend(ires.into_inner().locations);
+        }
+        Ok(tonic::Response::new(object::Locations { locations }))
rust/bin/agent/src/handler/object.rs (2)

43-43: Prefer uuid.is_empty() over uuid.len() == 0.

Idiomatic Rust and what Clippy recommends.

Suggested fix
-        if uuid.len() == 0 {
+        if uuid.is_empty() {

84-123: exists method holds the read lock while returning — consider scoping.

The read lock at line 112 is held through the response construction at line 122. For consistency with the pattern used in upsert.rs (where the lock is scoped to just the exists call), scope the lock to just the check.

Suggested approach
-        let s = self.s.read().await;
-        let (_, exists) = s.exists(uuid.clone()).await;
-
-        if !exists {
+        let exists = {
+            let s = self.s.read().await;
+            let (_, exists) = s.exists(uuid.clone()).await;
+            exists
+        };
+        if !exists {
rust/bin/agent/src/handler/update.rs (2)

71-71: Prefer uuid.is_empty() over uuid.len() == 0.

Same as in object.rs — idiomatic Rust per Clippy.


30-42: Write lock acquired before validation — holds lock during input parsing.

The write lock at line 42 is acquired before dimension/UUID validation (lines 48–90). If validation fails, the lock was held needlessly. Consider doing read-only validation first (dimension check requires read access), then upgrading to write for the actual update.

However, since update fundamentally needs write access and there's no read-to-write upgrade in tokio::RwLock, this is a minor optimization trade-off.

rust/bin/agent/src/service/k8s.rs (1)

320-329: Consider asserting applied annotations in tests.

test_export_on_tick doesn’t verify what was applied because the mock is boxed. Using Arc<dyn Patcher> or exposing the mock would allow meaningful assertions.

rust/bin/agent/src/config.rs (3)

43-52: AgentConfig::bind() only delegates to qbg, leaving other sub-configs unbound.

If any string fields in logging, observability (e.g., endpoint), server_config (e.g., host), or daemon are configured with ${ENV_VAR} placeholders in YAML, they won't be expanded. Similarly, validate() only validates QBG. Consider either documenting that env expansion is QBG-only, or propagating bind()/validate() to all sub-configs that may contain user-facing string fields.


752-759: load_config_from_file loads only a QBG config, but its name suggests a general config loader.

This could confuse callers expecting it to return an AgentConfig. Consider renaming to load_qbg_config_from_file for clarity.


311-349: VQueue::bind() and KVSDB::bind() are no-ops.

These methods exist purely for API symmetry but do nothing. If there's no plan to add env-var expansion for numeric fields, consider removing them to avoid confusion, or adding a comment explaining they're intentional extension points.

Also applies to: 351-379

rust/bin/agent/src/service/qbg.rs (2)

657-676: update_multiple clones the entire HashMap for iteration.

Line 663 clones the full vectors map (including all vector data) just to iterate and selectively remove entries from the original. This is an O(n × d) allocation where d is the vector dimension.

Consider collecting keys to remove instead
     async fn update_multiple(
         &mut self,
-        mut vectors: HashMap<String, Vec<f32>>,
+        mut vectors: HashMap<String, Vec<f32>>,
     ) -> Result<(), Error> {
         let mut uuids: Vec<String> = vec![];
-        for (uuid, vec) in vectors.clone() {
-            let result = self
-                .ready_for_update(uuid.clone(), vec, Local::now().nanosecond().into())
+        let mut to_remove: Vec<String> = vec![];
+        let keys: Vec<String> = vectors.keys().cloned().collect();
+        for uuid in keys {
+            let vec = vectors.get(&uuid).unwrap().clone();
+            let result = self
+                .ready_for_update(uuid.clone(), vec, Utc::now().timestamp_nanos_opt().unwrap_or(0))
                 .await;
             match result {
                 Ok(()) => uuids.push(uuid),
                 Err(_err) => {
-                    let _ = vectors.remove(&uuid);
+                    to_remove.push(uuid);
                 }
             }
         }
+        for key in to_remove {
+            vectors.remove(&key);
+        }
         self.remove_multiple(uuids.clone()).await?;
         self.insert_multiple(vectors).await
     }

234-234: Use uuid.is_empty() instead of uuid.len() == 0.

Idiomatic Rust prefers is_empty() over length comparison. This appears at lines 234, 275, and 331.

Also applies to: 275-275, 331-331

Signed-off-by: Vdaas CI <vald@vdaas.org>
@kmrmt
Copy link
Contributor Author

kmrmt commented Feb 27, 2026

/format

@vdaas-ci
Copy link
Collaborator

[FORMAT] Updating license headers and formatting go codes triggered by kmrmt.

tests/v2/e2e/assets/rollout.yaml
tests/v2/e2e/assets/stream_crud.yaml
tests/v2/e2e/assets/unary_crud.yaml
tests/v2/e2e/assets/unary_crud_qbg.yaml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[file name cspell] reported by reviewdog 🐶
Unknown word (readablestream) Suggestions: [reenablestream, reEnablestream, reenableStream, reEnableStream, Reenablestream]

@kmrmt
Copy link
Contributor Author

kmrmt commented Mar 5, 2026

/format

@vdaas-ci
Copy link
Collaborator

vdaas-ci commented Mar 5, 2026

[FORMAT] Updating license headers and formatting go codes triggered by kmrmt.

Signed-off-by: Vdaas CI <vald@vdaas.org>
@Matts966
Copy link
Member

Matts966 commented Mar 9, 2026

/format

@vdaas-ci
Copy link
Collaborator

vdaas-ci commented Mar 9, 2026

[FORMAT] Updating license headers and formatting go codes triggered by Matts966.

let mut agent = Agent::new(
service,
"agent-qbg",
"127.0.0.1",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kmrmt Is "127.0.0.1" always OK?

vdaas-ci and others added 3 commits March 9, 2026 08:01
Signed-off-by: Vdaas CI <vald@vdaas.org>
Signed-off-by: Kosuke Morimoto <ksk@vdaas.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants