From cb8df7a8e3c53f16d22f76da50e6e2e4734bdf62 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 14 Jul 2014 22:48:05 -0700 Subject: [PATCH] rustuv: Implement clone/close_accept This commits implements {Tcp,Unix}Acceptor::{clone,close_accept} methods for all of librustuv. This implementation rewrites much of Access, AccessTimeout, and AcceptTimeout to have type parameter for shared state that all acceptors share (a shared queue of sockets). The incoming/outgoing channels have been removed as all timeouts and such are now managed on the event loop rather than concurrently. --- src/librustuv/access.rs | 64 +++++++++--- src/librustuv/net.rs | 103 ++++++++++++------- src/librustuv/pipe.rs | 88 +++++++++++----- src/librustuv/timeout.rs | 214 +++++++++++++++++++++------------------ 4 files changed, 293 insertions(+), 176 deletions(-) diff --git a/src/librustuv/access.rs b/src/librustuv/access.rs index 9bd8af6419e..290293cf086 100644 --- a/src/librustuv/access.rs +++ b/src/librustuv/access.rs @@ -22,38 +22,40 @@ use std::cell::UnsafeCell; use homing::HomingMissile; -pub struct Access { - inner: Arc>, +pub struct Access { + inner: Arc>>, } -pub struct Guard<'a> { - access: &'a mut Access, +pub struct Guard<'a, T> { + access: &'a mut Access, missile: Option, } -struct Inner { +struct Inner { queue: Vec<(BlockedTask, uint)>, held: bool, closed: bool, + data: T, } -impl Access { - pub fn new() -> Access { +impl Access { + pub fn new(data: T) -> Access { Access { inner: Arc::new(UnsafeCell::new(Inner { queue: vec![], held: false, closed: false, + data: data, })) } } pub fn grant<'a>(&'a mut self, token: uint, - missile: HomingMissile) -> Guard<'a> { + missile: HomingMissile) -> Guard<'a, T> { // This unsafety is actually OK because the homing missile argument // guarantees that we're on the same event loop as all the other objects // attempting to get access granted. - let inner: &mut Inner = unsafe { &mut *self.inner.get() }; + let inner = unsafe { &mut *self.inner.get() }; if inner.held { let t: Box = Local::take(); @@ -69,6 +71,15 @@ impl Access { Guard { access: self, missile: Some(missile) } } + pub fn unsafe_get(&self) -> *mut T { + unsafe { &mut (*self.inner.get()).data as *mut _ } + } + + // Safe version which requires proof that you are on the home scheduler. + pub fn get_mut<'a>(&'a mut self, _missile: &HomingMissile) -> &'a mut T { + unsafe { &mut *self.unsafe_get() } + } + pub fn close(&self, _missile: &HomingMissile) { // This unsafety is OK because with a homing missile we're guaranteed to // be the only task looking at the `closed` flag (and are therefore @@ -82,21 +93,27 @@ impl Access { // is only safe to invoke while on the home event loop, and there is no // guarantee that this i being invoked on the home event loop. pub unsafe fn dequeue(&mut self, token: uint) -> Option { - let inner: &mut Inner = &mut *self.inner.get(); + let inner = &mut *self.inner.get(); match inner.queue.iter().position(|&(_, t)| t == token) { Some(i) => Some(inner.queue.remove(i).unwrap().val0()), None => None, } } + + /// Test whether this access is closed, using a homing missile to prove + /// that it's safe + pub fn is_closed(&self, _missile: &HomingMissile) -> bool { + unsafe { (*self.inner.get()).closed } + } } -impl Clone for Access { - fn clone(&self) -> Access { +impl Clone for Access { + fn clone(&self) -> Access { Access { inner: self.inner.clone() } } } -impl<'a> Guard<'a> { +impl<'a, T: Send> Guard<'a, T> { pub fn is_closed(&self) -> bool { // See above for why this unsafety is ok, it just applies to the read // instead of the write. @@ -104,13 +121,27 @@ impl<'a> Guard<'a> { } } +impl<'a, T: Send> Deref for Guard<'a, T> { + fn deref<'a>(&'a self) -> &'a T { + // A guard represents exclusive access to a piece of data, so it's safe + // to hand out shared and mutable references + unsafe { &(*self.access.inner.get()).data } + } +} + +impl<'a, T: Send> DerefMut for Guard<'a, T> { + fn deref_mut<'a>(&'a mut self) -> &'a mut T { + unsafe { &mut (*self.access.inner.get()).data } + } +} + #[unsafe_destructor] -impl<'a> Drop for Guard<'a> { +impl<'a, T> Drop for Guard<'a, T> { fn drop(&mut self) { // This guard's homing missile is still armed, so we're guaranteed to be // on the same I/O event loop, so this unsafety should be ok. assert!(self.missile.is_some()); - let inner: &mut Inner = unsafe { + let inner: &mut Inner = unsafe { mem::transmute(self.access.inner.get()) }; @@ -133,7 +164,8 @@ impl<'a> Drop for Guard<'a> { } } -impl Drop for Inner { +#[unsafe_destructor] +impl Drop for Inner { fn drop(&mut self) { assert!(!self.held); assert_eq!(self.queue.len(), 0); diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 0da8d0d2108..b1359840247 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -22,7 +22,7 @@ use stream::StreamWatcher; use super::{Loop, Request, UvError, Buf, status_to_io_result, uv_error_to_io_error, UvHandle, slice_to_uv_buf, wait_until_woken_after, wakeup}; -use timeout::{AccessTimeout, AcceptTimeout, ConnectCtx}; +use timeout::{AccessTimeout, ConnectCtx, AcceptTimeout}; use uvio::UvIoFactory; use uvll; @@ -158,20 +158,20 @@ pub struct TcpWatcher { // stream object, so we use these access guards in order to arbitrate among // multiple concurrent reads and writes. Note that libuv *can* read and // write simultaneously, it just can't read and read simultaneously. - read_access: AccessTimeout, - write_access: AccessTimeout, + read_access: AccessTimeout<()>, + write_access: AccessTimeout<()>, } pub struct TcpListener { home: HomeHandle, - handle: *mut uvll::uv_pipe_t, - outgoing: Sender, IoError>>, - incoming: Receiver, IoError>>, + handle: *mut uvll::uv_tcp_t, } pub struct TcpAcceptor { - listener: Box, - timeout: AcceptTimeout, + home: HomeHandle, + handle: *mut uvll::uv_tcp_t, + access: AcceptTimeout>, + refcount: Refcount, } // TCP watchers (clients/streams) @@ -192,8 +192,8 @@ impl TcpWatcher { handle: handle, stream: StreamWatcher::new(handle, true), refcount: Refcount::new(), - read_access: AccessTimeout::new(), - write_access: AccessTimeout::new(), + read_access: AccessTimeout::new(()), + write_access: AccessTimeout::new(()), } } @@ -291,7 +291,7 @@ impl rtio::RtioTcpStream for TcpWatcher { let task = { let m = self.fire_homing_missile(); self.read_access.access.close(&m); - self.stream.cancel_read(uvll::EOF as libc::ssize_t) + self.stream.cancel_read(uvll::EOF as libc::ssize_t) }; let _ = task.map(|t| t.reawaken()); Ok(()) @@ -354,12 +354,9 @@ impl TcpListener { assert_eq!(unsafe { uvll::uv_tcp_init(io.uv_loop(), handle) }, 0); - let (tx, rx) = channel(); let l = box TcpListener { home: io.make_handle(), handle: handle, - outgoing: tx, - incoming: rx, }; let mut storage = unsafe { mem::zeroed() }; let _len = addr_to_sockaddr(address, &mut storage); @@ -392,15 +389,19 @@ impl rtio::RtioSocket for TcpListener { impl rtio::RtioTcpListener for TcpListener { fn listen(self: Box) -> Result, IoError> { - // create the acceptor object from ourselves - let mut acceptor = box TcpAcceptor { - listener: self, - timeout: AcceptTimeout::new(), - }; + let _m = self.fire_homing_missile(); + + // create the acceptor object from ourselves + let acceptor = (box TcpAcceptor { + handle: self.handle, + home: self.home.clone(), + access: AcceptTimeout::new(), + refcount: Refcount::new(), + }).install(); + self.handle = 0 as *mut _; - let _m = acceptor.fire_homing_missile(); // FIXME: the 128 backlog should be configurable - match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } { + match unsafe { uvll::uv_listen(acceptor.handle, 128, listen_cb) } { 0 => Ok(acceptor as Box), n => Err(uv_error_to_io_error(UvError(n))), } @@ -409,7 +410,7 @@ impl rtio::RtioTcpListener for TcpListener { extern fn listen_cb(server: *mut uvll::uv_stream_t, status: c_int) { assert!(status != uvll::ECANCELED); - let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) }; + let tcp: &mut TcpAcceptor = unsafe { UvHandle::from_uv_handle(&server) }; let msg = match status { 0 => { let loop_ = Loop::wrap(unsafe { @@ -421,11 +422,15 @@ extern fn listen_cb(server: *mut uvll::uv_stream_t, status: c_int) { } n => Err(uv_error_to_io_error(UvError(n))) }; - tcp.outgoing.send(msg); + + // If we're running then we have exclusive access, so the unsafe_get() is ok + unsafe { tcp.access.push(msg); } } impl Drop for TcpListener { fn drop(&mut self) { + if self.handle.is_null() { return } + let _m = self.fire_homing_missile(); self.close(); } @@ -434,40 +439,68 @@ impl Drop for TcpListener { // TCP acceptors (bound servers) impl HomingIO for TcpAcceptor { - fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() } + fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home } } impl rtio::RtioSocket for TcpAcceptor { fn socket_name(&mut self) -> Result { let _m = self.fire_homing_missile(); - socket_name(Tcp, self.listener.handle) + socket_name(Tcp, self.handle) } } +impl UvHandle for TcpAcceptor { + fn uv_handle(&self) -> *mut uvll::uv_tcp_t { self.handle } +} + impl rtio::RtioTcpAcceptor for TcpAcceptor { fn accept(&mut self) -> Result, IoError> { - self.timeout.accept(&self.listener.incoming) + let m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.access.accept(m, &loop_) } fn accept_simultaneously(&mut self) -> Result<(), IoError> { let _m = self.fire_homing_missile(); status_to_io_result(unsafe { - uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1) + uvll::uv_tcp_simultaneous_accepts(self.handle, 1) }) } fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> { let _m = self.fire_homing_missile(); status_to_io_result(unsafe { - uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0) + uvll::uv_tcp_simultaneous_accepts(self.handle, 0) }) } fn set_timeout(&mut self, ms: Option) { let _m = self.fire_homing_missile(); - match ms { - None => self.timeout.clear(), - Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener), + let loop_ = self.uv_loop(); + self.access.set_timeout(ms, &loop_, &self.home); + } + + fn clone(&self) -> Box { + box TcpAcceptor { + refcount: self.refcount.clone(), + home: self.home.clone(), + handle: self.handle, + access: self.access.clone(), + } as Box + } + + fn close_accept(&mut self) -> Result<(), IoError> { + let m = self.fire_homing_missile(); + self.access.close(m); + Ok(()) + } +} + +impl Drop for TcpAcceptor { + fn drop(&mut self) { + let _m = self.fire_homing_missile(); + if self.refcount.decrement() { + self.close(); } } } @@ -482,8 +515,8 @@ pub struct UdpWatcher { // See above for what these fields are refcount: Refcount, - read_access: AccessTimeout, - write_access: AccessTimeout, + read_access: AccessTimeout<()>, + write_access: AccessTimeout<()>, blocked_sender: Option, } @@ -507,8 +540,8 @@ impl UdpWatcher { handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) }, home: io.make_handle(), refcount: Refcount::new(), - read_access: AccessTimeout::new(), - write_access: AccessTimeout::new(), + read_access: AccessTimeout::new(()), + write_access: AccessTimeout::new(()), blocked_sender: None, }; assert_eq!(unsafe { diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index f0a57546ed4..aa89e5e5f03 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -31,20 +31,20 @@ pub struct PipeWatcher { refcount: Refcount, // see comments in TcpWatcher for why these exist - write_access: AccessTimeout, - read_access: AccessTimeout, + write_access: AccessTimeout<()>, + read_access: AccessTimeout<()>, } pub struct PipeListener { home: HomeHandle, pipe: *mut uvll::uv_pipe_t, - outgoing: Sender>>, - incoming: Receiver>>, } pub struct PipeAcceptor { - listener: Box, - timeout: AcceptTimeout, + home: HomeHandle, + handle: *mut uvll::uv_pipe_t, + access: AcceptTimeout>, + refcount: Refcount, } // PipeWatcher implementation and traits @@ -71,8 +71,8 @@ impl PipeWatcher { home: home, defused: false, refcount: Refcount::new(), - read_access: AccessTimeout::new(), - write_access: AccessTimeout::new(), + read_access: AccessTimeout::new(()), + write_access: AccessTimeout::new(()), } } @@ -233,12 +233,9 @@ impl PipeListener { // If successful, unwrap the PipeWatcher because we control how // we close the pipe differently. We can't rely on // StreamWatcher's default close method. - let (tx, rx) = channel(); let p = box PipeListener { home: io.make_handle(), pipe: pipe.unwrap(), - incoming: rx, - outgoing: tx, }; Ok(p.install()) } @@ -250,15 +247,19 @@ impl PipeListener { impl rtio::RtioUnixListener for PipeListener { fn listen(self: Box) -> IoResult> { - // create the acceptor object from ourselves - let mut acceptor = box PipeAcceptor { - listener: self, - timeout: AcceptTimeout::new(), - }; + let _m = self.fire_homing_missile(); + + // create the acceptor object from ourselves + let acceptor = (box PipeAcceptor { + handle: self.pipe, + home: self.home.clone(), + access: AcceptTimeout::new(), + refcount: Refcount::new(), + }).install(); + self.pipe = 0 as *mut _; - let _m = acceptor.fire_homing_missile(); // FIXME: the 128 backlog should be configurable - match unsafe { uvll::uv_listen(acceptor.listener.pipe, 128, listen_cb) } { + match unsafe { uvll::uv_listen(acceptor.handle, 128, listen_cb) } { 0 => Ok(acceptor as Box), n => Err(uv_error_to_io_error(UvError(n))), } @@ -276,7 +277,7 @@ impl UvHandle for PipeListener { extern fn listen_cb(server: *mut uvll::uv_stream_t, status: libc::c_int) { assert!(status != uvll::ECANCELED); - let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) }; + let pipe: &mut PipeAcceptor = unsafe { UvHandle::from_uv_handle(&server) }; let msg = match status { 0 => { let loop_ = Loop::wrap(unsafe { @@ -288,11 +289,15 @@ extern fn listen_cb(server: *mut uvll::uv_stream_t, status: libc::c_int) { } n => Err(uv_error_to_io_error(UvError(n))) }; - pipe.outgoing.send(msg); + + // If we're running then we have exclusive access, so the unsafe_get() is ok + unsafe { pipe.access.push(msg); } } impl Drop for PipeListener { fn drop(&mut self) { + if self.pipe.is_null() { return } + let _m = self.fire_homing_missile(); self.close(); } @@ -302,19 +307,48 @@ impl Drop for PipeListener { impl rtio::RtioUnixAcceptor for PipeAcceptor { fn accept(&mut self) -> IoResult> { - self.timeout.accept(&self.listener.incoming) + let m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.access.accept(m, &loop_) } - fn set_timeout(&mut self, timeout_ms: Option) { - match timeout_ms { - None => self.timeout.clear(), - Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener), - } + fn set_timeout(&mut self, ms: Option) { + let _m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.access.set_timeout(ms, &loop_, &self.home); + } + + fn clone(&self) -> Box { + box PipeAcceptor { + refcount: self.refcount.clone(), + home: self.home.clone(), + handle: self.handle, + access: self.access.clone(), + } as Box + } + + fn close_accept(&mut self) -> IoResult<()> { + let m = self.fire_homing_missile(); + self.access.close(m); + Ok(()) } } impl HomingIO for PipeAcceptor { - fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home } + fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home } +} + +impl UvHandle for PipeAcceptor { + fn uv_handle(&self) -> *mut uvll::uv_pipe_t { self.handle } +} + +impl Drop for PipeAcceptor { + fn drop(&mut self) { + let _m = self.fire_homing_missile(); + if self.refcount.decrement() { + self.close(); + } + } } #[cfg(test)] diff --git a/src/librustuv/timeout.rs b/src/librustuv/timeout.rs index 1caaf5e0fc7..32d73952416 100644 --- a/src/librustuv/timeout.rs +++ b/src/librustuv/timeout.rs @@ -14,7 +14,7 @@ use std::rt::task::BlockedTask; use std::rt::rtio::IoResult; use access; -use homing::{HomeHandle, HomingMissile, HomingIO}; +use homing::{HomeHandle, HomingMissile}; use timer::TimerWatcher; use uvll; use uvio::UvIoFactory; @@ -22,15 +22,15 @@ use {Loop, UvError, uv_error_to_io_error, Request, wakeup}; use {UvHandle, wait_until_woken_after}; /// Management of a timeout when gaining access to a portion of a duplex stream. -pub struct AccessTimeout { +pub struct AccessTimeout { state: TimeoutState, timer: Option>, - pub access: access::Access, + pub access: access::Access, } -pub struct Guard<'a> { +pub struct Guard<'a, T> { state: &'a mut TimeoutState, - pub access: access::Guard<'a>, + pub access: access::Guard<'a, T>, pub can_timeout: bool, } @@ -49,17 +49,18 @@ enum ClientState { } struct TimerContext { - timeout: *mut AccessTimeout, - callback: fn(uint) -> Option, - payload: uint, + timeout: *mut AccessTimeout<()>, + callback: fn(*mut AccessTimeout<()>, &TimerContext), + user_unblock: fn(uint) -> Option, + user_payload: uint, } -impl AccessTimeout { - pub fn new() -> AccessTimeout { +impl AccessTimeout { + pub fn new(data: T) -> AccessTimeout { AccessTimeout { state: NoTimeout, timer: None, - access: access::Access::new(), + access: access::Access::new(data), } } @@ -68,7 +69,7 @@ impl AccessTimeout { /// On success, Ok(Guard) is returned and access has been granted to the /// stream. If a timeout occurs, then Err is returned with an appropriate /// error. - pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult> { + pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult> { // First, flag that we're attempting to acquire access. This will allow // us to cancel the pending grant if we timeout out while waiting for a // grant. @@ -94,6 +95,13 @@ impl AccessTimeout { }) } + pub fn timed_out(&self) -> bool { + match self.state { + TimedOut => true, + _ => false, + } + } + /// Sets the pending timeout to the value specified. /// /// The home/loop variables are used to construct a timer if one has not @@ -120,9 +128,10 @@ impl AccessTimeout { if self.timer.is_none() { let mut timer = box TimerWatcher::new_home(loop_, home.clone()); let mut cx = box TimerContext { - timeout: self as *mut _, - callback: cb, - payload: data, + timeout: self as *mut _ as *mut AccessTimeout<()>, + callback: real_cb::, + user_unblock: cb, + user_payload: data, }; unsafe { timer.set_data(&mut *cx); @@ -135,8 +144,8 @@ impl AccessTimeout { unsafe { let cx = uvll::get_data_for_uv_handle(timer.handle); let cx = cx as *mut TimerContext; - (*cx).callback = cb; - (*cx).payload = data; + (*cx).user_unblock = cb; + (*cx).user_payload = data; } timer.stop(); timer.start(timer_cb, ms, 0); @@ -146,7 +155,12 @@ impl AccessTimeout { let cx: &TimerContext = unsafe { &*(uvll::get_data_for_uv_handle(timer) as *const TimerContext) }; - let me = unsafe { &mut *cx.timeout }; + (cx.callback)(cx.timeout, cx); + } + + fn real_cb(timeout: *mut AccessTimeout<()>, cx: &TimerContext) { + let timeout = timeout as *mut AccessTimeout; + let me = unsafe { &mut *timeout }; match mem::replace(&mut me.state, TimedOut) { TimedOut | NoTimeout => unreachable!(), @@ -158,7 +172,7 @@ impl AccessTimeout { } } TimeoutPending(RequestPending) => { - match (cx.callback)(cx.payload) { + match (cx.user_unblock)(cx.user_payload) { Some(task) => task.reawaken(), None => unreachable!(), } @@ -168,8 +182,8 @@ impl AccessTimeout { } } -impl Clone for AccessTimeout { - fn clone(&self) -> AccessTimeout { +impl Clone for AccessTimeout { + fn clone(&self) -> AccessTimeout { AccessTimeout { access: self.access.clone(), state: NoTimeout, @@ -179,7 +193,7 @@ impl Clone for AccessTimeout { } #[unsafe_destructor] -impl<'a> Drop for Guard<'a> { +impl<'a, T> Drop for Guard<'a, T> { fn drop(&mut self) { match *self.state { TimeoutPending(NoWaiter) | TimeoutPending(AccessPending) => @@ -193,7 +207,8 @@ impl<'a> Drop for Guard<'a> { } } -impl Drop for AccessTimeout { +#[unsafe_destructor] +impl Drop for AccessTimeout { fn drop(&mut self) { match self.timer { Some(ref timer) => unsafe { @@ -215,12 +230,6 @@ pub struct ConnectCtx { pub timer: Option>, } -pub struct AcceptTimeout { - timer: Option, - timeout_tx: Option>, - timeout_rx: Option>, -} - impl ConnectCtx { pub fn connect( mut self, obj: T, timeout: Option, io: &mut UvIoFactory, @@ -306,88 +315,97 @@ impl ConnectCtx { } } -impl AcceptTimeout { - pub fn new() -> AcceptTimeout { - AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None } +pub struct AcceptTimeout { + access: AccessTimeout>, +} + +struct AcceptorState { + blocked_acceptor: Option, + pending: Vec>, +} + +impl AcceptTimeout { + pub fn new() -> AcceptTimeout { + AcceptTimeout { + access: AccessTimeout::new(AcceptorState { + blocked_acceptor: None, + pending: Vec::new(), + }) + } } - pub fn accept(&mut self, c: &Receiver>) -> IoResult { - match self.timeout_rx { - None => c.recv(), - Some(ref rx) => { - use std::comm::Select; - - // Poll the incoming channel first (don't rely on the order of - // select just yet). If someone's pending then we should return - // them immediately. - match c.try_recv() { - Ok(data) => return data, - Err(..) => {} - } - - // Use select to figure out which channel gets ready first. We - // do some custom handling of select to ensure that we never - // actually drain the timeout channel (we'll keep seeing the - // timeout message in the future). - let s = Select::new(); - let mut timeout = s.handle(rx); - let mut data = s.handle(c); - unsafe { - timeout.add(); - data.add(); - } - if s.wait() == timeout.id() { - Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) - } else { - c.recv() - } + pub fn accept(&mut self, + missile: HomingMissile, + loop_: &Loop) -> IoResult { + // If we've timed out but we're not closed yet, poll the state of the + // queue to see if we can peel off a connection. + if self.access.timed_out() && !self.access.access.is_closed(&missile) { + let tmp = self.access.access.get_mut(&missile); + return match tmp.pending.remove(0) { + Some(msg) => msg, + None => Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) } } - } - pub fn clear(&mut self) { - match self.timeout_rx { - Some(ref t) => { let _ = t.try_recv(); } + // Now that we're not polling, attempt to gain access and then peel off + // a connection. If we have no pending connections, then we need to go + // to sleep and wait for one. + // + // Note that if we're woken up for a pending connection then we're + // guaranteed that the check above will not steal our connection due to + // the single-threaded nature of the event loop. + let mut guard = try!(self.access.grant(missile)); + if guard.access.is_closed() { + return Err(uv_error_to_io_error(UvError(uvll::EOF))) + } + + match guard.access.pending.remove(0) { + Some(msg) => return msg, None => {} } - match self.timer { - Some(ref mut t) => t.stop(), - None => {} + + wait_until_woken_after(&mut guard.access.blocked_acceptor, loop_, || {}); + + match guard.access.pending.remove(0) { + _ if guard.access.is_closed() => { + Err(uv_error_to_io_error(UvError(uvll::EOF))) + } + Some(msg) => msg, + None => Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) } } - pub fn set_timeout + HomingIO>( - &mut self, ms: u64, t: &mut T - ) { - // If we have a timeout, lazily initialize the timer which will be used - // to fire when the timeout runs out. - if self.timer.is_none() { - let loop_ = Loop::wrap(unsafe { - uvll::get_loop_for_uv_handle(t.uv_handle()) - }); - let mut timer = TimerWatcher::new_home(&loop_, t.home().clone()); + pub unsafe fn push(&mut self, t: IoResult) { + let state = self.access.access.unsafe_get(); + (*state).pending.push(t); + let _ = (*state).blocked_acceptor.take().map(|t| t.reawaken()); + } + + pub fn set_timeout(&mut self, + ms: Option, + loop_: &Loop, + home: &HomeHandle) { + self.access.set_timeout(ms, home, loop_, cancel_accept::, + self as *mut _ as uint); + + fn cancel_accept(me: uint) -> Option { unsafe { - timer.set_data(self as *mut _); + let me: &mut AcceptTimeout = mem::transmute(me); + (*me.access.access.unsafe_get()).blocked_acceptor.take() } - self.timer = Some(timer); } + } - // Once we've got a timer, stop any previous timeout, reset it for the - // current one, and install some new channels to send/receive data on - let timer = self.timer.get_mut_ref(); - timer.stop(); - timer.start(timer_cb, ms, 0); - let (tx, rx) = channel(); - self.timeout_tx = Some(tx); - self.timeout_rx = Some(rx); - - extern fn timer_cb(timer: *mut uvll::uv_timer_t) { - let acceptor: &mut AcceptTimeout = unsafe { - &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout) - }; - // This send can never fail because if this timer is active then the - // receiving channel is guaranteed to be alive - acceptor.timeout_tx.get_ref().send(()); - } + pub fn close(&mut self, m: HomingMissile) { + self.access.access.close(&m); + let task = self.access.access.get_mut(&m).blocked_acceptor.take(); + drop(m); + let _ = task.map(|t| t.reawaken()); + } +} + +impl Clone for AcceptTimeout { + fn clone(&self) -> AcceptTimeout { + AcceptTimeout { access: self.access.clone() } } }