From 110168de2a7b529a7c4839ca1e19c4c42f68be12 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Fri, 11 Jul 2014 14:29:15 -0700 Subject: [PATCH] native: Implement clone/close_accept for unix This commits implements {Tcp,Unix}Acceptor::{clone,close_accept} methods for unix. A windows implementation is coming in a later commit. The clone implementation is based on atomic reference counting (as with all other clones), and the close_accept implementation is based on selecting on a self-pipe which signals that a close has been seen. --- src/libnative/io/net.rs | 95 ++++++++++++++---- src/libnative/io/pipe_unix.rs | 88 +++++++++++++---- src/libnative/io/pipe_windows.rs | 6 +- src/libnative/io/process.rs | 2 +- src/libnative/io/util.rs | 13 ++- src/librustrt/rtio.rs | 4 + src/libstd/io/net/tcp.rs | 132 +++++++++++++++++++++++++ src/libstd/io/net/unix.rs | 95 ++++++++++++++++++ src/test/run-pass/tcp-accept-stress.rs | 94 ++++++++++++++++++ 9 files changed, 482 insertions(+), 47 deletions(-) create mode 100644 src/test/run-pass/tcp-accept-stress.rs diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index 2255578ba80..7a8a363a0a3 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -14,10 +14,13 @@ use std::mem; use std::rt::mutex; use std::rt::rtio; use std::rt::rtio::{IoResult, IoError}; +use std::sync::atomics; use super::{retry, keep_going}; use super::c; use super::util; +use super::file::FileDesc; +use super::process; //////////////////////////////////////////////////////////////////////////////// // sockaddr and misc bindings @@ -479,9 +482,26 @@ impl TcpListener { pub fn fd(&self) -> sock_t { self.inner.fd } pub fn native_listen(self, backlog: int) -> IoResult { + try!(util::set_nonblocking(self.fd(), true)); match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { -1 => Err(last_error()), - _ => Ok(TcpAcceptor { listener: self, deadline: 0 }) + + #[cfg(unix)] + _ => { + let (reader, writer) = try!(process::pipe()); + try!(util::set_nonblocking(reader.fd(), true)); + try!(util::set_nonblocking(writer.fd(), true)); + try!(util::set_nonblocking(self.fd(), true)); + Ok(TcpAcceptor { + inner: Arc::new(AcceptorInner { + listener: self, + reader: reader, + writer: writer, + closed: atomics::AtomicBool::new(false), + }), + deadline: 0, + }) + } } } } @@ -502,31 +522,46 @@ impl rtio::RtioSocket for TcpListener { } pub struct TcpAcceptor { - listener: TcpListener, + inner: Arc, deadline: u64, } -impl TcpAcceptor { - pub fn fd(&self) -> sock_t { self.listener.fd() } +#[cfg(unix)] +struct AcceptorInner { + listener: TcpListener, + reader: FileDesc, + writer: FileDesc, + closed: atomics::AtomicBool, +} +impl TcpAcceptor { + pub fn fd(&self) -> sock_t { self.inner.listener.fd() } + + #[cfg(unix)] pub fn native_accept(&mut self) -> IoResult { - if self.deadline != 0 { - try!(util::await(self.fd(), Some(self.deadline), util::Readable)); - } - unsafe { - let mut storage: libc::sockaddr_storage = mem::zeroed(); - let storagep = &mut storage as *mut libc::sockaddr_storage; - let size = mem::size_of::(); - let mut size = size as libc::socklen_t; - match retry(|| { - libc::accept(self.fd(), - storagep as *mut libc::sockaddr, - &mut size as *mut libc::socklen_t) as libc::c_int - }) as sock_t { - -1 => Err(last_error()), - fd => Ok(TcpStream::new(Inner::new(fd))), + let deadline = if self.deadline == 0 {None} else {Some(self.deadline)}; + + while !self.inner.closed.load(atomics::SeqCst) { + unsafe { + let mut storage: libc::sockaddr_storage = mem::zeroed(); + let storagep = &mut storage as *mut libc::sockaddr_storage; + let size = mem::size_of::(); + let mut size = size as libc::socklen_t; + match retry(|| { + libc::accept(self.fd(), + storagep as *mut libc::sockaddr, + &mut size as *mut libc::socklen_t) as libc::c_int + }) as sock_t { + -1 if util::wouldblock() => {} + -1 => return Err(last_error()), + fd => return Ok(TcpStream::new(Inner::new(fd))), + } } + try!(util::await([self.fd(), self.inner.reader.fd()], + deadline, util::Readable)); } + + Err(util::eof()) } } @@ -546,6 +581,24 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { fn set_timeout(&mut self, timeout: Option) { self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); } + + fn clone(&self) -> Box { + box TcpAcceptor { + inner: self.inner.clone(), + deadline: 0, + } as Box + } + + #[cfg(unix)] + fn close_accept(&mut self) -> IoResult<()> { + self.inner.closed.store(true, atomics::SeqCst); + let mut fd = FileDesc::new(self.inner.writer.fd(), false); + match fd.inner_write([0]) { + Ok(..) => Ok(()), + Err(..) if util::wouldblock() => Ok(()), + Err(e) => Err(e), + } + } } //////////////////////////////////////////////////////////////////////////////// @@ -817,7 +870,7 @@ pub fn read(fd: sock_t, // With a timeout, first we wait for the socket to become // readable using select(), specifying the relevant timeout for // our previously set deadline. - try!(util::await(fd, deadline, util::Readable)); + try!(util::await([fd], deadline, util::Readable)); // At this point, we're still within the timeout, and we've // determined that the socket is readable (as returned by @@ -871,7 +924,7 @@ pub fn write(fd: sock_t, while written < buf.len() && (write_everything || written == 0) { // As with read(), first wait for the socket to be ready for // the I/O operation. - match util::await(fd, deadline, util::Writable) { + match util::await([fd], deadline, util::Writable) { Err(ref e) if e.code == libc::EOF as uint && written > 0 => { assert!(deadline.is_some()); return Err(util::short_write(written, "short write")) diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs index 895b8b5929c..4ad8383e6f8 100644 --- a/src/libnative/io/pipe_unix.rs +++ b/src/libnative/io/pipe_unix.rs @@ -15,12 +15,14 @@ use std::mem; use std::rt::mutex; use std::rt::rtio; use std::rt::rtio::{IoResult, IoError}; +use std::sync::atomics; use super::retry; use super::net; use super::util; use super::c; -use super::file::fd_t; +use super::process; +use super::file::{fd_t, FileDesc}; fn unix_socket(ty: libc::c_int) -> IoResult { match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } { @@ -225,7 +227,23 @@ impl UnixListener { pub fn native_listen(self, backlog: int) -> IoResult { match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { -1 => Err(super::last_error()), - _ => Ok(UnixAcceptor { listener: self, deadline: 0 }) + + #[cfg(unix)] + _ => { + let (reader, writer) = try!(process::pipe()); + try!(util::set_nonblocking(reader.fd(), true)); + try!(util::set_nonblocking(writer.fd(), true)); + try!(util::set_nonblocking(self.fd(), true)); + Ok(UnixAcceptor { + inner: Arc::new(AcceptorInner { + listener: self, + reader: reader, + writer: writer, + closed: atomics::AtomicBool::new(false), + }), + deadline: 0, + }) + } } } } @@ -240,29 +258,45 @@ impl rtio::RtioUnixListener for UnixListener { } pub struct UnixAcceptor { - listener: UnixListener, + inner: Arc, deadline: u64, } +#[cfg(unix)] +struct AcceptorInner { + listener: UnixListener, + reader: FileDesc, + writer: FileDesc, + closed: atomics::AtomicBool, +} + impl UnixAcceptor { - fn fd(&self) -> fd_t { self.listener.fd() } + fn fd(&self) -> fd_t { self.inner.listener.fd() } pub fn native_accept(&mut self) -> IoResult { - if self.deadline != 0 { - try!(util::await(self.fd(), Some(self.deadline), util::Readable)); - } - let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() }; - let storagep = &mut storage as *mut libc::sockaddr_storage; - let size = mem::size_of::(); - let mut size = size as libc::socklen_t; - match retry(|| unsafe { - libc::accept(self.fd(), - storagep as *mut libc::sockaddr, - &mut size as *mut libc::socklen_t) as libc::c_int - }) { - -1 => Err(super::last_error()), - fd => Ok(UnixStream::new(Arc::new(Inner::new(fd)))) + let deadline = if self.deadline == 0 {None} else {Some(self.deadline)}; + + while !self.inner.closed.load(atomics::SeqCst) { + unsafe { + let mut storage: libc::sockaddr_storage = mem::zeroed(); + let storagep = &mut storage as *mut libc::sockaddr_storage; + let size = mem::size_of::(); + let mut size = size as libc::socklen_t; + match retry(|| { + libc::accept(self.fd(), + storagep as *mut libc::sockaddr, + &mut size as *mut libc::socklen_t) as libc::c_int + }) { + -1 if util::wouldblock() => {} + -1 => return Err(super::last_error()), + fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))), + } + } + try!(util::await([self.fd(), self.inner.reader.fd()], + deadline, util::Readable)); } + + Err(util::eof()) } } @@ -273,6 +307,24 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor { fn set_timeout(&mut self, timeout: Option) { self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); } + + fn clone(&self) -> Box { + box UnixAcceptor { + inner: self.inner.clone(), + deadline: 0, + } as Box + } + + #[cfg(unix)] + fn close_accept(&mut self) -> IoResult<()> { + self.inner.closed.store(true, atomics::SeqCst); + let mut fd = FileDesc::new(self.inner.writer.fd(), false); + match fd.inner_write([0]) { + Ok(..) => Ok(()), + Err(..) if util::wouldblock() => Ok(()), + Err(e) => Err(e), + } + } } impl Drop for UnixListener { diff --git a/src/libnative/io/pipe_windows.rs b/src/libnative/io/pipe_windows.rs index 717915e5d23..6ad51ee586f 100644 --- a/src/libnative/io/pipe_windows.rs +++ b/src/libnative/io/pipe_windows.rs @@ -99,10 +99,10 @@ use super::c; use super::util; use super::file::to_utf16; -struct Event(libc::HANDLE); +pub struct Event(libc::HANDLE); impl Event { - fn new(manual_reset: bool, initial_state: bool) -> IoResult { + pub fn new(manual_reset: bool, initial_state: bool) -> IoResult { let event = unsafe { libc::CreateEventW(ptr::mut_null(), manual_reset as libc::BOOL, @@ -116,7 +116,7 @@ impl Event { } } - fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle } + pub fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle } } impl Drop for Event { diff --git a/src/libnative/io/process.rs b/src/libnative/io/process.rs index d1b28854157..b8ec0cd5496 100644 --- a/src/libnative/io/process.rs +++ b/src/libnative/io/process.rs @@ -191,7 +191,7 @@ impl Drop for Process { } } -fn pipe() -> IoResult<(file::FileDesc, file::FileDesc)> { +pub fn pipe() -> IoResult<(file::FileDesc, file::FileDesc)> { #[cfg(unix)] use libc::EMFILE as ERROR; #[cfg(windows)] use libc::WSAEMFILE as ERROR; struct Closer { fd: libc::c_int } diff --git a/src/libnative/io/util.rs b/src/libnative/io/util.rs index 356805d91de..aec29bc2d03 100644 --- a/src/libnative/io/util.rs +++ b/src/libnative/io/util.rs @@ -9,6 +9,7 @@ // except according to those terms. use libc; +use std::cmp; use std::mem; use std::os; use std::ptr; @@ -166,10 +167,15 @@ pub fn connect_timeout(fd: net::sock_t, } } -pub fn await(fd: net::sock_t, deadline: Option, +pub fn await(fds: &[net::sock_t], deadline: Option, status: SocketStatus) -> IoResult<()> { let mut set: c::fd_set = unsafe { mem::zeroed() }; - c::fd_set(&mut set, fd); + let mut max = 0; + for &fd in fds.iter() { + c::fd_set(&mut set, fd); + max = cmp::max(max, fd + 1); + } + let (read, write) = match status { Readable => (&mut set as *mut _, ptr::mut_null()), Writable => (ptr::mut_null(), &mut set as *mut _), @@ -188,8 +194,7 @@ pub fn await(fd: net::sock_t, deadline: Option, &mut tv as *mut _ } }; - let n = if cfg!(windows) {1} else {fd as libc::c_int + 1}; - let r = unsafe { c::select(n, read, write, ptr::mut_null(), tvp) }; + let r = unsafe { c::select(max, read, write, ptr::mut_null(), tvp) }; r }) { -1 => Err(last_error()), diff --git a/src/librustrt/rtio.rs b/src/librustrt/rtio.rs index 6525adf07f7..261d544a241 100644 --- a/src/librustrt/rtio.rs +++ b/src/librustrt/rtio.rs @@ -246,6 +246,8 @@ pub trait RtioTcpAcceptor : RtioSocket { fn accept_simultaneously(&mut self) -> IoResult<()>; fn dont_accept_simultaneously(&mut self) -> IoResult<()>; fn set_timeout(&mut self, timeout: Option); + fn clone(&self) -> Box; + fn close_accept(&mut self) -> IoResult<()>; } pub trait RtioTcpStream : RtioSocket { @@ -335,6 +337,8 @@ pub trait RtioUnixListener { pub trait RtioUnixAcceptor { fn accept(&mut self) -> IoResult>; fn set_timeout(&mut self, timeout: Option); + fn clone(&self) -> Box; + fn close_accept(&mut self) -> IoResult<()>; } pub trait RtioTTY { diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index 7055b9d7a47..ebc3940c16f 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -442,6 +442,54 @@ impl TcpAcceptor { #[experimental = "the type of the argument and name of this function are \ subject to change"] pub fn set_timeout(&mut self, ms: Option) { self.obj.set_timeout(ms); } + + /// Closes the accepting capabilities of this acceptor. + /// + /// This function is similar to `TcpStream`'s `close_{read,write}` methods + /// in that it will affect *all* cloned handles of this acceptor's original + /// handle. + /// + /// Once this function succeeds, all future calls to `accept` will return + /// immediately with an error, preventing all future calls to accept. The + /// underlying socket will not be relinquished back to the OS until all + /// acceptors have been deallocated. + /// + /// This is useful for waking up a thread in an accept loop to indicate that + /// it should exit. + /// + /// # Example + /// + /// ``` + /// # #![allow(experimental)] + /// use std::io::TcpListener; + /// use std::io::{Listener, Acceptor, TimedOut}; + /// + /// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap(); + /// let a2 = a.clone(); + /// + /// spawn(proc() { + /// let mut a2 = a2; + /// for socket in a2.incoming() { + /// match socket { + /// Ok(s) => { /* handle s */ } + /// Err(ref e) if e.kind == EndOfFile => break, // closed + /// Err(e) => fail!("unexpected error: {}", e), + /// } + /// } + /// }); + /// + /// # fn wait_for_sigint() {} + /// // Now that our accept loop is running, wait for the program to be + /// // requested to exit. + /// wait_for_sigint(); + /// + /// // Signal our accept loop to exit + /// assert!(a.close_accept().is_ok()); + /// ``` + #[experimental] + pub fn close_accept(&mut self) -> IoResult<()> { + self.obj.close_accept().map_err(IoError::from_rtio_error) + } } impl Acceptor for TcpAcceptor { @@ -453,6 +501,25 @@ impl Acceptor for TcpAcceptor { } } +impl Clone for TcpAcceptor { + /// Creates a new handle to this TCP acceptor, allowing for simultaneous + /// accepts. + /// + /// The underlying TCP acceptor will not be closed until all handles to the + /// acceptor have been deallocated. Incoming connections will be received on + /// at most once acceptor, the same connection will not be accepted twice. + /// + /// The `close_accept` method will shut down *all* acceptors cloned from the + /// same original acceptor, whereas the `set_timeout` method only affects + /// the selector that it is called on. + /// + /// This function is useful for creating a handle to invoke `close_accept` + /// on to wake up any other task blocked in `accept`. + fn clone(&self) -> TcpAcceptor { + TcpAcceptor { obj: self.obj.clone() } + } +} + #[cfg(test)] #[allow(experimental)] mod test { @@ -1411,4 +1478,69 @@ mod test { rxdone.recv(); rxdone.recv(); }) + + iotest!(fn clone_accept_smoke() { + let addr = next_test_ip4(); + let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port); + let mut a = l.listen().unwrap(); + let mut a2 = a.clone(); + + spawn(proc() { + let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port); + }); + spawn(proc() { + let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port); + }); + + assert!(a.accept().is_ok()); + assert!(a2.accept().is_ok()); + }) + + iotest!(fn clone_accept_concurrent() { + let addr = next_test_ip4(); + let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port); + let a = l.listen().unwrap(); + let a2 = a.clone(); + + let (tx, rx) = channel(); + let tx2 = tx.clone(); + + spawn(proc() { let mut a = a; tx.send(a.accept()) }); + spawn(proc() { let mut a = a2; tx2.send(a.accept()) }); + + spawn(proc() { + let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port); + }); + spawn(proc() { + let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port); + }); + + assert!(rx.recv().is_ok()); + assert!(rx.recv().is_ok()); + }) + + iotest!(fn close_accept_smoke() { + let addr = next_test_ip4(); + let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port); + let mut a = l.listen().unwrap(); + + a.close_accept().unwrap(); + assert_eq!(a.accept().err().unwrap().kind, EndOfFile); + }) + + iotest!(fn close_accept_concurrent() { + let addr = next_test_ip4(); + let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port); + let a = l.listen().unwrap(); + let mut a2 = a.clone(); + + let (tx, rx) = channel(); + spawn(proc() { + let mut a = a; + tx.send(a.accept()); + }); + a2.close_accept().unwrap(); + + assert_eq!(rx.recv().err().unwrap().kind, EndOfFile); + }) } diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index eb251075418..74f024a844e 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -212,6 +212,15 @@ impl UnixAcceptor { pub fn set_timeout(&mut self, timeout_ms: Option) { self.obj.set_timeout(timeout_ms) } + + /// Closes the accepting capabilities of this acceptor. + /// + /// This function has the same semantics as `TcpAcceptor::close_accept`, and + /// more information can be found in that documentation. + #[experimental] + pub fn close_accept(&mut self) -> IoResult<()> { + self.obj.close_accept().map_err(IoError::from_rtio_error) + } } impl Acceptor for UnixAcceptor { @@ -222,6 +231,25 @@ impl Acceptor for UnixAcceptor { } } +impl Clone for UnixAcceptor { + /// Creates a new handle to this unix acceptor, allowing for simultaneous + /// accepts. + /// + /// The underlying unix acceptor will not be closed until all handles to the + /// acceptor have been deallocated. Incoming connections will be received on + /// at most once acceptor, the same connection will not be accepted twice. + /// + /// The `close_accept` method will shut down *all* acceptors cloned from the + /// same original acceptor, whereas the `set_timeout` method only affects + /// the selector that it is called on. + /// + /// This function is useful for creating a handle to invoke `close_accept` + /// on to wake up any other task blocked in `accept`. + fn clone(&self) -> UnixAcceptor { + UnixAcceptor { obj: self.obj.clone() } + } +} + #[cfg(test)] #[allow(experimental)] mod tests { @@ -702,4 +730,71 @@ mod tests { rx2.recv(); }) + + iotest!(fn clone_accept_smoke() { + let addr = next_test_unix(); + let l = UnixListener::bind(&addr); + let mut a = l.listen().unwrap(); + let mut a2 = a.clone(); + + let addr2 = addr.clone(); + spawn(proc() { + let _ = UnixStream::connect(&addr2); + }); + spawn(proc() { + let _ = UnixStream::connect(&addr); + }); + + assert!(a.accept().is_ok()); + assert!(a2.accept().is_ok()); + }) + + iotest!(fn clone_accept_concurrent() { + let addr = next_test_unix(); + let l = UnixListener::bind(&addr); + let a = l.listen().unwrap(); + let a2 = a.clone(); + + let (tx, rx) = channel(); + let tx2 = tx.clone(); + + spawn(proc() { let mut a = a; tx.send(a.accept()) }); + spawn(proc() { let mut a = a2; tx2.send(a.accept()) }); + + let addr2 = addr.clone(); + spawn(proc() { + let _ = UnixStream::connect(&addr2); + }); + spawn(proc() { + let _ = UnixStream::connect(&addr); + }); + + assert!(rx.recv().is_ok()); + assert!(rx.recv().is_ok()); + }) + + iotest!(fn close_accept_smoke() { + let addr = next_test_unix(); + let l = UnixListener::bind(&addr); + let mut a = l.listen().unwrap(); + + a.close_accept().unwrap(); + assert_eq!(a.accept().err().unwrap().kind, EndOfFile); + }) + + iotest!(fn close_accept_concurrent() { + let addr = next_test_unix(); + let l = UnixListener::bind(&addr); + let a = l.listen().unwrap(); + let mut a2 = a.clone(); + + let (tx, rx) = channel(); + spawn(proc() { + let mut a = a; + tx.send(a.accept()); + }); + a2.close_accept().unwrap(); + + assert_eq!(rx.recv().err().unwrap().kind, EndOfFile); + }) } diff --git a/src/test/run-pass/tcp-accept-stress.rs b/src/test/run-pass/tcp-accept-stress.rs new file mode 100644 index 00000000000..3e420e45cfc --- /dev/null +++ b/src/test/run-pass/tcp-accept-stress.rs @@ -0,0 +1,94 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +#![feature(phase)] + +#[phase(plugin)] +extern crate green; +extern crate native; + +use std::io::{TcpListener, Listener, Acceptor, EndOfFile, TcpStream}; +use std::sync::{atomics, Arc}; +use std::task::TaskBuilder; +use native::NativeTaskBuilder; + +static N: uint = 8; +static M: uint = 100; + +green_start!(main) + +fn main() { + test(); + + let (tx, rx) = channel(); + TaskBuilder::new().native().spawn(proc() { + tx.send(test()); + }); + rx.recv(); +} + +fn test() { + let mut l = TcpListener::bind("127.0.0.1", 0).unwrap(); + let addr = l.socket_name().unwrap(); + let mut a = l.listen().unwrap(); + let cnt = Arc::new(atomics::AtomicUint::new(0)); + + let (tx, rx) = channel(); + for _ in range(0, N) { + let a = a.clone(); + let cnt = cnt.clone(); + let tx = tx.clone(); + spawn(proc() { + let mut a = a; + let mut mycnt = 0u; + loop { + match a.accept() { + Ok(..) => { + mycnt += 1; + if cnt.fetch_add(1, atomics::SeqCst) == N * M - 1 { + break + } + } + Err(ref e) if e.kind == EndOfFile => break, + Err(e) => fail!("{}", e), + } + } + assert!(mycnt > 0); + tx.send(()); + }); + } + + for _ in range(0, N) { + let tx = tx.clone(); + spawn(proc() { + for _ in range(0, M) { + let _s = TcpStream::connect(addr.ip.to_string().as_slice(), + addr.port).unwrap(); + } + tx.send(()); + }); + } + + // wait for senders + assert_eq!(rx.iter().take(N).count(), N); + + // wait for one acceptor to die + let _ = rx.recv(); + + // Notify other receivers should die + a.close_accept().unwrap(); + + // wait for receivers + assert_eq!(rx.iter().take(N - 1).count(), N - 1); + + // Everything should have been accepted. + assert_eq!(cnt.load(atomics::SeqCst), N * M); +} +