From 31da6b76986f337483a971884113a043c835102b Mon Sep 17 00:00:00 2001 From: Brendan Zabarauskas Date: Wed, 27 Nov 2013 15:10:12 +1000 Subject: [PATCH] Add an iterator for receiving messages from GenericPorts --- src/libstd/comm.rs | 87 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 84 insertions(+), 3 deletions(-) diff --git a/src/libstd/comm.rs b/src/libstd/comm.rs index 038598a29b8..fcba4a6bbdb 100644 --- a/src/libstd/comm.rs +++ b/src/libstd/comm.rs @@ -15,6 +15,7 @@ Message passing #[allow(missing_doc)]; use clone::Clone; +use iter::Iterator; use kinds::Send; use option::Option; use rtcomm = rt::comm; @@ -43,10 +44,35 @@ pub trait GenericPort { /// Receives a message, or fails if the connection closes. fn recv(&self) -> T; - /** Receives a message, or returns `none` if - the connection is closed or closes. - */ + /// Receives a message, or returns `none` if + /// the connection is closed or closes. fn try_recv(&self) -> Option; + + /// Returns an iterator that breaks once the connection closes. + /// + /// # Example + /// + /// ~~~rust + /// do spawn { + /// for x in port.recv_iter() { + /// if pred(x) { break; } + /// println!("{}", x); + /// } + /// } + /// ~~~ + fn recv_iter<'a>(&'a self) -> RecvIterator<'a, Self> { + RecvIterator { port: self } + } +} + +pub struct RecvIterator<'a, P> { + priv port: &'a P, +} + +impl<'a, T, P: GenericPort> Iterator for RecvIterator<'a, P> { + fn next(&mut self) -> Option { + self.port.try_recv() + } } /// Ports that can `peek` @@ -227,3 +253,58 @@ impl Clone for SharedPort { SharedPort { x: p.clone() } } } + +#[cfg(test)] +mod tests { + use comm::*; + use prelude::*; + + #[test] + fn test_nested_recv_iter() { + let (port, chan) = stream::(); + let (total_port, total_chan) = oneshot::(); + + do spawn { + let mut acc = 0; + for x in port.recv_iter() { + acc += x; + for x in port.recv_iter() { + acc += x; + for x in port.try_recv().move_iter() { + acc += x; + total_chan.send(acc); + } + } + } + } + + chan.send(3); + chan.send(1); + chan.send(2); + assert_eq!(total_port.recv(), 6); + } + + #[test] + fn test_recv_iter_break() { + let (port, chan) = stream::(); + let (count_port, count_chan) = oneshot::(); + + do spawn { + let mut count = 0; + for x in port.recv_iter() { + if count >= 3 { + count_chan.send(count); + break; + } else { + count += x; + } + } + } + + chan.send(2); + chan.send(2); + chan.send(2); + chan.send(2); + assert_eq!(count_port.recv(), 4); + } +}