Skip to content

Conversation

@gonzalezzfelipe
Copy link
Contributor

@gonzalezzfelipe gonzalezzfelipe commented Dec 9, 2025

Summary by CodeRabbit

  • Refactor

    • Improved runtime concurrency with finer-grained per-worker locking, read/write map access, and batched update handling to reduce contention and improve responsiveness.
  • Chores

    • Added an async utility dependency to support batched operations and related runtime improvements.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 9, 2025

Walkthrough

Reworked runtime concurrency: the worker map stores Mutex<LoadedWorker> and the runtime holds Arc<RwLock<WorkerMap>>. Map access sites were updated to use .read().await / .write().await and to lock individual workers for per-worker mutations; batch updates use join_all/Itertools.

Changes

Cohort / File(s) Summary
Per-worker & RwLock refactor
balius-runtime/src/lib.rs
Changed type WorkerMap = HashMap<String, LoadedWorker>HashMap<String, Mutex<LoadedWorker>>; replaced loaded: Arc<Mutex<WorkerMap>>loaded: Arc<RwLock<WorkerMap>>; updated chain_cursor, register_worker/register_worker_from_url/register_worker_from_file, remove_worker, handle_chain, and handle_request to use .read().await / .write().await on the map and to acquire per-worker Mutex locks; introduced batched chain application using futures::future::join_all and itertools::Itertools.
Dependency addition
balius-runtime/Cargo.toml
Added futures = "0.3.31" to dependencies.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Check correct use of RwLock read vs write (.read().await vs .write().await) at every access site.
  • Inspect lock acquisition order (map RwLock vs per-worker Mutex) to detect possible deadlocks.
  • Verify batch join_all handling, error propagation, and that per-worker state updates are applied atomically where intended.
  • Confirm imports (futures::future::join_all, itertools::Itertools) are used correctly and needed versions match Cargo changes.

Poem

🐇 I nibbled at locks with a careful hop,
Turned global grasps into per-worker crop,
Readers peek, writers queue in line,
Async hops sync each cursor fine,
A jubilant rabbit hums—concurrency's on top! 🥕

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly describes the main change: refactoring lock mechanisms from global Mutex to per-worker locking with RwLock for improved concurrency.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
balius-runtime/src/lib.rs (1)

668-673: Consider releasing the read lock earlier (optional optimization).

The implementation is correct. The read lock on the map is held until the end of the function. Since the map is only needed to look up the worker, you could potentially release the read lock after line 673 to improve concurrency.

However, this is a minor optimization and the current code is clear and correct. The lock is held for a brief duration (single request handling), so the impact is minimal.

Example refactor if desired:

 pub async fn handle_request(
     &self,
     worker_id: &str,
     method: &str,
     params: Vec<u8>,
 ) -> Result<wit::Response, Error> {
-    let workers = self.loaded.read().await;
-    let mut worker = workers
-        .get(worker_id)
-        .ok_or(Error::WorkerNotFound(worker_id.to_string()))?
-        .lock()
-        .await;
+    let worker_mutex = {
+        let workers = self.loaded.read().await;
+        workers
+            .get(worker_id)
+            .ok_or(Error::WorkerNotFound(worker_id.to_string()))?
+            .clone()
+    }; // read lock released here
+    let mut worker = worker_mutex.lock().await;

     let channel = worker
         .wasm_store
         .data()
         .router
         .find_request_target(method)?;

     let evt = wit::Event::Request(params);

     let result = worker.dispatch_event(channel, &evt).await;
     self.metrics.request(worker_id, method, result.is_ok());
     result
 }

Note: This requires Arc::clone() on the Mutex, so the WorkerMap values would need to be Arc<Mutex<LoadedWorker>> instead of just Mutex<LoadedWorker> for this optimization.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 25e1e67 and ee4dc39.

📒 Files selected for processing (1)
  • balius-runtime/src/lib.rs (7 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: lint
  • GitHub Check: build
🔇 Additional comments (6)
balius-runtime/src/lib.rs (6)

1-2: LGTM: Imports support the new concurrency model.

The new imports correctly support parallel worker updates (join_all), collection utilities (Itertools), and the read-write lock pattern (RwLock).

Also applies to: 10-10


490-490: LGTM: Two-level locking improves concurrency.

The change from Arc<Mutex<WorkerMap>> to Arc<RwLock<HashMap<String, Mutex<LoadedWorker>>>> is a well-established pattern that enables:

  • Multiple concurrent readers of the worker map
  • Fine-grained per-worker locking for mutations
  • Better parallelism during chain processing

Also applies to: 496-496


515-523: LGTM: Cursor computation is correct.

The read lock ensures map stability during iteration, while per-worker locks safely access individual cursors. The logic correctly computes the minimum cursor across all workers.


576-584: LGTM: Worker registration correctly uses write lock.

The write lock appropriately guards the structural modification of the map, and the new worker is correctly wrapped in a Mutex consistent with the updated WorkerMap type.


617-617: LGTM: Worker removal correctly uses write lock.

The write lock appropriately guards the removal operation.


638-656: Verify read lock duration during chain processing.

The implementation correctly uses parallel worker updates via join_all, which is excellent for performance. However, the read lock on the worker map is held for the entire chain processing duration (including block application and cursor updates). This will block worker registration and removal operations during chain processing.

This is likely intentional to ensure atomicity—you probably want a stable worker set while applying a chain update. However, if chain processing takes significant time and you need to dynamically manage workers during processing, this could become a bottleneck.

Consider whether:

  1. This blocking behavior is acceptable for your use case
  2. If worker management during chain processing is needed, you might need to restructure the locking (e.g., snapshot worker IDs with read lock, then release before processing)

@scarmuega scarmuega merged commit 51c82b4 into txpipe:main Dec 10, 2025
6 of 8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants