diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index 9971dfee828..e214797d4f8 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -1011,7 +1011,6 @@ fn new_sched_rng() -> XorShiftRng { mod test { use rustuv; - use std::comm; use std::task::TaskOpts; use std::rt::task::Task; use std::rt::local::Local; @@ -1428,7 +1427,7 @@ mod test { // This task should not be able to starve the sender; // The sender should get stolen to another thread. spawn(proc() { - while rx.try_recv() != comm::Data(()) { } + while rx.try_recv().is_err() { } }); tx.send(()); @@ -1445,7 +1444,7 @@ mod test { // This task should not be able to starve the other task. // The sends should eventually yield. spawn(proc() { - while rx1.try_recv() != comm::Data(()) { + while rx1.try_recv().is_err() { tx2.send(()); } }); @@ -1499,7 +1498,7 @@ mod test { let mut val = 20; while val > 0 { val = po.recv(); - ch.try_send(val - 1); + let _ = ch.send_opt(val - 1); } } diff --git a/src/libgreen/task.rs b/src/libgreen/task.rs index 6fa40c0e42b..534e9f8401e 100644 --- a/src/libgreen/task.rs +++ b/src/libgreen/task.rs @@ -515,7 +515,7 @@ mod tests { let _tx = tx; fail!() }); - assert_eq!(rx.recv_opt(), None); + assert_eq!(rx.recv_opt(), Err(())); } #[test] diff --git a/src/libnative/io/timer_other.rs b/src/libnative/io/timer_other.rs index 569b4cbb258..0bf97d58ffd 100644 --- a/src/libnative/io/timer_other.rs +++ b/src/libnative/io/timer_other.rs @@ -46,7 +46,6 @@ //! //! Note that all time units in this file are in *milliseconds*. -use std::comm::Data; use libc; use std::mem; use std::os; @@ -119,7 +118,7 @@ fn helper(input: libc::c_int, messages: Receiver) { Some(timer) => timer, None => return }; let tx = timer.tx.take_unwrap(); - if tx.try_send(()) && timer.repeat { + if tx.send_opt(()).is_ok() && timer.repeat { timer.tx = Some(tx); timer.target += timer.interval; insert(timer, active); @@ -162,14 +161,14 @@ fn helper(input: libc::c_int, messages: Receiver) { 1 => { loop { match messages.try_recv() { - Data(Shutdown) => { + Ok(Shutdown) => { assert!(active.len() == 0); break 'outer; } - Data(NewTimer(timer)) => insert(timer, &mut active), + Ok(NewTimer(timer)) => insert(timer, &mut active), - Data(RemoveTimer(id, ack)) => { + Ok(RemoveTimer(id, ack)) => { match dead.iter().position(|&(i, _)| id == i) { Some(i) => { let (_, i) = dead.remove(i).unwrap(); diff --git a/src/libnative/io/timer_timerfd.rs b/src/libnative/io/timer_timerfd.rs index d37a39fc30e..3fd61dc1da5 100644 --- a/src/libnative/io/timer_timerfd.rs +++ b/src/libnative/io/timer_timerfd.rs @@ -28,7 +28,6 @@ //! //! As with timer_other, all units in this file are in units of millseconds. -use std::comm::Data; use libc; use std::ptr; use std::os; @@ -107,7 +106,7 @@ fn helper(input: libc::c_int, messages: Receiver) { match list.as_slice().bsearch(|&(f, _, _)| f.cmp(&fd)) { Some(i) => { let (_, ref c, oneshot) = *list.get(i); - (!c.try_send(()) || oneshot, i) + (c.send_opt(()).is_err() || oneshot, i) } None => fail!("fd not active: {}", fd), } @@ -121,7 +120,7 @@ fn helper(input: libc::c_int, messages: Receiver) { while incoming { match messages.try_recv() { - Data(NewTimer(fd, chan, one, timeval)) => { + Ok(NewTimer(fd, chan, one, timeval)) => { // acknowledge we have the new channel, we will never send // another message to the old channel chan.send(()); @@ -149,7 +148,7 @@ fn helper(input: libc::c_int, messages: Receiver) { assert_eq!(ret, 0); } - Data(RemoveTimer(fd, chan)) => { + Ok(RemoveTimer(fd, chan)) => { match list.as_slice().bsearch(|&(f, _, _)| f.cmp(&fd)) { Some(i) => { drop(list.remove(i)); @@ -160,7 +159,7 @@ fn helper(input: libc::c_int, messages: Receiver) { chan.send(()); } - Data(Shutdown) => { + Ok(Shutdown) => { assert!(list.len() == 0); break 'outer; } diff --git a/src/libnative/io/timer_win32.rs b/src/libnative/io/timer_win32.rs index 8b7592783da..a15898feb92 100644 --- a/src/libnative/io/timer_win32.rs +++ b/src/libnative/io/timer_win32.rs @@ -20,7 +20,6 @@ //! Other than that, the implementation is pretty straightforward in terms of //! the other two implementations of timers with nothing *that* new showing up. -use std::comm::Data; use libc; use std::ptr; use std::rt::rtio; @@ -54,11 +53,11 @@ fn helper(input: libc::HANDLE, messages: Receiver) { if idx == 0 { loop { match messages.try_recv() { - Data(NewTimer(obj, c, one)) => { + Ok(NewTimer(obj, c, one)) => { objs.push(obj); chans.push((c, one)); } - Data(RemoveTimer(obj, c)) => { + Ok(RemoveTimer(obj, c)) => { c.send(()); match objs.iter().position(|&o| o == obj) { Some(i) => { @@ -68,7 +67,7 @@ fn helper(input: libc::HANDLE, messages: Receiver) { None => {} } } - Data(Shutdown) => { + Ok(Shutdown) => { assert_eq!(objs.len(), 1); assert_eq!(chans.len(), 0); break 'outer; @@ -79,7 +78,7 @@ fn helper(input: libc::HANDLE, messages: Receiver) { } else { let remove = { match chans.get(idx as uint - 1) { - &(ref c, oneshot) => !c.try_send(()) || oneshot + &(ref c, oneshot) => c.send_opt(()).is_err() || oneshot } }; if remove { diff --git a/src/libnative/task.rs b/src/libnative/task.rs index 871fc94bde4..ddfd46ecad9 100644 --- a/src/libnative/task.rs +++ b/src/libnative/task.rs @@ -274,7 +274,7 @@ mod tests { let _tx = tx; fail!() }); - assert_eq!(rx.recv_opt(), None); + assert_eq!(rx.recv_opt(), Err(())); } #[test] diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 4d4b62dddd4..b893f5f693f 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -1065,7 +1065,7 @@ mod test { } reads += 1; - tx2.try_send(()); + let _ = tx2.send_opt(()); } // Make sure we had multiple reads diff --git a/src/librustuv/signal.rs b/src/librustuv/signal.rs index c38b4fdd96f..2dcf2de681c 100644 --- a/src/librustuv/signal.rs +++ b/src/librustuv/signal.rs @@ -51,7 +51,7 @@ impl SignalWatcher { extern fn signal_cb(handle: *uvll::uv_signal_t, signum: c_int) { let s: &mut SignalWatcher = unsafe { UvHandle::from_uv_handle(&handle) }; assert_eq!(signum as int, s.signal as int); - s.channel.try_send(s.signal); + let _ = s.channel.send_opt(s.signal); } impl HomingIO for SignalWatcher { diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index 3d323382ad5..58008002837 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -140,9 +140,9 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) { let task = timer.blocker.take_unwrap(); let _ = task.wake().map(|t| t.reawaken()); } - SendOnce(chan) => { let _ = chan.try_send(()); } + SendOnce(chan) => { let _ = chan.send_opt(()); } SendMany(chan, id) => { - let _ = chan.try_send(()); + let _ = chan.send_opt(()); // Note that the above operation could have performed some form of // scheduling. This means that the timer may have decided to insert @@ -196,8 +196,8 @@ mod test { let oport = timer.oneshot(1); let pport = timer.period(1); timer.sleep(1); - assert_eq!(oport.recv_opt(), None); - assert_eq!(pport.recv_opt(), None); + assert_eq!(oport.recv_opt(), Err(())); + assert_eq!(pport.recv_opt(), Err(())); timer.oneshot(1).recv(); } @@ -284,7 +284,7 @@ mod test { let mut timer = TimerWatcher::new(local_loop()); timer.oneshot(1000) }; - assert_eq!(port.recv_opt(), None); + assert_eq!(port.recv_opt(), Err(())); } #[test] @@ -293,7 +293,7 @@ mod test { let mut timer = TimerWatcher::new(local_loop()); timer.period(1000) }; - assert_eq!(port.recv_opt(), None); + assert_eq!(port.recv_opt(), Err(())); } #[test] diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index f210bfc88bc..58781c01d66 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -322,25 +322,19 @@ pub struct SyncSender { /// This enumeration is the list of the possible reasons that try_recv could not /// return data when called. #[deriving(Eq, Clone, Show)] -pub enum TryRecvResult { +pub enum TryRecvError { /// This channel is currently empty, but the sender(s) have not yet /// disconnected, so data may yet become available. Empty, /// This channel's sending half has become disconnected, and there will /// never be any more data received on this channel Disconnected, - /// The channel had some data and we successfully popped it - Data(T), } -/// This enumeration is the list of the possible outcomes for the +/// This enumeration is the list of the possible error outcomes for the /// `SyncSender::try_send` method. #[deriving(Eq, Clone, Show)] -pub enum TrySendResult { - /// The data was successfully sent along the channel. This either means that - /// it was buffered in the channel, or handed off to a receiver. In either - /// case, the callee no longer has ownership of the data. - Sent, +pub enum TrySendError { /// The data could not be sent on the channel because it would require that /// the callee block to send the data. /// @@ -365,7 +359,7 @@ enum Flavor { /// of `Receiver` and `Sender` to see what's possible with them. pub fn channel() -> (Sender, Receiver) { let (a, b) = UnsafeArc::new2(oneshot::Packet::new()); - (Sender::my_new(Oneshot(b)), Receiver::my_new(Oneshot(a))) + (Sender::new(Oneshot(b)), Receiver::new(Oneshot(a))) } /// Creates a new synchronous, bounded channel. @@ -401,7 +395,7 @@ pub fn channel() -> (Sender, Receiver) { /// ``` pub fn sync_channel(bound: uint) -> (SyncSender, Receiver) { let (a, b) = UnsafeArc::new2(sync::Packet::new(bound)); - (SyncSender::new(a), Receiver::my_new(Sync(b))) + (SyncSender::new(a), Receiver::new(Sync(b))) } //////////////////////////////////////////////////////////////////////////////// @@ -409,7 +403,7 @@ pub fn sync_channel(bound: uint) -> (SyncSender, Receiver) { //////////////////////////////////////////////////////////////////////////////// impl Sender { - fn my_new(inner: Flavor) -> Sender { + fn new(inner: Flavor) -> Sender { Sender { inner: inner, sends: Cell::new(0), marker: marker::NoShare } } @@ -433,25 +427,42 @@ impl Sender { /// The purpose of this functionality is to propagate failure among tasks. /// If failure is not desired, then consider using the `try_send` method pub fn send(&self, t: T) { - if !self.try_send(t) { + if self.send_opt(t).is_err() { fail!("sending on a closed channel"); } } - /// Attempts to send a value on this channel, returning whether it was - /// successfully sent. + /// Attempts to send a value on this channel, returning it back if it could + /// not be sent. /// /// A successful send occurs when it is determined that the other end of /// the channel has not hung up already. An unsuccessful send would be one /// where the corresponding receiver has already been deallocated. Note - /// that a return value of `false` means that the data will never be - /// received, but a return value of `true` does *not* mean that the data + /// that a return value of `Err` means that the data will never be + /// received, but a return value of `Ok` does *not* mean that the data /// will be received. It is possible for the corresponding receiver to - /// hang up immediately after this function returns `true`. + /// hang up immediately after this function returns `Ok`. /// - /// Like `send`, this method will never block. If the failure of send cannot - /// be tolerated, then this method should be used instead. - pub fn try_send(&self, t: T) -> bool { + /// Like `send`, this method will never block. + /// + /// # Failure + /// + /// This method will never fail, it will return the message back to the + /// caller if the other end is disconnected + /// + /// # Example + /// + /// ``` + /// let (tx, rx) = channel(); + /// + /// // This send is always successful + /// assert_eq!(tx.send_opt(1), Ok(())); + /// + /// // This send will fail because the receiver is gone + /// drop(rx); + /// assert_eq!(tx.send_opt(1), Err(1)); + /// ``` + pub fn send_opt(&self, t: T) -> Result<(), T> { // In order to prevent starvation of other tasks in situations where // a task sends repeatedly without ever receiving, we occassionally // yield instead of doing a send immediately. @@ -475,16 +486,19 @@ impl Sender { return (*p).send(t); } else { let (a, b) = UnsafeArc::new2(stream::Packet::new()); - match (*p).upgrade(Receiver::my_new(Stream(b))) { + match (*p).upgrade(Receiver::new(Stream(b))) { oneshot::UpSuccess => { - (*a.get()).send(t); - (a, true) + let ret = (*a.get()).send(t); + (a, ret) } - oneshot::UpDisconnected => (a, false), + oneshot::UpDisconnected => (a, Err(t)), oneshot::UpWoke(task) => { - (*a.get()).send(t); + // This send cannot fail because the task is + // asleep (we're looking at it), so the receiver + // can't go away. + (*a.get()).send(t).unwrap(); task.wake().map(|t| t.reawaken()); - (a, true) + (a, Ok(())) } } } @@ -496,7 +510,7 @@ impl Sender { }; unsafe { - let mut tmp = Sender::my_new(Stream(new_inner)); + let mut tmp = Sender::new(Stream(new_inner)); mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner); } return ret; @@ -508,21 +522,21 @@ impl Clone for Sender { let (packet, sleeper) = match self.inner { Oneshot(ref p) => { let (a, b) = UnsafeArc::new2(shared::Packet::new()); - match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } { + match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } { oneshot::UpSuccess | oneshot::UpDisconnected => (b, None), oneshot::UpWoke(task) => (b, Some(task)) } } Stream(ref p) => { let (a, b) = UnsafeArc::new2(shared::Packet::new()); - match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } { + match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } { stream::UpSuccess | stream::UpDisconnected => (b, None), stream::UpWoke(task) => (b, Some(task)), } } Shared(ref p) => { unsafe { (*p.get()).clone_chan(); } - return Sender::my_new(Shared(p.clone())); + return Sender::new(Shared(p.clone())); } Sync(..) => unreachable!(), }; @@ -530,10 +544,10 @@ impl Clone for Sender { unsafe { (*packet.get()).inherit_blocker(sleeper); - let mut tmp = Sender::my_new(Shared(packet.clone())); + let mut tmp = Sender::new(Shared(packet.clone())); mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner); } - Sender::my_new(Shared(packet)) + Sender::new(Shared(packet)) } } @@ -579,7 +593,7 @@ impl SyncSender { /// `SyncSender::send_opt` method which will not fail if the receiver /// disconnects. pub fn send(&self, t: T) { - if self.send_opt(t).is_some() { + if self.send_opt(t).is_err() { fail!("sending on a closed channel"); } } @@ -595,11 +609,8 @@ impl SyncSender { /// # Failure /// /// This function cannot fail. - pub fn send_opt(&self, t: T) -> Option { - match unsafe { (*self.inner.get()).send(t) } { - Ok(()) => None, - Err(t) => Some(t), - } + pub fn send_opt(&self, t: T) -> Result<(), T> { + unsafe { (*self.inner.get()).send(t) } } /// Attempts to send a value on this channel without blocking. @@ -615,7 +626,7 @@ impl SyncSender { /// # Failure /// /// This function cannot fail - pub fn try_send(&self, t: T) -> TrySendResult { + pub fn try_send(&self, t: T) -> Result<(), TrySendError> { unsafe { (*self.inner.get()).try_send(t) } } } @@ -639,7 +650,7 @@ impl Drop for SyncSender { //////////////////////////////////////////////////////////////////////////////// impl Receiver { - fn my_new(inner: Flavor) -> Receiver { + fn new(inner: Flavor) -> Receiver { Receiver { inner: inner, receives: Cell::new(0), marker: marker::NoShare } } @@ -664,8 +675,8 @@ impl Receiver { /// peek at a value on this receiver. pub fn recv(&self) -> T { match self.recv_opt() { - Some(t) => t, - None => fail!("receiving on a closed channel"), + Ok(t) => t, + Err(()) => fail!("receiving on a closed channel"), } } @@ -679,7 +690,7 @@ impl Receiver { /// block on a receiver. /// /// This function cannot fail. - pub fn try_recv(&self) -> TryRecvResult { + pub fn try_recv(&self) -> Result { // If a thread is spinning in try_recv, we should take the opportunity // to reschedule things occasionally. See notes above in scheduling on // sends for why this doesn't always hit TLS, and also for why this uses @@ -695,32 +706,32 @@ impl Receiver { let mut new_port = match self.inner { Oneshot(ref p) => { match unsafe { (*p.get()).try_recv() } { - Ok(t) => return Data(t), - Err(oneshot::Empty) => return Empty, - Err(oneshot::Disconnected) => return Disconnected, + Ok(t) => return Ok(t), + Err(oneshot::Empty) => return Err(Empty), + Err(oneshot::Disconnected) => return Err(Disconnected), Err(oneshot::Upgraded(rx)) => rx, } } Stream(ref p) => { match unsafe { (*p.get()).try_recv() } { - Ok(t) => return Data(t), - Err(stream::Empty) => return Empty, - Err(stream::Disconnected) => return Disconnected, + Ok(t) => return Ok(t), + Err(stream::Empty) => return Err(Empty), + Err(stream::Disconnected) => return Err(Disconnected), Err(stream::Upgraded(rx)) => rx, } } Shared(ref p) => { match unsafe { (*p.get()).try_recv() } { - Ok(t) => return Data(t), - Err(shared::Empty) => return Empty, - Err(shared::Disconnected) => return Disconnected, + Ok(t) => return Ok(t), + Err(shared::Empty) => return Err(Empty), + Err(shared::Disconnected) => return Err(Disconnected), } } Sync(ref p) => { match unsafe { (*p.get()).try_recv() } { - Ok(t) => return Data(t), - Err(sync::Empty) => return Empty, - Err(sync::Disconnected) => return Disconnected, + Ok(t) => return Ok(t), + Err(sync::Empty) => return Err(Empty), + Err(sync::Disconnected) => return Err(Disconnected), } } }; @@ -741,32 +752,32 @@ impl Receiver { /// In other words, this function has the same semantics as the `recv` /// method except for the failure aspect. /// - /// If the channel has hung up, then `None` is returned. Otherwise `Some` of + /// If the channel has hung up, then `Err` is returned. Otherwise `Ok` of /// the value found on the receiver is returned. - pub fn recv_opt(&self) -> Option { + pub fn recv_opt(&self) -> Result { loop { let mut new_port = match self.inner { Oneshot(ref p) => { match unsafe { (*p.get()).recv() } { - Ok(t) => return Some(t), + Ok(t) => return Ok(t), Err(oneshot::Empty) => return unreachable!(), - Err(oneshot::Disconnected) => return None, + Err(oneshot::Disconnected) => return Err(()), Err(oneshot::Upgraded(rx)) => rx, } } Stream(ref p) => { match unsafe { (*p.get()).recv() } { - Ok(t) => return Some(t), + Ok(t) => return Ok(t), Err(stream::Empty) => return unreachable!(), - Err(stream::Disconnected) => return None, + Err(stream::Disconnected) => return Err(()), Err(stream::Upgraded(rx)) => rx, } } Shared(ref p) => { match unsafe { (*p.get()).recv() } { - Ok(t) => return Some(t), + Ok(t) => return Ok(t), Err(shared::Empty) => return unreachable!(), - Err(shared::Disconnected) => return None, + Err(shared::Disconnected) => return Err(()), } } Sync(ref p) => return unsafe { (*p.get()).recv() } @@ -873,7 +884,7 @@ impl select::Packet for Receiver { } impl<'a, T: Send> Iterator for Messages<'a, T> { - fn next(&mut self) -> Option { self.rx.recv_opt() } + fn next(&mut self) -> Option { self.rx.recv_opt().ok() } } #[unsafe_destructor] @@ -1022,7 +1033,7 @@ mod test { assert_eq!(rx.recv(), 1); } match rx.try_recv() { - Data(..) => fail!(), + Ok(..) => fail!(), _ => {} } dtx.send(()); @@ -1136,45 +1147,45 @@ mod test { test!(fn oneshot_single_thread_try_send_open() { let (tx, rx) = channel::(); - assert!(tx.try_send(10)); + assert!(tx.send_opt(10).is_ok()); assert!(rx.recv() == 10); }) test!(fn oneshot_single_thread_try_send_closed() { let (tx, rx) = channel::(); drop(rx); - assert!(!tx.try_send(10)); + assert!(tx.send_opt(10).is_err()); }) test!(fn oneshot_single_thread_try_recv_open() { let (tx, rx) = channel::(); tx.send(10); - assert!(rx.recv_opt() == Some(10)); + assert!(rx.recv_opt() == Ok(10)); }) test!(fn oneshot_single_thread_try_recv_closed() { let (tx, rx) = channel::(); drop(tx); - assert!(rx.recv_opt() == None); + assert!(rx.recv_opt() == Err(())); }) test!(fn oneshot_single_thread_peek_data() { let (tx, rx) = channel::(); - assert_eq!(rx.try_recv(), Empty) + assert_eq!(rx.try_recv(), Err(Empty)) tx.send(10); - assert_eq!(rx.try_recv(), Data(10)); + assert_eq!(rx.try_recv(), Ok(10)); }) test!(fn oneshot_single_thread_peek_close() { let (tx, rx) = channel::(); drop(tx); - assert_eq!(rx.try_recv(), Disconnected); - assert_eq!(rx.try_recv(), Disconnected); + assert_eq!(rx.try_recv(), Err(Disconnected)); + assert_eq!(rx.try_recv(), Err(Disconnected)); }) test!(fn oneshot_single_thread_peek_open() { let (_tx, rx) = channel::(); - assert_eq!(rx.try_recv(), Empty); + assert_eq!(rx.try_recv(), Err(Empty)); }) test!(fn oneshot_multi_task_recv_then_send() { @@ -1335,7 +1346,7 @@ mod test { tx.send(2); tx.send(2); tx.send(2); - tx.try_send(2); + let _ = tx.send_opt(2); drop(tx); assert_eq!(count_rx.recv(), 4); }) @@ -1353,14 +1364,14 @@ mod test { tx3.send(()); }); - assert_eq!(rx1.try_recv(), Empty); + assert_eq!(rx1.try_recv(), Err(Empty)); tx2.send(()); rx3.recv(); - assert_eq!(rx1.try_recv(), Data(1)); - assert_eq!(rx1.try_recv(), Empty); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(Empty)); tx2.send(()); rx3.recv(); - assert_eq!(rx1.try_recv(), Disconnected); + assert_eq!(rx1.try_recv(), Err(Disconnected)); }) // This bug used to end up in a livelock inside of the Receiver destructor @@ -1409,9 +1420,9 @@ mod test { let mut hits = 0; while hits < 10 { match rx.try_recv() { - Data(()) => { hits += 1; } - Empty => { Thread::yield_now(); } - Disconnected => return, + Ok(()) => { hits += 1; } + Err(Empty) => { Thread::yield_now(); } + Err(Disconnected) => return, } } cdone.send(()); @@ -1542,7 +1553,7 @@ mod sync_tests { assert_eq!(rx.recv(), 1); } match rx.try_recv() { - Data(..) => fail!(), + Ok(..) => fail!(), _ => {} } dtx.send(()); @@ -1596,50 +1607,50 @@ mod sync_tests { test!(fn oneshot_single_thread_try_send_open() { let (tx, rx) = sync_channel::(1); - assert_eq!(tx.try_send(10), Sent); + assert_eq!(tx.try_send(10), Ok(())); assert!(rx.recv() == 10); }) test!(fn oneshot_single_thread_try_send_closed() { let (tx, rx) = sync_channel::(0); drop(rx); - assert_eq!(tx.try_send(10), RecvDisconnected(10)); + assert_eq!(tx.try_send(10), Err(RecvDisconnected(10))); }) test!(fn oneshot_single_thread_try_send_closed2() { let (tx, _rx) = sync_channel::(0); - assert_eq!(tx.try_send(10), Full(10)); + assert_eq!(tx.try_send(10), Err(Full(10))); }) test!(fn oneshot_single_thread_try_recv_open() { let (tx, rx) = sync_channel::(1); tx.send(10); - assert!(rx.recv_opt() == Some(10)); + assert!(rx.recv_opt() == Ok(10)); }) test!(fn oneshot_single_thread_try_recv_closed() { let (tx, rx) = sync_channel::(0); drop(tx); - assert!(rx.recv_opt() == None); + assert!(rx.recv_opt() == Err(())); }) test!(fn oneshot_single_thread_peek_data() { let (tx, rx) = sync_channel::(1); - assert_eq!(rx.try_recv(), Empty) + assert_eq!(rx.try_recv(), Err(Empty)) tx.send(10); - assert_eq!(rx.try_recv(), Data(10)); + assert_eq!(rx.try_recv(), Ok(10)); }) test!(fn oneshot_single_thread_peek_close() { let (tx, rx) = sync_channel::(0); drop(tx); - assert_eq!(rx.try_recv(), Disconnected); - assert_eq!(rx.try_recv(), Disconnected); + assert_eq!(rx.try_recv(), Err(Disconnected)); + assert_eq!(rx.try_recv(), Err(Disconnected)); }) test!(fn oneshot_single_thread_peek_open() { let (_tx, rx) = sync_channel::(0); - assert_eq!(rx.try_recv(), Empty); + assert_eq!(rx.try_recv(), Err(Empty)); }) test!(fn oneshot_multi_task_recv_then_send() { @@ -1800,7 +1811,7 @@ mod sync_tests { tx.send(2); tx.send(2); tx.send(2); - tx.try_send(2); + let _ = tx.try_send(2); drop(tx); assert_eq!(count_rx.recv(), 4); }) @@ -1818,14 +1829,14 @@ mod sync_tests { tx3.send(()); }); - assert_eq!(rx1.try_recv(), Empty); + assert_eq!(rx1.try_recv(), Err(Empty)); tx2.send(()); rx3.recv(); - assert_eq!(rx1.try_recv(), Data(1)); - assert_eq!(rx1.try_recv(), Empty); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(Empty)); tx2.send(()); rx3.recv(); - assert_eq!(rx1.try_recv(), Disconnected); + assert_eq!(rx1.try_recv(), Err(Disconnected)); }) // This bug used to end up in a livelock inside of the Receiver destructor @@ -1859,9 +1870,9 @@ mod sync_tests { let mut hits = 0; while hits < 10 { match rx.try_recv() { - Data(()) => { hits += 1; } - Empty => { Thread::yield_now(); } - Disconnected => return, + Ok(()) => { hits += 1; } + Err(Empty) => { Thread::yield_now(); } + Err(Disconnected) => return, } } cdone.send(()); @@ -1876,20 +1887,20 @@ mod sync_tests { test!(fn send_opt1() { let (tx, rx) = sync_channel(0); spawn(proc() { rx.recv(); }); - assert_eq!(tx.send_opt(1), None); + assert_eq!(tx.send_opt(1), Ok(())); }) test!(fn send_opt2() { let (tx, rx) = sync_channel(0); spawn(proc() { drop(rx); }); - assert_eq!(tx.send_opt(1), Some(1)); + assert_eq!(tx.send_opt(1), Err(1)); }) test!(fn send_opt3() { let (tx, rx) = sync_channel(1); - assert_eq!(tx.send_opt(1), None); + assert_eq!(tx.send_opt(1), Ok(())); spawn(proc() { drop(rx); }); - assert_eq!(tx.send_opt(1), Some(1)); + assert_eq!(tx.send_opt(1), Err(1)); }) test!(fn send_opt4() { @@ -1898,11 +1909,11 @@ mod sync_tests { let (done, donerx) = channel(); let done2 = done.clone(); spawn(proc() { - assert_eq!(tx.send_opt(1), Some(1)); + assert_eq!(tx.send_opt(1), Err(1)); done.send(()); }); spawn(proc() { - assert_eq!(tx2.send_opt(2), Some(2)); + assert_eq!(tx2.send_opt(2), Err(2)); done2.send(()); }); drop(rx); @@ -1912,27 +1923,27 @@ mod sync_tests { test!(fn try_send1() { let (tx, _rx) = sync_channel(0); - assert_eq!(tx.try_send(1), Full(1)); + assert_eq!(tx.try_send(1), Err(Full(1))); }) test!(fn try_send2() { let (tx, _rx) = sync_channel(1); - assert_eq!(tx.try_send(1), Sent); - assert_eq!(tx.try_send(1), Full(1)); + assert_eq!(tx.try_send(1), Ok(())); + assert_eq!(tx.try_send(1), Err(Full(1))); }) test!(fn try_send3() { let (tx, rx) = sync_channel(1); - assert_eq!(tx.try_send(1), Sent); + assert_eq!(tx.try_send(1), Ok(())); drop(rx); - assert_eq!(tx.try_send(1), RecvDisconnected(1)); + assert_eq!(tx.try_send(1), Err(RecvDisconnected(1))); }) test!(fn try_send4() { let (tx, rx) = sync_channel(0); spawn(proc() { for _ in range(0, 1000) { task::deschedule(); } - assert_eq!(tx.try_send(1), Sent); + assert_eq!(tx.try_send(1), Ok(())); }); assert_eq!(rx.recv(), 1); } #[ignore(reason = "flaky on libnative")]) diff --git a/src/libstd/comm/oneshot.rs b/src/libstd/comm/oneshot.rs index 1bc7349a70d..e92b5cb272a 100644 --- a/src/libstd/comm/oneshot.rs +++ b/src/libstd/comm/oneshot.rs @@ -90,7 +90,7 @@ impl Packet { } } - pub fn send(&mut self, t: T) -> bool { + pub fn send(&mut self, t: T) -> Result<(), T> { // Sanity check match self.upgrade { NothingSent => {} @@ -102,14 +102,12 @@ impl Packet { match self.state.swap(DATA, atomics::SeqCst) { // Sent the data, no one was waiting - EMPTY => true, + EMPTY => Ok(()), - // Couldn't send the data, the port hung up first. We need to be - // sure to deallocate the sent data (to not leave it stuck in the - // queue) + // Couldn't send the data, the port hung up first. Return the data + // back up the stack. DISCONNECTED => { - self.data.take_unwrap(); - false + Err(self.data.take_unwrap()) } // Not possible, these are one-use channels @@ -121,7 +119,7 @@ impl Packet { n => unsafe { let t = BlockedTask::cast_from_uint(n); t.wake().map(|t| t.reawaken()); - true + Ok(()) } } } diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs index 84191ed6b28..c286fd84849 100644 --- a/src/libstd/comm/select.rs +++ b/src/libstd/comm/select.rs @@ -236,7 +236,7 @@ impl<'rx, T: Send> Handle<'rx, T> { /// Block to receive a value on the underlying receiver, returning `Some` on /// success or `None` if the channel disconnects. This function has the same /// semantics as `Receiver.recv_opt` - pub fn recv_opt(&mut self) -> Option { self.rx.recv_opt() } + pub fn recv_opt(&mut self) -> Result { self.rx.recv_opt() } /// Adds this handle to the receiver set that the handle was created from. This /// method can be called multiple times, but it has no effect if `add` was @@ -338,12 +338,12 @@ mod test { ) drop(tx1); select! ( - foo = rx1.recv_opt() => { assert_eq!(foo, None); }, + foo = rx1.recv_opt() => { assert_eq!(foo, Err(())); }, _bar = rx2.recv() => { fail!() } ) drop(tx2); select! ( - bar = rx2.recv_opt() => { assert_eq!(bar, None); } + bar = rx2.recv_opt() => { assert_eq!(bar, Err(())); } ) }) @@ -370,7 +370,7 @@ mod test { select! ( _a1 = rx1.recv_opt() => { fail!() }, - a2 = rx2.recv_opt() => { assert_eq!(a2, None); } + a2 = rx2.recv_opt() => { assert_eq!(a2, Err(())); } ) }) @@ -392,7 +392,7 @@ mod test { ) tx3.send(1); select! ( - a = rx1.recv_opt() => { assert_eq!(a, None); }, + a = rx1.recv_opt() => { assert_eq!(a, Err(())); }, _b = rx2.recv() => { fail!() } ) }) @@ -417,8 +417,8 @@ mod test { a = rx1.recv() => { assert_eq!(a, 1); }, a = rx2.recv() => { assert_eq!(a, 2); } ) - assert_eq!(rx1.try_recv(), Empty); - assert_eq!(rx2.try_recv(), Empty); + assert_eq!(rx1.try_recv(), Err(Empty)); + assert_eq!(rx2.try_recv(), Err(Empty)); tx3.send(()); }) @@ -456,7 +456,7 @@ mod test { spawn(proc() { rx3.recv(); tx1.clone(); - assert_eq!(rx3.try_recv(), Empty); + assert_eq!(rx3.try_recv(), Err(Empty)); tx1.send(2); rx3.recv(); }); @@ -477,7 +477,7 @@ mod test { spawn(proc() { rx3.recv(); tx1.clone(); - assert_eq!(rx3.try_recv(), Empty); + assert_eq!(rx3.try_recv(), Err(Empty)); tx1.send(2); rx3.recv(); }); diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs index e8ba9d6e628..525786f5d1e 100644 --- a/src/libstd/comm/shared.rs +++ b/src/libstd/comm/shared.rs @@ -131,9 +131,9 @@ impl Packet { unsafe { self.select_lock.unlock_noguard() } } - pub fn send(&mut self, t: T) -> bool { + pub fn send(&mut self, t: T) -> Result<(), T> { // See Port::drop for what's going on - if self.port_dropped.load(atomics::SeqCst) { return false } + if self.port_dropped.load(atomics::SeqCst) { return Err(t) } // Note that the multiple sender case is a little tricker // semantically than the single sender case. The logic for @@ -161,7 +161,7 @@ impl Packet { // received". Once we get beyond this check, we have permanently // entered the realm of "this may be received" if self.cnt.load(atomics::SeqCst) < DISCONNECTED + FUDGE { - return false + return Err(t) } self.queue.push(t); @@ -213,7 +213,7 @@ impl Packet { _ => {} } - true + Ok(()) } pub fn recv(&mut self) -> Result { diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs index 5820b13a35f..6c9280e0abc 100644 --- a/src/libstd/comm/stream.rs +++ b/src/libstd/comm/stream.rs @@ -87,25 +87,27 @@ impl Packet { } - pub fn send(&mut self, t: T) -> bool { + pub fn send(&mut self, t: T) -> Result<(), T> { + // If the other port has deterministically gone away, then definitely + // must return the data back up the stack. Otherwise, the data is + // considered as being sent. + if self.port_dropped.load(atomics::SeqCst) { return Err(t) } + match self.do_send(Data(t)) { - UpSuccess => true, - UpDisconnected => false, - UpWoke(task) => { - task.wake().map(|t| t.reawaken()); - true - } + UpSuccess | UpDisconnected => {}, + UpWoke(task) => { task.wake().map(|t| t.reawaken()); } } + Ok(()) } pub fn upgrade(&mut self, up: Receiver) -> UpgradeResult { + // If the port has gone away, then there's no need to proceed any + // further. + if self.port_dropped.load(atomics::SeqCst) { return UpDisconnected } + self.do_send(GoUp(up)) } fn do_send(&mut self, t: Message) -> UpgradeResult { - // Use an acquire/release ordering to maintain the same position with - // respect to the atomic loads below - if self.port_dropped.load(atomics::SeqCst) { return UpDisconnected } - self.queue.push(t); match self.cnt.fetch_add(1, atomics::SeqCst) { // As described in the mod's doc comment, -1 == wakeup diff --git a/src/libstd/comm/sync.rs b/src/libstd/comm/sync.rs index b3591dad274..6228c4c682b 100644 --- a/src/libstd/comm/sync.rs +++ b/src/libstd/comm/sync.rs @@ -201,22 +201,22 @@ impl Packet { } } - pub fn try_send(&self, t: T) -> super::TrySendResult { + pub fn try_send(&self, t: T) -> Result<(), super::TrySendError> { let (guard, state) = self.lock(); if state.disconnected { - super::RecvDisconnected(t) + Err(super::RecvDisconnected(t)) } else if state.buf.size() == state.buf.cap() { - super::Full(t) + Err(super::Full(t)) } else if state.cap == 0 { // With capacity 0, even though we have buffer space we can't // transfer the data unless there's a receiver waiting. match mem::replace(&mut state.blocker, NoneBlocked) { - NoneBlocked => super::Full(t), + NoneBlocked => Err(super::Full(t)), BlockedSender(..) => unreachable!(), BlockedReceiver(task) => { state.buf.enqueue(t); wakeup(task, guard); - super::Sent + Ok(()) } } } else { @@ -224,7 +224,7 @@ impl Packet { // just enqueue the data for later retrieval. assert!(state.buf.size() < state.buf.cap()); state.buf.enqueue(t); - super::Sent + Ok(()) } } @@ -232,7 +232,7 @@ impl Packet { // // When reading this, remember that there can only ever be one receiver at // time. - pub fn recv(&self) -> Option { + pub fn recv(&self) -> Result { let (guard, state) = self.lock(); // Wait for the buffer to have something in it. No need for a while loop @@ -242,13 +242,13 @@ impl Packet { wait(&mut state.blocker, BlockedReceiver, &self.lock); waited = true; } - if state.disconnected && state.buf.size() == 0 { return None } + if state.disconnected && state.buf.size() == 0 { return Err(()) } // Pick up the data, wake up our neighbors, and carry on assert!(state.buf.size() > 0); let ret = state.buf.dequeue(); self.wakeup_senders(waited, guard, state); - return Some(ret); + return Ok(ret); } pub fn try_recv(&self) -> Result { diff --git a/src/libstd/io/comm_adapters.rs b/src/libstd/io/comm_adapters.rs index 06e02072135..aa7371944da 100644 --- a/src/libstd/io/comm_adapters.rs +++ b/src/libstd/io/comm_adapters.rs @@ -73,7 +73,7 @@ impl Reader for ChanReader { break; } self.pos = 0; - self.buf = self.rx.recv_opt(); + self.buf = self.rx.recv_opt().ok(); self.closed = self.buf.is_none(); } if self.closed && num_read == 0 { @@ -116,15 +116,13 @@ impl Clone for ChanWriter { impl Writer for ChanWriter { fn write(&mut self, buf: &[u8]) -> IoResult<()> { - if !self.tx.try_send(buf.to_owned()) { - Err(io::IoError { + self.tx.send_opt(buf.to_owned()).map_err(|_| { + io::IoError { kind: io::BrokenPipe, desc: "Pipe closed", detail: None - }) - } else { - Ok(()) - } + } + }) } } diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs index 8dd59e859b8..cd2c81d284a 100644 --- a/src/libstd/io/net/udp.rs +++ b/src/libstd/io/net/udp.rs @@ -422,13 +422,13 @@ mod test { spawn(proc() { let mut sock3 = sock3; match sock3.sendto([1], addr2) { - Ok(..) => { let _ = tx2.try_send(()); } + Ok(..) => { let _ = tx2.send_opt(()); } Err(..) => {} } done.send(()); }); match sock1.sendto([2], addr2) { - Ok(..) => { let _ = tx.try_send(()); } + Ok(..) => { let _ = tx.send_opt(()); } Err(..) => {} } drop(tx); diff --git a/src/libstd/io/signal.rs b/src/libstd/io/signal.rs index 00b2e4f2307..d3f3d888b87 100644 --- a/src/libstd/io/signal.rs +++ b/src/libstd/io/signal.rs @@ -149,6 +149,7 @@ impl Listener { #[cfg(test, unix)] mod test_unix { + use prelude::*; use libc; use comm::Empty; use io::timer; @@ -199,7 +200,7 @@ mod test_unix { s2.unregister(Interrupt); sigint(); timer::sleep(10); - assert_eq!(s2.rx.try_recv(), Empty); + assert_eq!(s2.rx.try_recv(), Err(Empty)); } } diff --git a/src/libstd/io/timer.rs b/src/libstd/io/timer.rs index 839fcab8f86..1ca36df968c 100644 --- a/src/libstd/io/timer.rs +++ b/src/libstd/io/timer.rs @@ -137,7 +137,7 @@ mod test { let rx1 = timer.oneshot(10000); let rx = timer.oneshot(1); rx.recv(); - assert_eq!(rx1.recv_opt(), None); + assert_eq!(rx1.recv_opt(), Err(())); }) iotest!(fn test_io_timer_oneshot_then_sleep() { @@ -145,7 +145,7 @@ mod test { let rx = timer.oneshot(100000000000); timer.sleep(1); // this should inalidate rx - assert_eq!(rx.recv_opt(), None); + assert_eq!(rx.recv_opt(), Err(())); }) iotest!(fn test_io_timer_sleep_periodic() { @@ -170,11 +170,11 @@ mod test { let rx = timer.oneshot(1); rx.recv(); - assert!(rx.recv_opt().is_none()); + assert!(rx.recv_opt().is_err()); let rx = timer.oneshot(1); rx.recv(); - assert!(rx.recv_opt().is_none()); + assert!(rx.recv_opt().is_err()); }) iotest!(fn override() { @@ -182,8 +182,8 @@ mod test { let orx = timer.oneshot(100); let prx = timer.periodic(100); timer.sleep(1); - assert_eq!(orx.recv_opt(), None); - assert_eq!(prx.recv_opt(), None); + assert_eq!(orx.recv_opt(), Err(())); + assert_eq!(prx.recv_opt(), Err(())); timer.oneshot(1).recv(); }) @@ -226,7 +226,7 @@ mod test { let timer_rx = timer.periodic(1000); spawn(proc() { - timer_rx.recv_opt(); + let _ = timer_rx.recv_opt(); }); // when we drop the TimerWatcher we're going to destroy the channel, @@ -239,7 +239,7 @@ mod test { let timer_rx = timer.periodic(1000); spawn(proc() { - timer_rx.recv_opt(); + let _ = timer_rx.recv_opt(); }); timer.oneshot(1); @@ -251,7 +251,7 @@ mod test { let timer_rx = timer.periodic(1000); spawn(proc() { - timer_rx.recv_opt(); + let _ = timer_rx.recv_opt(); }); timer.sleep(1); @@ -262,7 +262,7 @@ mod test { let mut timer = Timer::new().unwrap(); timer.oneshot(1000) }; - assert_eq!(rx.recv_opt(), None); + assert_eq!(rx.recv_opt(), Err(())); }) iotest!(fn sender_goes_away_period() { @@ -270,7 +270,7 @@ mod test { let mut timer = Timer::new().unwrap(); timer.periodic(1000) }; - assert_eq!(rx.recv_opt(), None); + assert_eq!(rx.recv_opt(), Err(())); }) iotest!(fn receiver_goes_away_oneshot() { diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index bae20d3bb9b..a112ed77f09 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -385,7 +385,7 @@ impl Death { pub fn collect_failure(&mut self, result: TaskResult) { match self.on_exit.take() { Some(Execute(f)) => f(result), - Some(SendMessage(ch)) => { ch.try_send(result); } + Some(SendMessage(ch)) => { let _ = ch.send_opt(result); } None => {} } } diff --git a/src/libsync/comm.rs b/src/libsync/comm.rs index 9e01b16ee9b..13e075501d9 100644 --- a/src/libsync/comm.rs +++ b/src/libsync/comm.rs @@ -37,16 +37,16 @@ impl DuplexStream { pub fn send(&self, x: S) { self.tx.send(x) } - pub fn try_send(&self, x: S) -> bool { - self.tx.try_send(x) + pub fn send_opt(&self, x: S) -> Result<(), S> { + self.tx.send_opt(x) } pub fn recv(&self) -> R { self.rx.recv() } - pub fn try_recv(&self) -> comm::TryRecvResult { + pub fn try_recv(&self) -> Result { self.rx.try_recv() } - pub fn recv_opt(&self) -> Option { + pub fn recv_opt(&self) -> Result { self.rx.recv_opt() } } diff --git a/src/libsync/lock.rs b/src/libsync/lock.rs index b83bdf9df29..911cd1d2eb1 100644 --- a/src/libsync/lock.rs +++ b/src/libsync/lock.rs @@ -800,7 +800,7 @@ mod tests { // At this point, all spawned tasks should be blocked, // so we shouldn't get anything from the port assert!(match rx.try_recv() { - Empty => true, + Err(Empty) => true, _ => false, }); diff --git a/src/libsync/raw.rs b/src/libsync/raw.rs index 9bb7a81a2ff..eb90797395e 100644 --- a/src/libsync/raw.rs +++ b/src/libsync/raw.rs @@ -16,7 +16,6 @@ //! containing data. use std::cast; -use std::comm; use std::kinds::marker; use std::mem::replace; use std::sync::atomics; @@ -46,10 +45,10 @@ impl WaitQueue { // Signals one live task from the queue. fn signal(&self) -> bool { match self.head.try_recv() { - comm::Data(ch) => { + Ok(ch) => { // Send a wakeup signal. If the waiter was killed, its port will // have closed. Keep trying until we get a live task. - if ch.try_send(()) { + if ch.send_opt(()).is_ok() { true } else { self.signal() @@ -63,8 +62,8 @@ impl WaitQueue { let mut count = 0; loop { match self.head.try_recv() { - comm::Data(ch) => { - if ch.try_send(()) { + Ok(ch) => { + if ch.send_opt(()).is_ok() { count += 1; } } @@ -76,7 +75,7 @@ impl WaitQueue { fn wait_end(&self) -> WaitEnd { let (signal_end, wait_end) = channel(); - assert!(self.tail.try_send(signal_end)); + self.tail.send(signal_end); wait_end } } diff --git a/src/test/bench/msgsend-pipes-shared.rs b/src/test/bench/msgsend-pipes-shared.rs index 629b4cbfeea..d2e08cfccf8 100644 --- a/src/test/bench/msgsend-pipes-shared.rs +++ b/src/test/bench/msgsend-pipes-shared.rs @@ -38,12 +38,12 @@ fn server(requests: &Receiver, responses: &Sender) { let mut done = false; while !done { match requests.recv_opt() { - Some(get_count) => { responses.send(count.clone()); } - Some(bytes(b)) => { + Ok(get_count) => { responses.send(count.clone()); } + Ok(bytes(b)) => { //println!("server: received {:?} bytes", b); count += b; } - None => { done = true; } + Err(..) => { done = true; } _ => { } } } diff --git a/src/test/bench/msgsend-pipes.rs b/src/test/bench/msgsend-pipes.rs index 49d9c5d3a2e..dc9b3561bb1 100644 --- a/src/test/bench/msgsend-pipes.rs +++ b/src/test/bench/msgsend-pipes.rs @@ -33,12 +33,12 @@ fn server(requests: &Receiver, responses: &Sender) { let mut done = false; while !done { match requests.recv_opt() { - Some(get_count) => { responses.send(count.clone()); } - Some(bytes(b)) => { + Ok(get_count) => { responses.send(count.clone()); } + Ok(bytes(b)) => { //println!("server: received {:?} bytes", b); count += b; } - None => { done = true; } + Err(..) => { done = true; } _ => { } } } diff --git a/src/test/bench/task-perf-jargon-metal-smoke.rs b/src/test/bench/task-perf-jargon-metal-smoke.rs index f5711d91447..a45f8c61be5 100644 --- a/src/test/bench/task-perf-jargon-metal-smoke.rs +++ b/src/test/bench/task-perf-jargon-metal-smoke.rs @@ -50,7 +50,7 @@ fn main() { let (tx, rx) = channel(); child_generation(from_str::(*args.get(1)).unwrap(), tx); - if rx.recv_opt().is_none() { + if rx.recv_opt().is_err() { fail!("it happened when we slumbered"); } } diff --git a/src/test/run-pass/issue-9396.rs b/src/test/run-pass/issue-9396.rs index 2630057c988..9f08f1db410 100644 --- a/src/test/run-pass/issue-9396.rs +++ b/src/test/run-pass/issue-9396.rs @@ -20,9 +20,9 @@ pub fn main() { }); loop { match rx.try_recv() { - comm::Data(()) => break, - comm::Empty => {} - comm::Disconnected => unreachable!() + Ok(()) => break, + Err(comm::Empty) => {} + Err(comm::Disconnected) => unreachable!() } } }