rustuv: Implement clone/close_accept

This commits implements {Tcp,Unix}Acceptor::{clone,close_accept} methods for
all of librustuv.

This implementation rewrites much of Access, AccessTimeout, and AcceptTimeout to
have type parameter for shared state that all acceptors share (a shared queue of
sockets). The incoming/outgoing channels have been removed as all timeouts and
such are now managed on the event loop rather than concurrently.
This commit is contained in:
Alex Crichton 2014-07-14 22:48:05 -07:00
parent 110168de2a
commit cb8df7a8e3
4 changed files with 293 additions and 176 deletions

View File

@ -22,38 +22,40 @@ use std::cell::UnsafeCell;
use homing::HomingMissile; use homing::HomingMissile;
pub struct Access { pub struct Access<T> {
inner: Arc<UnsafeCell<Inner>>, inner: Arc<UnsafeCell<Inner<T>>>,
} }
pub struct Guard<'a> { pub struct Guard<'a, T> {
access: &'a mut Access, access: &'a mut Access<T>,
missile: Option<HomingMissile>, missile: Option<HomingMissile>,
} }
struct Inner { struct Inner<T> {
queue: Vec<(BlockedTask, uint)>, queue: Vec<(BlockedTask, uint)>,
held: bool, held: bool,
closed: bool, closed: bool,
data: T,
} }
impl Access { impl<T: Send> Access<T> {
pub fn new() -> Access { pub fn new(data: T) -> Access<T> {
Access { Access {
inner: Arc::new(UnsafeCell::new(Inner { inner: Arc::new(UnsafeCell::new(Inner {
queue: vec![], queue: vec![],
held: false, held: false,
closed: false, closed: false,
data: data,
})) }))
} }
} }
pub fn grant<'a>(&'a mut self, token: uint, pub fn grant<'a>(&'a mut self, token: uint,
missile: HomingMissile) -> Guard<'a> { missile: HomingMissile) -> Guard<'a, T> {
// This unsafety is actually OK because the homing missile argument // This unsafety is actually OK because the homing missile argument
// guarantees that we're on the same event loop as all the other objects // guarantees that we're on the same event loop as all the other objects
// attempting to get access granted. // attempting to get access granted.
let inner: &mut Inner = unsafe { &mut *self.inner.get() }; let inner = unsafe { &mut *self.inner.get() };
if inner.held { if inner.held {
let t: Box<Task> = Local::take(); let t: Box<Task> = Local::take();
@ -69,6 +71,15 @@ impl Access {
Guard { access: self, missile: Some(missile) } Guard { access: self, missile: Some(missile) }
} }
pub fn unsafe_get(&self) -> *mut T {
unsafe { &mut (*self.inner.get()).data as *mut _ }
}
// Safe version which requires proof that you are on the home scheduler.
pub fn get_mut<'a>(&'a mut self, _missile: &HomingMissile) -> &'a mut T {
unsafe { &mut *self.unsafe_get() }
}
pub fn close(&self, _missile: &HomingMissile) { pub fn close(&self, _missile: &HomingMissile) {
// This unsafety is OK because with a homing missile we're guaranteed to // This unsafety is OK because with a homing missile we're guaranteed to
// be the only task looking at the `closed` flag (and are therefore // be the only task looking at the `closed` flag (and are therefore
@ -82,21 +93,27 @@ impl Access {
// is only safe to invoke while on the home event loop, and there is no // is only safe to invoke while on the home event loop, and there is no
// guarantee that this i being invoked on the home event loop. // guarantee that this i being invoked on the home event loop.
pub unsafe fn dequeue(&mut self, token: uint) -> Option<BlockedTask> { pub unsafe fn dequeue(&mut self, token: uint) -> Option<BlockedTask> {
let inner: &mut Inner = &mut *self.inner.get(); let inner = &mut *self.inner.get();
match inner.queue.iter().position(|&(_, t)| t == token) { match inner.queue.iter().position(|&(_, t)| t == token) {
Some(i) => Some(inner.queue.remove(i).unwrap().val0()), Some(i) => Some(inner.queue.remove(i).unwrap().val0()),
None => None, None => None,
} }
} }
/// Test whether this access is closed, using a homing missile to prove
/// that it's safe
pub fn is_closed(&self, _missile: &HomingMissile) -> bool {
unsafe { (*self.inner.get()).closed }
}
} }
impl Clone for Access { impl<T: Send> Clone for Access<T> {
fn clone(&self) -> Access { fn clone(&self) -> Access<T> {
Access { inner: self.inner.clone() } Access { inner: self.inner.clone() }
} }
} }
impl<'a> Guard<'a> { impl<'a, T: Send> Guard<'a, T> {
pub fn is_closed(&self) -> bool { pub fn is_closed(&self) -> bool {
// See above for why this unsafety is ok, it just applies to the read // See above for why this unsafety is ok, it just applies to the read
// instead of the write. // instead of the write.
@ -104,13 +121,27 @@ impl<'a> Guard<'a> {
} }
} }
impl<'a, T: Send> Deref<T> for Guard<'a, T> {
fn deref<'a>(&'a self) -> &'a T {
// A guard represents exclusive access to a piece of data, so it's safe
// to hand out shared and mutable references
unsafe { &(*self.access.inner.get()).data }
}
}
impl<'a, T: Send> DerefMut<T> for Guard<'a, T> {
fn deref_mut<'a>(&'a mut self) -> &'a mut T {
unsafe { &mut (*self.access.inner.get()).data }
}
}
#[unsafe_destructor] #[unsafe_destructor]
impl<'a> Drop for Guard<'a> { impl<'a, T> Drop for Guard<'a, T> {
fn drop(&mut self) { fn drop(&mut self) {
// This guard's homing missile is still armed, so we're guaranteed to be // This guard's homing missile is still armed, so we're guaranteed to be
// on the same I/O event loop, so this unsafety should be ok. // on the same I/O event loop, so this unsafety should be ok.
assert!(self.missile.is_some()); assert!(self.missile.is_some());
let inner: &mut Inner = unsafe { let inner: &mut Inner<T> = unsafe {
mem::transmute(self.access.inner.get()) mem::transmute(self.access.inner.get())
}; };
@ -133,7 +164,8 @@ impl<'a> Drop for Guard<'a> {
} }
} }
impl Drop for Inner { #[unsafe_destructor]
impl<T> Drop for Inner<T> {
fn drop(&mut self) { fn drop(&mut self) {
assert!(!self.held); assert!(!self.held);
assert_eq!(self.queue.len(), 0); assert_eq!(self.queue.len(), 0);

View File

@ -22,7 +22,7 @@ use stream::StreamWatcher;
use super::{Loop, Request, UvError, Buf, status_to_io_result, use super::{Loop, Request, UvError, Buf, status_to_io_result,
uv_error_to_io_error, UvHandle, slice_to_uv_buf, uv_error_to_io_error, UvHandle, slice_to_uv_buf,
wait_until_woken_after, wakeup}; wait_until_woken_after, wakeup};
use timeout::{AccessTimeout, AcceptTimeout, ConnectCtx}; use timeout::{AccessTimeout, ConnectCtx, AcceptTimeout};
use uvio::UvIoFactory; use uvio::UvIoFactory;
use uvll; use uvll;
@ -158,20 +158,20 @@ pub struct TcpWatcher {
// stream object, so we use these access guards in order to arbitrate among // stream object, so we use these access guards in order to arbitrate among
// multiple concurrent reads and writes. Note that libuv *can* read and // multiple concurrent reads and writes. Note that libuv *can* read and
// write simultaneously, it just can't read and read simultaneously. // write simultaneously, it just can't read and read simultaneously.
read_access: AccessTimeout, read_access: AccessTimeout<()>,
write_access: AccessTimeout, write_access: AccessTimeout<()>,
} }
pub struct TcpListener { pub struct TcpListener {
home: HomeHandle, home: HomeHandle,
handle: *mut uvll::uv_pipe_t, handle: *mut uvll::uv_tcp_t,
outgoing: Sender<Result<Box<rtio::RtioTcpStream + Send>, IoError>>,
incoming: Receiver<Result<Box<rtio::RtioTcpStream + Send>, IoError>>,
} }
pub struct TcpAcceptor { pub struct TcpAcceptor {
listener: Box<TcpListener>, home: HomeHandle,
timeout: AcceptTimeout, handle: *mut uvll::uv_tcp_t,
access: AcceptTimeout<Box<rtio::RtioTcpStream + Send>>,
refcount: Refcount,
} }
// TCP watchers (clients/streams) // TCP watchers (clients/streams)
@ -192,8 +192,8 @@ impl TcpWatcher {
handle: handle, handle: handle,
stream: StreamWatcher::new(handle, true), stream: StreamWatcher::new(handle, true),
refcount: Refcount::new(), refcount: Refcount::new(),
read_access: AccessTimeout::new(), read_access: AccessTimeout::new(()),
write_access: AccessTimeout::new(), write_access: AccessTimeout::new(()),
} }
} }
@ -354,12 +354,9 @@ impl TcpListener {
assert_eq!(unsafe { assert_eq!(unsafe {
uvll::uv_tcp_init(io.uv_loop(), handle) uvll::uv_tcp_init(io.uv_loop(), handle)
}, 0); }, 0);
let (tx, rx) = channel();
let l = box TcpListener { let l = box TcpListener {
home: io.make_handle(), home: io.make_handle(),
handle: handle, handle: handle,
outgoing: tx,
incoming: rx,
}; };
let mut storage = unsafe { mem::zeroed() }; let mut storage = unsafe { mem::zeroed() };
let _len = addr_to_sockaddr(address, &mut storage); let _len = addr_to_sockaddr(address, &mut storage);
@ -392,15 +389,19 @@ impl rtio::RtioSocket for TcpListener {
impl rtio::RtioTcpListener for TcpListener { impl rtio::RtioTcpListener for TcpListener {
fn listen(self: Box<TcpListener>) fn listen(self: Box<TcpListener>)
-> Result<Box<rtio::RtioTcpAcceptor + Send>, IoError> { -> Result<Box<rtio::RtioTcpAcceptor + Send>, IoError> {
// create the acceptor object from ourselves let _m = self.fire_homing_missile();
let mut acceptor = box TcpAcceptor {
listener: self, // create the acceptor object from ourselves
timeout: AcceptTimeout::new(), let acceptor = (box TcpAcceptor {
}; handle: self.handle,
home: self.home.clone(),
access: AcceptTimeout::new(),
refcount: Refcount::new(),
}).install();
self.handle = 0 as *mut _;
let _m = acceptor.fire_homing_missile();
// FIXME: the 128 backlog should be configurable // FIXME: the 128 backlog should be configurable
match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } { match unsafe { uvll::uv_listen(acceptor.handle, 128, listen_cb) } {
0 => Ok(acceptor as Box<rtio::RtioTcpAcceptor + Send>), 0 => Ok(acceptor as Box<rtio::RtioTcpAcceptor + Send>),
n => Err(uv_error_to_io_error(UvError(n))), n => Err(uv_error_to_io_error(UvError(n))),
} }
@ -409,7 +410,7 @@ impl rtio::RtioTcpListener for TcpListener {
extern fn listen_cb(server: *mut uvll::uv_stream_t, status: c_int) { extern fn listen_cb(server: *mut uvll::uv_stream_t, status: c_int) {
assert!(status != uvll::ECANCELED); assert!(status != uvll::ECANCELED);
let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) }; let tcp: &mut TcpAcceptor = unsafe { UvHandle::from_uv_handle(&server) };
let msg = match status { let msg = match status {
0 => { 0 => {
let loop_ = Loop::wrap(unsafe { let loop_ = Loop::wrap(unsafe {
@ -421,11 +422,15 @@ extern fn listen_cb(server: *mut uvll::uv_stream_t, status: c_int) {
} }
n => Err(uv_error_to_io_error(UvError(n))) n => Err(uv_error_to_io_error(UvError(n)))
}; };
tcp.outgoing.send(msg);
// If we're running then we have exclusive access, so the unsafe_get() is ok
unsafe { tcp.access.push(msg); }
} }
impl Drop for TcpListener { impl Drop for TcpListener {
fn drop(&mut self) { fn drop(&mut self) {
if self.handle.is_null() { return }
let _m = self.fire_homing_missile(); let _m = self.fire_homing_missile();
self.close(); self.close();
} }
@ -434,40 +439,68 @@ impl Drop for TcpListener {
// TCP acceptors (bound servers) // TCP acceptors (bound servers)
impl HomingIO for TcpAcceptor { impl HomingIO for TcpAcceptor {
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() } fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
} }
impl rtio::RtioSocket for TcpAcceptor { impl rtio::RtioSocket for TcpAcceptor {
fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> { fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
let _m = self.fire_homing_missile(); let _m = self.fire_homing_missile();
socket_name(Tcp, self.listener.handle) socket_name(Tcp, self.handle)
} }
} }
impl UvHandle<uvll::uv_tcp_t> for TcpAcceptor {
fn uv_handle(&self) -> *mut uvll::uv_tcp_t { self.handle }
}
impl rtio::RtioTcpAcceptor for TcpAcceptor { impl rtio::RtioTcpAcceptor for TcpAcceptor {
fn accept(&mut self) -> Result<Box<rtio::RtioTcpStream + Send>, IoError> { fn accept(&mut self) -> Result<Box<rtio::RtioTcpStream + Send>, IoError> {
self.timeout.accept(&self.listener.incoming) let m = self.fire_homing_missile();
let loop_ = self.uv_loop();
self.access.accept(m, &loop_)
} }
fn accept_simultaneously(&mut self) -> Result<(), IoError> { fn accept_simultaneously(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile(); let _m = self.fire_homing_missile();
status_to_io_result(unsafe { status_to_io_result(unsafe {
uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1) uvll::uv_tcp_simultaneous_accepts(self.handle, 1)
}) })
} }
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> { fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile(); let _m = self.fire_homing_missile();
status_to_io_result(unsafe { status_to_io_result(unsafe {
uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0) uvll::uv_tcp_simultaneous_accepts(self.handle, 0)
}) })
} }
fn set_timeout(&mut self, ms: Option<u64>) { fn set_timeout(&mut self, ms: Option<u64>) {
let _m = self.fire_homing_missile(); let _m = self.fire_homing_missile();
match ms { let loop_ = self.uv_loop();
None => self.timeout.clear(), self.access.set_timeout(ms, &loop_, &self.home);
Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener), }
fn clone(&self) -> Box<rtio::RtioTcpAcceptor + Send> {
box TcpAcceptor {
refcount: self.refcount.clone(),
home: self.home.clone(),
handle: self.handle,
access: self.access.clone(),
} as Box<rtio::RtioTcpAcceptor + Send>
}
fn close_accept(&mut self) -> Result<(), IoError> {
let m = self.fire_homing_missile();
self.access.close(m);
Ok(())
}
}
impl Drop for TcpAcceptor {
fn drop(&mut self) {
let _m = self.fire_homing_missile();
if self.refcount.decrement() {
self.close();
} }
} }
} }
@ -482,8 +515,8 @@ pub struct UdpWatcher {
// See above for what these fields are // See above for what these fields are
refcount: Refcount, refcount: Refcount,
read_access: AccessTimeout, read_access: AccessTimeout<()>,
write_access: AccessTimeout, write_access: AccessTimeout<()>,
blocked_sender: Option<BlockedTask>, blocked_sender: Option<BlockedTask>,
} }
@ -507,8 +540,8 @@ impl UdpWatcher {
handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) }, handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
home: io.make_handle(), home: io.make_handle(),
refcount: Refcount::new(), refcount: Refcount::new(),
read_access: AccessTimeout::new(), read_access: AccessTimeout::new(()),
write_access: AccessTimeout::new(), write_access: AccessTimeout::new(()),
blocked_sender: None, blocked_sender: None,
}; };
assert_eq!(unsafe { assert_eq!(unsafe {

View File

@ -31,20 +31,20 @@ pub struct PipeWatcher {
refcount: Refcount, refcount: Refcount,
// see comments in TcpWatcher for why these exist // see comments in TcpWatcher for why these exist
write_access: AccessTimeout, write_access: AccessTimeout<()>,
read_access: AccessTimeout, read_access: AccessTimeout<()>,
} }
pub struct PipeListener { pub struct PipeListener {
home: HomeHandle, home: HomeHandle,
pipe: *mut uvll::uv_pipe_t, pipe: *mut uvll::uv_pipe_t,
outgoing: Sender<IoResult<Box<rtio::RtioPipe + Send>>>,
incoming: Receiver<IoResult<Box<rtio::RtioPipe + Send>>>,
} }
pub struct PipeAcceptor { pub struct PipeAcceptor {
listener: Box<PipeListener>, home: HomeHandle,
timeout: AcceptTimeout, handle: *mut uvll::uv_pipe_t,
access: AcceptTimeout<Box<rtio::RtioPipe + Send>>,
refcount: Refcount,
} }
// PipeWatcher implementation and traits // PipeWatcher implementation and traits
@ -71,8 +71,8 @@ impl PipeWatcher {
home: home, home: home,
defused: false, defused: false,
refcount: Refcount::new(), refcount: Refcount::new(),
read_access: AccessTimeout::new(), read_access: AccessTimeout::new(()),
write_access: AccessTimeout::new(), write_access: AccessTimeout::new(()),
} }
} }
@ -233,12 +233,9 @@ impl PipeListener {
// If successful, unwrap the PipeWatcher because we control how // If successful, unwrap the PipeWatcher because we control how
// we close the pipe differently. We can't rely on // we close the pipe differently. We can't rely on
// StreamWatcher's default close method. // StreamWatcher's default close method.
let (tx, rx) = channel();
let p = box PipeListener { let p = box PipeListener {
home: io.make_handle(), home: io.make_handle(),
pipe: pipe.unwrap(), pipe: pipe.unwrap(),
incoming: rx,
outgoing: tx,
}; };
Ok(p.install()) Ok(p.install())
} }
@ -250,15 +247,19 @@ impl PipeListener {
impl rtio::RtioUnixListener for PipeListener { impl rtio::RtioUnixListener for PipeListener {
fn listen(self: Box<PipeListener>) fn listen(self: Box<PipeListener>)
-> IoResult<Box<rtio::RtioUnixAcceptor + Send>> { -> IoResult<Box<rtio::RtioUnixAcceptor + Send>> {
// create the acceptor object from ourselves let _m = self.fire_homing_missile();
let mut acceptor = box PipeAcceptor {
listener: self, // create the acceptor object from ourselves
timeout: AcceptTimeout::new(), let acceptor = (box PipeAcceptor {
}; handle: self.pipe,
home: self.home.clone(),
access: AcceptTimeout::new(),
refcount: Refcount::new(),
}).install();
self.pipe = 0 as *mut _;
let _m = acceptor.fire_homing_missile();
// FIXME: the 128 backlog should be configurable // FIXME: the 128 backlog should be configurable
match unsafe { uvll::uv_listen(acceptor.listener.pipe, 128, listen_cb) } { match unsafe { uvll::uv_listen(acceptor.handle, 128, listen_cb) } {
0 => Ok(acceptor as Box<rtio::RtioUnixAcceptor + Send>), 0 => Ok(acceptor as Box<rtio::RtioUnixAcceptor + Send>),
n => Err(uv_error_to_io_error(UvError(n))), n => Err(uv_error_to_io_error(UvError(n))),
} }
@ -276,7 +277,7 @@ impl UvHandle<uvll::uv_pipe_t> for PipeListener {
extern fn listen_cb(server: *mut uvll::uv_stream_t, status: libc::c_int) { extern fn listen_cb(server: *mut uvll::uv_stream_t, status: libc::c_int) {
assert!(status != uvll::ECANCELED); assert!(status != uvll::ECANCELED);
let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) }; let pipe: &mut PipeAcceptor = unsafe { UvHandle::from_uv_handle(&server) };
let msg = match status { let msg = match status {
0 => { 0 => {
let loop_ = Loop::wrap(unsafe { let loop_ = Loop::wrap(unsafe {
@ -288,11 +289,15 @@ extern fn listen_cb(server: *mut uvll::uv_stream_t, status: libc::c_int) {
} }
n => Err(uv_error_to_io_error(UvError(n))) n => Err(uv_error_to_io_error(UvError(n)))
}; };
pipe.outgoing.send(msg);
// If we're running then we have exclusive access, so the unsafe_get() is ok
unsafe { pipe.access.push(msg); }
} }
impl Drop for PipeListener { impl Drop for PipeListener {
fn drop(&mut self) { fn drop(&mut self) {
if self.pipe.is_null() { return }
let _m = self.fire_homing_missile(); let _m = self.fire_homing_missile();
self.close(); self.close();
} }
@ -302,19 +307,48 @@ impl Drop for PipeListener {
impl rtio::RtioUnixAcceptor for PipeAcceptor { impl rtio::RtioUnixAcceptor for PipeAcceptor {
fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> { fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> {
self.timeout.accept(&self.listener.incoming) let m = self.fire_homing_missile();
let loop_ = self.uv_loop();
self.access.accept(m, &loop_)
} }
fn set_timeout(&mut self, timeout_ms: Option<u64>) { fn set_timeout(&mut self, ms: Option<u64>) {
match timeout_ms { let _m = self.fire_homing_missile();
None => self.timeout.clear(), let loop_ = self.uv_loop();
Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener), self.access.set_timeout(ms, &loop_, &self.home);
} }
fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
box PipeAcceptor {
refcount: self.refcount.clone(),
home: self.home.clone(),
handle: self.handle,
access: self.access.clone(),
} as Box<rtio::RtioUnixAcceptor + Send>
}
fn close_accept(&mut self) -> IoResult<()> {
let m = self.fire_homing_missile();
self.access.close(m);
Ok(())
} }
} }
impl HomingIO for PipeAcceptor { impl HomingIO for PipeAcceptor {
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home } fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
}
impl UvHandle<uvll::uv_pipe_t> for PipeAcceptor {
fn uv_handle(&self) -> *mut uvll::uv_pipe_t { self.handle }
}
impl Drop for PipeAcceptor {
fn drop(&mut self) {
let _m = self.fire_homing_missile();
if self.refcount.decrement() {
self.close();
}
}
} }
#[cfg(test)] #[cfg(test)]

View File

@ -14,7 +14,7 @@ use std::rt::task::BlockedTask;
use std::rt::rtio::IoResult; use std::rt::rtio::IoResult;
use access; use access;
use homing::{HomeHandle, HomingMissile, HomingIO}; use homing::{HomeHandle, HomingMissile};
use timer::TimerWatcher; use timer::TimerWatcher;
use uvll; use uvll;
use uvio::UvIoFactory; use uvio::UvIoFactory;
@ -22,15 +22,15 @@ use {Loop, UvError, uv_error_to_io_error, Request, wakeup};
use {UvHandle, wait_until_woken_after}; use {UvHandle, wait_until_woken_after};
/// Management of a timeout when gaining access to a portion of a duplex stream. /// Management of a timeout when gaining access to a portion of a duplex stream.
pub struct AccessTimeout { pub struct AccessTimeout<T> {
state: TimeoutState, state: TimeoutState,
timer: Option<Box<TimerWatcher>>, timer: Option<Box<TimerWatcher>>,
pub access: access::Access, pub access: access::Access<T>,
} }
pub struct Guard<'a> { pub struct Guard<'a, T> {
state: &'a mut TimeoutState, state: &'a mut TimeoutState,
pub access: access::Guard<'a>, pub access: access::Guard<'a, T>,
pub can_timeout: bool, pub can_timeout: bool,
} }
@ -49,17 +49,18 @@ enum ClientState {
} }
struct TimerContext { struct TimerContext {
timeout: *mut AccessTimeout, timeout: *mut AccessTimeout<()>,
callback: fn(uint) -> Option<BlockedTask>, callback: fn(*mut AccessTimeout<()>, &TimerContext),
payload: uint, user_unblock: fn(uint) -> Option<BlockedTask>,
user_payload: uint,
} }
impl AccessTimeout { impl<T: Send> AccessTimeout<T> {
pub fn new() -> AccessTimeout { pub fn new(data: T) -> AccessTimeout<T> {
AccessTimeout { AccessTimeout {
state: NoTimeout, state: NoTimeout,
timer: None, timer: None,
access: access::Access::new(), access: access::Access::new(data),
} }
} }
@ -68,7 +69,7 @@ impl AccessTimeout {
/// On success, Ok(Guard) is returned and access has been granted to the /// On success, Ok(Guard) is returned and access has been granted to the
/// stream. If a timeout occurs, then Err is returned with an appropriate /// stream. If a timeout occurs, then Err is returned with an appropriate
/// error. /// error.
pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult<Guard<'a>> { pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult<Guard<'a, T>> {
// First, flag that we're attempting to acquire access. This will allow // First, flag that we're attempting to acquire access. This will allow
// us to cancel the pending grant if we timeout out while waiting for a // us to cancel the pending grant if we timeout out while waiting for a
// grant. // grant.
@ -94,6 +95,13 @@ impl AccessTimeout {
}) })
} }
pub fn timed_out(&self) -> bool {
match self.state {
TimedOut => true,
_ => false,
}
}
/// Sets the pending timeout to the value specified. /// Sets the pending timeout to the value specified.
/// ///
/// The home/loop variables are used to construct a timer if one has not /// The home/loop variables are used to construct a timer if one has not
@ -120,9 +128,10 @@ impl AccessTimeout {
if self.timer.is_none() { if self.timer.is_none() {
let mut timer = box TimerWatcher::new_home(loop_, home.clone()); let mut timer = box TimerWatcher::new_home(loop_, home.clone());
let mut cx = box TimerContext { let mut cx = box TimerContext {
timeout: self as *mut _, timeout: self as *mut _ as *mut AccessTimeout<()>,
callback: cb, callback: real_cb::<T>,
payload: data, user_unblock: cb,
user_payload: data,
}; };
unsafe { unsafe {
timer.set_data(&mut *cx); timer.set_data(&mut *cx);
@ -135,8 +144,8 @@ impl AccessTimeout {
unsafe { unsafe {
let cx = uvll::get_data_for_uv_handle(timer.handle); let cx = uvll::get_data_for_uv_handle(timer.handle);
let cx = cx as *mut TimerContext; let cx = cx as *mut TimerContext;
(*cx).callback = cb; (*cx).user_unblock = cb;
(*cx).payload = data; (*cx).user_payload = data;
} }
timer.stop(); timer.stop();
timer.start(timer_cb, ms, 0); timer.start(timer_cb, ms, 0);
@ -146,7 +155,12 @@ impl AccessTimeout {
let cx: &TimerContext = unsafe { let cx: &TimerContext = unsafe {
&*(uvll::get_data_for_uv_handle(timer) as *const TimerContext) &*(uvll::get_data_for_uv_handle(timer) as *const TimerContext)
}; };
let me = unsafe { &mut *cx.timeout }; (cx.callback)(cx.timeout, cx);
}
fn real_cb<T: Send>(timeout: *mut AccessTimeout<()>, cx: &TimerContext) {
let timeout = timeout as *mut AccessTimeout<T>;
let me = unsafe { &mut *timeout };
match mem::replace(&mut me.state, TimedOut) { match mem::replace(&mut me.state, TimedOut) {
TimedOut | NoTimeout => unreachable!(), TimedOut | NoTimeout => unreachable!(),
@ -158,7 +172,7 @@ impl AccessTimeout {
} }
} }
TimeoutPending(RequestPending) => { TimeoutPending(RequestPending) => {
match (cx.callback)(cx.payload) { match (cx.user_unblock)(cx.user_payload) {
Some(task) => task.reawaken(), Some(task) => task.reawaken(),
None => unreachable!(), None => unreachable!(),
} }
@ -168,8 +182,8 @@ impl AccessTimeout {
} }
} }
impl Clone for AccessTimeout { impl<T: Send> Clone for AccessTimeout<T> {
fn clone(&self) -> AccessTimeout { fn clone(&self) -> AccessTimeout<T> {
AccessTimeout { AccessTimeout {
access: self.access.clone(), access: self.access.clone(),
state: NoTimeout, state: NoTimeout,
@ -179,7 +193,7 @@ impl Clone for AccessTimeout {
} }
#[unsafe_destructor] #[unsafe_destructor]
impl<'a> Drop for Guard<'a> { impl<'a, T> Drop for Guard<'a, T> {
fn drop(&mut self) { fn drop(&mut self) {
match *self.state { match *self.state {
TimeoutPending(NoWaiter) | TimeoutPending(AccessPending) => TimeoutPending(NoWaiter) | TimeoutPending(AccessPending) =>
@ -193,7 +207,8 @@ impl<'a> Drop for Guard<'a> {
} }
} }
impl Drop for AccessTimeout { #[unsafe_destructor]
impl<T> Drop for AccessTimeout<T> {
fn drop(&mut self) { fn drop(&mut self) {
match self.timer { match self.timer {
Some(ref timer) => unsafe { Some(ref timer) => unsafe {
@ -215,12 +230,6 @@ pub struct ConnectCtx {
pub timer: Option<Box<TimerWatcher>>, pub timer: Option<Box<TimerWatcher>>,
} }
pub struct AcceptTimeout {
timer: Option<TimerWatcher>,
timeout_tx: Option<Sender<()>>,
timeout_rx: Option<Receiver<()>>,
}
impl ConnectCtx { impl ConnectCtx {
pub fn connect<T>( pub fn connect<T>(
mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory, mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
@ -306,88 +315,97 @@ impl ConnectCtx {
} }
} }
impl AcceptTimeout { pub struct AcceptTimeout<T> {
pub fn new() -> AcceptTimeout { access: AccessTimeout<AcceptorState<T>>,
AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
} }
pub fn accept<T: Send>(&mut self, c: &Receiver<IoResult<T>>) -> IoResult<T> { struct AcceptorState<T> {
match self.timeout_rx { blocked_acceptor: Option<BlockedTask>,
None => c.recv(), pending: Vec<IoResult<T>>,
Some(ref rx) => {
use std::comm::Select;
// Poll the incoming channel first (don't rely on the order of
// select just yet). If someone's pending then we should return
// them immediately.
match c.try_recv() {
Ok(data) => return data,
Err(..) => {}
} }
// Use select to figure out which channel gets ready first. We impl<T: Send> AcceptTimeout<T> {
// do some custom handling of select to ensure that we never pub fn new() -> AcceptTimeout<T> {
// actually drain the timeout channel (we'll keep seeing the AcceptTimeout {
// timeout message in the future). access: AccessTimeout::new(AcceptorState {
let s = Select::new(); blocked_acceptor: None,
let mut timeout = s.handle(rx); pending: Vec::new(),
let mut data = s.handle(c); })
unsafe {
timeout.add();
data.add();
}
if s.wait() == timeout.id() {
Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
} else {
c.recv()
}
}
} }
} }
pub fn clear(&mut self) { pub fn accept(&mut self,
match self.timeout_rx { missile: HomingMissile,
Some(ref t) => { let _ = t.try_recv(); } loop_: &Loop) -> IoResult<T> {
// If we've timed out but we're not closed yet, poll the state of the
// queue to see if we can peel off a connection.
if self.access.timed_out() && !self.access.access.is_closed(&missile) {
let tmp = self.access.access.get_mut(&missile);
return match tmp.pending.remove(0) {
Some(msg) => msg,
None => Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
}
}
// Now that we're not polling, attempt to gain access and then peel off
// a connection. If we have no pending connections, then we need to go
// to sleep and wait for one.
//
// Note that if we're woken up for a pending connection then we're
// guaranteed that the check above will not steal our connection due to
// the single-threaded nature of the event loop.
let mut guard = try!(self.access.grant(missile));
if guard.access.is_closed() {
return Err(uv_error_to_io_error(UvError(uvll::EOF)))
}
match guard.access.pending.remove(0) {
Some(msg) => return msg,
None => {} None => {}
} }
match self.timer {
Some(ref mut t) => t.stop(), wait_until_woken_after(&mut guard.access.blocked_acceptor, loop_, || {});
None => {}
match guard.access.pending.remove(0) {
_ if guard.access.is_closed() => {
Err(uv_error_to_io_error(UvError(uvll::EOF)))
}
Some(msg) => msg,
None => Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
} }
} }
pub fn set_timeout<U, T: UvHandle<U> + HomingIO>( pub unsafe fn push(&mut self, t: IoResult<T>) {
&mut self, ms: u64, t: &mut T let state = self.access.access.unsafe_get();
) { (*state).pending.push(t);
// If we have a timeout, lazily initialize the timer which will be used let _ = (*state).blocked_acceptor.take().map(|t| t.reawaken());
// to fire when the timeout runs out. }
if self.timer.is_none() {
let loop_ = Loop::wrap(unsafe { pub fn set_timeout(&mut self,
uvll::get_loop_for_uv_handle(t.uv_handle()) ms: Option<u64>,
}); loop_: &Loop,
let mut timer = TimerWatcher::new_home(&loop_, t.home().clone()); home: &HomeHandle) {
self.access.set_timeout(ms, home, loop_, cancel_accept::<T>,
self as *mut _ as uint);
fn cancel_accept<T: Send>(me: uint) -> Option<BlockedTask> {
unsafe { unsafe {
timer.set_data(self as *mut _); let me: &mut AcceptTimeout<T> = mem::transmute(me);
(*me.access.access.unsafe_get()).blocked_acceptor.take()
}
} }
self.timer = Some(timer);
} }
// Once we've got a timer, stop any previous timeout, reset it for the pub fn close(&mut self, m: HomingMissile) {
// current one, and install some new channels to send/receive data on self.access.access.close(&m);
let timer = self.timer.get_mut_ref(); let task = self.access.access.get_mut(&m).blocked_acceptor.take();
timer.stop(); drop(m);
timer.start(timer_cb, ms, 0); let _ = task.map(|t| t.reawaken());
let (tx, rx) = channel(); }
self.timeout_tx = Some(tx); }
self.timeout_rx = Some(rx);
extern fn timer_cb(timer: *mut uvll::uv_timer_t) { impl<T: Send> Clone for AcceptTimeout<T> {
let acceptor: &mut AcceptTimeout = unsafe { fn clone(&self) -> AcceptTimeout<T> {
&mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout) AcceptTimeout { access: self.access.clone() }
};
// This send can never fail because if this timer is active then the
// receiving channel is guaranteed to be alive
acceptor.timeout_tx.get_ref().send(());
}
} }
} }