Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,15 @@ impl<T> std::error::Error for SendError<T> {}
/// The receive operation can only fail if the corresponding [`Sender`](crate::Sender) was dropped
/// before sending any message, or if a message has already been received on the channel.
#[cfg(any(feature = "std", feature = "async"))]
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub struct RecvError;
#[derive(Debug, Eq, PartialEq)]
pub struct RecvError(());

#[cfg(any(feature = "std", feature = "async"))]
impl RecvError {
pub(crate) const fn new() -> Self {
RecvError(())
}
}

#[cfg(any(feature = "std", feature = "async"))]
impl fmt::Display for RecvError {
Expand Down
18 changes: 9 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ impl<T> Receiver<T> {
// sender's final write of the DISCONNECTED state.
unsafe { dealloc(channel_ptr) };

break Err(RecvError);
break Err(RecvError::new());
}
// State did not change, spurious wakeup, park again.
RECEIVING | UNPARKING => (),
Expand Down Expand Up @@ -654,7 +654,7 @@ impl<T> Receiver<T> {
// write of the channel state.
unsafe { dealloc(channel_ptr) };

Err(RecvError)
Err(RecvError::new())
}
_ => unreachable!(),
}
Expand Down Expand Up @@ -684,7 +684,7 @@ impl<T> Receiver<T> {
// final write of the channel state.
unsafe { dealloc(channel_ptr) };

Err(RecvError)
Err(RecvError::new())
}
// The receiver must have been `Future::poll`ed prior to this call.
#[cfg(feature = "async")]
Expand All @@ -705,7 +705,7 @@ impl<T> Receiver<T> {
/// Panics if called after this receiver has been polled asynchronously.
#[cfg(feature = "std")]
pub fn recv_ref(&self) -> Result<T, RecvError> {
self.start_recv_ref(RecvError, |channel| {
self.start_recv_ref(RecvError::new(), |channel| {
loop {
thread::park();

Expand All @@ -723,7 +723,7 @@ impl<T> Receiver<T> {
break Ok(unsafe { channel.take_message() });
}
// The sender was dropped while we were parked.
DISCONNECTED => break Err(RecvError),
DISCONNECTED => break Err(RecvError::new()),
// State did not change, spurious wakeup, park again.
RECEIVING | UNPARKING => (),
_ => unreachable!(),
Expand Down Expand Up @@ -1077,7 +1077,7 @@ impl<T> core::future::Future for Receiver<T> {
}
// The sender was dropped before sending anything while we prepared to park.
// The sender has taken the waker already.
Err(DISCONNECTED) => Poll::Ready(Err(RecvError)),
Err(DISCONNECTED) => Poll::Ready(Err(RecvError::new())),
// The sender is currently waking us up.
Err(UNPARKING) => {
// We can't trust that the old waker that the sender has access to
Expand Down Expand Up @@ -1105,7 +1105,7 @@ impl<T> core::future::Future for Receiver<T> {
Poll::Ready(Ok(unsafe { channel.take_message() }))
}
// The sender was dropped before sending anything, or we already received the message.
DISCONNECTED => Poll::Ready(Err(RecvError)),
DISCONNECTED => Poll::Ready(Err(RecvError::new())),
// The sender has observed the RECEIVING state and is currently reading the waker from
// a previous poll. We need to loop here until we observe the MESSAGE or DISCONNECTED
// state. We busy loop here since we know the sender is done very soon.
Expand All @@ -1129,7 +1129,7 @@ impl<T> core::future::Future for Receiver<T> {
}
// The sender was dropped. Our drop impl will synchronize with the sender's
// final write of the channel state and deallocate the channel.
DISCONNECTED => break Poll::Ready(Err(RecvError)),
DISCONNECTED => break Poll::Ready(Err(RecvError::new())),
// Sender is still unparking us... Spin more
UNPARKING => (),
_ => unreachable!(),
Expand Down Expand Up @@ -1403,7 +1403,7 @@ impl<T> Channel<T> {
// RECEIVING state, so it has not accessed the waker. We must drop it.
self.drop_waker();

Poll::Ready(Err(RecvError))
Poll::Ready(Err(RecvError::new()))
}
_ => unreachable!(),
}
Expand Down
2 changes: 1 addition & 1 deletion tests/miri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ fn tx_drop_rx_poll_to_completion() {
let mut cx = task::Context::from_waker(Waker::noop());
loop {
match rx.as_mut().poll(&mut cx) {
Poll::Ready(Err(oneshot::RecvError)) => break,
Poll::Ready(Err(_)) => break,
Poll::Ready(result) => panic!("Unexpected result: {:?}", result),
Poll::Pending => spin_loop(),
}
Expand Down
6 changes: 3 additions & 3 deletions tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use core::mem;
use oneshot::TryRecvError;

#[cfg(feature = "std")]
use oneshot::{RecvError, RecvTimeoutError};
use oneshot::RecvTimeoutError;
#[cfg(feature = "std")]
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -35,7 +35,7 @@ fn send_before_try_recv() {
assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
#[cfg(feature = "std")]
{
assert_eq!(receiver.recv_ref(), Err(RecvError));
assert!(receiver.recv_ref().is_err());
assert!(receiver.recv_timeout(Duration::from_secs(1)).is_err());
}
})
Expand Down Expand Up @@ -78,7 +78,7 @@ fn send_before_recv_ref() {
assert!(sender.send(19i128).is_ok());

assert_eq!(receiver.recv_ref(), Ok(19i128));
assert_eq!(receiver.recv_ref(), Err(RecvError));
assert!(receiver.recv_ref().is_err());
assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
assert!(receiver.recv_timeout(Duration::from_secs(1)).is_err());
})
Expand Down
Loading