auto merge of #15704 : alexcrichton/rust/issue-15595, r=brson

If a task is spinning in an accept loop, there is currently no method of gracefully shutting it down. This PR introduces a way to do so by cloning the acceptor and implementing a close_accept method to unblocking any pending acceptor.

As with other I/O methods like this, it is `#[experimental]` from the start and sadly carries with it a good deal of code to support it. Much of the complication is from the fact that you can now concurrently accept on the same socket.

I tried to add a good deal of tests for this change, but another set of eyes is always appreciated!
This commit is contained in:
bors 2014-08-25 01:45:57 +00:00
commit 83804f9085
14 changed files with 1104 additions and 299 deletions

View File

@ -26,6 +26,14 @@ pub static ENABLE_INSERT_MODE: libc::DWORD = 0x20;
pub static ENABLE_LINE_INPUT: libc::DWORD = 0x2;
pub static ENABLE_PROCESSED_INPUT: libc::DWORD = 0x1;
pub static ENABLE_QUICK_EDIT_MODE: libc::DWORD = 0x40;
pub static WSA_INVALID_EVENT: WSAEVENT = 0 as WSAEVENT;
pub static FD_ACCEPT: libc::c_long = 0x08;
pub static FD_MAX_EVENTS: uint = 10;
pub static WSA_INFINITE: libc::DWORD = libc::INFINITE;
pub static WSA_WAIT_TIMEOUT: libc::DWORD = libc::consts::os::extra::WAIT_TIMEOUT;
pub static WSA_WAIT_EVENT_0: libc::DWORD = libc::consts::os::extra::WAIT_OBJECT_0;
pub static WSA_WAIT_FAILED: libc::DWORD = libc::consts::os::extra::WAIT_FAILED;
#[repr(C)]
#[cfg(target_arch = "x86")]
@ -52,6 +60,16 @@ pub struct WSADATA {
pub type LPWSADATA = *mut WSADATA;
#[repr(C)]
pub struct WSANETWORKEVENTS {
pub lNetworkEvents: libc::c_long,
pub iErrorCode: [libc::c_int, ..FD_MAX_EVENTS],
}
pub type LPWSANETWORKEVENTS = *mut WSANETWORKEVENTS;
pub type WSAEVENT = libc::HANDLE;
#[repr(C)]
pub struct fd_set {
fd_count: libc::c_uint,
@ -68,6 +86,21 @@ extern "system" {
pub fn WSAStartup(wVersionRequested: libc::WORD,
lpWSAData: LPWSADATA) -> libc::c_int;
pub fn WSAGetLastError() -> libc::c_int;
pub fn WSACloseEvent(hEvent: WSAEVENT) -> libc::BOOL;
pub fn WSACreateEvent() -> WSAEVENT;
pub fn WSAEventSelect(s: libc::SOCKET,
hEventObject: WSAEVENT,
lNetworkEvents: libc::c_long) -> libc::c_int;
pub fn WSASetEvent(hEvent: WSAEVENT) -> libc::BOOL;
pub fn WSAWaitForMultipleEvents(cEvents: libc::DWORD,
lphEvents: *const WSAEVENT,
fWaitAll: libc::BOOL,
dwTimeout: libc::DWORD,
fAltertable: libc::BOOL) -> libc::DWORD;
pub fn WSAEnumNetworkEvents(s: libc::SOCKET,
hEventObject: WSAEVENT,
lpNetworkEvents: LPWSANETWORKEVENTS)
-> libc::c_int;
pub fn ioctlsocket(s: libc::SOCKET, cmd: libc::c_long,
argp: *mut libc::c_ulong) -> libc::c_int;
@ -82,6 +115,12 @@ extern "system" {
optval: *mut libc::c_char,
optlen: *mut libc::c_int) -> libc::c_int;
pub fn SetEvent(hEvent: libc::HANDLE) -> libc::BOOL;
pub fn WaitForMultipleObjects(nCount: libc::DWORD,
lpHandles: *const libc::HANDLE,
bWaitAll: libc::BOOL,
dwMilliseconds: libc::DWORD) -> libc::DWORD;
pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL;
pub fn CancelIoEx(hFile: libc::HANDLE,
lpOverlapped: libc::LPOVERLAPPED) -> libc::BOOL;

View File

@ -11,21 +11,25 @@
use alloc::arc::Arc;
use libc;
use std::mem;
use std::ptr;
use std::rt::mutex;
use std::rt::rtio;
use std::rt::rtio::{IoResult, IoError};
use std::sync::atomic;
use super::{retry, keep_going};
use super::c;
use super::util;
#[cfg(unix)] use super::process;
#[cfg(unix)] use super::file::FileDesc;
pub use self::os::{init, sock_t, last_error};
////////////////////////////////////////////////////////////////////////////////
// sockaddr and misc bindings
////////////////////////////////////////////////////////////////////////////////
#[cfg(windows)] pub type sock_t = libc::SOCKET;
#[cfg(unix)] pub type sock_t = super::file::fd_t;
pub fn htons(u: u16) -> u16 {
u.to_be()
}
@ -97,7 +101,7 @@ fn socket(addr: rtio::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
rtio::Ipv6Addr(..) => libc::AF_INET6,
};
match libc::socket(fam, ty, 0) {
-1 => Err(super::last_error()),
-1 => Err(os::last_error()),
fd => Ok(fd),
}
}
@ -111,7 +115,7 @@ fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
payload,
mem::size_of::<T>() as libc::socklen_t);
if ret != 0 {
Err(last_error())
Err(os::last_error())
} else {
Ok(())
}
@ -127,7 +131,7 @@ pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
&mut slot as *mut _ as *mut _,
&mut len);
if ret != 0 {
Err(last_error())
Err(os::last_error())
} else {
assert!(len as uint == mem::size_of::<T>());
Ok(slot)
@ -135,25 +139,6 @@ pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
}
}
#[cfg(windows)]
pub fn last_error() -> IoError {
use std::os;
let code = unsafe { c::WSAGetLastError() as uint };
IoError {
code: code,
extra: 0,
detail: Some(os::error_string(code)),
}
}
#[cfg(not(windows))]
fn last_error() -> IoError {
super::last_error()
}
#[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
#[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
fn sockname(fd: sock_t,
f: unsafe extern "system" fn(sock_t, *mut libc::sockaddr,
*mut libc::socklen_t) -> libc::c_int)
@ -167,7 +152,7 @@ fn sockname(fd: sock_t,
storage as *mut libc::sockaddr,
&mut len as *mut libc::socklen_t);
if ret != 0 {
return Err(last_error())
return Err(os::last_error())
}
}
return sockaddr_to_addr(&storage, len as uint);
@ -221,28 +206,6 @@ pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
}
}
#[cfg(unix)]
pub fn init() {}
#[cfg(windows)]
pub fn init() {
unsafe {
use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
static mut INITIALIZED: bool = false;
static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
let _guard = LOCK.lock();
if !INITIALIZED {
let mut data: c::WSADATA = mem::zeroed();
let ret = c::WSAStartup(0x202, // version 2.2
&mut data);
assert_eq!(ret, 0);
INITIALIZED = true;
}
}
}
////////////////////////////////////////////////////////////////////////////////
// TCP streams
////////////////////////////////////////////////////////////////////////////////
@ -289,7 +252,7 @@ impl TcpStream {
},
None => {
match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
-1 => Err(last_error()),
-1 => Err(os::last_error()),
_ => Ok(ret),
}
}
@ -435,7 +398,7 @@ impl rtio::RtioSocket for TcpStream {
}
impl Drop for Inner {
fn drop(&mut self) { unsafe { close(self.fd); } }
fn drop(&mut self) { unsafe { os::close(self.fd); } }
}
#[unsafe_destructor]
@ -471,7 +434,7 @@ impl TcpListener {
}
match unsafe { libc::bind(fd, addrp, len) } {
-1 => Err(last_error()),
-1 => Err(os::last_error()),
_ => Ok(ret),
}
}
@ -480,8 +443,44 @@ impl TcpListener {
pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
-1 => Err(last_error()),
_ => Ok(TcpAcceptor { listener: self, deadline: 0 })
-1 => Err(os::last_error()),
#[cfg(unix)]
_ => {
let (reader, writer) = try!(process::pipe());
try!(util::set_nonblocking(reader.fd(), true));
try!(util::set_nonblocking(writer.fd(), true));
try!(util::set_nonblocking(self.fd(), true));
Ok(TcpAcceptor {
inner: Arc::new(AcceptorInner {
listener: self,
reader: reader,
writer: writer,
closed: atomic::AtomicBool::new(false),
}),
deadline: 0,
})
}
#[cfg(windows)]
_ => {
let accept = try!(os::Event::new());
let ret = unsafe {
c::WSAEventSelect(self.fd(), accept.handle(), c::FD_ACCEPT)
};
if ret != 0 {
return Err(os::last_error())
}
Ok(TcpAcceptor {
inner: Arc::new(AcceptorInner {
listener: self,
abort: try!(os::Event::new()),
accept: accept,
closed: atomic::AtomicBool::new(false),
}),
deadline: 0,
})
}
}
}
}
@ -502,31 +501,135 @@ impl rtio::RtioSocket for TcpListener {
}
pub struct TcpAcceptor {
listener: TcpListener,
inner: Arc<AcceptorInner>,
deadline: u64,
}
impl TcpAcceptor {
pub fn fd(&self) -> sock_t { self.listener.fd() }
#[cfg(unix)]
struct AcceptorInner {
listener: TcpListener,
reader: FileDesc,
writer: FileDesc,
closed: atomic::AtomicBool,
}
#[cfg(windows)]
struct AcceptorInner {
listener: TcpListener,
abort: os::Event,
accept: os::Event,
closed: atomic::AtomicBool,
}
impl TcpAcceptor {
pub fn fd(&self) -> sock_t { self.inner.listener.fd() }
#[cfg(unix)]
pub fn native_accept(&mut self) -> IoResult<TcpStream> {
if self.deadline != 0 {
try!(util::await(self.fd(), Some(self.deadline), util::Readable));
// In implementing accept, the two main concerns are dealing with
// close_accept() and timeouts. The unix implementation is based on a
// nonblocking accept plus a call to select(). Windows ends up having
// an entirely separate implementation than unix, which is explained
// below.
//
// To implement timeouts, all blocking is done via select() instead of
// accept() by putting the socket in non-blocking mode. Because
// select() takes a timeout argument, we just pass through the timeout
// to select().
//
// To implement close_accept(), we have a self-pipe to ourselves which
// is passed to select() along with the socket being accepted on. The
// self-pipe is never written to unless close_accept() is called.
let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
while !self.inner.closed.load(atomic::SeqCst) {
match retry(|| unsafe {
libc::accept(self.fd(), ptr::mut_null(), ptr::mut_null())
}) {
-1 if util::wouldblock() => {}
-1 => return Err(os::last_error()),
fd => return Ok(TcpStream::new(Inner::new(fd as sock_t))),
}
try!(util::await([self.fd(), self.inner.reader.fd()],
deadline, util::Readable));
}
unsafe {
let mut storage: libc::sockaddr_storage = mem::zeroed();
let storagep = &mut storage as *mut libc::sockaddr_storage;
let size = mem::size_of::<libc::sockaddr_storage>();
let mut size = size as libc::socklen_t;
match retry(|| {
libc::accept(self.fd(),
storagep as *mut libc::sockaddr,
&mut size as *mut libc::socklen_t) as libc::c_int
}) as sock_t {
-1 => Err(last_error()),
fd => Ok(TcpStream::new(Inner::new(fd))),
Err(util::eof())
}
#[cfg(windows)]
pub fn native_accept(&mut self) -> IoResult<TcpStream> {
// Unlink unix, windows cannot invoke `select` on arbitrary file
// descriptors like pipes, only sockets. Consequently, windows cannot
// use the same implementation as unix for accept() when close_accept()
// is considered.
//
// In order to implement close_accept() and timeouts, windows uses
// event handles. An acceptor-specific abort event is created which
// will only get set in close_accept(), and it will never be un-set.
// Additionally, another acceptor-specific event is associated with the
// FD_ACCEPT network event.
//
// These two events are then passed to WaitForMultipleEvents to see
// which one triggers first, and the timeout passed to this function is
// the local timeout for the acceptor.
//
// If the wait times out, then the accept timed out. If the wait
// succeeds with the abort event, then we were closed, and if the wait
// succeeds otherwise, then we do a nonblocking poll via `accept` to
// see if we can accept a connection. The connection is candidate to be
// stolen, so we do all of this in a loop as well.
let events = [self.inner.abort.handle(), self.inner.accept.handle()];
while !self.inner.closed.load(atomic::SeqCst) {
let ms = if self.deadline == 0 {
c::WSA_INFINITE as u64
} else {
let now = ::io::timer::now();
if self.deadline < now {0} else {self.deadline - now}
};
let ret = unsafe {
c::WSAWaitForMultipleEvents(2, events.as_ptr(), libc::FALSE,
ms as libc::DWORD, libc::FALSE)
};
match ret {
c::WSA_WAIT_TIMEOUT => {
return Err(util::timeout("accept timed out"))
}
c::WSA_WAIT_FAILED => return Err(os::last_error()),
c::WSA_WAIT_EVENT_0 => break,
n => assert_eq!(n, c::WSA_WAIT_EVENT_0 + 1),
}
let mut wsaevents: c::WSANETWORKEVENTS = unsafe { mem::zeroed() };
let ret = unsafe {
c::WSAEnumNetworkEvents(self.fd(), events[1], &mut wsaevents)
};
if ret != 0 { return Err(os::last_error()) }
if wsaevents.lNetworkEvents & c::FD_ACCEPT == 0 { continue }
match unsafe {
libc::accept(self.fd(), ptr::mut_null(), ptr::mut_null())
} {
-1 if util::wouldblock() => {}
-1 => return Err(os::last_error()),
// Accepted sockets inherit the same properties as the caller,
// so we need to deregister our event and switch the socket back
// to blocking mode
fd => {
let stream = TcpStream::new(Inner::new(fd));
let ret = unsafe {
c::WSAEventSelect(fd, events[1], 0)
};
if ret != 0 { return Err(os::last_error()) }
try!(util::set_nonblocking(fd, false));
return Ok(stream)
}
}
}
Err(util::eof())
}
}
@ -546,6 +649,35 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
fn clone(&self) -> Box<rtio::RtioTcpAcceptor + Send> {
box TcpAcceptor {
inner: self.inner.clone(),
deadline: 0,
} as Box<rtio::RtioTcpAcceptor + Send>
}
#[cfg(unix)]
fn close_accept(&mut self) -> IoResult<()> {
self.inner.closed.store(true, atomic::SeqCst);
let mut fd = FileDesc::new(self.inner.writer.fd(), false);
match fd.inner_write([0]) {
Ok(..) => Ok(()),
Err(..) if util::wouldblock() => Ok(()),
Err(e) => Err(e),
}
}
#[cfg(windows)]
fn close_accept(&mut self) -> IoResult<()> {
self.inner.closed.store(true, atomic::SeqCst);
let ret = unsafe { c::WSASetEvent(self.inner.abort.handle()) };
if ret == libc::TRUE {
Ok(())
} else {
Err(os::last_error())
}
}
}
////////////////////////////////////////////////////////////////////////////////
@ -572,7 +704,7 @@ impl UdpSocket {
let addrp = &storage as *const _ as *const libc::sockaddr;
match unsafe { libc::bind(fd, addrp, len) } {
-1 => Err(last_error()),
-1 => Err(os::last_error()),
_ => Ok(ret),
}
}
@ -817,7 +949,7 @@ pub fn read<T>(fd: sock_t,
// With a timeout, first we wait for the socket to become
// readable using select(), specifying the relevant timeout for
// our previously set deadline.
try!(util::await(fd, deadline, util::Readable));
try!(util::await([fd], deadline, util::Readable));
// At this point, we're still within the timeout, and we've
// determined that the socket is readable (as returned by
@ -828,7 +960,7 @@ pub fn read<T>(fd: sock_t,
let _guard = lock();
match retry(|| read(deadline.is_some())) {
-1 if util::wouldblock() => { assert!(deadline.is_some()); }
-1 => return Err(last_error()),
-1 => return Err(os::last_error()),
n => { ret = n; break }
}
}
@ -836,7 +968,7 @@ pub fn read<T>(fd: sock_t,
match ret {
0 => Err(util::eof()),
n if n < 0 => Err(last_error()),
n if n < 0 => Err(os::last_error()),
n => Ok(n as uint)
}
}
@ -871,7 +1003,7 @@ pub fn write<T>(fd: sock_t,
while written < buf.len() && (write_everything || written == 0) {
// As with read(), first wait for the socket to be ready for
// the I/O operation.
match util::await(fd, deadline, util::Writable) {
match util::await([fd], deadline, util::Writable) {
Err(ref e) if e.code == libc::EOF as uint && written > 0 => {
assert!(deadline.is_some());
return Err(util::short_write(written, "short write"))
@ -887,15 +1019,88 @@ pub fn write<T>(fd: sock_t,
let len = buf.len() - written;
match retry(|| write(deadline.is_some(), ptr, len) as libc::c_int) {
-1 if util::wouldblock() => {}
-1 => return Err(last_error()),
-1 => return Err(os::last_error()),
n => { written += n as uint; }
}
}
ret = 0;
}
if ret < 0 {
Err(last_error())
Err(os::last_error())
} else {
Ok(written)
}
}
#[cfg(windows)]
mod os {
use libc;
use std::mem;
use std::rt::rtio::{IoError, IoResult};
use io::c;
pub type sock_t = libc::SOCKET;
pub struct Event(c::WSAEVENT);
impl Event {
pub fn new() -> IoResult<Event> {
let event = unsafe { c::WSACreateEvent() };
if event == c::WSA_INVALID_EVENT {
Err(last_error())
} else {
Ok(Event(event))
}
}
pub fn handle(&self) -> c::WSAEVENT { let Event(handle) = *self; handle }
}
impl Drop for Event {
fn drop(&mut self) {
unsafe { let _ = c::WSACloseEvent(self.handle()); }
}
}
pub fn init() {
unsafe {
use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
static mut INITIALIZED: bool = false;
static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
let _guard = LOCK.lock();
if !INITIALIZED {
let mut data: c::WSADATA = mem::zeroed();
let ret = c::WSAStartup(0x202, // version 2.2
&mut data);
assert_eq!(ret, 0);
INITIALIZED = true;
}
}
}
pub fn last_error() -> IoError {
use std::os;
let code = unsafe { c::WSAGetLastError() as uint };
IoError {
code: code,
extra: 0,
detail: Some(os::error_string(code)),
}
}
pub unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
}
#[cfg(unix)]
mod os {
use libc;
use std::rt::rtio::IoError;
use io;
pub type sock_t = io::file::fd_t;
pub fn init() {}
pub fn last_error() -> IoError { io::last_error() }
pub unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
}

View File

@ -15,12 +15,14 @@ use std::mem;
use std::rt::mutex;
use std::rt::rtio;
use std::rt::rtio::{IoResult, IoError};
use std::sync::atomic;
use super::retry;
use super::net;
use super::util;
use super::c;
use super::file::fd_t;
use super::process;
use super::file::{fd_t, FileDesc};
fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
@ -225,7 +227,23 @@ impl UnixListener {
pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
-1 => Err(super::last_error()),
_ => Ok(UnixAcceptor { listener: self, deadline: 0 })
#[cfg(unix)]
_ => {
let (reader, writer) = try!(process::pipe());
try!(util::set_nonblocking(reader.fd(), true));
try!(util::set_nonblocking(writer.fd(), true));
try!(util::set_nonblocking(self.fd(), true));
Ok(UnixAcceptor {
inner: Arc::new(AcceptorInner {
listener: self,
reader: reader,
writer: writer,
closed: atomic::AtomicBool::new(false),
}),
deadline: 0,
})
}
}
}
}
@ -240,29 +258,45 @@ impl rtio::RtioUnixListener for UnixListener {
}
pub struct UnixAcceptor {
listener: UnixListener,
inner: Arc<AcceptorInner>,
deadline: u64,
}
#[cfg(unix)]
struct AcceptorInner {
listener: UnixListener,
reader: FileDesc,
writer: FileDesc,
closed: atomic::AtomicBool,
}
impl UnixAcceptor {
fn fd(&self) -> fd_t { self.listener.fd() }
fn fd(&self) -> fd_t { self.inner.listener.fd() }
pub fn native_accept(&mut self) -> IoResult<UnixStream> {
if self.deadline != 0 {
try!(util::await(self.fd(), Some(self.deadline), util::Readable));
}
let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
let storagep = &mut storage as *mut libc::sockaddr_storage;
let size = mem::size_of::<libc::sockaddr_storage>();
let mut size = size as libc::socklen_t;
match retry(|| unsafe {
libc::accept(self.fd(),
storagep as *mut libc::sockaddr,
&mut size as *mut libc::socklen_t) as libc::c_int
}) {
-1 => Err(super::last_error()),
fd => Ok(UnixStream::new(Arc::new(Inner::new(fd))))
let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
while !self.inner.closed.load(atomic::SeqCst) {
unsafe {
let mut storage: libc::sockaddr_storage = mem::zeroed();
let storagep = &mut storage as *mut libc::sockaddr_storage;
let size = mem::size_of::<libc::sockaddr_storage>();
let mut size = size as libc::socklen_t;
match retry(|| {
libc::accept(self.fd(),
storagep as *mut libc::sockaddr,
&mut size as *mut libc::socklen_t) as libc::c_int
}) {
-1 if util::wouldblock() => {}
-1 => return Err(super::last_error()),
fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))),
}
}
try!(util::await([self.fd(), self.inner.reader.fd()],
deadline, util::Readable));
}
Err(util::eof())
}
}
@ -273,6 +307,24 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
box UnixAcceptor {
inner: self.inner.clone(),
deadline: 0,
} as Box<rtio::RtioUnixAcceptor + Send>
}
#[cfg(unix)]
fn close_accept(&mut self) -> IoResult<()> {
self.inner.closed.store(true, atomic::SeqCst);
let mut fd = FileDesc::new(self.inner.writer.fd(), false);
match fd.inner_write([0]) {
Ok(..) => Ok(()),
Err(..) if util::wouldblock() => Ok(()),
Err(e) => Err(e),
}
}
}
impl Drop for UnixListener {

View File

@ -169,23 +169,30 @@ unsafe fn pipe(name: *const u16, init: bool) -> libc::HANDLE {
}
pub fn await(handle: libc::HANDLE, deadline: u64,
overlapped: &mut libc::OVERLAPPED) -> bool {
if deadline == 0 { return true }
events: &[libc::HANDLE]) -> IoResult<uint> {
use libc::consts::os::extra::{WAIT_FAILED, WAIT_TIMEOUT, WAIT_OBJECT_0};
// If we've got a timeout, use WaitForSingleObject in tandem with CancelIo
// to figure out if we should indeed get the result.
let now = ::io::timer::now();
let timeout = deadline < now || unsafe {
let ms = (deadline - now) as libc::DWORD;
let r = libc::WaitForSingleObject(overlapped.hEvent,
ms);
r != libc::WAIT_OBJECT_0
};
if timeout {
unsafe { let _ = c::CancelIo(handle); }
false
let ms = if deadline == 0 {
libc::INFINITE as u64
} else {
true
let now = ::io::timer::now();
if deadline < now {0} else {deadline - now}
};
let ret = unsafe {
c::WaitForMultipleObjects(events.len() as libc::DWORD,
events.as_ptr(),
libc::FALSE,
ms as libc::DWORD)
};
match ret {
WAIT_FAILED => Err(super::last_error()),
WAIT_TIMEOUT => unsafe {
let _ = c::CancelIo(handle);
Err(util::timeout("operation timed out"))
},
n => Ok((n - WAIT_OBJECT_0) as uint)
}
}
@ -390,8 +397,8 @@ impl rtio::RtioPipe for UnixStream {
drop(guard);
loop {
// Process a timeout if one is pending
let succeeded = await(self.handle(), self.read_deadline,
&mut overlapped);
let wait_succeeded = await(self.handle(), self.read_deadline,
[overlapped.hEvent]);
let ret = unsafe {
libc::GetOverlappedResult(self.handle(),
@ -408,7 +415,7 @@ impl rtio::RtioPipe for UnixStream {
// If the reading half is now closed, then we're done. If we woke up
// because the writing half was closed, keep trying.
if !succeeded {
if wait_succeeded.is_err() {
return Err(util::timeout("read timed out"))
}
if self.read_closed() {
@ -458,8 +465,8 @@ impl rtio::RtioPipe for UnixStream {
})
}
// Process a timeout if one is pending
let succeeded = await(self.handle(), self.write_deadline,
&mut overlapped);
let wait_succeeded = await(self.handle(), self.write_deadline,
[overlapped.hEvent]);
let ret = unsafe {
libc::GetOverlappedResult(self.handle(),
&mut overlapped,
@ -473,7 +480,7 @@ impl rtio::RtioPipe for UnixStream {
if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
return Err(super::last_error())
}
if !succeeded {
if !wait_succeeded.is_ok() {
let amt = offset + bytes_written as uint;
return if amt > 0 {
Err(IoError {
@ -577,6 +584,10 @@ impl UnixListener {
listener: self,
event: try!(Event::new(true, false)),
deadline: 0,
inner: Arc::new(AcceptorState {
abort: try!(Event::new(true, false)),
closed: atomic::AtomicBool::new(false),
}),
})
}
}
@ -597,11 +608,17 @@ impl rtio::RtioUnixListener for UnixListener {
}
pub struct UnixAcceptor {
inner: Arc<AcceptorState>,
listener: UnixListener,
event: Event,
deadline: u64,
}
struct AcceptorState {
abort: Event,
closed: atomic::AtomicBool,
}
impl UnixAcceptor {
pub fn native_accept(&mut self) -> IoResult<UnixStream> {
// This function has some funky implementation details when working with
@ -638,6 +655,10 @@ impl UnixAcceptor {
// using the original server pipe.
let handle = self.listener.handle;
// If we've had an artifical call to close_accept, be sure to never
// proceed in accepting new clients in the future
if self.inner.closed.load(atomic::SeqCst) { return Err(util::eof()) }
let name = try!(to_utf16(&self.listener.name));
// Once we've got a "server handle", we need to wait for a client to
@ -652,7 +673,9 @@ impl UnixAcceptor {
if err == libc::ERROR_IO_PENDING as libc::DWORD {
// Process a timeout if one is pending
let _ = await(handle, self.deadline, &mut overlapped);
let wait_succeeded = await(handle, self.deadline,
[self.inner.abort.handle(),
overlapped.hEvent]);
// This will block until the overlapped I/O is completed. The
// timeout was previously handled, so this will either block in
@ -665,7 +688,11 @@ impl UnixAcceptor {
libc::TRUE)
};
if ret == 0 {
err = unsafe { libc::GetLastError() };
if wait_succeeded.is_ok() {
err = unsafe { libc::GetLastError() };
} else {
return Err(util::timeout("accept timed out"))
}
} else {
// we succeeded, bypass the check below
err = libc::ERROR_PIPE_CONNECTED as libc::DWORD;
@ -709,5 +736,34 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0);
}
fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
let name = to_utf16(&self.listener.name).ok().unwrap();
box UnixAcceptor {
inner: self.inner.clone(),
event: Event::new(true, false).ok().unwrap(),
deadline: 0,
listener: UnixListener {
name: self.listener.name.clone(),
handle: unsafe {
let p = pipe(name.as_ptr(), false) ;
assert!(p != libc::INVALID_HANDLE_VALUE as libc::HANDLE);
p
},
},
} as Box<rtio::RtioUnixAcceptor + Send>
}
fn close_accept(&mut self) -> IoResult<()> {
self.inner.closed.store(true, atomic::SeqCst);
let ret = unsafe {
c::SetEvent(self.inner.abort.handle())
};
if ret == 0 {
Err(super::last_error())
} else {
Ok(())
}
}
}

View File

@ -191,7 +191,7 @@ impl Drop for Process {
}
}
fn pipe() -> IoResult<(file::FileDesc, file::FileDesc)> {
pub fn pipe() -> IoResult<(file::FileDesc, file::FileDesc)> {
#[cfg(unix)] use libc::EMFILE as ERROR;
#[cfg(windows)] use libc::WSAEMFILE as ERROR;
struct Closer { fd: libc::c_int }

View File

@ -9,6 +9,7 @@
// except according to those terms.
use libc;
use std::cmp;
use std::mem;
use std::os;
use std::ptr;
@ -166,10 +167,18 @@ pub fn connect_timeout(fd: net::sock_t,
}
}
pub fn await(fd: net::sock_t, deadline: Option<u64>,
pub fn await(fds: &[net::sock_t], deadline: Option<u64>,
status: SocketStatus) -> IoResult<()> {
let mut set: c::fd_set = unsafe { mem::zeroed() };
c::fd_set(&mut set, fd);
let mut max = 0;
for &fd in fds.iter() {
c::fd_set(&mut set, fd);
max = cmp::max(max, fd + 1);
}
if cfg!(windows) {
max = fds.len() as net::sock_t;
}
let (read, write) = match status {
Readable => (&mut set as *mut _, ptr::mut_null()),
Writable => (ptr::mut_null(), &mut set as *mut _),
@ -188,8 +197,9 @@ pub fn await(fd: net::sock_t, deadline: Option<u64>,
&mut tv as *mut _
}
};
let n = if cfg!(windows) {1} else {fd as libc::c_int + 1};
let r = unsafe { c::select(n, read, write, ptr::mut_null(), tvp) };
let r = unsafe {
c::select(max as libc::c_int, read, write, ptr::mut_null(), tvp)
};
r
}) {
-1 => Err(last_error()),

View File

@ -246,6 +246,8 @@ pub trait RtioTcpAcceptor : RtioSocket {
fn accept_simultaneously(&mut self) -> IoResult<()>;
fn dont_accept_simultaneously(&mut self) -> IoResult<()>;
fn set_timeout(&mut self, timeout: Option<u64>);
fn clone(&self) -> Box<RtioTcpAcceptor + Send>;
fn close_accept(&mut self) -> IoResult<()>;
}
pub trait RtioTcpStream : RtioSocket {
@ -335,6 +337,8 @@ pub trait RtioUnixListener {
pub trait RtioUnixAcceptor {
fn accept(&mut self) -> IoResult<Box<RtioPipe + Send>>;
fn set_timeout(&mut self, timeout: Option<u64>);
fn clone(&self) -> Box<RtioUnixAcceptor + Send>;
fn close_accept(&mut self) -> IoResult<()>;
}
pub trait RtioTTY {

View File

@ -22,38 +22,40 @@ use std::cell::UnsafeCell;
use homing::HomingMissile;
pub struct Access {
inner: Arc<UnsafeCell<Inner>>,
pub struct Access<T> {
inner: Arc<UnsafeCell<Inner<T>>>,
}
pub struct Guard<'a> {
access: &'a mut Access,
pub struct Guard<'a, T> {
access: &'a mut Access<T>,
missile: Option<HomingMissile>,
}
struct Inner {
struct Inner<T> {
queue: Vec<(BlockedTask, uint)>,
held: bool,
closed: bool,
data: T,
}
impl Access {
pub fn new() -> Access {
impl<T: Send> Access<T> {
pub fn new(data: T) -> Access<T> {
Access {
inner: Arc::new(UnsafeCell::new(Inner {
queue: vec![],
held: false,
closed: false,
data: data,
}))
}
}
pub fn grant<'a>(&'a mut self, token: uint,
missile: HomingMissile) -> Guard<'a> {
missile: HomingMissile) -> Guard<'a, T> {
// This unsafety is actually OK because the homing missile argument
// guarantees that we're on the same event loop as all the other objects
// attempting to get access granted.
let inner: &mut Inner = unsafe { &mut *self.inner.get() };
let inner = unsafe { &mut *self.inner.get() };
if inner.held {
let t: Box<Task> = Local::take();
@ -69,6 +71,15 @@ impl Access {
Guard { access: self, missile: Some(missile) }
}
pub fn unsafe_get(&self) -> *mut T {
unsafe { &mut (*self.inner.get()).data as *mut _ }
}
// Safe version which requires proof that you are on the home scheduler.
pub fn get_mut<'a>(&'a mut self, _missile: &HomingMissile) -> &'a mut T {
unsafe { &mut *self.unsafe_get() }
}
pub fn close(&self, _missile: &HomingMissile) {
// This unsafety is OK because with a homing missile we're guaranteed to
// be the only task looking at the `closed` flag (and are therefore
@ -82,21 +93,27 @@ impl Access {
// is only safe to invoke while on the home event loop, and there is no
// guarantee that this i being invoked on the home event loop.
pub unsafe fn dequeue(&mut self, token: uint) -> Option<BlockedTask> {
let inner: &mut Inner = &mut *self.inner.get();
let inner = &mut *self.inner.get();
match inner.queue.iter().position(|&(_, t)| t == token) {
Some(i) => Some(inner.queue.remove(i).unwrap().val0()),
None => None,
}
}
/// Test whether this access is closed, using a homing missile to prove
/// that it's safe
pub fn is_closed(&self, _missile: &HomingMissile) -> bool {
unsafe { (*self.inner.get()).closed }
}
}
impl Clone for Access {
fn clone(&self) -> Access {
impl<T: Send> Clone for Access<T> {
fn clone(&self) -> Access<T> {
Access { inner: self.inner.clone() }
}
}
impl<'a> Guard<'a> {
impl<'a, T: Send> Guard<'a, T> {
pub fn is_closed(&self) -> bool {
// See above for why this unsafety is ok, it just applies to the read
// instead of the write.
@ -104,13 +121,27 @@ impl<'a> Guard<'a> {
}
}
impl<'a, T: Send> Deref<T> for Guard<'a, T> {
fn deref<'a>(&'a self) -> &'a T {
// A guard represents exclusive access to a piece of data, so it's safe
// to hand out shared and mutable references
unsafe { &(*self.access.inner.get()).data }
}
}
impl<'a, T: Send> DerefMut<T> for Guard<'a, T> {
fn deref_mut<'a>(&'a mut self) -> &'a mut T {
unsafe { &mut (*self.access.inner.get()).data }
}
}
#[unsafe_destructor]
impl<'a> Drop for Guard<'a> {
impl<'a, T> Drop for Guard<'a, T> {
fn drop(&mut self) {
// This guard's homing missile is still armed, so we're guaranteed to be
// on the same I/O event loop, so this unsafety should be ok.
assert!(self.missile.is_some());
let inner: &mut Inner = unsafe {
let inner: &mut Inner<T> = unsafe {
mem::transmute(self.access.inner.get())
};
@ -133,7 +164,8 @@ impl<'a> Drop for Guard<'a> {
}
}
impl Drop for Inner {
#[unsafe_destructor]
impl<T> Drop for Inner<T> {
fn drop(&mut self) {
assert!(!self.held);
assert_eq!(self.queue.len(), 0);

View File

@ -22,7 +22,7 @@ use stream::StreamWatcher;
use super::{Loop, Request, UvError, Buf, status_to_io_result,
uv_error_to_io_error, UvHandle, slice_to_uv_buf,
wait_until_woken_after, wakeup};
use timeout::{AccessTimeout, AcceptTimeout, ConnectCtx};
use timeout::{AccessTimeout, ConnectCtx, AcceptTimeout};
use uvio::UvIoFactory;
use uvll;
@ -158,20 +158,20 @@ pub struct TcpWatcher {
// stream object, so we use these access guards in order to arbitrate among
// multiple concurrent reads and writes. Note that libuv *can* read and
// write simultaneously, it just can't read and read simultaneously.
read_access: AccessTimeout,
write_access: AccessTimeout,
read_access: AccessTimeout<()>,
write_access: AccessTimeout<()>,
}
pub struct TcpListener {
home: HomeHandle,
handle: *mut uvll::uv_pipe_t,
outgoing: Sender<Result<Box<rtio::RtioTcpStream + Send>, IoError>>,
incoming: Receiver<Result<Box<rtio::RtioTcpStream + Send>, IoError>>,
handle: *mut uvll::uv_tcp_t,
}
pub struct TcpAcceptor {
listener: Box<TcpListener>,
timeout: AcceptTimeout,
home: HomeHandle,
handle: *mut uvll::uv_tcp_t,
access: AcceptTimeout<Box<rtio::RtioTcpStream + Send>>,
refcount: Refcount,
}
// TCP watchers (clients/streams)
@ -192,8 +192,8 @@ impl TcpWatcher {
handle: handle,
stream: StreamWatcher::new(handle, true),
refcount: Refcount::new(),
read_access: AccessTimeout::new(),
write_access: AccessTimeout::new(),
read_access: AccessTimeout::new(()),
write_access: AccessTimeout::new(()),
}
}
@ -291,7 +291,7 @@ impl rtio::RtioTcpStream for TcpWatcher {
let task = {
let m = self.fire_homing_missile();
self.read_access.access.close(&m);
self.stream.cancel_read(uvll::EOF as libc::ssize_t)
self.stream.cancel_read(uvll::EOF as libc::ssize_t)
};
let _ = task.map(|t| t.reawaken());
Ok(())
@ -354,12 +354,9 @@ impl TcpListener {
assert_eq!(unsafe {
uvll::uv_tcp_init(io.uv_loop(), handle)
}, 0);
let (tx, rx) = channel();
let l = box TcpListener {
home: io.make_handle(),
handle: handle,
outgoing: tx,
incoming: rx,
};
let mut storage = unsafe { mem::zeroed() };
let _len = addr_to_sockaddr(address, &mut storage);
@ -390,17 +387,21 @@ impl rtio::RtioSocket for TcpListener {
}
impl rtio::RtioTcpListener for TcpListener {
fn listen(self: Box<TcpListener>)
fn listen(mut self: Box<TcpListener>)
-> Result<Box<rtio::RtioTcpAcceptor + Send>, IoError> {
// create the acceptor object from ourselves
let mut acceptor = box TcpAcceptor {
listener: self,
timeout: AcceptTimeout::new(),
};
let _m = self.fire_homing_missile();
// create the acceptor object from ourselves
let acceptor = (box TcpAcceptor {
handle: self.handle,
home: self.home.clone(),
access: AcceptTimeout::new(),
refcount: Refcount::new(),
}).install();
self.handle = 0 as *mut _;
let _m = acceptor.fire_homing_missile();
// FIXME: the 128 backlog should be configurable
match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
match unsafe { uvll::uv_listen(acceptor.handle, 128, listen_cb) } {
0 => Ok(acceptor as Box<rtio::RtioTcpAcceptor + Send>),
n => Err(uv_error_to_io_error(UvError(n))),
}
@ -409,7 +410,7 @@ impl rtio::RtioTcpListener for TcpListener {
extern fn listen_cb(server: *mut uvll::uv_stream_t, status: c_int) {
assert!(status != uvll::ECANCELED);
let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
let tcp: &mut TcpAcceptor = unsafe { UvHandle::from_uv_handle(&server) };
let msg = match status {
0 => {
let loop_ = Loop::wrap(unsafe {
@ -421,11 +422,15 @@ extern fn listen_cb(server: *mut uvll::uv_stream_t, status: c_int) {
}
n => Err(uv_error_to_io_error(UvError(n)))
};
tcp.outgoing.send(msg);
// If we're running then we have exclusive access, so the unsafe_get() is ok
unsafe { tcp.access.push(msg); }
}
impl Drop for TcpListener {
fn drop(&mut self) {
if self.handle.is_null() { return }
let _m = self.fire_homing_missile();
self.close();
}
@ -434,40 +439,68 @@ impl Drop for TcpListener {
// TCP acceptors (bound servers)
impl HomingIO for TcpAcceptor {
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
}
impl rtio::RtioSocket for TcpAcceptor {
fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
let _m = self.fire_homing_missile();
socket_name(Tcp, self.listener.handle)
socket_name(Tcp, self.handle)
}
}
impl UvHandle<uvll::uv_tcp_t> for TcpAcceptor {
fn uv_handle(&self) -> *mut uvll::uv_tcp_t { self.handle }
}
impl rtio::RtioTcpAcceptor for TcpAcceptor {
fn accept(&mut self) -> Result<Box<rtio::RtioTcpStream + Send>, IoError> {
self.timeout.accept(&self.listener.incoming)
let m = self.fire_homing_missile();
let loop_ = self.uv_loop();
self.access.accept(m, &loop_)
}
fn accept_simultaneously(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
uvll::uv_tcp_simultaneous_accepts(self.handle, 1)
})
}
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
uvll::uv_tcp_simultaneous_accepts(self.handle, 0)
})
}
fn set_timeout(&mut self, ms: Option<u64>) {
let _m = self.fire_homing_missile();
match ms {
None => self.timeout.clear(),
Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
let loop_ = self.uv_loop();
self.access.set_timeout(ms, &loop_, &self.home);
}
fn clone(&self) -> Box<rtio::RtioTcpAcceptor + Send> {
box TcpAcceptor {
refcount: self.refcount.clone(),
home: self.home.clone(),
handle: self.handle,
access: self.access.clone(),
} as Box<rtio::RtioTcpAcceptor + Send>
}
fn close_accept(&mut self) -> Result<(), IoError> {
let m = self.fire_homing_missile();
self.access.close(m);
Ok(())
}
}
impl Drop for TcpAcceptor {
fn drop(&mut self) {
let _m = self.fire_homing_missile();
if self.refcount.decrement() {
self.close();
}
}
}
@ -482,8 +515,8 @@ pub struct UdpWatcher {
// See above for what these fields are
refcount: Refcount,
read_access: AccessTimeout,
write_access: AccessTimeout,
read_access: AccessTimeout<()>,
write_access: AccessTimeout<()>,
blocked_sender: Option<BlockedTask>,
}
@ -507,8 +540,8 @@ impl UdpWatcher {
handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
home: io.make_handle(),
refcount: Refcount::new(),
read_access: AccessTimeout::new(),
write_access: AccessTimeout::new(),
read_access: AccessTimeout::new(()),
write_access: AccessTimeout::new(()),
blocked_sender: None,
};
assert_eq!(unsafe {

View File

@ -31,20 +31,20 @@ pub struct PipeWatcher {
refcount: Refcount,
// see comments in TcpWatcher for why these exist
write_access: AccessTimeout,
read_access: AccessTimeout,
write_access: AccessTimeout<()>,
read_access: AccessTimeout<()>,
}
pub struct PipeListener {
home: HomeHandle,
pipe: *mut uvll::uv_pipe_t,
outgoing: Sender<IoResult<Box<rtio::RtioPipe + Send>>>,
incoming: Receiver<IoResult<Box<rtio::RtioPipe + Send>>>,
}
pub struct PipeAcceptor {
listener: Box<PipeListener>,
timeout: AcceptTimeout,
home: HomeHandle,
handle: *mut uvll::uv_pipe_t,
access: AcceptTimeout<Box<rtio::RtioPipe + Send>>,
refcount: Refcount,
}
// PipeWatcher implementation and traits
@ -71,8 +71,8 @@ impl PipeWatcher {
home: home,
defused: false,
refcount: Refcount::new(),
read_access: AccessTimeout::new(),
write_access: AccessTimeout::new(),
read_access: AccessTimeout::new(()),
write_access: AccessTimeout::new(()),
}
}
@ -233,12 +233,9 @@ impl PipeListener {
// If successful, unwrap the PipeWatcher because we control how
// we close the pipe differently. We can't rely on
// StreamWatcher's default close method.
let (tx, rx) = channel();
let p = box PipeListener {
home: io.make_handle(),
pipe: pipe.unwrap(),
incoming: rx,
outgoing: tx,
};
Ok(p.install())
}
@ -248,17 +245,21 @@ impl PipeListener {
}
impl rtio::RtioUnixListener for PipeListener {
fn listen(self: Box<PipeListener>)
fn listen(mut self: Box<PipeListener>)
-> IoResult<Box<rtio::RtioUnixAcceptor + Send>> {
// create the acceptor object from ourselves
let mut acceptor = box PipeAcceptor {
listener: self,
timeout: AcceptTimeout::new(),
};
let _m = self.fire_homing_missile();
// create the acceptor object from ourselves
let acceptor = (box PipeAcceptor {
handle: self.pipe,
home: self.home.clone(),
access: AcceptTimeout::new(),
refcount: Refcount::new(),
}).install();
self.pipe = 0 as *mut _;
let _m = acceptor.fire_homing_missile();
// FIXME: the 128 backlog should be configurable
match unsafe { uvll::uv_listen(acceptor.listener.pipe, 128, listen_cb) } {
match unsafe { uvll::uv_listen(acceptor.handle, 128, listen_cb) } {
0 => Ok(acceptor as Box<rtio::RtioUnixAcceptor + Send>),
n => Err(uv_error_to_io_error(UvError(n))),
}
@ -276,7 +277,7 @@ impl UvHandle<uvll::uv_pipe_t> for PipeListener {
extern fn listen_cb(server: *mut uvll::uv_stream_t, status: libc::c_int) {
assert!(status != uvll::ECANCELED);
let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) };
let pipe: &mut PipeAcceptor = unsafe { UvHandle::from_uv_handle(&server) };
let msg = match status {
0 => {
let loop_ = Loop::wrap(unsafe {
@ -288,11 +289,15 @@ extern fn listen_cb(server: *mut uvll::uv_stream_t, status: libc::c_int) {
}
n => Err(uv_error_to_io_error(UvError(n)))
};
pipe.outgoing.send(msg);
// If we're running then we have exclusive access, so the unsafe_get() is ok
unsafe { pipe.access.push(msg); }
}
impl Drop for PipeListener {
fn drop(&mut self) {
if self.pipe.is_null() { return }
let _m = self.fire_homing_missile();
self.close();
}
@ -302,19 +307,48 @@ impl Drop for PipeListener {
impl rtio::RtioUnixAcceptor for PipeAcceptor {
fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> {
self.timeout.accept(&self.listener.incoming)
let m = self.fire_homing_missile();
let loop_ = self.uv_loop();
self.access.accept(m, &loop_)
}
fn set_timeout(&mut self, timeout_ms: Option<u64>) {
match timeout_ms {
None => self.timeout.clear(),
Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
}
fn set_timeout(&mut self, ms: Option<u64>) {
let _m = self.fire_homing_missile();
let loop_ = self.uv_loop();
self.access.set_timeout(ms, &loop_, &self.home);
}
fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
box PipeAcceptor {
refcount: self.refcount.clone(),
home: self.home.clone(),
handle: self.handle,
access: self.access.clone(),
} as Box<rtio::RtioUnixAcceptor + Send>
}
fn close_accept(&mut self) -> IoResult<()> {
let m = self.fire_homing_missile();
self.access.close(m);
Ok(())
}
}
impl HomingIO for PipeAcceptor {
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home }
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
}
impl UvHandle<uvll::uv_pipe_t> for PipeAcceptor {
fn uv_handle(&self) -> *mut uvll::uv_pipe_t { self.handle }
}
impl Drop for PipeAcceptor {
fn drop(&mut self) {
let _m = self.fire_homing_missile();
if self.refcount.decrement() {
self.close();
}
}
}
#[cfg(test)]

View File

@ -14,7 +14,7 @@ use std::rt::task::BlockedTask;
use std::rt::rtio::IoResult;
use access;
use homing::{HomeHandle, HomingMissile, HomingIO};
use homing::{HomeHandle, HomingMissile};
use timer::TimerWatcher;
use uvll;
use uvio::UvIoFactory;
@ -22,15 +22,15 @@ use {Loop, UvError, uv_error_to_io_error, Request, wakeup};
use {UvHandle, wait_until_woken_after};
/// Management of a timeout when gaining access to a portion of a duplex stream.
pub struct AccessTimeout {
pub struct AccessTimeout<T> {
state: TimeoutState,
timer: Option<Box<TimerWatcher>>,
pub access: access::Access,
pub access: access::Access<T>,
}
pub struct Guard<'a> {
pub struct Guard<'a, T> {
state: &'a mut TimeoutState,
pub access: access::Guard<'a>,
pub access: access::Guard<'a, T>,
pub can_timeout: bool,
}
@ -49,17 +49,18 @@ enum ClientState {
}
struct TimerContext {
timeout: *mut AccessTimeout,
callback: fn(uint) -> Option<BlockedTask>,
payload: uint,
timeout: *mut AccessTimeout<()>,
callback: fn(*mut AccessTimeout<()>, &TimerContext),
user_unblock: fn(uint) -> Option<BlockedTask>,
user_payload: uint,
}
impl AccessTimeout {
pub fn new() -> AccessTimeout {
impl<T: Send> AccessTimeout<T> {
pub fn new(data: T) -> AccessTimeout<T> {
AccessTimeout {
state: NoTimeout,
timer: None,
access: access::Access::new(),
access: access::Access::new(data),
}
}
@ -68,7 +69,7 @@ impl AccessTimeout {
/// On success, Ok(Guard) is returned and access has been granted to the
/// stream. If a timeout occurs, then Err is returned with an appropriate
/// error.
pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult<Guard<'a>> {
pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult<Guard<'a, T>> {
// First, flag that we're attempting to acquire access. This will allow
// us to cancel the pending grant if we timeout out while waiting for a
// grant.
@ -94,6 +95,13 @@ impl AccessTimeout {
})
}
pub fn timed_out(&self) -> bool {
match self.state {
TimedOut => true,
_ => false,
}
}
/// Sets the pending timeout to the value specified.
///
/// The home/loop variables are used to construct a timer if one has not
@ -120,9 +128,10 @@ impl AccessTimeout {
if self.timer.is_none() {
let mut timer = box TimerWatcher::new_home(loop_, home.clone());
let mut cx = box TimerContext {
timeout: self as *mut _,
callback: cb,
payload: data,
timeout: self as *mut _ as *mut AccessTimeout<()>,
callback: real_cb::<T>,
user_unblock: cb,
user_payload: data,
};
unsafe {
timer.set_data(&mut *cx);
@ -135,8 +144,8 @@ impl AccessTimeout {
unsafe {
let cx = uvll::get_data_for_uv_handle(timer.handle);
let cx = cx as *mut TimerContext;
(*cx).callback = cb;
(*cx).payload = data;
(*cx).user_unblock = cb;
(*cx).user_payload = data;
}
timer.stop();
timer.start(timer_cb, ms, 0);
@ -146,7 +155,12 @@ impl AccessTimeout {
let cx: &TimerContext = unsafe {
&*(uvll::get_data_for_uv_handle(timer) as *const TimerContext)
};
let me = unsafe { &mut *cx.timeout };
(cx.callback)(cx.timeout, cx);
}
fn real_cb<T: Send>(timeout: *mut AccessTimeout<()>, cx: &TimerContext) {
let timeout = timeout as *mut AccessTimeout<T>;
let me = unsafe { &mut *timeout };
match mem::replace(&mut me.state, TimedOut) {
TimedOut | NoTimeout => unreachable!(),
@ -158,7 +172,7 @@ impl AccessTimeout {
}
}
TimeoutPending(RequestPending) => {
match (cx.callback)(cx.payload) {
match (cx.user_unblock)(cx.user_payload) {
Some(task) => task.reawaken(),
None => unreachable!(),
}
@ -168,8 +182,8 @@ impl AccessTimeout {
}
}
impl Clone for AccessTimeout {
fn clone(&self) -> AccessTimeout {
impl<T: Send> Clone for AccessTimeout<T> {
fn clone(&self) -> AccessTimeout<T> {
AccessTimeout {
access: self.access.clone(),
state: NoTimeout,
@ -179,7 +193,7 @@ impl Clone for AccessTimeout {
}
#[unsafe_destructor]
impl<'a> Drop for Guard<'a> {
impl<'a, T> Drop for Guard<'a, T> {
fn drop(&mut self) {
match *self.state {
TimeoutPending(NoWaiter) | TimeoutPending(AccessPending) =>
@ -193,7 +207,8 @@ impl<'a> Drop for Guard<'a> {
}
}
impl Drop for AccessTimeout {
#[unsafe_destructor]
impl<T> Drop for AccessTimeout<T> {
fn drop(&mut self) {
match self.timer {
Some(ref timer) => unsafe {
@ -215,12 +230,6 @@ pub struct ConnectCtx {
pub timer: Option<Box<TimerWatcher>>,
}
pub struct AcceptTimeout {
timer: Option<TimerWatcher>,
timeout_tx: Option<Sender<()>>,
timeout_rx: Option<Receiver<()>>,
}
impl ConnectCtx {
pub fn connect<T>(
mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
@ -306,88 +315,97 @@ impl ConnectCtx {
}
}
impl AcceptTimeout {
pub fn new() -> AcceptTimeout {
AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
pub struct AcceptTimeout<T> {
access: AccessTimeout<AcceptorState<T>>,
}
struct AcceptorState<T> {
blocked_acceptor: Option<BlockedTask>,
pending: Vec<IoResult<T>>,
}
impl<T: Send> AcceptTimeout<T> {
pub fn new() -> AcceptTimeout<T> {
AcceptTimeout {
access: AccessTimeout::new(AcceptorState {
blocked_acceptor: None,
pending: Vec::new(),
})
}
}
pub fn accept<T: Send>(&mut self, c: &Receiver<IoResult<T>>) -> IoResult<T> {
match self.timeout_rx {
None => c.recv(),
Some(ref rx) => {
use std::comm::Select;
// Poll the incoming channel first (don't rely on the order of
// select just yet). If someone's pending then we should return
// them immediately.
match c.try_recv() {
Ok(data) => return data,
Err(..) => {}
}
// Use select to figure out which channel gets ready first. We
// do some custom handling of select to ensure that we never
// actually drain the timeout channel (we'll keep seeing the
// timeout message in the future).
let s = Select::new();
let mut timeout = s.handle(rx);
let mut data = s.handle(c);
unsafe {
timeout.add();
data.add();
}
if s.wait() == timeout.id() {
Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
} else {
c.recv()
}
pub fn accept(&mut self,
missile: HomingMissile,
loop_: &Loop) -> IoResult<T> {
// If we've timed out but we're not closed yet, poll the state of the
// queue to see if we can peel off a connection.
if self.access.timed_out() && !self.access.access.is_closed(&missile) {
let tmp = self.access.access.get_mut(&missile);
return match tmp.pending.remove(0) {
Some(msg) => msg,
None => Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
}
}
}
pub fn clear(&mut self) {
match self.timeout_rx {
Some(ref t) => { let _ = t.try_recv(); }
// Now that we're not polling, attempt to gain access and then peel off
// a connection. If we have no pending connections, then we need to go
// to sleep and wait for one.
//
// Note that if we're woken up for a pending connection then we're
// guaranteed that the check above will not steal our connection due to
// the single-threaded nature of the event loop.
let mut guard = try!(self.access.grant(missile));
if guard.access.is_closed() {
return Err(uv_error_to_io_error(UvError(uvll::EOF)))
}
match guard.access.pending.remove(0) {
Some(msg) => return msg,
None => {}
}
match self.timer {
Some(ref mut t) => t.stop(),
None => {}
wait_until_woken_after(&mut guard.access.blocked_acceptor, loop_, || {});
match guard.access.pending.remove(0) {
_ if guard.access.is_closed() => {
Err(uv_error_to_io_error(UvError(uvll::EOF)))
}
Some(msg) => msg,
None => Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
}
}
pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
&mut self, ms: u64, t: &mut T
) {
// If we have a timeout, lazily initialize the timer which will be used
// to fire when the timeout runs out.
if self.timer.is_none() {
let loop_ = Loop::wrap(unsafe {
uvll::get_loop_for_uv_handle(t.uv_handle())
});
let mut timer = TimerWatcher::new_home(&loop_, t.home().clone());
pub unsafe fn push(&mut self, t: IoResult<T>) {
let state = self.access.access.unsafe_get();
(*state).pending.push(t);
let _ = (*state).blocked_acceptor.take().map(|t| t.reawaken());
}
pub fn set_timeout(&mut self,
ms: Option<u64>,
loop_: &Loop,
home: &HomeHandle) {
self.access.set_timeout(ms, home, loop_, cancel_accept::<T>,
self as *mut _ as uint);
fn cancel_accept<T: Send>(me: uint) -> Option<BlockedTask> {
unsafe {
timer.set_data(self as *mut _);
let me: &mut AcceptTimeout<T> = mem::transmute(me);
(*me.access.access.unsafe_get()).blocked_acceptor.take()
}
self.timer = Some(timer);
}
}
// Once we've got a timer, stop any previous timeout, reset it for the
// current one, and install some new channels to send/receive data on
let timer = self.timer.get_mut_ref();
timer.stop();
timer.start(timer_cb, ms, 0);
let (tx, rx) = channel();
self.timeout_tx = Some(tx);
self.timeout_rx = Some(rx);
extern fn timer_cb(timer: *mut uvll::uv_timer_t) {
let acceptor: &mut AcceptTimeout = unsafe {
&mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
};
// This send can never fail because if this timer is active then the
// receiving channel is guaranteed to be alive
acceptor.timeout_tx.get_ref().send(());
}
pub fn close(&mut self, m: HomingMissile) {
self.access.access.close(&m);
let task = self.access.access.get_mut(&m).blocked_acceptor.take();
drop(m);
let _ = task.map(|t| t.reawaken());
}
}
impl<T: Send> Clone for AcceptTimeout<T> {
fn clone(&self) -> AcceptTimeout<T> {
AcceptTimeout { access: self.access.clone() }
}
}

View File

@ -442,6 +442,53 @@ impl TcpAcceptor {
#[experimental = "the type of the argument and name of this function are \
subject to change"]
pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); }
/// Closes the accepting capabilities of this acceptor.
///
/// This function is similar to `TcpStream`'s `close_{read,write}` methods
/// in that it will affect *all* cloned handles of this acceptor's original
/// handle.
///
/// Once this function succeeds, all future calls to `accept` will return
/// immediately with an error, preventing all future calls to accept. The
/// underlying socket will not be relinquished back to the OS until all
/// acceptors have been deallocated.
///
/// This is useful for waking up a thread in an accept loop to indicate that
/// it should exit.
///
/// # Example
///
/// ```
/// # #![allow(experimental)]
/// use std::io::{TcpListener, Listener, Acceptor, EndOfFile};
///
/// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap();
/// let a2 = a.clone();
///
/// spawn(proc() {
/// let mut a2 = a2;
/// for socket in a2.incoming() {
/// match socket {
/// Ok(s) => { /* handle s */ }
/// Err(ref e) if e.kind == EndOfFile => break, // closed
/// Err(e) => fail!("unexpected error: {}", e),
/// }
/// }
/// });
///
/// # fn wait_for_sigint() {}
/// // Now that our accept loop is running, wait for the program to be
/// // requested to exit.
/// wait_for_sigint();
///
/// // Signal our accept loop to exit
/// assert!(a.close_accept().is_ok());
/// ```
#[experimental]
pub fn close_accept(&mut self) -> IoResult<()> {
self.obj.close_accept().map_err(IoError::from_rtio_error)
}
}
impl Acceptor<TcpStream> for TcpAcceptor {
@ -453,6 +500,25 @@ impl Acceptor<TcpStream> for TcpAcceptor {
}
}
impl Clone for TcpAcceptor {
/// Creates a new handle to this TCP acceptor, allowing for simultaneous
/// accepts.
///
/// The underlying TCP acceptor will not be closed until all handles to the
/// acceptor have been deallocated. Incoming connections will be received on
/// at most once acceptor, the same connection will not be accepted twice.
///
/// The `close_accept` method will shut down *all* acceptors cloned from the
/// same original acceptor, whereas the `set_timeout` method only affects
/// the selector that it is called on.
///
/// This function is useful for creating a handle to invoke `close_accept`
/// on to wake up any other task blocked in `accept`.
fn clone(&self) -> TcpAcceptor {
TcpAcceptor { obj: self.obj.clone() }
}
}
#[cfg(test)]
#[allow(experimental)]
mod test {
@ -1411,4 +1477,69 @@ mod test {
rxdone.recv();
rxdone.recv();
})
iotest!(fn clone_accept_smoke() {
let addr = next_test_ip4();
let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
let mut a = l.listen().unwrap();
let mut a2 = a.clone();
spawn(proc() {
let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
});
spawn(proc() {
let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
});
assert!(a.accept().is_ok());
assert!(a2.accept().is_ok());
})
iotest!(fn clone_accept_concurrent() {
let addr = next_test_ip4();
let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
let a = l.listen().unwrap();
let a2 = a.clone();
let (tx, rx) = channel();
let tx2 = tx.clone();
spawn(proc() { let mut a = a; tx.send(a.accept()) });
spawn(proc() { let mut a = a2; tx2.send(a.accept()) });
spawn(proc() {
let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
});
spawn(proc() {
let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
});
assert!(rx.recv().is_ok());
assert!(rx.recv().is_ok());
})
iotest!(fn close_accept_smoke() {
let addr = next_test_ip4();
let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
let mut a = l.listen().unwrap();
a.close_accept().unwrap();
assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
})
iotest!(fn close_accept_concurrent() {
let addr = next_test_ip4();
let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
let a = l.listen().unwrap();
let mut a2 = a.clone();
let (tx, rx) = channel();
spawn(proc() {
let mut a = a;
tx.send(a.accept());
});
a2.close_accept().unwrap();
assert_eq!(rx.recv().err().unwrap().kind, EndOfFile);
})
}

View File

@ -212,6 +212,15 @@ impl UnixAcceptor {
pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_timeout(timeout_ms)
}
/// Closes the accepting capabilities of this acceptor.
///
/// This function has the same semantics as `TcpAcceptor::close_accept`, and
/// more information can be found in that documentation.
#[experimental]
pub fn close_accept(&mut self) -> IoResult<()> {
self.obj.close_accept().map_err(IoError::from_rtio_error)
}
}
impl Acceptor<UnixStream> for UnixAcceptor {
@ -222,6 +231,25 @@ impl Acceptor<UnixStream> for UnixAcceptor {
}
}
impl Clone for UnixAcceptor {
/// Creates a new handle to this unix acceptor, allowing for simultaneous
/// accepts.
///
/// The underlying unix acceptor will not be closed until all handles to the
/// acceptor have been deallocated. Incoming connections will be received on
/// at most once acceptor, the same connection will not be accepted twice.
///
/// The `close_accept` method will shut down *all* acceptors cloned from the
/// same original acceptor, whereas the `set_timeout` method only affects
/// the selector that it is called on.
///
/// This function is useful for creating a handle to invoke `close_accept`
/// on to wake up any other task blocked in `accept`.
fn clone(&self) -> UnixAcceptor {
UnixAcceptor { obj: self.obj.clone() }
}
}
#[cfg(test)]
#[allow(experimental)]
mod tests {
@ -702,4 +730,73 @@ mod tests {
rx2.recv();
})
#[cfg(not(windows))]
iotest!(fn clone_accept_smoke() {
let addr = next_test_unix();
let l = UnixListener::bind(&addr);
let mut a = l.listen().unwrap();
let mut a2 = a.clone();
let addr2 = addr.clone();
spawn(proc() {
let _ = UnixStream::connect(&addr2);
});
spawn(proc() {
let _ = UnixStream::connect(&addr);
});
assert!(a.accept().is_ok());
drop(a);
assert!(a2.accept().is_ok());
})
iotest!(fn clone_accept_concurrent() {
let addr = next_test_unix();
let l = UnixListener::bind(&addr);
let a = l.listen().unwrap();
let a2 = a.clone();
let (tx, rx) = channel();
let tx2 = tx.clone();
spawn(proc() { let mut a = a; tx.send(a.accept()) });
spawn(proc() { let mut a = a2; tx2.send(a.accept()) });
let addr2 = addr.clone();
spawn(proc() {
let _ = UnixStream::connect(&addr2);
});
spawn(proc() {
let _ = UnixStream::connect(&addr);
});
assert!(rx.recv().is_ok());
assert!(rx.recv().is_ok());
})
iotest!(fn close_accept_smoke() {
let addr = next_test_unix();
let l = UnixListener::bind(&addr);
let mut a = l.listen().unwrap();
a.close_accept().unwrap();
assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
})
iotest!(fn close_accept_concurrent() {
let addr = next_test_unix();
let l = UnixListener::bind(&addr);
let a = l.listen().unwrap();
let mut a2 = a.clone();
let (tx, rx) = channel();
spawn(proc() {
let mut a = a;
tx.send(a.accept());
});
a2.close_accept().unwrap();
assert_eq!(rx.recv().err().unwrap().kind, EndOfFile);
})
}

View File

@ -0,0 +1,94 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
#![feature(phase)]
#[phase(plugin)]
extern crate green;
extern crate native;
use std::io::{TcpListener, Listener, Acceptor, EndOfFile, TcpStream};
use std::sync::{atomic, Arc};
use std::task::TaskBuilder;
use native::NativeTaskBuilder;
static N: uint = 8;
static M: uint = 100;
green_start!(main)
fn main() {
test();
let (tx, rx) = channel();
TaskBuilder::new().native().spawn(proc() {
tx.send(test());
});
rx.recv();
}
fn test() {
let mut l = TcpListener::bind("127.0.0.1", 0).unwrap();
let addr = l.socket_name().unwrap();
let mut a = l.listen().unwrap();
let cnt = Arc::new(atomic::AtomicUint::new(0));
let (tx, rx) = channel();
for _ in range(0, N) {
let a = a.clone();
let cnt = cnt.clone();
let tx = tx.clone();
spawn(proc() {
let mut a = a;
let mut mycnt = 0u;
loop {
match a.accept() {
Ok(..) => {
mycnt += 1;
if cnt.fetch_add(1, atomic::SeqCst) == N * M - 1 {
break
}
}
Err(ref e) if e.kind == EndOfFile => break,
Err(e) => fail!("{}", e),
}
}
assert!(mycnt > 0);
tx.send(());
});
}
for _ in range(0, N) {
let tx = tx.clone();
spawn(proc() {
for _ in range(0, M) {
let _s = TcpStream::connect(addr.ip.to_string().as_slice(),
addr.port).unwrap();
}
tx.send(());
});
}
// wait for senders
assert_eq!(rx.iter().take(N).count(), N);
// wait for one acceptor to die
let _ = rx.recv();
// Notify other receivers should die
a.close_accept().unwrap();
// wait for receivers
assert_eq!(rx.iter().take(N - 1).count(), N - 1);
// Everything should have been accepted.
assert_eq!(cnt.load(atomic::SeqCst), N * M);
}