rustuv: Implement timeouts for unix networking

This commit implements the set{,_read,_write}_timeout() methods for the
libuv-based networking I/O objects. The implementation details are commented
thoroughly throughout the implementation.
This commit is contained in:
Alex Crichton 2014-04-27 15:45:16 -07:00
parent 295e0a04ad
commit b2c6d6fd3f
8 changed files with 711 additions and 277 deletions

View File

@ -31,7 +31,7 @@ pub struct Guard<'a> {
}
struct Inner {
queue: Vec<BlockedTask>,
queue: Vec<(BlockedTask, uint)>,
held: bool,
closed: bool,
}
@ -47,16 +47,17 @@ impl Access {
}
}
pub fn grant<'a>(&'a mut self, missile: HomingMissile) -> Guard<'a> {
pub fn grant<'a>(&'a mut self, token: uint,
missile: HomingMissile) -> Guard<'a> {
// This unsafety is actually OK because the homing missile argument
// guarantees that we're on the same event loop as all the other objects
// attempting to get access granted.
let inner: &mut Inner = unsafe { cast::transmute(self.inner.get()) };
let inner: &mut Inner = unsafe { &mut *self.inner.get() };
if inner.held {
let t: Box<Task> = Local::take();
t.deschedule(1, |task| {
inner.queue.push(task);
inner.queue.push((task, token));
Ok(())
});
assert!(inner.held);
@ -75,6 +76,17 @@ impl Access {
// necessary synchronization to be running on this thread.
unsafe { (*self.inner.get()).closed = true; }
}
// Dequeue a blocked task with a specified token. This is unsafe because it
// is only safe to invoke while on the home event loop, and there is no
// guarantee that this i being invoked on the home event loop.
pub unsafe fn dequeue(&mut self, token: uint) -> Option<BlockedTask> {
let inner: &mut Inner = &mut *self.inner.get();
match inner.queue.iter().position(|&(_, t)| t == token) {
Some(i) => Some(inner.queue.remove(i).unwrap().val0()),
None => None,
}
}
}
impl Clone for Access {
@ -111,9 +123,9 @@ impl<'a> Drop for Guard<'a> {
// scheduled on this scheduler. Because we might be woken up on some
// other scheduler, we drop our homing missile before we reawaken
// the task.
Some(task) => {
Some((task, _)) => {
drop(self.missile.take());
let _ = task.wake().map(|t| t.reawaken());
task.reawaken();
}
None => { inner.held = false; }
}

View File

@ -84,6 +84,7 @@ fn start(argc: int, argv: **u8) -> int {
mod macros;
mod access;
mod timeout;
mod homing;
mod queue;
mod rc;

View File

@ -12,21 +12,20 @@ use libc::{size_t, ssize_t, c_int, c_void, c_uint};
use libc;
use std::cast;
use std::io;
use std::io::{IoError, IoResult};
use std::io::IoError;
use std::io::net::ip;
use std::mem;
use std::ptr;
use std::rt::rtio;
use std::rt::task::BlockedTask;
use access::Access;
use homing::{HomingIO, HomeHandle};
use rc::Refcount;
use stream::StreamWatcher;
use super::{Loop, Request, UvError, Buf, status_to_io_result,
uv_error_to_io_error, UvHandle, slice_to_uv_buf,
wait_until_woken_after, wakeup};
use timer::TimerWatcher;
use timeout::{AccessTimeout, AcceptTimeout, ConnectCtx};
use uvio::UvIoFactory;
use uvll;
@ -146,190 +145,6 @@ fn socket_name(sk: SocketNameKind,
n => Err(uv_error_to_io_error(UvError(n)))
}
}
////////////////////////////////////////////////////////////////////////////////
// Helpers for handling timeouts, shared for pipes/tcp
////////////////////////////////////////////////////////////////////////////////
pub struct ConnectCtx {
pub status: c_int,
pub task: Option<BlockedTask>,
pub timer: Option<Box<TimerWatcher>>,
}
pub struct AcceptTimeout {
timer: Option<TimerWatcher>,
timeout_tx: Option<Sender<()>>,
timeout_rx: Option<Receiver<()>>,
}
impl ConnectCtx {
pub fn connect<T>(
mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
f: |&Request, &T, uvll::uv_connect_cb| -> libc::c_int
) -> Result<T, UvError> {
let mut req = Request::new(uvll::UV_CONNECT);
let r = f(&req, &obj, connect_cb);
return match r {
0 => {
req.defuse(); // uv callback now owns this request
match timeout {
Some(t) => {
let mut timer = TimerWatcher::new(io);
timer.start(timer_cb, t, 0);
self.timer = Some(timer);
}
None => {}
}
wait_until_woken_after(&mut self.task, &io.loop_, || {
let data = &self as *_;
match self.timer {
Some(ref mut timer) => unsafe { timer.set_data(data) },
None => {}
}
req.set_data(data);
});
// Make sure an erroneously fired callback doesn't have access
// to the context any more.
req.set_data(0 as *int);
// If we failed because of a timeout, drop the TcpWatcher as
// soon as possible because it's data is now set to null and we
// want to cancel the callback ASAP.
match self.status {
0 => Ok(obj),
n => { drop(obj); Err(UvError(n)) }
}
}
n => Err(UvError(n))
};
extern fn timer_cb(handle: *uvll::uv_timer_t) {
// Don't close the corresponding tcp request, just wake up the task
// and let RAII take care of the pending watcher.
let cx: &mut ConnectCtx = unsafe {
&mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx)
};
cx.status = uvll::ECANCELED;
wakeup(&mut cx.task);
}
extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
// This callback can be invoked with ECANCELED if the watcher is
// closed by the timeout callback. In that case we just want to free
// the request and be along our merry way.
let req = Request::wrap(req);
if status == uvll::ECANCELED { return }
// Apparently on windows when the handle is closed this callback may
// not be invoked with ECANCELED but rather another error code.
// Either ways, if the data is null, then our timeout has expired
// and there's nothing we can do.
let data = unsafe { uvll::get_data_for_req(req.handle) };
if data.is_null() { return }
let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) };
cx.status = status;
match cx.timer {
Some(ref mut t) => t.stop(),
None => {}
}
// Note that the timer callback doesn't cancel the connect request
// (that's the job of uv_close()), so it's possible for this
// callback to get triggered after the timeout callback fires, but
// before the task wakes up. In that case, we did indeed
// successfully connect, but we don't need to wake someone up. We
// updated the status above (correctly so), and the task will pick
// up on this when it wakes up.
if cx.task.is_some() {
wakeup(&mut cx.task);
}
}
}
}
impl AcceptTimeout {
pub fn new() -> AcceptTimeout {
AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
}
pub fn accept<T: Send>(&mut self, c: &Receiver<IoResult<T>>) -> IoResult<T> {
match self.timeout_rx {
None => c.recv(),
Some(ref rx) => {
use std::comm::Select;
// Poll the incoming channel first (don't rely on the order of
// select just yet). If someone's pending then we should return
// them immediately.
match c.try_recv() {
Ok(data) => return data,
Err(..) => {}
}
// Use select to figure out which channel gets ready first. We
// do some custom handling of select to ensure that we never
// actually drain the timeout channel (we'll keep seeing the
// timeout message in the future).
let s = Select::new();
let mut timeout = s.handle(rx);
let mut data = s.handle(c);
unsafe {
timeout.add();
data.add();
}
if s.wait() == timeout.id() {
Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
} else {
c.recv()
}
}
}
}
pub fn clear(&mut self) {
// Clear any previous timeout by dropping the timer and transmission
// channels
drop((self.timer.take(),
self.timeout_tx.take(),
self.timeout_rx.take()))
}
pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
&mut self, ms: u64, t: &mut T
) {
// If we have a timeout, lazily initialize the timer which will be used
// to fire when the timeout runs out.
if self.timer.is_none() {
let _m = t.fire_homing_missile();
let loop_ = Loop::wrap(unsafe {
uvll::get_loop_for_uv_handle(t.uv_handle())
});
let mut timer = TimerWatcher::new_home(&loop_, t.home().clone());
unsafe {
timer.set_data(self as *mut _ as *AcceptTimeout);
}
self.timer = Some(timer);
}
// Once we've got a timer, stop any previous timeout, reset it for the
// current one, and install some new channels to send/receive data on
let timer = self.timer.get_mut_ref();
timer.stop();
timer.start(timer_cb, ms, 0);
let (tx, rx) = channel();
self.timeout_tx = Some(tx);
self.timeout_rx = Some(rx);
extern fn timer_cb(timer: *uvll::uv_timer_t) {
let acceptor: &mut AcceptTimeout = unsafe {
&mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
};
// This send can never fail because if this timer is active then the
// receiving channel is guaranteed to be alive
acceptor.timeout_tx.get_ref().send(());
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// TCP implementation
@ -345,8 +160,8 @@ pub struct TcpWatcher {
// stream object, so we use these access guards in order to arbitrate among
// multiple concurrent reads and writes. Note that libuv *can* read and
// write simultaneously, it just can't read and read simultaneously.
read_access: Access,
write_access: Access,
read_access: AccessTimeout,
write_access: AccessTimeout,
}
pub struct TcpListener {
@ -380,8 +195,8 @@ impl TcpWatcher {
handle: handle,
stream: StreamWatcher::new(handle),
refcount: Refcount::new(),
read_access: Access::new(),
write_access: Access::new(),
read_access: AccessTimeout::new(),
write_access: AccessTimeout::new(),
}
}
@ -412,10 +227,10 @@ impl rtio::RtioSocket for TcpWatcher {
impl rtio::RtioTcpStream for TcpWatcher {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
let m = self.fire_homing_missile();
let access = self.read_access.grant(m);
let guard = try!(self.read_access.grant(m));
// see comments in close_read about this check
if access.is_closed() {
if guard.access.is_closed() {
return Err(io::standard_error(io::EndOfFile))
}
@ -424,8 +239,8 @@ impl rtio::RtioTcpStream for TcpWatcher {
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
let m = self.fire_homing_missile();
let _g = self.write_access.grant(m);
self.stream.write(buf).map_err(uv_error_to_io_error)
let guard = try!(self.write_access.grant(m));
self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
}
fn peer_name(&mut self) -> Result<ip::SocketAddr, IoError> {
@ -468,16 +283,19 @@ impl rtio::RtioTcpStream for TcpWatcher {
stream: StreamWatcher::new(self.handle),
home: self.home.clone(),
refcount: self.refcount.clone(),
write_access: self.write_access.clone(),
read_access: self.read_access.clone(),
write_access: self.write_access.clone(),
} as Box<rtio::RtioTcpStream:Send>
}
fn close_read(&mut self) -> Result<(), IoError> {
// see comments in PipeWatcher::close_read
let m = self.fire_homing_missile();
self.read_access.close(&m);
self.stream.cancel_read(m);
let task = {
let m = self.fire_homing_missile();
self.read_access.access.close(&m);
self.stream.cancel_read(uvll::EOF as libc::ssize_t)
};
let _ = task.map(|t| t.reawaken());
Ok(())
}
@ -485,6 +303,35 @@ impl rtio::RtioTcpStream for TcpWatcher {
let _m = self.fire_homing_missile();
shutdown(self.handle, &self.uv_loop())
}
fn set_timeout(&mut self, timeout: Option<u64>) {
self.set_read_timeout(timeout);
self.set_write_timeout(timeout);
}
fn set_read_timeout(&mut self, ms: Option<u64>) {
let _m = self.fire_homing_missile();
let loop_ = self.uv_loop();
self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
&self.stream as *_ as uint);
fn cancel_read(stream: uint) -> Option<BlockedTask> {
let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
stream.cancel_read(uvll::ECANCELED as ssize_t)
}
}
fn set_write_timeout(&mut self, ms: Option<u64>) {
let _m = self.fire_homing_missile();
let loop_ = self.uv_loop();
self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
&self.stream as *_ as uint);
fn cancel_write(stream: uint) -> Option<BlockedTask> {
let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
stream.cancel_write()
}
}
}
impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
@ -618,6 +465,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
}
fn set_timeout(&mut self, ms: Option<u64>) {
let _m = self.fire_homing_missile();
match ms {
None => self.timeout.clear(),
Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
@ -635,8 +483,22 @@ pub struct UdpWatcher {
// See above for what these fields are
refcount: Refcount,
read_access: Access,
write_access: Access,
read_access: AccessTimeout,
write_access: AccessTimeout,
blocked_sender: Option<BlockedTask>,
}
struct UdpRecvCtx {
task: Option<BlockedTask>,
buf: Option<Buf>,
result: Option<(ssize_t, Option<ip::SocketAddr>)>,
}
struct UdpSendCtx {
result: c_int,
data: Option<Vec<u8>>,
udp: *mut UdpWatcher,
}
impl UdpWatcher {
@ -646,8 +508,9 @@ impl UdpWatcher {
handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
home: io.make_handle(),
refcount: Refcount::new(),
read_access: Access::new(),
write_access: Access::new(),
read_access: AccessTimeout::new(),
write_access: AccessTimeout::new(),
blocked_sender: None,
};
assert_eq!(unsafe {
uvll::uv_udp_init(io.uv_loop(), udp.handle)
@ -683,20 +546,15 @@ impl rtio::RtioUdpSocket for UdpWatcher {
fn recvfrom(&mut self, buf: &mut [u8])
-> Result<(uint, ip::SocketAddr), IoError>
{
struct Ctx {
task: Option<BlockedTask>,
buf: Option<Buf>,
result: Option<(ssize_t, Option<ip::SocketAddr>)>,
}
let loop_ = self.uv_loop();
let m = self.fire_homing_missile();
let _g = self.read_access.grant(m);
let _guard = try!(self.read_access.grant(m));
return match unsafe {
uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
} {
0 => {
let mut cx = Ctx {
let mut cx = UdpRecvCtx {
task: None,
buf: Some(slice_to_uv_buf(buf)),
result: None,
@ -718,7 +576,8 @@ impl rtio::RtioUdpSocket for UdpWatcher {
_suggested_size: size_t,
buf: *mut Buf) {
unsafe {
let cx = &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx);
let cx = uvll::get_data_for_uv_handle(handle);
let cx = &mut *(cx as *mut UdpRecvCtx);
*buf = cx.buf.take().expect("recv alloc_cb called more than once")
}
}
@ -727,7 +586,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
addr: *libc::sockaddr, _flags: c_uint) {
assert!(nread != uvll::ECANCELED as ssize_t);
let cx = unsafe {
&mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx)
&mut *(uvll::get_data_for_uv_handle(handle) as *mut UdpRecvCtx)
};
// When there's no data to read the recv callback can be a no-op.
@ -751,42 +610,68 @@ impl rtio::RtioUdpSocket for UdpWatcher {
}
fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> {
struct Ctx { task: Option<BlockedTask>, result: c_int }
let m = self.fire_homing_missile();
let loop_ = self.uv_loop();
let _g = self.write_access.grant(m);
let guard = try!(self.write_access.grant(m));
let mut req = Request::new(uvll::UV_UDP_SEND);
let buf = slice_to_uv_buf(buf);
let (addr, _len) = addr_to_sockaddr(dst);
let result = unsafe {
let addr_p = &addr as *libc::sockaddr_storage;
uvll::uv_udp_send(req.handle, self.handle, [buf],
addr_p as *libc::sockaddr, send_cb)
let addr_p = &addr as *_ as *libc::sockaddr;
// see comments in StreamWatcher::write for why we may allocate a buffer
// here.
let data = if guard.can_timeout {Some(Vec::from_slice(buf))} else {None};
let uv_buf = if guard.can_timeout {
slice_to_uv_buf(data.get_ref().as_slice())
} else {
slice_to_uv_buf(buf)
};
return match result {
return match unsafe {
uvll::uv_udp_send(req.handle, self.handle, [uv_buf], addr_p, send_cb)
} {
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { task: None, result: 0 };
wait_until_woken_after(&mut cx.task, &loop_, || {
let mut cx = UdpSendCtx {
result: uvll::ECANCELED, data: data, udp: self as *mut _
};
wait_until_woken_after(&mut self.blocked_sender, &loop_, || {
req.set_data(&cx);
});
match cx.result {
0 => Ok(()),
n => Err(uv_error_to_io_error(UvError(n)))
if cx.result != uvll::ECANCELED {
return match cx.result {
0 => Ok(()),
n => Err(uv_error_to_io_error(UvError(n)))
}
}
let new_cx = ~UdpSendCtx {
result: 0,
udp: 0 as *mut UdpWatcher,
data: cx.data.take(),
};
unsafe {
req.set_data(&*new_cx);
cast::forget(new_cx);
}
Err(uv_error_to_io_error(UvError(cx.result)))
}
n => Err(uv_error_to_io_error(UvError(n)))
};
// This function is the same as stream::write_cb, but adapted for udp
// instead of streams.
extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
let req = Request::wrap(req);
assert!(status != uvll::ECANCELED);
let cx: &mut Ctx = unsafe { req.get_data() };
let cx: &mut UdpSendCtx = unsafe { req.get_data() };
cx.result = status;
wakeup(&mut cx.task);
if cx.udp as uint != 0 {
let udp: &mut UdpWatcher = unsafe { &mut *cx.udp };
wakeup(&mut udp.blocked_sender);
} else {
let _cx: ~UdpSendCtx = unsafe { cast::transmute(cx) };
}
}
}
@ -866,8 +751,48 @@ impl rtio::RtioUdpSocket for UdpWatcher {
refcount: self.refcount.clone(),
write_access: self.write_access.clone(),
read_access: self.read_access.clone(),
blocked_sender: None,
} as Box<rtio::RtioUdpSocket:Send>
}
fn set_timeout(&mut self, timeout: Option<u64>) {
self.set_read_timeout(timeout);
self.set_write_timeout(timeout);
}
fn set_read_timeout(&mut self, ms: Option<u64>) {
let _m = self.fire_homing_missile();
let loop_ = self.uv_loop();
self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
self.handle as uint);
fn cancel_read(stream: uint) -> Option<BlockedTask> {
// This method is quite similar to StreamWatcher::cancel_read, see
// there for more information
let handle = stream as *uvll::uv_udp_t;
assert_eq!(unsafe { uvll::uv_udp_recv_stop(handle) }, 0);
let data = unsafe {
let data = uvll::get_data_for_uv_handle(handle);
if data.is_null() { return None }
uvll::set_data_for_uv_handle(handle, 0 as *int);
&mut *(data as *mut UdpRecvCtx)
};
data.result = Some((uvll::ECANCELED as ssize_t, None));
data.task.take()
}
}
fn set_write_timeout(&mut self, ms: Option<u64>) {
let _m = self.fire_homing_missile();
let loop_ = self.uv_loop();
self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
self as *mut _ as uint);
fn cancel_write(stream: uint) -> Option<BlockedTask> {
let stream: &mut UdpWatcher = unsafe { cast::transmute(stream) };
stream.blocked_sender.take()
}
}
}
impl Drop for UdpWatcher {

View File

@ -10,16 +10,18 @@
use libc;
use std::c_str::CString;
use std::cast;
use std::io::IoError;
use std::io;
use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
use std::rt::task::BlockedTask;
use access::Access;
use homing::{HomingIO, HomeHandle};
use net;
use rc::Refcount;
use stream::StreamWatcher;
use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
use timeout::{AcceptTimeout, ConnectCtx, AccessTimeout};
use uvio::UvIoFactory;
use uvll;
@ -30,8 +32,8 @@ pub struct PipeWatcher {
refcount: Refcount,
// see comments in TcpWatcher for why these exist
write_access: Access,
read_access: Access,
write_access: AccessTimeout,
read_access: AccessTimeout,
}
pub struct PipeListener {
@ -43,7 +45,7 @@ pub struct PipeListener {
pub struct PipeAcceptor {
listener: Box<PipeListener>,
timeout: net::AcceptTimeout,
timeout: AcceptTimeout,
}
// PipeWatcher implementation and traits
@ -70,8 +72,8 @@ impl PipeWatcher {
home: home,
defused: false,
refcount: Refcount::new(),
read_access: Access::new(),
write_access: Access::new(),
read_access: AccessTimeout::new(),
write_access: AccessTimeout::new(),
}
}
@ -89,7 +91,7 @@ impl PipeWatcher {
-> Result<PipeWatcher, UvError>
{
let pipe = PipeWatcher::new(io, false);
let cx = net::ConnectCtx { status: -1, task: None, timer: None };
let cx = ConnectCtx { status: -1, task: None, timer: None };
cx.connect(pipe, timeout, io, |req, pipe, cb| {
unsafe {
uvll::uv_pipe_connect(req.handle, pipe.handle(),
@ -112,10 +114,10 @@ impl PipeWatcher {
impl RtioPipe for PipeWatcher {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
let m = self.fire_homing_missile();
let access = self.read_access.grant(m);
let guard = try!(self.read_access.grant(m));
// see comments in close_read about this check
if access.is_closed() {
if guard.access.is_closed() {
return Err(io::standard_error(io::EndOfFile))
}
@ -124,8 +126,8 @@ impl RtioPipe for PipeWatcher {
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
let m = self.fire_homing_missile();
let _g = self.write_access.grant(m);
self.stream.write(buf).map_err(uv_error_to_io_error)
let guard = try!(self.write_access.grant(m));
self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
}
fn clone(&self) -> Box<RtioPipe:Send> {
@ -157,9 +159,12 @@ impl RtioPipe for PipeWatcher {
// ordering is crucial because we could in theory be rescheduled during
// the uv_read_stop which means that another read invocation could leak
// in before we set the flag.
let m = self.fire_homing_missile();
self.read_access.close(&m);
self.stream.cancel_read(m);
let task = {
let m = self.fire_homing_missile();
self.read_access.access.close(&m);
self.stream.cancel_read(uvll::EOF as libc::ssize_t)
};
let _ = task.map(|t| t.reawaken());
Ok(())
}
@ -167,6 +172,35 @@ impl RtioPipe for PipeWatcher {
let _m = self.fire_homing_missile();
net::shutdown(self.stream.handle, &self.uv_loop())
}
fn set_timeout(&mut self, timeout: Option<u64>) {
self.set_read_timeout(timeout);
self.set_write_timeout(timeout);
}
fn set_read_timeout(&mut self, ms: Option<u64>) {
let _m = self.fire_homing_missile();
let loop_ = self.uv_loop();
self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
&self.stream as *_ as uint);
fn cancel_read(stream: uint) -> Option<BlockedTask> {
let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
stream.cancel_read(uvll::ECANCELED as libc::ssize_t)
}
}
fn set_write_timeout(&mut self, ms: Option<u64>) {
let _m = self.fire_homing_missile();
let loop_ = self.uv_loop();
self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
&self.stream as *_ as uint);
fn cancel_write(stream: uint) -> Option<BlockedTask> {
let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
stream.cancel_write()
}
}
}
impl HomingIO for PipeWatcher {
@ -219,7 +253,7 @@ impl RtioUnixListener for PipeListener {
// create the acceptor object from ourselves
let mut acceptor = box PipeAcceptor {
listener: self,
timeout: net::AcceptTimeout::new(),
timeout: AcceptTimeout::new(),
};
let _m = acceptor.fire_homing_missile();

View File

@ -14,7 +14,6 @@ use std::ptr;
use std::rt::task::BlockedTask;
use Loop;
use homing::HomingMissile;
use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after,
ForbidUnwind, wakeup};
use uvll;
@ -31,6 +30,8 @@ pub struct StreamWatcher {
// structure, but currently we don't have mappings for all the structures
// defined in libuv, so we're foced to malloc this.
last_write_req: Option<Request>,
blocked_writer: Option<BlockedTask>,
}
struct ReadContext {
@ -41,7 +42,8 @@ struct ReadContext {
struct WriteContext {
result: c_int,
task: Option<BlockedTask>,
stream: *mut StreamWatcher,
data: Option<Vec<u8>>,
}
impl StreamWatcher {
@ -62,6 +64,7 @@ impl StreamWatcher {
StreamWatcher {
handle: stream,
last_write_req: None,
blocked_writer: None,
}
}
@ -74,7 +77,7 @@ impl StreamWatcher {
buf: Some(slice_to_uv_buf(buf)),
// if the read is canceled, we'll see eof, otherwise this will get
// overwritten
result: uvll::EOF as ssize_t,
result: 0,
task: None,
};
// When reading a TTY stream on windows, libuv will invoke alloc_cb
@ -104,27 +107,22 @@ impl StreamWatcher {
return ret;
}
pub fn cancel_read(&mut self, m: HomingMissile) {
pub fn cancel_read(&mut self, reason: ssize_t) -> Option<BlockedTask> {
// When we invoke uv_read_stop, it cancels the read and alloc
// callbacks. We need to manually wake up a pending task (if one was
// present). Note that we wake up the task *outside* the homing missile
// to ensure that we don't switch schedulers when we're not supposed to.
// present).
assert_eq!(unsafe { uvll::uv_read_stop(self.handle) }, 0);
let data = unsafe {
let data = uvll::get_data_for_uv_handle(self.handle);
if data.is_null() { return }
if data.is_null() { return None }
uvll::set_data_for_uv_handle(self.handle, 0 as *int);
&mut *(data as *mut ReadContext)
};
let task = data.task.take();
drop(m);
match task {
Some(task) => { let _ = task.wake().map(|t| t.reawaken()); }
None => {}
}
data.result = reason;
data.task.take()
}
pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> {
pub fn write(&mut self, buf: &[u8], may_timeout: bool) -> Result<(), UvError> {
// The ownership of the write request is dubious if this function
// unwinds. I believe that if the write_cb fails to re-schedule the task
// then the write request will be leaked.
@ -137,30 +135,94 @@ impl StreamWatcher {
};
req.set_data(ptr::null::<()>());
// And here's where timeouts get a little interesting. Currently, libuv
// does not support canceling an in-flight write request. Consequently,
// when a write timeout expires, there's not much we can do other than
// detach the sleeping task from the write request itself. Semantically,
// this means that the write request will complete asynchronously, but
// the calling task will return error (because the write timed out).
//
// There is special wording in the documentation of set_write_timeout()
// indicating that this is a plausible failure scenario, and this
// function is why that wording exists.
//
// Implementation-wise, we must be careful when passing a buffer down to
// libuv. Most of this implementation avoids allocations becuase of the
// blocking guarantee (all stack local variables are valid for the
// entire read/write request). If our write request can be timed out,
// however, we must heap allocate the data and pass that to the libuv
// functions instead. The reason for this is that if we time out and
// return, there's no guarantee that `buf` is a valid buffer any more.
//
// To do this, the write context has an optionally owned vector of
// bytes.
let data = if may_timeout {Some(Vec::from_slice(buf))} else {None};
let uv_buf = if may_timeout {
slice_to_uv_buf(data.get_ref().as_slice())
} else {
slice_to_uv_buf(buf)
};
// Send off the request, but be careful to not block until we're sure
// that the write reqeust is queued. If the reqeust couldn't be queued,
// then we should return immediately with an error.
match unsafe {
uvll::uv_write(req.handle, self.handle, [slice_to_uv_buf(buf)],
write_cb)
uvll::uv_write(req.handle, self.handle, [uv_buf], write_cb)
} {
0 => {
let mut wcx = WriteContext { result: 0, task: None, };
let mut wcx = WriteContext {
result: uvll::ECANCELED,
stream: self as *mut _,
data: data,
};
req.defuse(); // uv callback now owns this request
let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) };
wait_until_woken_after(&mut wcx.task, &Loop::wrap(loop_), || {
wait_until_woken_after(&mut self.blocked_writer,
&Loop::wrap(loop_), || {
req.set_data(&wcx);
});
self.last_write_req = Some(Request::wrap(req.handle));
match wcx.result {
0 => Ok(()),
n => Err(UvError(n)),
if wcx.result != uvll::ECANCELED {
self.last_write_req = Some(Request::wrap(req.handle));
return match wcx.result {
0 => Ok(()),
n => Err(UvError(n)),
}
}
// This is the second case where canceling an in-flight write
// gets interesting. If we've been canceled (no one reset our
// result), then someone still needs to free the request, and
// someone still needs to free the allocate buffer.
//
// To take care of this, we swap out the stack-allocated write
// context for a heap-allocated context, transferring ownership
// of everything to the write_cb. Libuv guarantees that this
// callback will be invoked at some point, and the callback will
// be responsible for deallocating these resources.
//
// Note that we don't cache this write request back in the
// stream watcher because we no longer have ownership of it, and
// we never will.
let new_wcx = ~WriteContext {
result: 0,
stream: 0 as *mut StreamWatcher,
data: wcx.data.take(),
};
unsafe {
req.set_data(&*new_wcx);
cast::forget(new_wcx);
}
Err(UvError(wcx.result))
}
n => Err(UvError(n)),
}
}
pub fn cancel_write(&mut self) -> Option<BlockedTask> {
self.blocked_writer.take()
}
}
// This allocation callback expects to be invoked once and only once. It will
@ -198,12 +260,18 @@ extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: *Buf) {
// away the error code as a result.
extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
let mut req = Request::wrap(req);
assert!(status != uvll::ECANCELED);
// Remember to not free the request because it is re-used between writes on
// the same stream.
let wcx: &mut WriteContext = unsafe { req.get_data() };
wcx.result = status;
req.defuse();
wakeup(&mut wcx.task);
// If the stream is present, we haven't timed out, otherwise we acquire
// ownership of everything and then deallocate it all at once.
if wcx.stream as uint != 0 {
req.defuse();
let stream: &mut StreamWatcher = unsafe { &mut *wcx.stream };
wakeup(&mut stream.blocked_writer);
} else {
let _wcx: ~WriteContext = unsafe { cast::transmute(wcx) };
}
}

394
src/librustuv/timeout.rs Normal file
View File

@ -0,0 +1,394 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use libc::c_int;
use std::cast;
use std::io::IoResult;
use std::mem;
use std::rt::task::BlockedTask;
use access;
use homing::{HomeHandle, HomingMissile, HomingIO};
use timer::TimerWatcher;
use uvll;
use uvio::UvIoFactory;
use {Loop, UvError, uv_error_to_io_error, Request, wakeup};
use {UvHandle, wait_until_woken_after};
/// Managment of a timeout when gaining access to a portion of a duplex stream.
pub struct AccessTimeout {
state: TimeoutState,
timer: Option<~TimerWatcher>,
pub access: access::Access,
}
pub struct Guard<'a> {
state: &'a mut TimeoutState,
pub access: access::Guard<'a>,
pub can_timeout: bool,
}
#[deriving(Eq)]
enum TimeoutState {
NoTimeout,
TimeoutPending(ClientState),
TimedOut,
}
#[deriving(Eq)]
enum ClientState {
NoWaiter,
AccessPending,
RequestPending,
}
struct TimerContext {
timeout: *mut AccessTimeout,
callback: fn(uint) -> Option<BlockedTask>,
payload: uint,
}
impl AccessTimeout {
pub fn new() -> AccessTimeout {
AccessTimeout {
state: NoTimeout,
timer: None,
access: access::Access::new(),
}
}
/// Grants access to half of a duplex stream, timing out if necessary.
///
/// On success, Ok(Guard) is returned and access has been granted to the
/// stream. If a timeout occurs, then Err is returned with an appropriate
/// error.
pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult<Guard<'a>> {
// First, flag that we're attempting to acquire access. This will allow
// us to cancel the pending grant if we timeout out while waiting for a
// grant.
match self.state {
NoTimeout => {},
TimeoutPending(ref mut client) => *client = AccessPending,
TimedOut => return Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
}
let access = self.access.grant(self as *mut _ as uint, m);
// After acquiring the grant, we need to flag ourselves as having a
// pending request so the timeout knows to cancel the request.
let can_timeout = match self.state {
NoTimeout => false,
TimeoutPending(ref mut client) => { *client = RequestPending; true }
TimedOut => return Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
};
Ok(Guard {
access: access,
state: &mut self.state,
can_timeout: can_timeout
})
}
/// Sets the pending timeout to the value specified.
///
/// The home/loop variables are used to construct a timer if one has not
/// been previously constructed.
///
/// The callback will be invoked if the timeout elapses, and the data of
/// the time will be set to `data`.
pub fn set_timeout(&mut self, ms: Option<u64>,
home: &HomeHandle,
loop_: &Loop,
cb: fn(uint) -> Option<BlockedTask>,
data: uint) {
self.state = NoTimeout;
let ms = match ms {
Some(ms) => ms,
None => return match self.timer {
Some(ref mut t) => t.stop(),
None => {}
}
};
// If we have a timeout, lazily initialize the timer which will be used
// to fire when the timeout runs out.
if self.timer.is_none() {
let mut timer = ~TimerWatcher::new_home(loop_, home.clone());
let cx = ~TimerContext {
timeout: self as *mut _,
callback: cb,
payload: data,
};
unsafe {
timer.set_data(&*cx);
cast::forget(cx);
}
self.timer = Some(timer);
}
let timer = self.timer.get_mut_ref();
unsafe {
let cx = uvll::get_data_for_uv_handle(timer.handle);
let cx = cx as *mut TimerContext;
(*cx).callback = cb;
(*cx).payload = data;
}
timer.stop();
timer.start(timer_cb, ms, 0);
self.state = TimeoutPending(NoWaiter);
extern fn timer_cb(timer: *uvll::uv_timer_t) {
let cx: &TimerContext = unsafe {
&*(uvll::get_data_for_uv_handle(timer) as *TimerContext)
};
let me = unsafe { &mut *cx.timeout };
match mem::replace(&mut me.state, TimedOut) {
TimedOut | NoTimeout => unreachable!(),
TimeoutPending(NoWaiter) => {}
TimeoutPending(AccessPending) => {
match unsafe { me.access.dequeue(me as *mut _ as uint) } {
Some(task) => task.reawaken(),
None => unreachable!(),
}
}
TimeoutPending(RequestPending) => {
match (cx.callback)(cx.payload) {
Some(task) => task.reawaken(),
None => unreachable!(),
}
}
}
}
}
}
impl Clone for AccessTimeout {
fn clone(&self) -> AccessTimeout {
AccessTimeout {
access: self.access.clone(),
state: NoTimeout,
timer: None,
}
}
}
#[unsafe_destructor]
impl<'a> Drop for Guard<'a> {
fn drop(&mut self) {
match *self.state {
TimeoutPending(NoWaiter) | TimeoutPending(AccessPending) =>
unreachable!(),
NoTimeout | TimedOut => {}
TimeoutPending(RequestPending) => {
*self.state = TimeoutPending(NoWaiter);
}
}
}
}
impl Drop for AccessTimeout {
fn drop(&mut self) {
match self.timer {
Some(ref timer) => unsafe {
let data = uvll::get_data_for_uv_handle(timer.handle);
let _data: ~TimerContext = cast::transmute(data);
},
None => {}
}
}
}
////////////////////////////////////////////////////////////////////////////////
// Connect timeouts
////////////////////////////////////////////////////////////////////////////////
pub struct ConnectCtx {
pub status: c_int,
pub task: Option<BlockedTask>,
pub timer: Option<~TimerWatcher>,
}
pub struct AcceptTimeout {
timer: Option<TimerWatcher>,
timeout_tx: Option<Sender<()>>,
timeout_rx: Option<Receiver<()>>,
}
impl ConnectCtx {
pub fn connect<T>(
mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
f: |&Request, &T, uvll::uv_connect_cb| -> c_int
) -> Result<T, UvError> {
let mut req = Request::new(uvll::UV_CONNECT);
let r = f(&req, &obj, connect_cb);
return match r {
0 => {
req.defuse(); // uv callback now owns this request
match timeout {
Some(t) => {
let mut timer = TimerWatcher::new(io);
timer.start(timer_cb, t, 0);
self.timer = Some(timer);
}
None => {}
}
wait_until_woken_after(&mut self.task, &io.loop_, || {
let data = &self as *_;
match self.timer {
Some(ref mut timer) => unsafe { timer.set_data(data) },
None => {}
}
req.set_data(data);
});
// Make sure an erroneously fired callback doesn't have access
// to the context any more.
req.set_data(0 as *int);
// If we failed because of a timeout, drop the TcpWatcher as
// soon as possible because it's data is now set to null and we
// want to cancel the callback ASAP.
match self.status {
0 => Ok(obj),
n => { drop(obj); Err(UvError(n)) }
}
}
n => Err(UvError(n))
};
extern fn timer_cb(handle: *uvll::uv_timer_t) {
// Don't close the corresponding tcp request, just wake up the task
// and let RAII take care of the pending watcher.
let cx: &mut ConnectCtx = unsafe {
&mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx)
};
cx.status = uvll::ECANCELED;
wakeup(&mut cx.task);
}
extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
// This callback can be invoked with ECANCELED if the watcher is
// closed by the timeout callback. In that case we just want to free
// the request and be along our merry way.
let req = Request::wrap(req);
if status == uvll::ECANCELED { return }
// Apparently on windows when the handle is closed this callback may
// not be invoked with ECANCELED but rather another error code.
// Either ways, if the data is null, then our timeout has expired
// and there's nothing we can do.
let data = unsafe { uvll::get_data_for_req(req.handle) };
if data.is_null() { return }
let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) };
cx.status = status;
match cx.timer {
Some(ref mut t) => t.stop(),
None => {}
}
// Note that the timer callback doesn't cancel the connect request
// (that's the job of uv_close()), so it's possible for this
// callback to get triggered after the timeout callback fires, but
// before the task wakes up. In that case, we did indeed
// successfully connect, but we don't need to wake someone up. We
// updated the status above (correctly so), and the task will pick
// up on this when it wakes up.
if cx.task.is_some() {
wakeup(&mut cx.task);
}
}
}
}
impl AcceptTimeout {
pub fn new() -> AcceptTimeout {
AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
}
pub fn accept<T: Send>(&mut self, c: &Receiver<IoResult<T>>) -> IoResult<T> {
match self.timeout_rx {
None => c.recv(),
Some(ref rx) => {
use std::comm::Select;
// Poll the incoming channel first (don't rely on the order of
// select just yet). If someone's pending then we should return
// them immediately.
match c.try_recv() {
Ok(data) => return data,
Err(..) => {}
}
// Use select to figure out which channel gets ready first. We
// do some custom handling of select to ensure that we never
// actually drain the timeout channel (we'll keep seeing the
// timeout message in the future).
let s = Select::new();
let mut timeout = s.handle(rx);
let mut data = s.handle(c);
unsafe {
timeout.add();
data.add();
}
if s.wait() == timeout.id() {
Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
} else {
c.recv()
}
}
}
}
pub fn clear(&mut self) {
match self.timeout_rx {
Some(ref t) => { let _ = t.try_recv(); }
None => {}
}
match self.timer {
Some(ref mut t) => t.stop(),
None => {}
}
}
pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
&mut self, ms: u64, t: &mut T
) {
// If we have a timeout, lazily initialize the timer which will be used
// to fire when the timeout runs out.
if self.timer.is_none() {
let loop_ = Loop::wrap(unsafe {
uvll::get_loop_for_uv_handle(t.uv_handle())
});
let mut timer = TimerWatcher::new_home(&loop_, t.home().clone());
unsafe {
timer.set_data(self as *mut _ as *AcceptTimeout);
}
self.timer = Some(timer);
}
// Once we've got a timer, stop any previous timeout, reset it for the
// current one, and install some new channels to send/receive data on
let timer = self.timer.get_mut_ref();
timer.stop();
timer.start(timer_cb, ms, 0);
let (tx, rx) = channel();
self.timeout_tx = Some(tx);
self.timeout_rx = Some(rx);
extern fn timer_cb(timer: *uvll::uv_timer_t) {
let acceptor: &mut AcceptTimeout = unsafe {
&mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
};
// This send can never fail because if this timer is active then the
// receiving channel is guaranteed to be alive
acceptor.timeout_tx.get_ref().send(());
}
}
}

View File

@ -18,7 +18,7 @@ use uvio::UvIoFactory;
use uvll;
pub struct TimerWatcher {
handle: *uvll::uv_timer_t,
pub handle: *uvll::uv_timer_t,
home: HomeHandle,
action: Option<NextAction>,
blocker: Option<BlockedTask>,

View File

@ -87,7 +87,7 @@ impl RtioTTY for TtyWatcher {
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
self.stream.write(buf).map_err(uv_error_to_io_error)
self.stream.write(buf, false).map_err(uv_error_to_io_error)
}
fn set_raw(&mut self, raw: bool) -> Result<(), IoError> {