native: Implement clone/close_accept for unix

This commits implements {Tcp,Unix}Acceptor::{clone,close_accept} methods for
unix. A windows implementation is coming in a later commit.

The clone implementation is based on atomic reference counting (as with all
other clones), and the close_accept implementation is based on selecting on a
self-pipe which signals that a close has been seen.
This commit is contained in:
Alex Crichton 2014-07-11 14:29:15 -07:00
parent 6d9b219e6f
commit 110168de2a
9 changed files with 482 additions and 47 deletions

View File

@ -14,10 +14,13 @@ use std::mem;
use std::rt::mutex;
use std::rt::rtio;
use std::rt::rtio::{IoResult, IoError};
use std::sync::atomics;
use super::{retry, keep_going};
use super::c;
use super::util;
use super::file::FileDesc;
use super::process;
////////////////////////////////////////////////////////////////////////////////
// sockaddr and misc bindings
@ -479,9 +482,26 @@ impl TcpListener {
pub fn fd(&self) -> sock_t { self.inner.fd }
pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
try!(util::set_nonblocking(self.fd(), true));
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
-1 => Err(last_error()),
_ => Ok(TcpAcceptor { listener: self, deadline: 0 })
#[cfg(unix)]
_ => {
let (reader, writer) = try!(process::pipe());
try!(util::set_nonblocking(reader.fd(), true));
try!(util::set_nonblocking(writer.fd(), true));
try!(util::set_nonblocking(self.fd(), true));
Ok(TcpAcceptor {
inner: Arc::new(AcceptorInner {
listener: self,
reader: reader,
writer: writer,
closed: atomics::AtomicBool::new(false),
}),
deadline: 0,
})
}
}
}
}
@ -502,31 +522,46 @@ impl rtio::RtioSocket for TcpListener {
}
pub struct TcpAcceptor {
listener: TcpListener,
inner: Arc<AcceptorInner>,
deadline: u64,
}
impl TcpAcceptor {
pub fn fd(&self) -> sock_t { self.listener.fd() }
#[cfg(unix)]
struct AcceptorInner {
listener: TcpListener,
reader: FileDesc,
writer: FileDesc,
closed: atomics::AtomicBool,
}
impl TcpAcceptor {
pub fn fd(&self) -> sock_t { self.inner.listener.fd() }
#[cfg(unix)]
pub fn native_accept(&mut self) -> IoResult<TcpStream> {
if self.deadline != 0 {
try!(util::await(self.fd(), Some(self.deadline), util::Readable));
}
unsafe {
let mut storage: libc::sockaddr_storage = mem::zeroed();
let storagep = &mut storage as *mut libc::sockaddr_storage;
let size = mem::size_of::<libc::sockaddr_storage>();
let mut size = size as libc::socklen_t;
match retry(|| {
libc::accept(self.fd(),
storagep as *mut libc::sockaddr,
&mut size as *mut libc::socklen_t) as libc::c_int
}) as sock_t {
-1 => Err(last_error()),
fd => Ok(TcpStream::new(Inner::new(fd))),
let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
while !self.inner.closed.load(atomics::SeqCst) {
unsafe {
let mut storage: libc::sockaddr_storage = mem::zeroed();
let storagep = &mut storage as *mut libc::sockaddr_storage;
let size = mem::size_of::<libc::sockaddr_storage>();
let mut size = size as libc::socklen_t;
match retry(|| {
libc::accept(self.fd(),
storagep as *mut libc::sockaddr,
&mut size as *mut libc::socklen_t) as libc::c_int
}) as sock_t {
-1 if util::wouldblock() => {}
-1 => return Err(last_error()),
fd => return Ok(TcpStream::new(Inner::new(fd))),
}
}
try!(util::await([self.fd(), self.inner.reader.fd()],
deadline, util::Readable));
}
Err(util::eof())
}
}
@ -546,6 +581,24 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
fn clone(&self) -> Box<rtio::RtioTcpAcceptor + Send> {
box TcpAcceptor {
inner: self.inner.clone(),
deadline: 0,
} as Box<rtio::RtioTcpAcceptor + Send>
}
#[cfg(unix)]
fn close_accept(&mut self) -> IoResult<()> {
self.inner.closed.store(true, atomics::SeqCst);
let mut fd = FileDesc::new(self.inner.writer.fd(), false);
match fd.inner_write([0]) {
Ok(..) => Ok(()),
Err(..) if util::wouldblock() => Ok(()),
Err(e) => Err(e),
}
}
}
////////////////////////////////////////////////////////////////////////////////
@ -817,7 +870,7 @@ pub fn read<T>(fd: sock_t,
// With a timeout, first we wait for the socket to become
// readable using select(), specifying the relevant timeout for
// our previously set deadline.
try!(util::await(fd, deadline, util::Readable));
try!(util::await([fd], deadline, util::Readable));
// At this point, we're still within the timeout, and we've
// determined that the socket is readable (as returned by
@ -871,7 +924,7 @@ pub fn write<T>(fd: sock_t,
while written < buf.len() && (write_everything || written == 0) {
// As with read(), first wait for the socket to be ready for
// the I/O operation.
match util::await(fd, deadline, util::Writable) {
match util::await([fd], deadline, util::Writable) {
Err(ref e) if e.code == libc::EOF as uint && written > 0 => {
assert!(deadline.is_some());
return Err(util::short_write(written, "short write"))

View File

@ -15,12 +15,14 @@ use std::mem;
use std::rt::mutex;
use std::rt::rtio;
use std::rt::rtio::{IoResult, IoError};
use std::sync::atomics;
use super::retry;
use super::net;
use super::util;
use super::c;
use super::file::fd_t;
use super::process;
use super::file::{fd_t, FileDesc};
fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
@ -225,7 +227,23 @@ impl UnixListener {
pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
-1 => Err(super::last_error()),
_ => Ok(UnixAcceptor { listener: self, deadline: 0 })
#[cfg(unix)]
_ => {
let (reader, writer) = try!(process::pipe());
try!(util::set_nonblocking(reader.fd(), true));
try!(util::set_nonblocking(writer.fd(), true));
try!(util::set_nonblocking(self.fd(), true));
Ok(UnixAcceptor {
inner: Arc::new(AcceptorInner {
listener: self,
reader: reader,
writer: writer,
closed: atomics::AtomicBool::new(false),
}),
deadline: 0,
})
}
}
}
}
@ -240,29 +258,45 @@ impl rtio::RtioUnixListener for UnixListener {
}
pub struct UnixAcceptor {
listener: UnixListener,
inner: Arc<AcceptorInner>,
deadline: u64,
}
#[cfg(unix)]
struct AcceptorInner {
listener: UnixListener,
reader: FileDesc,
writer: FileDesc,
closed: atomics::AtomicBool,
}
impl UnixAcceptor {
fn fd(&self) -> fd_t { self.listener.fd() }
fn fd(&self) -> fd_t { self.inner.listener.fd() }
pub fn native_accept(&mut self) -> IoResult<UnixStream> {
if self.deadline != 0 {
try!(util::await(self.fd(), Some(self.deadline), util::Readable));
}
let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
let storagep = &mut storage as *mut libc::sockaddr_storage;
let size = mem::size_of::<libc::sockaddr_storage>();
let mut size = size as libc::socklen_t;
match retry(|| unsafe {
libc::accept(self.fd(),
storagep as *mut libc::sockaddr,
&mut size as *mut libc::socklen_t) as libc::c_int
}) {
-1 => Err(super::last_error()),
fd => Ok(UnixStream::new(Arc::new(Inner::new(fd))))
let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
while !self.inner.closed.load(atomics::SeqCst) {
unsafe {
let mut storage: libc::sockaddr_storage = mem::zeroed();
let storagep = &mut storage as *mut libc::sockaddr_storage;
let size = mem::size_of::<libc::sockaddr_storage>();
let mut size = size as libc::socklen_t;
match retry(|| {
libc::accept(self.fd(),
storagep as *mut libc::sockaddr,
&mut size as *mut libc::socklen_t) as libc::c_int
}) {
-1 if util::wouldblock() => {}
-1 => return Err(super::last_error()),
fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))),
}
}
try!(util::await([self.fd(), self.inner.reader.fd()],
deadline, util::Readable));
}
Err(util::eof())
}
}
@ -273,6 +307,24 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
box UnixAcceptor {
inner: self.inner.clone(),
deadline: 0,
} as Box<rtio::RtioUnixAcceptor + Send>
}
#[cfg(unix)]
fn close_accept(&mut self) -> IoResult<()> {
self.inner.closed.store(true, atomics::SeqCst);
let mut fd = FileDesc::new(self.inner.writer.fd(), false);
match fd.inner_write([0]) {
Ok(..) => Ok(()),
Err(..) if util::wouldblock() => Ok(()),
Err(e) => Err(e),
}
}
}
impl Drop for UnixListener {

View File

@ -99,10 +99,10 @@ use super::c;
use super::util;
use super::file::to_utf16;
struct Event(libc::HANDLE);
pub struct Event(libc::HANDLE);
impl Event {
fn new(manual_reset: bool, initial_state: bool) -> IoResult<Event> {
pub fn new(manual_reset: bool, initial_state: bool) -> IoResult<Event> {
let event = unsafe {
libc::CreateEventW(ptr::mut_null(),
manual_reset as libc::BOOL,
@ -116,7 +116,7 @@ impl Event {
}
}
fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle }
pub fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle }
}
impl Drop for Event {

View File

@ -191,7 +191,7 @@ impl Drop for Process {
}
}
fn pipe() -> IoResult<(file::FileDesc, file::FileDesc)> {
pub fn pipe() -> IoResult<(file::FileDesc, file::FileDesc)> {
#[cfg(unix)] use libc::EMFILE as ERROR;
#[cfg(windows)] use libc::WSAEMFILE as ERROR;
struct Closer { fd: libc::c_int }

View File

@ -9,6 +9,7 @@
// except according to those terms.
use libc;
use std::cmp;
use std::mem;
use std::os;
use std::ptr;
@ -166,10 +167,15 @@ pub fn connect_timeout(fd: net::sock_t,
}
}
pub fn await(fd: net::sock_t, deadline: Option<u64>,
pub fn await(fds: &[net::sock_t], deadline: Option<u64>,
status: SocketStatus) -> IoResult<()> {
let mut set: c::fd_set = unsafe { mem::zeroed() };
c::fd_set(&mut set, fd);
let mut max = 0;
for &fd in fds.iter() {
c::fd_set(&mut set, fd);
max = cmp::max(max, fd + 1);
}
let (read, write) = match status {
Readable => (&mut set as *mut _, ptr::mut_null()),
Writable => (ptr::mut_null(), &mut set as *mut _),
@ -188,8 +194,7 @@ pub fn await(fd: net::sock_t, deadline: Option<u64>,
&mut tv as *mut _
}
};
let n = if cfg!(windows) {1} else {fd as libc::c_int + 1};
let r = unsafe { c::select(n, read, write, ptr::mut_null(), tvp) };
let r = unsafe { c::select(max, read, write, ptr::mut_null(), tvp) };
r
}) {
-1 => Err(last_error()),

View File

@ -246,6 +246,8 @@ pub trait RtioTcpAcceptor : RtioSocket {
fn accept_simultaneously(&mut self) -> IoResult<()>;
fn dont_accept_simultaneously(&mut self) -> IoResult<()>;
fn set_timeout(&mut self, timeout: Option<u64>);
fn clone(&self) -> Box<RtioTcpAcceptor + Send>;
fn close_accept(&mut self) -> IoResult<()>;
}
pub trait RtioTcpStream : RtioSocket {
@ -335,6 +337,8 @@ pub trait RtioUnixListener {
pub trait RtioUnixAcceptor {
fn accept(&mut self) -> IoResult<Box<RtioPipe + Send>>;
fn set_timeout(&mut self, timeout: Option<u64>);
fn clone(&self) -> Box<RtioUnixAcceptor + Send>;
fn close_accept(&mut self) -> IoResult<()>;
}
pub trait RtioTTY {

View File

@ -442,6 +442,54 @@ impl TcpAcceptor {
#[experimental = "the type of the argument and name of this function are \
subject to change"]
pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); }
/// Closes the accepting capabilities of this acceptor.
///
/// This function is similar to `TcpStream`'s `close_{read,write}` methods
/// in that it will affect *all* cloned handles of this acceptor's original
/// handle.
///
/// Once this function succeeds, all future calls to `accept` will return
/// immediately with an error, preventing all future calls to accept. The
/// underlying socket will not be relinquished back to the OS until all
/// acceptors have been deallocated.
///
/// This is useful for waking up a thread in an accept loop to indicate that
/// it should exit.
///
/// # Example
///
/// ```
/// # #![allow(experimental)]
/// use std::io::TcpListener;
/// use std::io::{Listener, Acceptor, TimedOut};
///
/// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap();
/// let a2 = a.clone();
///
/// spawn(proc() {
/// let mut a2 = a2;
/// for socket in a2.incoming() {
/// match socket {
/// Ok(s) => { /* handle s */ }
/// Err(ref e) if e.kind == EndOfFile => break, // closed
/// Err(e) => fail!("unexpected error: {}", e),
/// }
/// }
/// });
///
/// # fn wait_for_sigint() {}
/// // Now that our accept loop is running, wait for the program to be
/// // requested to exit.
/// wait_for_sigint();
///
/// // Signal our accept loop to exit
/// assert!(a.close_accept().is_ok());
/// ```
#[experimental]
pub fn close_accept(&mut self) -> IoResult<()> {
self.obj.close_accept().map_err(IoError::from_rtio_error)
}
}
impl Acceptor<TcpStream> for TcpAcceptor {
@ -453,6 +501,25 @@ impl Acceptor<TcpStream> for TcpAcceptor {
}
}
impl Clone for TcpAcceptor {
/// Creates a new handle to this TCP acceptor, allowing for simultaneous
/// accepts.
///
/// The underlying TCP acceptor will not be closed until all handles to the
/// acceptor have been deallocated. Incoming connections will be received on
/// at most once acceptor, the same connection will not be accepted twice.
///
/// The `close_accept` method will shut down *all* acceptors cloned from the
/// same original acceptor, whereas the `set_timeout` method only affects
/// the selector that it is called on.
///
/// This function is useful for creating a handle to invoke `close_accept`
/// on to wake up any other task blocked in `accept`.
fn clone(&self) -> TcpAcceptor {
TcpAcceptor { obj: self.obj.clone() }
}
}
#[cfg(test)]
#[allow(experimental)]
mod test {
@ -1411,4 +1478,69 @@ mod test {
rxdone.recv();
rxdone.recv();
})
iotest!(fn clone_accept_smoke() {
let addr = next_test_ip4();
let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
let mut a = l.listen().unwrap();
let mut a2 = a.clone();
spawn(proc() {
let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
});
spawn(proc() {
let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
});
assert!(a.accept().is_ok());
assert!(a2.accept().is_ok());
})
iotest!(fn clone_accept_concurrent() {
let addr = next_test_ip4();
let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
let a = l.listen().unwrap();
let a2 = a.clone();
let (tx, rx) = channel();
let tx2 = tx.clone();
spawn(proc() { let mut a = a; tx.send(a.accept()) });
spawn(proc() { let mut a = a2; tx2.send(a.accept()) });
spawn(proc() {
let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
});
spawn(proc() {
let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
});
assert!(rx.recv().is_ok());
assert!(rx.recv().is_ok());
})
iotest!(fn close_accept_smoke() {
let addr = next_test_ip4();
let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
let mut a = l.listen().unwrap();
a.close_accept().unwrap();
assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
})
iotest!(fn close_accept_concurrent() {
let addr = next_test_ip4();
let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
let a = l.listen().unwrap();
let mut a2 = a.clone();
let (tx, rx) = channel();
spawn(proc() {
let mut a = a;
tx.send(a.accept());
});
a2.close_accept().unwrap();
assert_eq!(rx.recv().err().unwrap().kind, EndOfFile);
})
}

View File

@ -212,6 +212,15 @@ impl UnixAcceptor {
pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_timeout(timeout_ms)
}
/// Closes the accepting capabilities of this acceptor.
///
/// This function has the same semantics as `TcpAcceptor::close_accept`, and
/// more information can be found in that documentation.
#[experimental]
pub fn close_accept(&mut self) -> IoResult<()> {
self.obj.close_accept().map_err(IoError::from_rtio_error)
}
}
impl Acceptor<UnixStream> for UnixAcceptor {
@ -222,6 +231,25 @@ impl Acceptor<UnixStream> for UnixAcceptor {
}
}
impl Clone for UnixAcceptor {
/// Creates a new handle to this unix acceptor, allowing for simultaneous
/// accepts.
///
/// The underlying unix acceptor will not be closed until all handles to the
/// acceptor have been deallocated. Incoming connections will be received on
/// at most once acceptor, the same connection will not be accepted twice.
///
/// The `close_accept` method will shut down *all* acceptors cloned from the
/// same original acceptor, whereas the `set_timeout` method only affects
/// the selector that it is called on.
///
/// This function is useful for creating a handle to invoke `close_accept`
/// on to wake up any other task blocked in `accept`.
fn clone(&self) -> UnixAcceptor {
UnixAcceptor { obj: self.obj.clone() }
}
}
#[cfg(test)]
#[allow(experimental)]
mod tests {
@ -702,4 +730,71 @@ mod tests {
rx2.recv();
})
iotest!(fn clone_accept_smoke() {
let addr = next_test_unix();
let l = UnixListener::bind(&addr);
let mut a = l.listen().unwrap();
let mut a2 = a.clone();
let addr2 = addr.clone();
spawn(proc() {
let _ = UnixStream::connect(&addr2);
});
spawn(proc() {
let _ = UnixStream::connect(&addr);
});
assert!(a.accept().is_ok());
assert!(a2.accept().is_ok());
})
iotest!(fn clone_accept_concurrent() {
let addr = next_test_unix();
let l = UnixListener::bind(&addr);
let a = l.listen().unwrap();
let a2 = a.clone();
let (tx, rx) = channel();
let tx2 = tx.clone();
spawn(proc() { let mut a = a; tx.send(a.accept()) });
spawn(proc() { let mut a = a2; tx2.send(a.accept()) });
let addr2 = addr.clone();
spawn(proc() {
let _ = UnixStream::connect(&addr2);
});
spawn(proc() {
let _ = UnixStream::connect(&addr);
});
assert!(rx.recv().is_ok());
assert!(rx.recv().is_ok());
})
iotest!(fn close_accept_smoke() {
let addr = next_test_unix();
let l = UnixListener::bind(&addr);
let mut a = l.listen().unwrap();
a.close_accept().unwrap();
assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
})
iotest!(fn close_accept_concurrent() {
let addr = next_test_unix();
let l = UnixListener::bind(&addr);
let a = l.listen().unwrap();
let mut a2 = a.clone();
let (tx, rx) = channel();
spawn(proc() {
let mut a = a;
tx.send(a.accept());
});
a2.close_accept().unwrap();
assert_eq!(rx.recv().err().unwrap().kind, EndOfFile);
})
}

View File

@ -0,0 +1,94 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
#![feature(phase)]
#[phase(plugin)]
extern crate green;
extern crate native;
use std::io::{TcpListener, Listener, Acceptor, EndOfFile, TcpStream};
use std::sync::{atomics, Arc};
use std::task::TaskBuilder;
use native::NativeTaskBuilder;
static N: uint = 8;
static M: uint = 100;
green_start!(main)
fn main() {
test();
let (tx, rx) = channel();
TaskBuilder::new().native().spawn(proc() {
tx.send(test());
});
rx.recv();
}
fn test() {
let mut l = TcpListener::bind("127.0.0.1", 0).unwrap();
let addr = l.socket_name().unwrap();
let mut a = l.listen().unwrap();
let cnt = Arc::new(atomics::AtomicUint::new(0));
let (tx, rx) = channel();
for _ in range(0, N) {
let a = a.clone();
let cnt = cnt.clone();
let tx = tx.clone();
spawn(proc() {
let mut a = a;
let mut mycnt = 0u;
loop {
match a.accept() {
Ok(..) => {
mycnt += 1;
if cnt.fetch_add(1, atomics::SeqCst) == N * M - 1 {
break
}
}
Err(ref e) if e.kind == EndOfFile => break,
Err(e) => fail!("{}", e),
}
}
assert!(mycnt > 0);
tx.send(());
});
}
for _ in range(0, N) {
let tx = tx.clone();
spawn(proc() {
for _ in range(0, M) {
let _s = TcpStream::connect(addr.ip.to_string().as_slice(),
addr.port).unwrap();
}
tx.send(());
});
}
// wait for senders
assert_eq!(rx.iter().take(N).count(), N);
// wait for one acceptor to die
let _ = rx.recv();
// Notify other receivers should die
a.close_accept().unwrap();
// wait for receivers
assert_eq!(rx.iter().take(N - 1).count(), N - 1);
// Everything should have been accepted.
assert_eq!(cnt.load(atomics::SeqCst), N * M);
}