From 967871f05629d6c57768ca75cfe80c57cc511539 Mon Sep 17 00:00:00 2001 From: Luke Yue Date: Sun, 30 Nov 2025 12:21:57 +0800 Subject: [PATCH 1/2] WIP: a usable CQ completion channel Signed-off-by: Luke Yue --- src/ibverbs/completion.rs | 125 +++++++++++++++++++++++++++++++++++--- 1 file changed, 116 insertions(+), 9 deletions(-) diff --git a/src/ibverbs/completion.rs b/src/ibverbs/completion.rs index 3d9c7d6..6df1965 100644 --- a/src/ibverbs/completion.rs +++ b/src/ibverbs/completion.rs @@ -3,7 +3,7 @@ use std::num::NonZeroU32; use std::os::fd::{AsRawFd, RawFd}; use std::os::raw::c_void; use std::ptr::NonNull; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::{io, ptr}; use std::{marker::PhantomData, mem::MaybeUninit}; @@ -11,10 +11,11 @@ use bitmask_enum::bitmask; use super::device_context::DeviceContext; use rdma_mummy_sys::{ - ibv_comp_channel, ibv_cq, ibv_cq_ex, ibv_cq_init_attr_ex, ibv_create_comp_channel, ibv_create_cq, ibv_create_cq_ex, - ibv_create_cq_wc_flags, ibv_destroy_comp_channel, ibv_destroy_cq, ibv_end_poll, ibv_next_poll, ibv_pd, ibv_poll_cq, - ibv_poll_cq_attr, ibv_start_poll, ibv_wc, ibv_wc_opcode, ibv_wc_read_byte_len, ibv_wc_read_completion_ts, - ibv_wc_read_imm_data, ibv_wc_read_opcode, ibv_wc_read_vendor_err, ibv_wc_status, + ibv_ack_cq_events, ibv_comp_channel, ibv_cq, ibv_cq_ex, ibv_cq_init_attr_ex, ibv_create_comp_channel, + ibv_create_cq, ibv_create_cq_ex, ibv_create_cq_wc_flags, ibv_destroy_comp_channel, ibv_destroy_cq, ibv_end_poll, + ibv_get_cq_event, ibv_next_poll, ibv_pd, ibv_poll_cq, ibv_poll_cq_attr, ibv_req_notify_cq, ibv_start_poll, ibv_wc, + ibv_wc_opcode, ibv_wc_read_byte_len, ibv_wc_read_completion_ts, ibv_wc_read_imm_data, ibv_wc_read_opcode, + ibv_wc_read_vendor_err, ibv_wc_status, }; /// Error returned by [`DeviceContext::create_comp_channel`] for creating a new completion channel. @@ -46,6 +47,34 @@ pub enum CreateCompletionQueueErrorKind { Ibverbs(#[from] io::Error), } +#[derive(Debug, thiserror::Error)] +#[error("failed to get completion event")] +#[non_exhaustive] +pub struct GetCompletionEventError(#[from] pub GetCompletionEventErrorKind); + +/// The enum type for [`GetCompletionEventError`]. +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +#[non_exhaustive] +pub enum GetCompletionEventErrorKind { + Ibverbs(#[from] io::Error), +} + +/// Error returned by [`CompletionChannel::req_notify_cq`] for requesting notification of completion queue. +#[derive(Debug, thiserror::Error)] +#[error("failed to request notification of completion queue")] +#[non_exhaustive] +pub struct RequestNotifyCompletionQueueError(#[from] pub RequestNotifyCompletionQueueErrorKind); + +/// The enum type for [`RequestNotifyCompletionQueueError`]. + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +#[non_exhaustive] +pub enum RequestNotifyCompletionQueueErrorKind { + Ibverbs(#[from] io::Error), +} + /// Error returned by [`BasicCompletionQueue::start_poll`] and /// [`ExtendedCompletionQueue::start_poll`] for polling Work Completions from RDMA CQ. #[derive(Debug, thiserror::Error)] @@ -254,6 +283,22 @@ impl CompletionChannel { } } + pub fn get_cq_event(&self) -> Result { + let mut cq_ptr = MaybeUninit::<*mut ibv_cq>::uninit(); + let mut cq_wrapper = MaybeUninit::<*mut WeakGenericCompletionQueue>::uninit(); + + let ret = unsafe { ibv_get_cq_event(self.channel.as_ptr(), cq_ptr.as_mut_ptr(), cq_wrapper.as_mut_ptr() as _) }; + if ret < 0 { + return Err(GetCompletionEventErrorKind::Ibverbs(io::Error::last_os_error()).into()); + } + let _cq = unsafe { NonNull::new(cq_ptr.assume_init()).unwrap() }; + let cq_wrapper = unsafe { NonNull::new(cq_wrapper.assume_init()).unwrap() }; + + let weak_cq = unsafe { (*cq_wrapper.as_ptr()).upgrade() }; + + Ok(weak_cq.unwrap()) + } + /// # Safety /// /// Return the handle of completion channel. @@ -281,6 +326,20 @@ pub trait CompletionQueue { /// We mark this method unsafe because the lifetime of `ibv_cq` is not /// associated with the return value. unsafe fn cq(&self) -> NonNull; + + fn ack_events(&self, num_events: u32) { + unsafe { ibv_ack_cq_events(self.cq().as_ptr(), num_events) }; + } + + fn req_notify_cq(&self, solicited_only: bool) -> Result<(), RequestNotifyCompletionQueueError> { + let ret = unsafe { ibv_req_notify_cq(self.cq().as_ptr(), if solicited_only { 1 } else { 0 }) }; + + if ret != 0 { + return Err(RequestNotifyCompletionQueueErrorKind::Ibverbs(io::Error::from_raw_os_error(ret)).into()); + } + + Ok(()) + } } /// The legacy [`CompletionQueue`] created with [`CompletionQueueBuilder::build`] @@ -490,11 +549,21 @@ impl CompletionQueueBuilder { if cq_ex.is_null() { Err(CreateCompletionQueueErrorKind::Ibverbs(io::Error::last_os_error()).into()) } else { - Ok(Arc::new(ExtendedCompletionQueue { + let cq_wrapper = Arc::new(ExtendedCompletionQueue { cq_ex: unsafe { NonNull::new_unchecked(cq_ex) }, _dev_ctx: Arc::clone(&self.dev_ctx), _comp_channel: self.comp_channel.clone(), - })) + }); + + let weak_cq = Arc::downgrade(&cq_wrapper.clone()); + let boxed = Box::new(WeakGenericCompletionQueue::Extended(weak_cq)); + let raw_box = Box::into_raw(boxed); + + unsafe { + (*cq_ex).cq_context = raw_box as *mut std::ffi::c_void; + } + + Ok(cq_wrapper) } } @@ -516,12 +585,22 @@ impl CompletionQueueBuilder { if cq.is_null() { Err(CreateCompletionQueueErrorKind::Ibverbs(io::Error::last_os_error()).into()) } else { - Ok(Arc::new(BasicCompletionQueue { + let cq_wrapper = Arc::new(BasicCompletionQueue { cq: unsafe { NonNull::new_unchecked(cq) }, poll_batch: unsafe { NonZeroU32::new(32).unwrap_unchecked() }, _dev_ctx: Arc::clone(&self.dev_ctx), _comp_channel: self.comp_channel.clone(), - })) + }); + + let weak_cq = Arc::downgrade(&cq_wrapper.clone()); + let boxed = Box::new(WeakGenericCompletionQueue::Basic(weak_cq)); + let raw_box = Box::into_raw(boxed); + + unsafe { + (*cq).cq_context = raw_box as *mut std::ffi::c_void; + } + + Ok(cq_wrapper) } } } @@ -746,6 +825,20 @@ impl<'cq> Iterator for ExtendedPoller<'cq> { } } +enum WeakGenericCompletionQueue { + Basic(Weak), + Extended(Weak), +} + +impl WeakGenericCompletionQueue { + pub fn upgrade(&self) -> Option { + match self { + WeakGenericCompletionQueue::Basic(cq) => cq.upgrade().map(GenericCompletionQueue::Basic), + WeakGenericCompletionQueue::Extended(cq) => cq.upgrade().map(GenericCompletionQueue::Extended), + } + } +} + /// A unified interface for [`BasicCompletionQueue`] and [`ExtendedCompletionQueue`], implemented /// with enum dispatching. #[derive(Debug)] @@ -788,6 +881,20 @@ impl GenericCompletionQueue { GenericCompletionQueue::Extended(cq) => cq.start_poll().map(GenericPoller::Extended), } } + + pub fn ack_events(&self, num_events: u32) { + match self { + GenericCompletionQueue::Basic(cq) => cq.ack_events(num_events), + GenericCompletionQueue::Extended(cq) => cq.ack_events(num_events), + } + } + + pub fn req_notify_cq(&self, solicited_only: bool) -> Result<(), RequestNotifyCompletionQueueError> { + match self { + GenericCompletionQueue::Basic(cq) => cq.req_notify_cq(solicited_only), + GenericCompletionQueue::Extended(cq) => cq.req_notify_cq(solicited_only), + } + } } impl<'cq> Iterator for GenericPoller<'cq> { From a2fa8833a1e2913dc45b00bd73c7a4767f6903ff Mon Sep 17 00:00:00 2001 From: Luke Yue Date: Sun, 30 Nov 2025 16:50:24 +0800 Subject: [PATCH 2/2] feat(examples): adapt CQ events for rc_pingpong Signed-off-by: Luke Yue --- .cirrus.yml | 1 + Justfile | 5 + examples/rc_pingpong.rs | 210 ++++++++++++++++++++-------------- examples/rc_pingpong_split.rs | 154 ++++++++++++++++--------- 4 files changed, 234 insertions(+), 136 deletions(-) diff --git a/.cirrus.yml b/.cirrus.yml index 685a697..eb7c136 100644 --- a/.cirrus.yml +++ b/.cirrus.yml @@ -27,6 +27,7 @@ task: - export LD_LIBRARY_PATH=./rdma-core/build/lib - just test-basic-with-cov - just test-rc-pingpong-with-cov + - just test-rc-pingpong-with-events-with-cov - just test-cmtime-with-cov - just generate-cov - sed -i 's#/tmp/cirrus-ci-build/##g' lcov.info diff --git a/Justfile b/Justfile index 781c653..17ccc3c 100644 --- a/Justfile +++ b/Justfile @@ -20,6 +20,11 @@ test-rc-pingpong-with-cov: sleep 2 cargo llvm-cov --no-report run --example rc_pingpong_split -- -d {{rdma_dev}} -g 1 127.0.0.1 +test-rc-pingpong-with-events-with-cov: + cargo llvm-cov --no-report run --example rc_pingpong_split -- -d {{rdma_dev}} -g 1 -e & + sleep 2 + cargo llvm-cov --no-report run --example rc_pingpong_split -- -d {{rdma_dev}} -g 1 -e 127.0.0.1 + test-cmtime-with-cov: cargo llvm-cov --no-report run --example cmtime -- -b {{ip}} & sleep 2 diff --git a/examples/rc_pingpong.rs b/examples/rc_pingpong.rs index 524dbf7..0614b4b 100644 --- a/examples/rc_pingpong.rs +++ b/examples/rc_pingpong.rs @@ -22,7 +22,8 @@ use postcard::{from_bytes, to_allocvec}; use serde::{Deserialize, Serialize}; use sideway::ibverbs::address::{AddressHandleAttribute, Gid}; use sideway::ibverbs::completion::{ - CreateCompletionQueueWorkCompletionFlags, GenericCompletionQueue, WorkCompletionStatus, + CompletionChannel, CreateCompletionQueueWorkCompletionFlags, GenericCompletionQueue, PollCompletionQueueError, + WorkCompletionStatus, }; use sideway::ibverbs::device::{DeviceInfo, DeviceList}; use sideway::ibverbs::device_context::Mtu; @@ -69,6 +70,9 @@ pub struct Args { /// Get CQE with timestamp #[arg(long, short = 't', default_value_t = false)] ts: bool, + /// Use CQ events instead of busy polling + #[arg(long, short = 'e', default_value_t = false)] + use_events: bool, /// If no value provided, start a server and wait for connection, otherwise, connect to server at [host] #[arg(name = "host")] server_ip: Option, @@ -156,6 +160,13 @@ fn main() -> anyhow::Result<()> { } } + // Create completion channel if using events + let comp_channel = if args.use_events { + Some(CompletionChannel::new(&context).expect("Couldn't create completion channel")) + } else { + None + }; + let pd = context.alloc_pd().unwrap_or_else(|_| panic!("Couldn't allocate PD")); let send_data: Vec = vec![0; args.size as _]; let send_mr = unsafe { @@ -189,16 +200,28 @@ fn main() -> anyhow::Result<()> { ); } + // Associate completion channel with CQ if using events + if let Some(ref channel) = comp_channel { + cq_builder.setup_comp_channel(channel, 0); + } + let cq = cq_builder.setup_cqe(rx_depth + 1).build_ex().unwrap(); let cq_handle = GenericCompletionQueue::from(Arc::clone(&cq)); + // Request initial notification if using events + if args.use_events { + cq_handle + .req_notify_cq(false) + .expect("Couldn't request CQ notification"); + } + let mut builder = pd.create_qp_builder(); let mut qp = builder .setup_max_inline_data(128) .setup_send_cq(cq_handle.clone()) - .setup_recv_cq(cq_handle) + .setup_recv_cq(cq_handle.clone()) .setup_max_send_wr(1) .setup_max_recv_wr(rx_depth) .build_ex() @@ -324,100 +347,119 @@ fn main() -> anyhow::Result<()> { outstanding_send = true; } // poll for the completion - { - loop { - match cq.start_poll() { - Ok(mut poller) => { - while let Some(wc) = poller.next() { - if wc.status() != WorkCompletionStatus::Success as u32 { - panic!( - "Failed status {:#?} ({}) for wr_id {}", - Into::::into(wc.status()), - wc.status(), - wc.wr_id() - ); - } - match wc.wr_id() { - SEND_WR_ID => { - scnt += 1; - outstanding_send = false; - }, - RECV_WR_ID => { - rcnt += 1; - rout -= 1; - - // Post more receives if the receive side credit is low - if rout <= rx_depth / 2 { - let to_post = rx_depth - rout; - for _ in 0..to_post { - let mut guard = qp.start_post_recv(); - let recv_handle = guard.construct_wr(RECV_WR_ID); - unsafe { - recv_handle.setup_sge( - recv_mr.lkey(), - recv_data.as_mut_ptr() as _, - args.size, - ); - }; - guard.post().unwrap(); - } - rout += to_post; + let mut num_cq_events: u32 = 0; + loop { + // If using events, wait for CQ event before polling + if args.use_events { + if let Some(ref channel) = comp_channel { + // Get the CQ event (this blocks until an event arrives) + let _event_cq = channel.get_cq_event().expect("Failed to get CQ event"); + num_cq_events += 1; + + // Re-arm the notification BEFORE polling to avoid missing events + cq_handle + .req_notify_cq(false) + .expect("Couldn't request CQ notification"); + } + } + + // Poll for completions + match cq.start_poll() { + Ok(mut poller) => { + while let Some(wc) = poller.next() { + if wc.status() != WorkCompletionStatus::Success as u32 { + panic!( + "Failed status {:#?} ({}) for wr_id {}", + Into::::into(wc.status()), + wc.status(), + wc.wr_id() + ); + } + match wc.wr_id() { + SEND_WR_ID => { + scnt += 1; + outstanding_send = false; + }, + RECV_WR_ID => { + rcnt += 1; + rout -= 1; + + // Post more receives if the receive side credit is low + if rout <= rx_depth / 2 { + let to_post = rx_depth - rout; + for _ in 0..to_post { + let mut guard = qp.start_post_recv(); + let recv_handle = guard.construct_wr(RECV_WR_ID); + unsafe { + recv_handle.setup_sge(recv_mr.lkey(), recv_data.as_mut_ptr() as _, args.size); + }; + guard.post().unwrap(); } + rout += to_post; + } - if args.ts { - let timestamp = wc.completion_timestamp(); - if ts_param.last_completion_with_timestamp != 0 { - let delta: u64 = if timestamp >= ts_param.completion_recv_prev_time { - timestamp - ts_param.completion_recv_prev_time - } else { - completion_timestamp_mask - ts_param.completion_recv_prev_time - + timestamp - + 1 - }; - - ts_param.completion_recv_max_time_delta = - ts_param.completion_recv_max_time_delta.max(delta); - ts_param.completion_recv_min_time_delta = - ts_param.completion_recv_min_time_delta.min(delta); - ts_param.completion_recv_total_time_delta += delta; - ts_param.completion_with_time_iters += 1; - } - - ts_param.completion_recv_prev_time = timestamp; - ts_param.last_completion_with_timestamp = 1; - } else { - ts_param.last_completion_with_timestamp = 0; + if args.ts { + let timestamp = wc.completion_timestamp(); + if ts_param.last_completion_with_timestamp != 0 { + let delta: u64 = if timestamp >= ts_param.completion_recv_prev_time { + timestamp - ts_param.completion_recv_prev_time + } else { + completion_timestamp_mask - ts_param.completion_recv_prev_time + timestamp + 1 + }; + + ts_param.completion_recv_max_time_delta = + ts_param.completion_recv_max_time_delta.max(delta); + ts_param.completion_recv_min_time_delta = + ts_param.completion_recv_min_time_delta.min(delta); + ts_param.completion_recv_total_time_delta += delta; + ts_param.completion_with_time_iters += 1; } - }, - _ => { - panic!("Unknown error!"); - }, - } - if scnt < args.iter && !outstanding_send { - // Post another send if we haven't reached the iteration limit - let mut guard = qp.start_post_send(); - let send_handle = guard.construct_wr(SEND_WR_ID, WorkRequestFlags::Signaled).setup_send(); - unsafe { - send_handle.setup_sge(send_mr.lkey(), send_data.as_ptr() as _, args.size); + ts_param.completion_recv_prev_time = timestamp; + ts_param.last_completion_with_timestamp = 1; + } else { + ts_param.last_completion_with_timestamp = 0; } - guard.post()?; - outstanding_send = true; + }, + _ => { + panic!("Unknown error!"); + }, + } + + if scnt < args.iter && !outstanding_send { + // Post another send if we haven't reached the iteration limit + let mut guard = qp.start_post_send(); + let send_handle = guard.construct_wr(SEND_WR_ID, WorkRequestFlags::Signaled).setup_send(); + unsafe { + send_handle.setup_sge(send_mr.lkey(), send_data.as_ptr() as _, args.size); } + guard.post()?; + outstanding_send = true; } - }, - Err(_) => { + } + }, + Err(PollCompletionQueueError::CompletionQueueEmpty) => { + // CQ is empty - if not using events, continue busy polling + if !args.use_events { continue; - }, - } + } + }, + Err(e) => { + panic!("Failed to poll CQ: {:?}", e); + }, + } - // Check if we're done - if scnt >= args.iter && rcnt >= args.iter { - break; - } + // Check if we're done + if scnt >= args.iter && rcnt >= args.iter { + break; } } + // Acknowledge all CQ events before cleanup + if num_cq_events > 0 { + cq_handle.ack_events(num_cq_events); + } + let end_time = clock.now(); let time = end_time.duration_since(start_time); let bytes = args.size as u64 * args.iter as u64 * 2; diff --git a/examples/rc_pingpong_split.rs b/examples/rc_pingpong_split.rs index c0d53ed..e951683 100644 --- a/examples/rc_pingpong_split.rs +++ b/examples/rc_pingpong_split.rs @@ -32,8 +32,8 @@ use postcard::{from_bytes, to_allocvec}; use serde::{Deserialize, Serialize}; use sideway::ibverbs::address::{AddressHandleAttribute, Gid}; use sideway::ibverbs::completion::{ - CreateCompletionQueueWorkCompletionFlags, ExtendedCompletionQueue, ExtendedWorkCompletion, GenericCompletionQueue, - WorkCompletionStatus, + CompletionChannel, CreateCompletionQueueWorkCompletionFlags, ExtendedCompletionQueue, ExtendedWorkCompletion, + GenericCompletionQueue, PollCompletionQueueError, WorkCompletionStatus, }; use sideway::ibverbs::device::{Device, DeviceInfo, DeviceList}; use sideway::ibverbs::device_context::{DeviceContext, Mtu}; @@ -83,6 +83,9 @@ pub struct Args { /// Get CQE with timestamp #[arg(long, short = 't', default_value_t = false)] ts: bool, + /// Use CQ events instead of busy polling + #[arg(long, short = 'e', default_value_t = false)] + use_events: bool, /// If no value provided, start a server and wait for connection, otherwise, connect to server at [host] #[arg(name = "host")] server_ip: Option, @@ -121,13 +124,17 @@ struct PingPongContext { _recv_buf: Arc>, recv_mr: Arc, cq: Arc, + cq_handle: GenericCompletionQueue, + comp_channel: Option>, qp: ExtendedQueuePair, size: u32, completion_timestamp_mask: u64, } impl PingPongContext { - fn build(device: &Device, size: u32, rx_depth: u32, ib_port: u8, use_ts: bool) -> anyhow::Result { + fn build( + device: &Device, size: u32, rx_depth: u32, ib_port: u8, use_ts: bool, use_events: bool, + ) -> anyhow::Result { let context: Arc = device .open() .unwrap_or_else(|_| panic!("Couldn't get context for {}", device.name())); @@ -143,6 +150,13 @@ impl PingPongContext { 0 }; + // Create completion channel if using events + let comp_channel = if use_events { + Some(CompletionChannel::new(&context).expect("Couldn't create completion channel")) + } else { + None + }; + let pd = context.alloc_pd().unwrap_or_else(|_| panic!("Couldn't allocate PD")); let send_buf = Arc::new(vec![0; size as usize]); @@ -172,16 +186,27 @@ impl PingPongContext { | CreateCompletionQueueWorkCompletionFlags::CompletionTimestamp, ); } + // Associate completion channel with CQ if using events + if let Some(ref channel) = comp_channel { + cq_builder.setup_comp_channel(channel, 0); + } let cq = cq_builder.setup_cqe(rx_depth + 1).build_ex().unwrap(); - let cq_for_qp = GenericCompletionQueue::from(Arc::clone(&cq)); + let cq_handle = GenericCompletionQueue::from(Arc::clone(&cq)); + + // Request initial notification if using events + if use_events { + cq_handle + .req_notify_cq(false) + .expect("Couldn't request CQ notification"); + } let mut builder = pd.create_qp_builder(); let mut qp = builder .setup_max_inline_data(128) - .setup_send_cq(cq_for_qp.clone()) - .setup_recv_cq(cq_for_qp) + .setup_send_cq(cq_handle.clone()) + .setup_recv_cq(cq_handle.clone()) .setup_max_send_wr(1) .setup_max_recv_wr(rx_depth) .build_ex() @@ -202,6 +227,8 @@ impl PingPongContext { _recv_buf: recv_buf, recv_mr, cq, + cq_handle, + comp_channel, qp, size, completion_timestamp_mask, @@ -377,7 +404,7 @@ fn main() -> anyhow::Result<()> { None => device_list.iter().next().expect("No IB device found"), }; - let mut ctx = PingPongContext::build(&device, args.size, rx_depth, args.ib_port, args.ts)?; + let mut ctx = PingPongContext::build(&device, args.size, rx_depth, args.ib_port, args.ts, args.use_events)?; let gid = ctx.ctx.query_gid(args.ib_port, args.gid_idx.into()).unwrap(); let psn = rand::random::() & 0xFFFFFF; @@ -450,58 +477,81 @@ fn main() -> anyhow::Result<()> { outstanding_send = true; } // poll for the completion - { - loop { - let mut need_post_recv = false; - let mut to_post_recv = 0; - let mut need_post_send = false; - - { - match ctx.cq.start_poll() { - Ok(mut poller) => { - while let Some(wc) = poller.next() { - ctx.parse_single_work_completion( - &wc, - &mut ts_param, - &mut scnt, - &mut rcnt, - &mut outstanding_send, - &mut rout, - rx_depth, - &mut need_post_recv, - &mut to_post_recv, - args.ts, - ); - - // Record that we need to post a send later - if scnt < args.iter && !outstanding_send { - need_post_send = true; - outstanding_send = true; - } - } - }, - Err(_) => { - continue; - }, - } + let mut num_cq_events: u32 = 0; + loop { + let mut need_post_recv = false; + let mut to_post_recv = 0; + let mut need_post_send = false; + + // If using events, wait for CQ event before polling + if args.use_events { + if let Some(ref channel) = ctx.comp_channel { + // Get the CQ event (this blocks until an event arrives) + let _event_cq = channel.get_cq_event().expect("Failed to get CQ event"); + num_cq_events += 1; + + // Re-arm the notification BEFORE polling to avoid missing events + ctx.cq_handle + .req_notify_cq(false) + .expect("Couldn't request CQ notification"); } + } - if need_post_recv { - ctx.post_recv(to_post_recv).unwrap(); - rout += to_post_recv; - } + // Poll for completions + match ctx.cq.start_poll() { + Ok(mut poller) => { + while let Some(wc) = poller.next() { + ctx.parse_single_work_completion( + &wc, + &mut ts_param, + &mut scnt, + &mut rcnt, + &mut outstanding_send, + &mut rout, + rx_depth, + &mut need_post_recv, + &mut to_post_recv, + args.ts, + ); + + // Record that we need to post a send later + if scnt < args.iter && !outstanding_send { + need_post_send = true; + outstanding_send = true; + } + } + }, + Err(PollCompletionQueueError::CompletionQueueEmpty) => { + // CQ is empty - if not using events, continue busy polling + if !args.use_events { + continue; + } + }, + Err(e) => { + panic!("Failed to poll CQ: {:?}", e); + }, + } - if need_post_send { - ctx.post_send()?; - } + if need_post_recv { + ctx.post_recv(to_post_recv).unwrap(); + rout += to_post_recv; + } - // Check if we're done - if scnt >= args.iter && rcnt >= args.iter { - break; - } + if need_post_send { + ctx.post_send()?; + } + + // Check if we're done + if scnt >= args.iter && rcnt >= args.iter { + break; } } + // Acknowledge all CQ events before cleanup + if num_cq_events > 0 { + ctx.cq_handle.ack_events(num_cq_events); + } + let end_time = clock.now(); let time = end_time.duration_since(start_time); let bytes = args.size as u64 * args.iter as u64 * 2;