auto merge of #13814 : alexcrichton/rust/read-timeout, r=brson

This PR is an implementation of `set_timeout`, `set_read_timeout`, and `set_write_timeout` for TCP, UDP, and Unix streams (named pipes on windows).

The implementation was much more difficult than I imagined it was going to be throughout the 9 categories ({tcp, udp, unix} x {windows, unix, green}). The major snag is that libuv doesn't support canceling writes, so I chose to word the `set_write_timeout` documentation in such a way that it accomadates the behavior seen when running around with libgreen.

The first commit is from #13751, and I just included it to pre-emptively avoid rebase conflicts. The following commits are relevant to this PR. The tests aren't quite passing on windows just yet, but I should have those working by tomorrow once my VM is back up and running. For now, I wanted to see what others' thoughts were on this strategy.
This commit is contained in:
bors 2014-05-08 00:01:41 -07:00
commit c39b1cb1be
22 changed files with 1697 additions and 468 deletions

View File

@ -27,6 +27,13 @@ pub static FIOCLEX: libc::c_ulong = 0x20006601;
#[cfg(target_os = "android")]
pub static FIOCLEX: libc::c_ulong = 0x5451;
#[cfg(target_os = "macos")]
#[cfg(target_os = "freebsd")]
pub static MSG_DONTWAIT: libc::c_int = 0x80;
#[cfg(target_os = "linux")]
#[cfg(target_os = "android")]
pub static MSG_DONTWAIT: libc::c_int = 0x40;
extern {
pub fn gettimeofday(timeval: *mut libc::timeval,
tzp: *libc::c_void) -> libc::c_int;

View File

@ -18,6 +18,7 @@ pub static WSADESCRIPTION_LEN: uint = 256;
pub static WSASYS_STATUS_LEN: uint = 128;
pub static FIONBIO: libc::c_long = 0x8004667e;
static FD_SETSIZE: uint = 64;
pub static MSG_DONTWAIT: libc::c_int = 0;
pub struct WSADATA {
pub wVersion: libc::WORD,

View File

@ -189,6 +189,9 @@ impl rtio::RtioPipe for FileDesc {
fn close_write(&mut self) -> Result<(), IoError> {
Err(io::standard_error(io::InvalidInput))
}
fn set_timeout(&mut self, _t: Option<u64>) {}
fn set_read_timeout(&mut self, _t: Option<u64>) {}
fn set_write_timeout(&mut self, _t: Option<u64>) {}
}
impl rtio::RtioTTY for FileDesc {

View File

@ -221,6 +221,9 @@ impl rtio::RtioPipe for FileDesc {
fn close_write(&mut self) -> IoResult<()> {
Err(io::standard_error(io::InvalidInput))
}
fn set_timeout(&mut self, _t: Option<u64>) {}
fn set_read_timeout(&mut self, _t: Option<u64>) {}
fn set_write_timeout(&mut self, _t: Option<u64>) {}
}
impl rtio::RtioTTY for FileDesc {

View File

@ -15,6 +15,7 @@ use std::io;
use std::mem;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;
use std::unstable::mutex;
use super::{IoResult, retry, keep_going};
use super::c;
@ -236,22 +237,36 @@ pub fn init() {
pub struct TcpStream {
inner: UnsafeArc<Inner>,
read_deadline: u64,
write_deadline: u64,
}
struct Inner {
fd: sock_t,
lock: mutex::NativeMutex,
}
pub struct Guard<'a> {
pub fd: sock_t,
pub guard: mutex::LockGuard<'a>,
}
impl Inner {
fn new(fd: sock_t) -> Inner {
Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
}
}
impl TcpStream {
pub fn connect(addr: ip::SocketAddr,
timeout: Option<u64>) -> IoResult<TcpStream> {
let fd = try!(socket(addr, libc::SOCK_STREAM));
let (addr, len) = addr_to_sockaddr(addr);
let inner = Inner { fd: fd };
let ret = TcpStream { inner: UnsafeArc::new(inner) };
let ret = TcpStream::new(Inner::new(fd));
let len = len as libc::socklen_t;
let (addr, len) = addr_to_sockaddr(addr);
let addrp = &addr as *_ as *libc::sockaddr;
let len = len as libc::socklen_t;
match timeout {
Some(timeout) => {
try!(util::connect_timeout(fd, addrp, len, timeout));
@ -266,6 +281,14 @@ impl TcpStream {
}
}
fn new(inner: Inner) -> TcpStream {
TcpStream {
inner: UnsafeArc::new(inner),
read_deadline: 0,
write_deadline: 0,
}
}
pub fn fd(&self) -> sock_t {
// This unsafety is fine because it's just a read-only arc
unsafe { (*self.inner.get()).fd }
@ -299,6 +322,19 @@ impl TcpStream {
fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> {
Ok(())
}
#[cfg(target_os = "linux")]
fn lock_nonblocking(&self) {}
#[cfg(not(target_os = "linux"))]
fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
let ret = Guard {
fd: self.fd(),
guard: unsafe { (*self.inner.get()).lock.lock() },
};
assert!(util::set_nonblocking(self.fd(), true).is_ok());
ret
}
}
#[cfg(windows)] type wrlen = libc::c_int;
@ -306,33 +342,31 @@ impl TcpStream {
impl rtio::RtioTcpStream for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
let ret = retry(|| {
unsafe {
libc::recv(self.fd(),
buf.as_mut_ptr() as *mut libc::c_void,
buf.len() as wrlen,
0) as libc::c_int
}
});
if ret == 0 {
Err(io::standard_error(io::EndOfFile))
} else if ret < 0 {
Err(last_error())
} else {
Ok(ret as uint)
}
let fd = self.fd();
let dolock = || self.lock_nonblocking();
let doread = |nb| unsafe {
let flags = if nb {c::MSG_DONTWAIT} else {0};
libc::recv(fd,
buf.as_mut_ptr() as *mut libc::c_void,
buf.len() as wrlen,
flags) as libc::c_int
};
read(fd, self.read_deadline, dolock, doread)
}
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
let ret = keep_going(buf, |buf, len| unsafe {
libc::send(self.fd(),
let fd = self.fd();
let dolock = || self.lock_nonblocking();
let dowrite = |nb: bool, buf: *u8, len: uint| unsafe {
let flags = if nb {c::MSG_DONTWAIT} else {0};
libc::send(fd,
buf as *mut libc::c_void,
len as wrlen,
0) as i64
});
if ret < 0 {
Err(super::last_error())
} else {
Ok(())
flags) as i64
};
match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
Ok(_) => Ok(()),
Err(e) => Err(e)
}
}
fn peer_name(&mut self) -> IoResult<ip::SocketAddr> {
@ -354,14 +388,29 @@ impl rtio::RtioTcpStream for TcpStream {
fn clone(&self) -> Box<rtio::RtioTcpStream:Send> {
box TcpStream {
inner: self.inner.clone(),
read_deadline: 0,
write_deadline: 0,
} as Box<rtio::RtioTcpStream:Send>
}
fn close_write(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
}
fn close_read(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
}
fn set_timeout(&mut self, timeout: Option<u64>) {
let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
self.read_deadline = deadline;
self.write_deadline = deadline;
}
fn set_read_timeout(&mut self, timeout: Option<u64>) {
self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
fn set_write_timeout(&mut self, timeout: Option<u64>) {
self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
}
impl rtio::RtioSocket for TcpStream {
@ -374,6 +423,13 @@ impl Drop for Inner {
fn drop(&mut self) { unsafe { close(self.fd); } }
}
#[unsafe_destructor]
impl<'a> Drop for Guard<'a> {
fn drop(&mut self) {
assert!(util::set_nonblocking(self.fd, false).is_ok());
}
}
////////////////////////////////////////////////////////////////////////////////
// TCP listeners
////////////////////////////////////////////////////////////////////////////////
@ -384,29 +440,24 @@ pub struct TcpListener {
impl TcpListener {
pub fn bind(addr: ip::SocketAddr) -> IoResult<TcpListener> {
unsafe {
socket(addr, libc::SOCK_STREAM).and_then(|fd| {
let (addr, len) = addr_to_sockaddr(addr);
let addrp = &addr as *libc::sockaddr_storage;
let inner = Inner { fd: fd };
let ret = TcpListener { inner: inner };
// On platforms with Berkeley-derived sockets, this allows
// to quickly rebind a socket, without needing to wait for
// the OS to clean up the previous one.
if cfg!(unix) {
match setsockopt(fd, libc::SOL_SOCKET,
libc::SO_REUSEADDR,
1 as libc::c_int) {
Err(n) => { return Err(n); },
Ok(..) => { }
}
}
match libc::bind(fd, addrp as *libc::sockaddr,
len as libc::socklen_t) {
-1 => Err(last_error()),
_ => Ok(ret),
}
})
let fd = try!(socket(addr, libc::SOCK_STREAM));
let ret = TcpListener { inner: Inner::new(fd) };
let (addr, len) = addr_to_sockaddr(addr);
let addrp = &addr as *_ as *libc::sockaddr;
let len = len as libc::socklen_t;
// On platforms with Berkeley-derived sockets, this allows
// to quickly rebind a socket, without needing to wait for
// the OS to clean up the previous one.
if cfg!(unix) {
try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR,
1 as libc::c_int));
}
match unsafe { libc::bind(fd, addrp, len) } {
-1 => Err(last_error()),
_ => Ok(ret),
}
}
@ -444,7 +495,7 @@ impl TcpAcceptor {
pub fn native_accept(&mut self) -> IoResult<TcpStream> {
if self.deadline != 0 {
try!(util::accept_deadline(self.fd(), self.deadline));
try!(util::await(self.fd(), Some(self.deadline), util::Readable));
}
unsafe {
let mut storage: libc::sockaddr_storage = mem::init();
@ -457,7 +508,7 @@ impl TcpAcceptor {
&mut size as *mut libc::socklen_t) as libc::c_int
}) as sock_t {
-1 => Err(last_error()),
fd => Ok(TcpStream { inner: UnsafeArc::new(Inner { fd: fd })})
fd => Ok(TcpStream::new(Inner::new(fd))),
}
}
}
@ -487,22 +538,26 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
pub struct UdpSocket {
inner: UnsafeArc<Inner>,
read_deadline: u64,
write_deadline: u64,
}
impl UdpSocket {
pub fn bind(addr: ip::SocketAddr) -> IoResult<UdpSocket> {
unsafe {
socket(addr, libc::SOCK_DGRAM).and_then(|fd| {
let (addr, len) = addr_to_sockaddr(addr);
let addrp = &addr as *libc::sockaddr_storage;
let inner = Inner { fd: fd };
let ret = UdpSocket { inner: UnsafeArc::new(inner) };
match libc::bind(fd, addrp as *libc::sockaddr,
len as libc::socklen_t) {
-1 => Err(last_error()),
_ => Ok(ret),
}
})
let fd = try!(socket(addr, libc::SOCK_DGRAM));
let ret = UdpSocket {
inner: UnsafeArc::new(Inner::new(fd)),
read_deadline: 0,
write_deadline: 0,
};
let (addr, len) = addr_to_sockaddr(addr);
let addrp = &addr as *_ as *libc::sockaddr;
let len = len as libc::socklen_t;
match unsafe { libc::bind(fd, addrp, len) } {
-1 => Err(last_error()),
_ => Ok(ret),
}
}
@ -541,6 +596,19 @@ impl UdpSocket {
}
}
}
#[cfg(target_os = "linux")]
fn lock_nonblocking(&self) {}
#[cfg(not(target_os = "linux"))]
fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
let ret = Guard {
fd: self.fd(),
guard: unsafe { (*self.inner.get()).lock.lock() },
};
assert!(util::set_nonblocking(self.fd(), true).is_ok());
ret
}
}
impl rtio::RtioSocket for UdpSocket {
@ -554,48 +622,54 @@ impl rtio::RtioSocket for UdpSocket {
impl rtio::RtioUdpSocket for UdpSocket {
fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, ip::SocketAddr)> {
unsafe {
let mut storage: libc::sockaddr_storage = mem::init();
let storagep = &mut storage as *mut libc::sockaddr_storage;
let mut addrlen: libc::socklen_t =
mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
let ret = retry(|| {
libc::recvfrom(self.fd(),
buf.as_ptr() as *mut libc::c_void,
buf.len() as msglen_t,
0,
storagep as *mut libc::sockaddr,
&mut addrlen) as libc::c_int
});
if ret < 0 { return Err(last_error()) }
sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
Ok((ret as uint, addr))
})
}
let fd = self.fd();
let mut storage: libc::sockaddr_storage = unsafe { mem::init() };
let storagep = &mut storage as *mut _ as *mut libc::sockaddr;
let mut addrlen: libc::socklen_t =
mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
let dolock = || self.lock_nonblocking();
let doread = |nb| unsafe {
let flags = if nb {c::MSG_DONTWAIT} else {0};
libc::recvfrom(fd,
buf.as_mut_ptr() as *mut libc::c_void,
buf.len() as msglen_t,
flags,
storagep,
&mut addrlen) as libc::c_int
};
let n = try!(read(fd, self.read_deadline, dolock, doread));
sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
Ok((n as uint, addr))
})
}
fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> IoResult<()> {
let (dst, len) = addr_to_sockaddr(dst);
let dstp = &dst as *libc::sockaddr_storage;
unsafe {
let ret = retry(|| {
libc::sendto(self.fd(),
buf.as_ptr() as *libc::c_void,
buf.len() as msglen_t,
0,
dstp as *libc::sockaddr,
len as libc::socklen_t) as libc::c_int
});
match ret {
-1 => Err(last_error()),
n if n as uint != buf.len() => {
Err(io::IoError {
kind: io::OtherIoError,
desc: "couldn't send entire packet at once",
detail: None,
})
}
_ => Ok(())
}
let (dst, dstlen) = addr_to_sockaddr(dst);
let dstp = &dst as *_ as *libc::sockaddr;
let dstlen = dstlen as libc::socklen_t;
let fd = self.fd();
let dolock = || self.lock_nonblocking();
let dowrite = |nb, buf: *u8, len: uint| unsafe {
let flags = if nb {c::MSG_DONTWAIT} else {0};
libc::sendto(fd,
buf as *libc::c_void,
len as msglen_t,
flags,
dstp,
dstlen) as i64
};
let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite));
if n != buf.len() {
Err(io::IoError {
kind: io::ShortWrite(n),
desc: "couldn't send entire packet at once",
detail: None,
})
} else {
Ok(())
}
}
@ -645,6 +719,179 @@ impl rtio::RtioUdpSocket for UdpSocket {
fn clone(&self) -> Box<rtio::RtioUdpSocket:Send> {
box UdpSocket {
inner: self.inner.clone(),
read_deadline: 0,
write_deadline: 0,
} as Box<rtio::RtioUdpSocket:Send>
}
fn set_timeout(&mut self, timeout: Option<u64>) {
let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
self.read_deadline = deadline;
self.write_deadline = deadline;
}
fn set_read_timeout(&mut self, timeout: Option<u64>) {
self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
fn set_write_timeout(&mut self, timeout: Option<u64>) {
self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
}
////////////////////////////////////////////////////////////////////////////////
// Timeout helpers
//
// The read/write functions below are the helpers for reading/writing a socket
// with a possible deadline specified. This is generally viewed as a timed out
// I/O operation.
//
// From the application's perspective, timeouts apply to the I/O object, not to
// the underlying file descriptor (it's one timeout per object). This means that
// we can't use the SO_RCVTIMEO and corresponding send timeout option.
//
// The next idea to implement timeouts would be to use nonblocking I/O. An
// invocation of select() would wait (with a timeout) for a socket to be ready.
// Once its ready, we can perform the operation. Note that the operation *must*
// be nonblocking, even though select() says the socket is ready. This is
// because some other thread could have come and stolen our data (handles can be
// cloned).
//
// To implement nonblocking I/O, the first option we have is to use the
// O_NONBLOCK flag. Remember though that this is a global setting, affecting all
// I/O objects, so this was initially viewed as unwise.
//
// It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to
// send/recv, but the niftiness wears off once you realize it only works well on
// linux [1] [2]. This means that it's pretty easy to get a nonblocking
// operation on linux (no flag fidding, no affecting other objects), but not on
// other platforms.
//
// To work around this constraint on other platforms, we end up using the
// original strategy of flipping the O_NONBLOCK flag. As mentioned before, this
// could cause other objects' blocking operations to suddenly become
// nonblocking. To get around this, a "blocking operation" which returns EAGAIN
// falls back to using the same code path as nonblocking operations, but with an
// infinite timeout (select + send/recv). This helps emulate blocking
// reads/writes despite the underlying descriptor being nonblocking, as well as
// optimizing the fast path of just hitting one syscall in the good case.
//
// As a final caveat, this implementation uses a mutex so only one thread is
// doing a nonblocking operation at at time. This is the operation that comes
// after the select() (at which point we think the socket is ready). This is
// done for sanity to ensure that the state of the O_NONBLOCK flag is what we
// expect (wouldn't want someone turning it on when it should be off!). All
// operations performed in the lock are *nonblocking* to avoid holding the mutex
// forever.
//
// So, in summary, linux uses MSG_DONTWAIT and doesn't need mutexes, everyone
// else uses O_NONBLOCK and mutexes with some trickery to make sure blocking
// reads/writes are still blocking.
//
// Fun, fun!
//
// [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html
// [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait
pub fn read<T>(fd: sock_t,
deadline: u64,
lock: || -> T,
read: |bool| -> libc::c_int) -> IoResult<uint> {
let mut ret = -1;
if deadline == 0 {
ret = retry(|| read(false));
}
if deadline != 0 || (ret == -1 && util::wouldblock()) {
let deadline = match deadline {
0 => None,
n => Some(n),
};
loop {
// With a timeout, first we wait for the socket to become
// readable using select(), specifying the relevant timeout for
// our previously set deadline.
try!(util::await(fd, deadline, util::Readable));
// At this point, we're still within the timeout, and we've
// determined that the socket is readable (as returned by
// select). We must still read the socket in *nonblocking* mode
// because some other thread could come steal our data. If we
// fail to read some data, we retry (hence the outer loop) and
// wait for the socket to become readable again.
let _guard = lock();
match retry(|| read(deadline.is_some())) {
-1 if util::wouldblock() => { assert!(deadline.is_some()); }
-1 => return Err(last_error()),
n => { ret = n; break }
}
}
}
match ret {
0 => Err(io::standard_error(io::EndOfFile)),
n if n < 0 => Err(last_error()),
n => Ok(n as uint)
}
}
pub fn write<T>(fd: sock_t,
deadline: u64,
buf: &[u8],
write_everything: bool,
lock: || -> T,
write: |bool, *u8, uint| -> i64) -> IoResult<uint> {
let mut ret = -1;
let mut written = 0;
if deadline == 0 {
if write_everything {
ret = keep_going(buf, |inner, len| {
written = buf.len() - len;
write(false, inner, len)
});
} else {
ret = retry(|| {
write(false, buf.as_ptr(), buf.len()) as libc::c_int
}) as i64;
if ret > 0 { written = ret as uint; }
}
}
if deadline != 0 || (ret == -1 && util::wouldblock()) {
let deadline = match deadline {
0 => None,
n => Some(n),
};
while written < buf.len() && (write_everything || written == 0) {
// As with read(), first wait for the socket to be ready for
// the I/O operation.
match util::await(fd, deadline, util::Writable) {
Err(ref e) if e.kind == io::TimedOut && written > 0 => {
assert!(deadline.is_some());
return Err(io::IoError {
kind: io::ShortWrite(written),
desc: "short write",
detail: None,
})
}
Err(e) => return Err(e),
Ok(()) => {}
}
// Also as with read(), we use MSG_DONTWAIT to guard ourselves
// against unforseen circumstances.
let _guard = lock();
let ptr = buf.slice_from(written).as_ptr();
let len = buf.len() - written;
match retry(|| write(deadline.is_some(), ptr, len) as libc::c_int) {
-1 if util::wouldblock() => {}
-1 => return Err(last_error()),
n => { written += n as uint; }
}
}
ret = 0;
}
if ret < 0 {
Err(last_error())
} else {
Ok(written)
}
}

View File

@ -16,9 +16,12 @@ use std::io;
use std::mem;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;
use std::unstable::mutex;
use super::{IoResult, retry, keep_going};
use super::{IoResult, retry};
use super::net;
use super::util;
use super::c;
use super::file::fd_t;
fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
@ -55,6 +58,13 @@ fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint
struct Inner {
fd: fd_t,
lock: mutex::NativeMutex,
}
impl Inner {
fn new(fd: fd_t) -> Inner {
Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
}
}
impl Drop for Inner {
@ -64,7 +74,7 @@ impl Drop for Inner {
fn connect(addr: &CString, ty: libc::c_int,
timeout: Option<u64>) -> IoResult<Inner> {
let (addr, len) = try!(addr_to_sockaddr_un(addr));
let inner = Inner { fd: try!(unix_socket(ty)) };
let inner = Inner::new(try!(unix_socket(ty)));
let addrp = &addr as *_ as *libc::sockaddr;
let len = len as libc::socklen_t;
@ -84,7 +94,7 @@ fn connect(addr: &CString, ty: libc::c_int,
fn bind(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
let (addr, len) = try!(addr_to_sockaddr_un(addr));
let inner = Inner { fd: try!(unix_socket(ty)) };
let inner = Inner::new(try!(unix_socket(ty)));
let addrp = &addr as *libc::sockaddr_storage;
match unsafe {
libc::bind(inner.fd, addrp as *libc::sockaddr, len as libc::socklen_t)
@ -100,54 +110,74 @@ fn bind(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
pub struct UnixStream {
inner: UnsafeArc<Inner>,
read_deadline: u64,
write_deadline: u64,
}
impl UnixStream {
pub fn connect(addr: &CString,
timeout: Option<u64>) -> IoResult<UnixStream> {
connect(addr, libc::SOCK_STREAM, timeout).map(|inner| {
UnixStream { inner: UnsafeArc::new(inner) }
UnixStream::new(UnsafeArc::new(inner))
})
}
fn new(inner: UnsafeArc<Inner>) -> UnixStream {
UnixStream {
inner: inner,
read_deadline: 0,
write_deadline: 0,
}
}
fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } }
#[cfg(target_os = "linux")]
fn lock_nonblocking(&self) {}
#[cfg(not(target_os = "linux"))]
fn lock_nonblocking<'a>(&'a self) -> net::Guard<'a> {
let ret = net::Guard {
fd: self.fd(),
guard: unsafe { (*self.inner.get()).lock.lock() },
};
assert!(util::set_nonblocking(self.fd(), true).is_ok());
ret
}
}
impl rtio::RtioPipe for UnixStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
let ret = retry(|| unsafe {
libc::recv(self.fd(),
buf.as_ptr() as *mut libc::c_void,
let fd = self.fd();
let dolock = || self.lock_nonblocking();
let doread = |nb| unsafe {
let flags = if nb {c::MSG_DONTWAIT} else {0};
libc::recv(fd,
buf.as_mut_ptr() as *mut libc::c_void,
buf.len() as libc::size_t,
0) as libc::c_int
});
if ret == 0 {
Err(io::standard_error(io::EndOfFile))
} else if ret < 0 {
Err(super::last_error())
} else {
Ok(ret as uint)
}
flags) as libc::c_int
};
net::read(fd, self.read_deadline, dolock, doread)
}
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
let ret = keep_going(buf, |buf, len| unsafe {
libc::send(self.fd(),
let fd = self.fd();
let dolock = || self.lock_nonblocking();
let dowrite = |nb: bool, buf: *u8, len: uint| unsafe {
let flags = if nb {c::MSG_DONTWAIT} else {0};
libc::send(fd,
buf as *mut libc::c_void,
len as libc::size_t,
0) as i64
});
if ret < 0 {
Err(super::last_error())
} else {
Ok(())
flags) as i64
};
match net::write(fd, self.write_deadline, buf, true, dolock, dowrite) {
Ok(_) => Ok(()),
Err(e) => Err(e)
}
}
fn clone(&self) -> Box<rtio::RtioPipe:Send> {
box UnixStream {
inner: self.inner.clone(),
} as Box<rtio::RtioPipe:Send>
box UnixStream::new(self.inner.clone()) as Box<rtio::RtioPipe:Send>
}
fn close_write(&mut self) -> IoResult<()> {
@ -156,6 +186,17 @@ impl rtio::RtioPipe for UnixStream {
fn close_read(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
}
fn set_timeout(&mut self, timeout: Option<u64>) {
let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
self.read_deadline = deadline;
self.write_deadline = deadline;
}
fn set_read_timeout(&mut self, timeout: Option<u64>) {
self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
fn set_write_timeout(&mut self, timeout: Option<u64>) {
self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
}
////////////////////////////////////////////////////////////////////////////////
@ -202,7 +243,7 @@ impl UnixAcceptor {
pub fn native_accept(&mut self) -> IoResult<UnixStream> {
if self.deadline != 0 {
try!(util::accept_deadline(self.fd(), self.deadline));
try!(util::await(self.fd(), Some(self.deadline), util::Readable));
}
let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() };
let storagep = &mut storage as *mut libc::sockaddr_storage;
@ -214,7 +255,7 @@ impl UnixAcceptor {
&mut size as *mut libc::socklen_t) as libc::c_int
}) {
-1 => Err(super::last_error()),
fd => Ok(UnixStream { inner: UnsafeArc::new(Inner { fd: fd }) })
fd => Ok(UnixStream::new(UnsafeArc::new(Inner::new(fd))))
}
}
}

View File

@ -169,6 +169,27 @@ unsafe fn pipe(name: *u16, init: bool) -> libc::HANDLE {
)
}
pub fn await(handle: libc::HANDLE, deadline: u64,
overlapped: &mut libc::OVERLAPPED) -> bool {
if deadline == 0 { return true }
// If we've got a timeout, use WaitForSingleObject in tandem with CancelIo
// to figure out if we should indeed get the result.
let now = ::io::timer::now();
let timeout = deadline < now || unsafe {
let ms = (deadline - now) as libc::DWORD;
let r = libc::WaitForSingleObject(overlapped.hEvent,
ms);
r != libc::WAIT_OBJECT_0
};
if timeout {
unsafe { let _ = c::CancelIo(handle); }
false
} else {
true
}
}
////////////////////////////////////////////////////////////////////////////////
// Unix Streams
////////////////////////////////////////////////////////////////////////////////
@ -177,6 +198,8 @@ pub struct UnixStream {
inner: UnsafeArc<Inner>,
write: Option<Event>,
read: Option<Event>,
read_deadline: u64,
write_deadline: u64,
}
impl UnixStream {
@ -253,6 +276,8 @@ impl UnixStream {
inner: UnsafeArc::new(inner),
read: None,
write: None,
read_deadline: 0,
write_deadline: 0,
})
}
}
@ -358,6 +383,10 @@ impl rtio::RtioPipe for UnixStream {
// sleep.
drop(guard);
loop {
// Process a timeout if one is pending
let succeeded = await(self.handle(), self.read_deadline,
&mut overlapped);
let ret = unsafe {
libc::GetOverlappedResult(self.handle(),
&mut overlapped,
@ -373,6 +402,9 @@ impl rtio::RtioPipe for UnixStream {
// If the reading half is now closed, then we're done. If we woke up
// because the writing half was closed, keep trying.
if !succeeded {
return Err(io::standard_error(io::TimedOut))
}
if self.read_closed() {
return Err(io::standard_error(io::EndOfFile))
}
@ -408,12 +440,16 @@ impl rtio::RtioPipe for UnixStream {
&mut bytes_written,
&mut overlapped)
};
let err = os::errno();
drop(guard);
if ret == 0 {
if os::errno() != libc::ERROR_IO_PENDING as uint {
return Err(super::last_error())
if err != libc::ERROR_IO_PENDING as uint {
return Err(io::IoError::from_errno(err, true));
}
// Process a timeout if one is pending
let succeeded = await(self.handle(), self.write_deadline,
&mut overlapped);
let ret = unsafe {
libc::GetOverlappedResult(self.handle(),
&mut overlapped,
@ -427,10 +463,22 @@ impl rtio::RtioPipe for UnixStream {
if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
return Err(super::last_error())
}
if !succeeded {
let amt = offset + bytes_written as uint;
return if amt > 0 {
Err(io::IoError {
kind: io::ShortWrite(amt),
desc: "short write during write",
detail: None,
})
} else {
Err(util::timeout("write timed out"))
}
}
if self.write_closed() {
return Err(io::standard_error(io::BrokenPipe))
}
continue; // retry
continue // retry
}
}
offset += bytes_written as uint;
@ -443,6 +491,8 @@ impl rtio::RtioPipe for UnixStream {
inner: self.inner.clone(),
read: None,
write: None,
read_deadline: 0,
write_deadline: 0,
} as Box<rtio::RtioPipe:Send>
}
@ -475,6 +525,18 @@ impl rtio::RtioPipe for UnixStream {
unsafe { (*self.inner.get()).write_closed.store(true, atomics::SeqCst) }
self.cancel_io()
}
fn set_timeout(&mut self, timeout: Option<u64>) {
let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
self.read_deadline = deadline;
self.write_deadline = deadline;
}
fn set_read_timeout(&mut self, timeout: Option<u64>) {
self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
fn set_write_timeout(&mut self, timeout: Option<u64>) {
self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
}
////////////////////////////////////////////////////////////////////////////////
@ -577,22 +639,8 @@ impl UnixAcceptor {
let mut err = unsafe { libc::GetLastError() };
if err == libc::ERROR_IO_PENDING as libc::DWORD {
// If we've got a timeout, use WaitForSingleObject in tandem
// with CancelIo to figure out if we should indeed get the
// result.
if self.deadline != 0 {
let now = ::io::timer::now();
let timeout = self.deadline < now || unsafe {
let ms = (self.deadline - now) as libc::DWORD;
let r = libc::WaitForSingleObject(overlapped.hEvent,
ms);
r != libc::WAIT_OBJECT_0
};
if timeout {
unsafe { let _ = c::CancelIo(handle); }
return Err(util::timeout("accept timed out"))
}
}
// Process a timeout if one is pending
let _ = await(handle, self.deadline, &mut overlapped);
// This will block until the overlapped I/O is completed. The
// timeout was previously handled, so this will either block in
@ -638,6 +686,8 @@ impl UnixAcceptor {
inner: UnsafeArc::new(Inner::new(handle)),
read: None,
write: None,
read_deadline: 0,
write_deadline: 0,
})
}
}

View File

@ -12,12 +12,19 @@ use libc;
use std::io::IoResult;
use std::io;
use std::mem;
use std::os;
use std::ptr;
use super::c;
use super::net;
use super::{retry, last_error};
#[deriving(Show)]
pub enum SocketStatus {
Readable,
Writable,
}
pub fn timeout(desc: &'static str) -> io::IoError {
io::IoError {
kind: io::TimedOut,
@ -33,6 +40,34 @@ pub fn ms_to_timeval(ms: u64) -> libc::timeval {
}
}
#[cfg(unix)]
pub fn wouldblock() -> bool {
let err = os::errno();
err == libc::EWOULDBLOCK as int || err == libc::EAGAIN as int
}
#[cfg(windows)]
pub fn wouldblock() -> bool {
let err = os::errno();
err == libc::WSAEWOULDBLOCK as uint
}
#[cfg(unix)]
pub fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> {
let set = nb as libc::c_int;
super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
}
#[cfg(windows)]
pub fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> {
let mut set = nb as libc::c_ulong;
if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } {
Err(last_error())
} else {
Ok(())
}
}
// See http://developerweb.net/viewtopic.php?id=3196 for where this is
// derived from.
pub fn connect_timeout(fd: net::sock_t,
@ -79,22 +114,6 @@ pub fn connect_timeout(fd: net::sock_t,
try!(set_nonblocking(fd, false));
return ret;
#[cfg(unix)]
fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> {
let set = nb as libc::c_int;
super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
}
#[cfg(windows)]
fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> {
let mut set = nb as libc::c_ulong;
if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } {
Err(last_error())
} else {
Ok(())
}
}
#[cfg(unix)]
fn await(fd: net::sock_t, set: &mut c::fd_set,
timeout: u64) -> libc::c_int {
@ -116,21 +135,34 @@ pub fn connect_timeout(fd: net::sock_t,
}
}
pub fn accept_deadline(fd: net::sock_t, deadline: u64) -> IoResult<()> {
pub fn await(fd: net::sock_t, deadline: Option<u64>,
status: SocketStatus) -> IoResult<()> {
let mut set: c::fd_set = unsafe { mem::init() };
c::fd_set(&mut set, fd);
let (read, write) = match status {
Readable => (&set as *_, ptr::null()),
Writable => (ptr::null(), &set as *_),
};
let mut tv: libc::timeval = unsafe { mem::init() };
match retry(|| {
// If we're past the deadline, then pass a 0 timeout to select() so
// we can poll the status of the socket.
let now = ::io::timer::now();
let ms = if deadline < now {0} else {deadline - now};
let tv = ms_to_timeval(ms);
let tvp = match deadline {
None => ptr::null(),
Some(deadline) => {
// If we're past the deadline, then pass a 0 timeout to
// select() so we can poll the status
let ms = if deadline < now {0} else {deadline - now};
tv = ms_to_timeval(ms);
&tv as *_
}
};
let n = if cfg!(windows) {1} else {fd as libc::c_int + 1};
unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) }
let r = unsafe { c::select(n, read, write, ptr::null(), tvp) };
r
}) {
-1 => Err(last_error()),
0 => Err(timeout("accept timed out")),
_ => return Ok(()),
0 => Err(timeout("timed out")),
_ => Ok(()),
}
}

View File

@ -31,7 +31,7 @@ pub struct Guard<'a> {
}
struct Inner {
queue: Vec<BlockedTask>,
queue: Vec<(BlockedTask, uint)>,
held: bool,
closed: bool,
}
@ -47,16 +47,17 @@ impl Access {
}
}
pub fn grant<'a>(&'a mut self, missile: HomingMissile) -> Guard<'a> {
pub fn grant<'a>(&'a mut self, token: uint,
missile: HomingMissile) -> Guard<'a> {
// This unsafety is actually OK because the homing missile argument
// guarantees that we're on the same event loop as all the other objects
// attempting to get access granted.
let inner: &mut Inner = unsafe { cast::transmute(self.inner.get()) };
let inner: &mut Inner = unsafe { &mut *self.inner.get() };
if inner.held {
let t: Box<Task> = Local::take();
t.deschedule(1, |task| {
inner.queue.push(task);
inner.queue.push((task, token));
Ok(())
});
assert!(inner.held);
@ -75,6 +76,17 @@ impl Access {
// necessary synchronization to be running on this thread.
unsafe { (*self.inner.get()).closed = true; }
}
// Dequeue a blocked task with a specified token. This is unsafe because it
// is only safe to invoke while on the home event loop, and there is no
// guarantee that this i being invoked on the home event loop.
pub unsafe fn dequeue(&mut self, token: uint) -> Option<BlockedTask> {
let inner: &mut Inner = &mut *self.inner.get();
match inner.queue.iter().position(|&(_, t)| t == token) {
Some(i) => Some(inner.queue.remove(i).unwrap().val0()),
None => None,
}
}
}
impl Clone for Access {
@ -111,9 +123,9 @@ impl<'a> Drop for Guard<'a> {
// scheduled on this scheduler. Because we might be woken up on some
// other scheduler, we drop our homing missile before we reawaken
// the task.
Some(task) => {
Some((task, _)) => {
drop(self.missile.take());
let _ = task.wake().map(|t| t.reawaken());
task.reawaken();
}
None => { inner.held = false; }
}

View File

@ -84,6 +84,7 @@ fn start(argc: int, argv: **u8) -> int {
mod macros;
mod access;
mod timeout;
mod homing;
mod queue;
mod rc;

View File

@ -12,21 +12,20 @@ use libc::{size_t, ssize_t, c_int, c_void, c_uint};
use libc;
use std::cast;
use std::io;
use std::io::{IoError, IoResult};
use std::io::IoError;
use std::io::net::ip;
use std::mem;
use std::ptr;
use std::rt::rtio;
use std::rt::task::BlockedTask;
use access::Access;
use homing::{HomingIO, HomeHandle};
use rc::Refcount;
use stream::StreamWatcher;
use super::{Loop, Request, UvError, Buf, status_to_io_result,
uv_error_to_io_error, UvHandle, slice_to_uv_buf,
wait_until_woken_after, wakeup};
use timer::TimerWatcher;
use timeout::{AccessTimeout, AcceptTimeout, ConnectCtx};
use uvio::UvIoFactory;
use uvll;
@ -146,190 +145,6 @@ fn socket_name(sk: SocketNameKind,
n => Err(uv_error_to_io_error(UvError(n)))
}
}
////////////////////////////////////////////////////////////////////////////////
// Helpers for handling timeouts, shared for pipes/tcp
////////////////////////////////////////////////////////////////////////////////
pub struct ConnectCtx {
pub status: c_int,
pub task: Option<BlockedTask>,
pub timer: Option<Box<TimerWatcher>>,
}
pub struct AcceptTimeout {
timer: Option<TimerWatcher>,
timeout_tx: Option<Sender<()>>,
timeout_rx: Option<Receiver<()>>,
}
impl ConnectCtx {
pub fn connect<T>(
mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
f: |&Request, &T, uvll::uv_connect_cb| -> libc::c_int
) -> Result<T, UvError> {
let mut req = Request::new(uvll::UV_CONNECT);
let r = f(&req, &obj, connect_cb);
return match r {
0 => {
req.defuse(); // uv callback now owns this request
match timeout {
Some(t) => {
let mut timer = TimerWatcher::new(io);
timer.start(timer_cb, t, 0);
self.timer = Some(timer);
}
None => {}
}
wait_until_woken_after(&mut self.task, &io.loop_, || {
let data = &self as *_;
match self.timer {
Some(ref mut timer) => unsafe { timer.set_data(data) },
None => {}
}
req.set_data(data);
});
// Make sure an erroneously fired callback doesn't have access
// to the context any more.
req.set_data(0 as *int);
// If we failed because of a timeout, drop the TcpWatcher as
// soon as possible because it's data is now set to null and we
// want to cancel the callback ASAP.
match self.status {
0 => Ok(obj),
n => { drop(obj); Err(UvError(n)) }
}
}
n => Err(UvError(n))
};
extern fn timer_cb(handle: *uvll::uv_timer_t) {
// Don't close the corresponding tcp request, just wake up the task
// and let RAII take care of the pending watcher.
let cx: &mut ConnectCtx = unsafe {
&mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx)
};
cx.status = uvll::ECANCELED;
wakeup(&mut cx.task);
}
extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
// This callback can be invoked with ECANCELED if the watcher is
// closed by the timeout callback. In that case we just want to free
// the request and be along our merry way.
let req = Request::wrap(req);
if status == uvll::ECANCELED { return }
// Apparently on windows when the handle is closed this callback may
// not be invoked with ECANCELED but rather another error code.
// Either ways, if the data is null, then our timeout has expired
// and there's nothing we can do.
let data = unsafe { uvll::get_data_for_req(req.handle) };
if data.is_null() { return }
let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) };
cx.status = status;
match cx.timer {
Some(ref mut t) => t.stop(),
None => {}
}
// Note that the timer callback doesn't cancel the connect request
// (that's the job of uv_close()), so it's possible for this
// callback to get triggered after the timeout callback fires, but
// before the task wakes up. In that case, we did indeed
// successfully connect, but we don't need to wake someone up. We
// updated the status above (correctly so), and the task will pick
// up on this when it wakes up.
if cx.task.is_some() {
wakeup(&mut cx.task);
}
}
}
}
impl AcceptTimeout {
pub fn new() -> AcceptTimeout {
AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
}
pub fn accept<T: Send>(&mut self, c: &Receiver<IoResult<T>>) -> IoResult<T> {
match self.timeout_rx {
None => c.recv(),
Some(ref rx) => {
use std::comm::Select;
// Poll the incoming channel first (don't rely on the order of
// select just yet). If someone's pending then we should return
// them immediately.
match c.try_recv() {
Ok(data) => return data,
Err(..) => {}
}
// Use select to figure out which channel gets ready first. We
// do some custom handling of select to ensure that we never
// actually drain the timeout channel (we'll keep seeing the
// timeout message in the future).
let s = Select::new();
let mut timeout = s.handle(rx);
let mut data = s.handle(c);
unsafe {
timeout.add();
data.add();
}
if s.wait() == timeout.id() {
Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
} else {
c.recv()
}
}
}
}
pub fn clear(&mut self) {
// Clear any previous timeout by dropping the timer and transmission
// channels
drop((self.timer.take(),
self.timeout_tx.take(),
self.timeout_rx.take()))
}
pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
&mut self, ms: u64, t: &mut T
) {
// If we have a timeout, lazily initialize the timer which will be used
// to fire when the timeout runs out.
if self.timer.is_none() {
let _m = t.fire_homing_missile();
let loop_ = Loop::wrap(unsafe {
uvll::get_loop_for_uv_handle(t.uv_handle())
});
let mut timer = TimerWatcher::new_home(&loop_, t.home().clone());
unsafe {
timer.set_data(self as *mut _ as *AcceptTimeout);
}
self.timer = Some(timer);
}
// Once we've got a timer, stop any previous timeout, reset it for the
// current one, and install some new channels to send/receive data on
let timer = self.timer.get_mut_ref();
timer.stop();
timer.start(timer_cb, ms, 0);
let (tx, rx) = channel();
self.timeout_tx = Some(tx);
self.timeout_rx = Some(rx);
extern fn timer_cb(timer: *uvll::uv_timer_t) {
let acceptor: &mut AcceptTimeout = unsafe {
&mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
};
// This send can never fail because if this timer is active then the
// receiving channel is guaranteed to be alive
acceptor.timeout_tx.get_ref().send(());
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// TCP implementation
@ -345,8 +160,8 @@ pub struct TcpWatcher {
// stream object, so we use these access guards in order to arbitrate among
// multiple concurrent reads and writes. Note that libuv *can* read and
// write simultaneously, it just can't read and read simultaneously.
read_access: Access,
write_access: Access,
read_access: AccessTimeout,
write_access: AccessTimeout,
}
pub struct TcpListener {
@ -380,8 +195,8 @@ impl TcpWatcher {
handle: handle,
stream: StreamWatcher::new(handle),
refcount: Refcount::new(),
read_access: Access::new(),
write_access: Access::new(),
read_access: AccessTimeout::new(),
write_access: AccessTimeout::new(),
}
}
@ -412,10 +227,10 @@ impl rtio::RtioSocket for TcpWatcher {
impl rtio::RtioTcpStream for TcpWatcher {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
let m = self.fire_homing_missile();
let access = self.read_access.grant(m);
let guard = try!(self.read_access.grant(m));
// see comments in close_read about this check
if access.is_closed() {
if guard.access.is_closed() {
return Err(io::standard_error(io::EndOfFile))
}
@ -424,8 +239,8 @@ impl rtio::RtioTcpStream for TcpWatcher {
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
let m = self.fire_homing_missile();
let _g = self.write_access.grant(m);
self.stream.write(buf).map_err(uv_error_to_io_error)
let guard = try!(self.write_access.grant(m));
self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
}
fn peer_name(&mut self) -> Result<ip::SocketAddr, IoError> {
@ -468,16 +283,19 @@ impl rtio::RtioTcpStream for TcpWatcher {
stream: StreamWatcher::new(self.handle),
home: self.home.clone(),
refcount: self.refcount.clone(),
write_access: self.write_access.clone(),
read_access: self.read_access.clone(),
write_access: self.write_access.clone(),
} as Box<rtio::RtioTcpStream:Send>
}
fn close_read(&mut self) -> Result<(), IoError> {
// see comments in PipeWatcher::close_read
let m = self.fire_homing_missile();
self.read_access.close(&m);
self.stream.cancel_read(m);
let task = {
let m = self.fire_homing_missile();
self.read_access.access.close(&m);
self.stream.cancel_read(uvll::EOF as libc::ssize_t)
};
let _ = task.map(|t| t.reawaken());
Ok(())
}
@ -485,6 +303,35 @@ impl rtio::RtioTcpStream for TcpWatcher {
let _m = self.fire_homing_missile();
shutdown(self.handle, &self.uv_loop())
}
fn set_timeout(&mut self, timeout: Option<u64>) {
self.set_read_timeout(timeout);
self.set_write_timeout(timeout);
}
fn set_read_timeout(&mut self, ms: Option<u64>) {
let _m = self.fire_homing_missile();
let loop_ = self.uv_loop();
self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
&self.stream as *_ as uint);
fn cancel_read(stream: uint) -> Option<BlockedTask> {
let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
stream.cancel_read(uvll::ECANCELED as ssize_t)
}
}
fn set_write_timeout(&mut self, ms: Option<u64>) {
let _m = self.fire_homing_missile();
let loop_ = self.uv_loop();
self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
&self.stream as *_ as uint);
fn cancel_write(stream: uint) -> Option<BlockedTask> {
let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
stream.cancel_write()
}
}
}
impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
@ -618,6 +465,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
}
fn set_timeout(&mut self, ms: Option<u64>) {
let _m = self.fire_homing_missile();
match ms {
None => self.timeout.clear(),
Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
@ -635,8 +483,22 @@ pub struct UdpWatcher {
// See above for what these fields are
refcount: Refcount,
read_access: Access,
write_access: Access,
read_access: AccessTimeout,
write_access: AccessTimeout,
blocked_sender: Option<BlockedTask>,
}
struct UdpRecvCtx {
task: Option<BlockedTask>,
buf: Option<Buf>,
result: Option<(ssize_t, Option<ip::SocketAddr>)>,
}
struct UdpSendCtx {
result: c_int,
data: Option<Vec<u8>>,
udp: *mut UdpWatcher,
}
impl UdpWatcher {
@ -646,8 +508,9 @@ impl UdpWatcher {
handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
home: io.make_handle(),
refcount: Refcount::new(),
read_access: Access::new(),
write_access: Access::new(),
read_access: AccessTimeout::new(),
write_access: AccessTimeout::new(),
blocked_sender: None,
};
assert_eq!(unsafe {
uvll::uv_udp_init(io.uv_loop(), udp.handle)
@ -683,20 +546,15 @@ impl rtio::RtioUdpSocket for UdpWatcher {
fn recvfrom(&mut self, buf: &mut [u8])
-> Result<(uint, ip::SocketAddr), IoError>
{
struct Ctx {
task: Option<BlockedTask>,
buf: Option<Buf>,
result: Option<(ssize_t, Option<ip::SocketAddr>)>,
}
let loop_ = self.uv_loop();
let m = self.fire_homing_missile();
let _g = self.read_access.grant(m);
let _guard = try!(self.read_access.grant(m));
return match unsafe {
uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
} {
0 => {
let mut cx = Ctx {
let mut cx = UdpRecvCtx {
task: None,
buf: Some(slice_to_uv_buf(buf)),
result: None,
@ -718,7 +576,8 @@ impl rtio::RtioUdpSocket for UdpWatcher {
_suggested_size: size_t,
buf: *mut Buf) {
unsafe {
let cx = &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx);
let cx = uvll::get_data_for_uv_handle(handle);
let cx = &mut *(cx as *mut UdpRecvCtx);
*buf = cx.buf.take().expect("recv alloc_cb called more than once")
}
}
@ -727,7 +586,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
addr: *libc::sockaddr, _flags: c_uint) {
assert!(nread != uvll::ECANCELED as ssize_t);
let cx = unsafe {
&mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx)
&mut *(uvll::get_data_for_uv_handle(handle) as *mut UdpRecvCtx)
};
// When there's no data to read the recv callback can be a no-op.
@ -751,42 +610,68 @@ impl rtio::RtioUdpSocket for UdpWatcher {
}
fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> {
struct Ctx { task: Option<BlockedTask>, result: c_int }
let m = self.fire_homing_missile();
let loop_ = self.uv_loop();
let _g = self.write_access.grant(m);
let guard = try!(self.write_access.grant(m));
let mut req = Request::new(uvll::UV_UDP_SEND);
let buf = slice_to_uv_buf(buf);
let (addr, _len) = addr_to_sockaddr(dst);
let result = unsafe {
let addr_p = &addr as *libc::sockaddr_storage;
uvll::uv_udp_send(req.handle, self.handle, [buf],
addr_p as *libc::sockaddr, send_cb)
let addr_p = &addr as *_ as *libc::sockaddr;
// see comments in StreamWatcher::write for why we may allocate a buffer
// here.
let data = if guard.can_timeout {Some(Vec::from_slice(buf))} else {None};
let uv_buf = if guard.can_timeout {
slice_to_uv_buf(data.get_ref().as_slice())
} else {
slice_to_uv_buf(buf)
};
return match result {
return match unsafe {
uvll::uv_udp_send(req.handle, self.handle, [uv_buf], addr_p, send_cb)
} {
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { task: None, result: 0 };
wait_until_woken_after(&mut cx.task, &loop_, || {
let mut cx = UdpSendCtx {
result: uvll::ECANCELED, data: data, udp: self as *mut _
};
wait_until_woken_after(&mut self.blocked_sender, &loop_, || {
req.set_data(&cx);
});
match cx.result {
0 => Ok(()),
n => Err(uv_error_to_io_error(UvError(n)))
if cx.result != uvll::ECANCELED {
return match cx.result {
0 => Ok(()),
n => Err(uv_error_to_io_error(UvError(n)))
}
}
let new_cx = box UdpSendCtx {
result: 0,
udp: 0 as *mut UdpWatcher,
data: cx.data.take(),
};
unsafe {
req.set_data(&*new_cx);
cast::forget(new_cx);
}
Err(uv_error_to_io_error(UvError(cx.result)))
}
n => Err(uv_error_to_io_error(UvError(n)))
};
// This function is the same as stream::write_cb, but adapted for udp
// instead of streams.
extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
let req = Request::wrap(req);
assert!(status != uvll::ECANCELED);
let cx: &mut Ctx = unsafe { req.get_data() };
let cx: &mut UdpSendCtx = unsafe { req.get_data() };
cx.result = status;
wakeup(&mut cx.task);
if cx.udp as uint != 0 {
let udp: &mut UdpWatcher = unsafe { &mut *cx.udp };
wakeup(&mut udp.blocked_sender);
} else {
let _cx: Box<UdpSendCtx> = unsafe { cast::transmute(cx) };
}
}
}
@ -866,8 +751,48 @@ impl rtio::RtioUdpSocket for UdpWatcher {
refcount: self.refcount.clone(),
write_access: self.write_access.clone(),
read_access: self.read_access.clone(),
blocked_sender: None,
} as Box<rtio::RtioUdpSocket:Send>
}
fn set_timeout(&mut self, timeout: Option<u64>) {
self.set_read_timeout(timeout);
self.set_write_timeout(timeout);
}
fn set_read_timeout(&mut self, ms: Option<u64>) {
let _m = self.fire_homing_missile();
let loop_ = self.uv_loop();
self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
self.handle as uint);
fn cancel_read(stream: uint) -> Option<BlockedTask> {
// This method is quite similar to StreamWatcher::cancel_read, see
// there for more information
let handle = stream as *uvll::uv_udp_t;
assert_eq!(unsafe { uvll::uv_udp_recv_stop(handle) }, 0);
let data = unsafe {
let data = uvll::get_data_for_uv_handle(handle);
if data.is_null() { return None }
uvll::set_data_for_uv_handle(handle, 0 as *int);
&mut *(data as *mut UdpRecvCtx)
};
data.result = Some((uvll::ECANCELED as ssize_t, None));
data.task.take()
}
}
fn set_write_timeout(&mut self, ms: Option<u64>) {
let _m = self.fire_homing_missile();
let loop_ = self.uv_loop();
self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
self as *mut _ as uint);
fn cancel_write(stream: uint) -> Option<BlockedTask> {
let stream: &mut UdpWatcher = unsafe { cast::transmute(stream) };
stream.blocked_sender.take()
}
}
}
impl Drop for UdpWatcher {

View File

@ -10,16 +10,18 @@
use libc;
use std::c_str::CString;
use std::cast;
use std::io::IoError;
use std::io;
use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
use std::rt::task::BlockedTask;
use access::Access;
use homing::{HomingIO, HomeHandle};
use net;
use rc::Refcount;
use stream::StreamWatcher;
use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
use timeout::{AcceptTimeout, ConnectCtx, AccessTimeout};
use uvio::UvIoFactory;
use uvll;
@ -30,8 +32,8 @@ pub struct PipeWatcher {
refcount: Refcount,
// see comments in TcpWatcher for why these exist
write_access: Access,
read_access: Access,
write_access: AccessTimeout,
read_access: AccessTimeout,
}
pub struct PipeListener {
@ -43,7 +45,7 @@ pub struct PipeListener {
pub struct PipeAcceptor {
listener: Box<PipeListener>,
timeout: net::AcceptTimeout,
timeout: AcceptTimeout,
}
// PipeWatcher implementation and traits
@ -70,8 +72,8 @@ impl PipeWatcher {
home: home,
defused: false,
refcount: Refcount::new(),
read_access: Access::new(),
write_access: Access::new(),
read_access: AccessTimeout::new(),
write_access: AccessTimeout::new(),
}
}
@ -89,7 +91,7 @@ impl PipeWatcher {
-> Result<PipeWatcher, UvError>
{
let pipe = PipeWatcher::new(io, false);
let cx = net::ConnectCtx { status: -1, task: None, timer: None };
let cx = ConnectCtx { status: -1, task: None, timer: None };
cx.connect(pipe, timeout, io, |req, pipe, cb| {
unsafe {
uvll::uv_pipe_connect(req.handle, pipe.handle(),
@ -112,10 +114,10 @@ impl PipeWatcher {
impl RtioPipe for PipeWatcher {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
let m = self.fire_homing_missile();
let access = self.read_access.grant(m);
let guard = try!(self.read_access.grant(m));
// see comments in close_read about this check
if access.is_closed() {
if guard.access.is_closed() {
return Err(io::standard_error(io::EndOfFile))
}
@ -124,8 +126,8 @@ impl RtioPipe for PipeWatcher {
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
let m = self.fire_homing_missile();
let _g = self.write_access.grant(m);
self.stream.write(buf).map_err(uv_error_to_io_error)
let guard = try!(self.write_access.grant(m));
self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
}
fn clone(&self) -> Box<RtioPipe:Send> {
@ -157,9 +159,12 @@ impl RtioPipe for PipeWatcher {
// ordering is crucial because we could in theory be rescheduled during
// the uv_read_stop which means that another read invocation could leak
// in before we set the flag.
let m = self.fire_homing_missile();
self.read_access.close(&m);
self.stream.cancel_read(m);
let task = {
let m = self.fire_homing_missile();
self.read_access.access.close(&m);
self.stream.cancel_read(uvll::EOF as libc::ssize_t)
};
let _ = task.map(|t| t.reawaken());
Ok(())
}
@ -167,6 +172,35 @@ impl RtioPipe for PipeWatcher {
let _m = self.fire_homing_missile();
net::shutdown(self.stream.handle, &self.uv_loop())
}
fn set_timeout(&mut self, timeout: Option<u64>) {
self.set_read_timeout(timeout);
self.set_write_timeout(timeout);
}
fn set_read_timeout(&mut self, ms: Option<u64>) {
let _m = self.fire_homing_missile();
let loop_ = self.uv_loop();
self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
&self.stream as *_ as uint);
fn cancel_read(stream: uint) -> Option<BlockedTask> {
let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
stream.cancel_read(uvll::ECANCELED as libc::ssize_t)
}
}
fn set_write_timeout(&mut self, ms: Option<u64>) {
let _m = self.fire_homing_missile();
let loop_ = self.uv_loop();
self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
&self.stream as *_ as uint);
fn cancel_write(stream: uint) -> Option<BlockedTask> {
let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
stream.cancel_write()
}
}
}
impl HomingIO for PipeWatcher {
@ -219,7 +253,7 @@ impl RtioUnixListener for PipeListener {
// create the acceptor object from ourselves
let mut acceptor = box PipeAcceptor {
listener: self,
timeout: net::AcceptTimeout::new(),
timeout: AcceptTimeout::new(),
};
let _m = acceptor.fire_homing_missile();

View File

@ -14,7 +14,6 @@ use std::ptr;
use std::rt::task::BlockedTask;
use Loop;
use homing::HomingMissile;
use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after,
ForbidUnwind, wakeup};
use uvll;
@ -31,6 +30,8 @@ pub struct StreamWatcher {
// structure, but currently we don't have mappings for all the structures
// defined in libuv, so we're foced to malloc this.
last_write_req: Option<Request>,
blocked_writer: Option<BlockedTask>,
}
struct ReadContext {
@ -41,7 +42,8 @@ struct ReadContext {
struct WriteContext {
result: c_int,
task: Option<BlockedTask>,
stream: *mut StreamWatcher,
data: Option<Vec<u8>>,
}
impl StreamWatcher {
@ -62,6 +64,7 @@ impl StreamWatcher {
StreamWatcher {
handle: stream,
last_write_req: None,
blocked_writer: None,
}
}
@ -74,7 +77,7 @@ impl StreamWatcher {
buf: Some(slice_to_uv_buf(buf)),
// if the read is canceled, we'll see eof, otherwise this will get
// overwritten
result: uvll::EOF as ssize_t,
result: 0,
task: None,
};
// When reading a TTY stream on windows, libuv will invoke alloc_cb
@ -104,27 +107,22 @@ impl StreamWatcher {
return ret;
}
pub fn cancel_read(&mut self, m: HomingMissile) {
pub fn cancel_read(&mut self, reason: ssize_t) -> Option<BlockedTask> {
// When we invoke uv_read_stop, it cancels the read and alloc
// callbacks. We need to manually wake up a pending task (if one was
// present). Note that we wake up the task *outside* the homing missile
// to ensure that we don't switch schedulers when we're not supposed to.
// present).
assert_eq!(unsafe { uvll::uv_read_stop(self.handle) }, 0);
let data = unsafe {
let data = uvll::get_data_for_uv_handle(self.handle);
if data.is_null() { return }
if data.is_null() { return None }
uvll::set_data_for_uv_handle(self.handle, 0 as *int);
&mut *(data as *mut ReadContext)
};
let task = data.task.take();
drop(m);
match task {
Some(task) => { let _ = task.wake().map(|t| t.reawaken()); }
None => {}
}
data.result = reason;
data.task.take()
}
pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> {
pub fn write(&mut self, buf: &[u8], may_timeout: bool) -> Result<(), UvError> {
// The ownership of the write request is dubious if this function
// unwinds. I believe that if the write_cb fails to re-schedule the task
// then the write request will be leaked.
@ -137,30 +135,94 @@ impl StreamWatcher {
};
req.set_data(ptr::null::<()>());
// And here's where timeouts get a little interesting. Currently, libuv
// does not support canceling an in-flight write request. Consequently,
// when a write timeout expires, there's not much we can do other than
// detach the sleeping task from the write request itself. Semantically,
// this means that the write request will complete asynchronously, but
// the calling task will return error (because the write timed out).
//
// There is special wording in the documentation of set_write_timeout()
// indicating that this is a plausible failure scenario, and this
// function is why that wording exists.
//
// Implementation-wise, we must be careful when passing a buffer down to
// libuv. Most of this implementation avoids allocations becuase of the
// blocking guarantee (all stack local variables are valid for the
// entire read/write request). If our write request can be timed out,
// however, we must heap allocate the data and pass that to the libuv
// functions instead. The reason for this is that if we time out and
// return, there's no guarantee that `buf` is a valid buffer any more.
//
// To do this, the write context has an optionally owned vector of
// bytes.
let data = if may_timeout {Some(Vec::from_slice(buf))} else {None};
let uv_buf = if may_timeout {
slice_to_uv_buf(data.get_ref().as_slice())
} else {
slice_to_uv_buf(buf)
};
// Send off the request, but be careful to not block until we're sure
// that the write reqeust is queued. If the reqeust couldn't be queued,
// then we should return immediately with an error.
match unsafe {
uvll::uv_write(req.handle, self.handle, [slice_to_uv_buf(buf)],
write_cb)
uvll::uv_write(req.handle, self.handle, [uv_buf], write_cb)
} {
0 => {
let mut wcx = WriteContext { result: 0, task: None, };
let mut wcx = WriteContext {
result: uvll::ECANCELED,
stream: self as *mut _,
data: data,
};
req.defuse(); // uv callback now owns this request
let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) };
wait_until_woken_after(&mut wcx.task, &Loop::wrap(loop_), || {
wait_until_woken_after(&mut self.blocked_writer,
&Loop::wrap(loop_), || {
req.set_data(&wcx);
});
self.last_write_req = Some(Request::wrap(req.handle));
match wcx.result {
0 => Ok(()),
n => Err(UvError(n)),
if wcx.result != uvll::ECANCELED {
self.last_write_req = Some(Request::wrap(req.handle));
return match wcx.result {
0 => Ok(()),
n => Err(UvError(n)),
}
}
// This is the second case where canceling an in-flight write
// gets interesting. If we've been canceled (no one reset our
// result), then someone still needs to free the request, and
// someone still needs to free the allocate buffer.
//
// To take care of this, we swap out the stack-allocated write
// context for a heap-allocated context, transferring ownership
// of everything to the write_cb. Libuv guarantees that this
// callback will be invoked at some point, and the callback will
// be responsible for deallocating these resources.
//
// Note that we don't cache this write request back in the
// stream watcher because we no longer have ownership of it, and
// we never will.
let new_wcx = box WriteContext {
result: 0,
stream: 0 as *mut StreamWatcher,
data: wcx.data.take(),
};
unsafe {
req.set_data(&*new_wcx);
cast::forget(new_wcx);
}
Err(UvError(wcx.result))
}
n => Err(UvError(n)),
}
}
pub fn cancel_write(&mut self) -> Option<BlockedTask> {
self.blocked_writer.take()
}
}
// This allocation callback expects to be invoked once and only once. It will
@ -198,12 +260,18 @@ extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: *Buf) {
// away the error code as a result.
extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
let mut req = Request::wrap(req);
assert!(status != uvll::ECANCELED);
// Remember to not free the request because it is re-used between writes on
// the same stream.
let wcx: &mut WriteContext = unsafe { req.get_data() };
wcx.result = status;
req.defuse();
wakeup(&mut wcx.task);
// If the stream is present, we haven't timed out, otherwise we acquire
// ownership of everything and then deallocate it all at once.
if wcx.stream as uint != 0 {
req.defuse();
let stream: &mut StreamWatcher = unsafe { &mut *wcx.stream };
wakeup(&mut stream.blocked_writer);
} else {
let _wcx: Box<WriteContext> = unsafe { cast::transmute(wcx) };
}
}

394
src/librustuv/timeout.rs Normal file
View File

@ -0,0 +1,394 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use libc::c_int;
use std::cast;
use std::io::IoResult;
use std::mem;
use std::rt::task::BlockedTask;
use access;
use homing::{HomeHandle, HomingMissile, HomingIO};
use timer::TimerWatcher;
use uvll;
use uvio::UvIoFactory;
use {Loop, UvError, uv_error_to_io_error, Request, wakeup};
use {UvHandle, wait_until_woken_after};
/// Managment of a timeout when gaining access to a portion of a duplex stream.
pub struct AccessTimeout {
state: TimeoutState,
timer: Option<Box<TimerWatcher>>,
pub access: access::Access,
}
pub struct Guard<'a> {
state: &'a mut TimeoutState,
pub access: access::Guard<'a>,
pub can_timeout: bool,
}
#[deriving(Eq)]
enum TimeoutState {
NoTimeout,
TimeoutPending(ClientState),
TimedOut,
}
#[deriving(Eq)]
enum ClientState {
NoWaiter,
AccessPending,
RequestPending,
}
struct TimerContext {
timeout: *mut AccessTimeout,
callback: fn(uint) -> Option<BlockedTask>,
payload: uint,
}
impl AccessTimeout {
pub fn new() -> AccessTimeout {
AccessTimeout {
state: NoTimeout,
timer: None,
access: access::Access::new(),
}
}
/// Grants access to half of a duplex stream, timing out if necessary.
///
/// On success, Ok(Guard) is returned and access has been granted to the
/// stream. If a timeout occurs, then Err is returned with an appropriate
/// error.
pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult<Guard<'a>> {
// First, flag that we're attempting to acquire access. This will allow
// us to cancel the pending grant if we timeout out while waiting for a
// grant.
match self.state {
NoTimeout => {},
TimeoutPending(ref mut client) => *client = AccessPending,
TimedOut => return Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
}
let access = self.access.grant(self as *mut _ as uint, m);
// After acquiring the grant, we need to flag ourselves as having a
// pending request so the timeout knows to cancel the request.
let can_timeout = match self.state {
NoTimeout => false,
TimeoutPending(ref mut client) => { *client = RequestPending; true }
TimedOut => return Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
};
Ok(Guard {
access: access,
state: &mut self.state,
can_timeout: can_timeout
})
}
/// Sets the pending timeout to the value specified.
///
/// The home/loop variables are used to construct a timer if one has not
/// been previously constructed.
///
/// The callback will be invoked if the timeout elapses, and the data of
/// the time will be set to `data`.
pub fn set_timeout(&mut self, ms: Option<u64>,
home: &HomeHandle,
loop_: &Loop,
cb: fn(uint) -> Option<BlockedTask>,
data: uint) {
self.state = NoTimeout;
let ms = match ms {
Some(ms) => ms,
None => return match self.timer {
Some(ref mut t) => t.stop(),
None => {}
}
};
// If we have a timeout, lazily initialize the timer which will be used
// to fire when the timeout runs out.
if self.timer.is_none() {
let mut timer = box TimerWatcher::new_home(loop_, home.clone());
let cx = box TimerContext {
timeout: self as *mut _,
callback: cb,
payload: data,
};
unsafe {
timer.set_data(&*cx);
cast::forget(cx);
}
self.timer = Some(timer);
}
let timer = self.timer.get_mut_ref();
unsafe {
let cx = uvll::get_data_for_uv_handle(timer.handle);
let cx = cx as *mut TimerContext;
(*cx).callback = cb;
(*cx).payload = data;
}
timer.stop();
timer.start(timer_cb, ms, 0);
self.state = TimeoutPending(NoWaiter);
extern fn timer_cb(timer: *uvll::uv_timer_t) {
let cx: &TimerContext = unsafe {
&*(uvll::get_data_for_uv_handle(timer) as *TimerContext)
};
let me = unsafe { &mut *cx.timeout };
match mem::replace(&mut me.state, TimedOut) {
TimedOut | NoTimeout => unreachable!(),
TimeoutPending(NoWaiter) => {}
TimeoutPending(AccessPending) => {
match unsafe { me.access.dequeue(me as *mut _ as uint) } {
Some(task) => task.reawaken(),
None => unreachable!(),
}
}
TimeoutPending(RequestPending) => {
match (cx.callback)(cx.payload) {
Some(task) => task.reawaken(),
None => unreachable!(),
}
}
}
}
}
}
impl Clone for AccessTimeout {
fn clone(&self) -> AccessTimeout {
AccessTimeout {
access: self.access.clone(),
state: NoTimeout,
timer: None,
}
}
}
#[unsafe_destructor]
impl<'a> Drop for Guard<'a> {
fn drop(&mut self) {
match *self.state {
TimeoutPending(NoWaiter) | TimeoutPending(AccessPending) =>
unreachable!(),
NoTimeout | TimedOut => {}
TimeoutPending(RequestPending) => {
*self.state = TimeoutPending(NoWaiter);
}
}
}
}
impl Drop for AccessTimeout {
fn drop(&mut self) {
match self.timer {
Some(ref timer) => unsafe {
let data = uvll::get_data_for_uv_handle(timer.handle);
let _data: Box<TimerContext> = cast::transmute(data);
},
None => {}
}
}
}
////////////////////////////////////////////////////////////////////////////////
// Connect timeouts
////////////////////////////////////////////////////////////////////////////////
pub struct ConnectCtx {
pub status: c_int,
pub task: Option<BlockedTask>,
pub timer: Option<Box<TimerWatcher>>,
}
pub struct AcceptTimeout {
timer: Option<TimerWatcher>,
timeout_tx: Option<Sender<()>>,
timeout_rx: Option<Receiver<()>>,
}
impl ConnectCtx {
pub fn connect<T>(
mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
f: |&Request, &T, uvll::uv_connect_cb| -> c_int
) -> Result<T, UvError> {
let mut req = Request::new(uvll::UV_CONNECT);
let r = f(&req, &obj, connect_cb);
return match r {
0 => {
req.defuse(); // uv callback now owns this request
match timeout {
Some(t) => {
let mut timer = TimerWatcher::new(io);
timer.start(timer_cb, t, 0);
self.timer = Some(timer);
}
None => {}
}
wait_until_woken_after(&mut self.task, &io.loop_, || {
let data = &self as *_;
match self.timer {
Some(ref mut timer) => unsafe { timer.set_data(data) },
None => {}
}
req.set_data(data);
});
// Make sure an erroneously fired callback doesn't have access
// to the context any more.
req.set_data(0 as *int);
// If we failed because of a timeout, drop the TcpWatcher as
// soon as possible because it's data is now set to null and we
// want to cancel the callback ASAP.
match self.status {
0 => Ok(obj),
n => { drop(obj); Err(UvError(n)) }
}
}
n => Err(UvError(n))
};
extern fn timer_cb(handle: *uvll::uv_timer_t) {
// Don't close the corresponding tcp request, just wake up the task
// and let RAII take care of the pending watcher.
let cx: &mut ConnectCtx = unsafe {
&mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx)
};
cx.status = uvll::ECANCELED;
wakeup(&mut cx.task);
}
extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
// This callback can be invoked with ECANCELED if the watcher is
// closed by the timeout callback. In that case we just want to free
// the request and be along our merry way.
let req = Request::wrap(req);
if status == uvll::ECANCELED { return }
// Apparently on windows when the handle is closed this callback may
// not be invoked with ECANCELED but rather another error code.
// Either ways, if the data is null, then our timeout has expired
// and there's nothing we can do.
let data = unsafe { uvll::get_data_for_req(req.handle) };
if data.is_null() { return }
let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) };
cx.status = status;
match cx.timer {
Some(ref mut t) => t.stop(),
None => {}
}
// Note that the timer callback doesn't cancel the connect request
// (that's the job of uv_close()), so it's possible for this
// callback to get triggered after the timeout callback fires, but
// before the task wakes up. In that case, we did indeed
// successfully connect, but we don't need to wake someone up. We
// updated the status above (correctly so), and the task will pick
// up on this when it wakes up.
if cx.task.is_some() {
wakeup(&mut cx.task);
}
}
}
}
impl AcceptTimeout {
pub fn new() -> AcceptTimeout {
AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
}
pub fn accept<T: Send>(&mut self, c: &Receiver<IoResult<T>>) -> IoResult<T> {
match self.timeout_rx {
None => c.recv(),
Some(ref rx) => {
use std::comm::Select;
// Poll the incoming channel first (don't rely on the order of
// select just yet). If someone's pending then we should return
// them immediately.
match c.try_recv() {
Ok(data) => return data,
Err(..) => {}
}
// Use select to figure out which channel gets ready first. We
// do some custom handling of select to ensure that we never
// actually drain the timeout channel (we'll keep seeing the
// timeout message in the future).
let s = Select::new();
let mut timeout = s.handle(rx);
let mut data = s.handle(c);
unsafe {
timeout.add();
data.add();
}
if s.wait() == timeout.id() {
Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
} else {
c.recv()
}
}
}
}
pub fn clear(&mut self) {
match self.timeout_rx {
Some(ref t) => { let _ = t.try_recv(); }
None => {}
}
match self.timer {
Some(ref mut t) => t.stop(),
None => {}
}
}
pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
&mut self, ms: u64, t: &mut T
) {
// If we have a timeout, lazily initialize the timer which will be used
// to fire when the timeout runs out.
if self.timer.is_none() {
let loop_ = Loop::wrap(unsafe {
uvll::get_loop_for_uv_handle(t.uv_handle())
});
let mut timer = TimerWatcher::new_home(&loop_, t.home().clone());
unsafe {
timer.set_data(self as *mut _ as *AcceptTimeout);
}
self.timer = Some(timer);
}
// Once we've got a timer, stop any previous timeout, reset it for the
// current one, and install some new channels to send/receive data on
let timer = self.timer.get_mut_ref();
timer.stop();
timer.start(timer_cb, ms, 0);
let (tx, rx) = channel();
self.timeout_tx = Some(tx);
self.timeout_rx = Some(rx);
extern fn timer_cb(timer: *uvll::uv_timer_t) {
let acceptor: &mut AcceptTimeout = unsafe {
&mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
};
// This send can never fail because if this timer is active then the
// receiving channel is guaranteed to be alive
acceptor.timeout_tx.get_ref().send(());
}
}
}

View File

@ -18,7 +18,7 @@ use uvio::UvIoFactory;
use uvll;
pub struct TimerWatcher {
handle: *uvll::uv_timer_t,
pub handle: *uvll::uv_timer_t,
home: HomeHandle,
action: Option<NextAction>,
blocker: Option<BlockedTask>,

View File

@ -87,7 +87,7 @@ impl RtioTTY for TtyWatcher {
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
self.stream.write(buf).map_err(uv_error_to_io_error)
self.stream.write(buf, false).map_err(uv_error_to_io_error)
}
fn set_raw(&mut self, raw: bool) -> Result<(), IoError> {

View File

@ -326,6 +326,8 @@ impl IoError {
libc::WSAEADDRNOTAVAIL => (ConnectionRefused, "address not available"),
libc::WSAEADDRINUSE => (ConnectionRefused, "address in use"),
libc::ERROR_BROKEN_PIPE => (EndOfFile, "the pipe has ended"),
libc::ERROR_OPERATION_ABORTED =>
(TimedOut, "operation timed out"),
// libuv maps this error code to EISDIR. we do too. if it is found
// to be incorrect, we can add in some more machinery to only
@ -434,6 +436,17 @@ pub enum IoErrorKind {
InvalidInput,
/// The I/O operation's timeout expired, causing it to be canceled.
TimedOut,
/// This write operation failed to write all of its data.
///
/// Normally the write() method on a Writer guarantees that all of its data
/// has been written, but some operations may be terminated after only
/// partially writing some data. An example of this is a timed out write
/// which successfully wrote a known number of bytes, but bailed out after
/// doing so.
///
/// The payload contained as part of this variant is the number of bytes
/// which are known to have been successfully written.
ShortWrite(uint),
}
/// A trait for objects which are byte-oriented streams. Readers are defined by
@ -1429,7 +1442,8 @@ pub fn standard_error(kind: IoErrorKind) -> IoError {
PathDoesntExist => "no such file",
MismatchedFileTypeForOperation => "mismatched file type",
ResourceUnavailable => "resource unavailable",
TimedOut => "operation timed out"
TimedOut => "operation timed out",
ShortWrite(..) => "short write",
};
IoError {
kind: kind,

View File

@ -151,6 +151,69 @@ impl TcpStream {
/// Note that this method affects all cloned handles associated with this
/// stream, not just this one handle.
pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() }
/// Sets a timeout, in milliseconds, for blocking operations on this stream.
///
/// This function will set a timeout for all blocking operations (including
/// reads and writes) on this stream. The timeout specified is a relative
/// time, in milliseconds, into the future after which point operations will
/// time out. This means that the timeout must be reset periodically to keep
/// it from expiring. Specifying a value of `None` will clear the timeout
/// for this stream.
///
/// The timeout on this stream is local to this stream only. Setting a
/// timeout does not affect any other cloned instances of this stream, nor
/// does the timeout propagated to cloned handles of this stream. Setting
/// this timeout will override any specific read or write timeouts
/// previously set for this stream.
///
/// For clarification on the semantics of interrupting a read and a write,
/// take a look at `set_read_timeout` and `set_write_timeout`.
pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_timeout(timeout_ms)
}
/// Sets the timeout for read operations on this stream.
///
/// See documentation in `set_timeout` for the semantics of this read time.
/// This will overwrite any previous read timeout set through either this
/// function or `set_timeout`.
///
/// # Errors
///
/// When this timeout expires, if there is no pending read operation, no
/// action is taken. Otherwise, the read operation will be scheduled to
/// promptly return. If a timeout error is returned, then no data was read
/// during the timeout period.
pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_read_timeout(timeout_ms)
}
/// Sets the timeout for write operations on this stream.
///
/// See documentation in `set_timeout` for the semantics of this write time.
/// This will overwrite any previous write timeout set through either this
/// function or `set_timeout`.
///
/// # Errors
///
/// When this timeout expires, if there is no pending write operation, no
/// action is taken. Otherwise, the pending write operation will be
/// scheduled to promptly return. The actual state of the underlying stream
/// is not specified.
///
/// The write operation may return an error of type `ShortWrite` which
/// indicates that the object is known to have written an exact number of
/// bytes successfully during the timeout period, and the remaining bytes
/// were never written.
///
/// If the write operation returns `TimedOut`, then it the timeout primitive
/// does not know how many bytes were written as part of the timeout
/// operation. It may be the case that bytes continue to be written in an
/// asynchronous fashion after the call to write returns.
pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_write_timeout(timeout_ms)
}
}
impl Clone for TcpStream {
@ -892,6 +955,7 @@ mod test {
Err(ref e) if e.kind == TimedOut => {}
Err(e) => fail!("error: {}", e),
}
::task::deschedule();
if i == 1000 { fail!("should have a pending connection") }
}
drop(l);
@ -964,4 +1028,118 @@ mod test {
// this test will never finish if the child doesn't wake up
rx.recv();
})
iotest!(fn readwrite_timeouts() {
let addr = next_test_ip6();
let mut a = TcpListener::bind(addr).listen().unwrap();
let (tx, rx) = channel::<()>();
spawn(proc() {
let mut s = TcpStream::connect(addr).unwrap();
rx.recv();
assert!(s.write([0]).is_ok());
let _ = rx.recv_opt();
});
let mut s = a.accept().unwrap();
s.set_timeout(Some(20));
assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
s.set_timeout(Some(20));
for i in range(0, 1001) {
match s.write([0, .. 128 * 1024]) {
Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
Err(IoError { kind: TimedOut, .. }) => break,
Err(e) => fail!("{}", e),
}
if i == 1000 { fail!("should have filled up?!"); }
}
assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
tx.send(());
s.set_timeout(None);
assert_eq!(s.read([0, 0]), Ok(1));
})
iotest!(fn read_timeouts() {
let addr = next_test_ip6();
let mut a = TcpListener::bind(addr).listen().unwrap();
let (tx, rx) = channel::<()>();
spawn(proc() {
let mut s = TcpStream::connect(addr).unwrap();
rx.recv();
let mut amt = 0;
while amt < 100 * 128 * 1024 {
match s.read([0, ..128 * 1024]) {
Ok(n) => { amt += n; }
Err(e) => fail!("{}", e),
}
}
let _ = rx.recv_opt();
});
let mut s = a.accept().unwrap();
s.set_read_timeout(Some(20));
assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
tx.send(());
for _ in range(0, 100) {
assert!(s.write([0, ..128 * 1024]).is_ok());
}
})
iotest!(fn write_timeouts() {
let addr = next_test_ip6();
let mut a = TcpListener::bind(addr).listen().unwrap();
let (tx, rx) = channel::<()>();
spawn(proc() {
let mut s = TcpStream::connect(addr).unwrap();
rx.recv();
assert!(s.write([0]).is_ok());
let _ = rx.recv_opt();
});
let mut s = a.accept().unwrap();
s.set_write_timeout(Some(20));
for i in range(0, 1001) {
match s.write([0, .. 128 * 1024]) {
Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
Err(IoError { kind: TimedOut, .. }) => break,
Err(e) => fail!("{}", e),
}
if i == 1000 { fail!("should have filled up?!"); }
}
assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
tx.send(());
assert!(s.read([0]).is_ok());
})
iotest!(fn timeout_concurrent_read() {
let addr = next_test_ip6();
let mut a = TcpListener::bind(addr).listen().unwrap();
let (tx, rx) = channel::<()>();
spawn(proc() {
let mut s = TcpStream::connect(addr).unwrap();
rx.recv();
assert_eq!(s.write([0]), Ok(()));
let _ = rx.recv_opt();
});
let mut s = a.accept().unwrap();
let s2 = s.clone();
let (tx2, rx2) = channel();
spawn(proc() {
let mut s2 = s2;
assert_eq!(s2.read([0]), Ok(1));
tx2.send(());
});
s.set_read_timeout(Some(20));
assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
tx.send(());
rx2.recv();
})
}

View File

@ -20,6 +20,7 @@ use io::net::ip::{SocketAddr, IpAddr};
use io::{Reader, Writer, IoResult};
use kinds::Send;
use owned::Box;
use option::Option;
use result::{Ok, Err};
use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory, LocalIo};
@ -142,6 +143,27 @@ impl UdpSocket {
self.obj.ignore_broadcasts()
}
}
/// Sets the read/write timeout for this socket.
///
/// For more information, see `TcpStream::set_timeout`
pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_timeout(timeout_ms)
}
/// Sets the read timeout for this socket.
///
/// For more information, see `TcpStream::set_timeout`
pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_read_timeout(timeout_ms)
}
/// Sets the write timeout for this socket.
///
/// For more information, see `TcpStream::set_timeout`
pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_write_timeout(timeout_ms)
}
}
impl Clone for UdpSocket {
@ -485,4 +507,56 @@ mod test {
rx.recv();
serv_rx.recv();
})
iotest!(fn recvfrom_timeout() {
let addr1 = next_test_ip4();
let addr2 = next_test_ip4();
let mut a = UdpSocket::bind(addr1).unwrap();
let (tx, rx) = channel();
let (tx2, rx2) = channel();
spawn(proc() {
let mut a = UdpSocket::bind(addr2).unwrap();
assert_eq!(a.recvfrom([0]), Ok((1, addr1)));
assert_eq!(a.sendto([0], addr1), Ok(()));
rx.recv();
assert_eq!(a.sendto([0], addr1), Ok(()));
tx2.send(());
});
// Make sure that reads time out, but writes can continue
a.set_read_timeout(Some(20));
assert_eq!(a.recvfrom([0]).err().unwrap().kind, TimedOut);
assert_eq!(a.recvfrom([0]).err().unwrap().kind, TimedOut);
assert_eq!(a.sendto([0], addr2), Ok(()));
// Cloned handles should be able to block
let mut a2 = a.clone();
assert_eq!(a2.recvfrom([0]), Ok((1, addr2)));
// Clearing the timeout should allow for receiving
a.set_timeout(None);
tx.send(());
assert_eq!(a2.recvfrom([0]), Ok((1, addr2)));
// Make sure the child didn't die
rx2.recv();
})
iotest!(fn sendto_timeout() {
let addr1 = next_test_ip4();
let addr2 = next_test_ip4();
let mut a = UdpSocket::bind(addr1).unwrap();
let _b = UdpSocket::bind(addr2).unwrap();
a.set_write_timeout(Some(1000));
for _ in range(0, 100) {
match a.sendto([0, ..4*1024], addr2) {
Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
Err(IoError { kind: TimedOut, .. }) => break,
Err(e) => fail!("other error: {}", e),
}
}
})
}

View File

@ -61,21 +61,11 @@ impl UnixStream {
})
}
/// Connect to a pipe named by `path`. This will attempt to open a
/// connection to the underlying socket.
/// Connect to a pipe named by `path`, timing out if the specified number of
/// milliseconds.
///
/// The returned stream will be closed when the object falls out of scope.
///
/// # Example
///
/// ```rust
/// # #![allow(unused_must_use)]
/// use std::io::net::unix::UnixStream;
///
/// let server = Path::new("path/to/my/socket");
/// let mut stream = UnixStream::connect(&server);
/// stream.write([1, 2, 3]);
/// ```
/// This function is similar to `connect`, except that if `timeout_ms`
/// elapses the function will return an error of kind `TimedOut`.
#[experimental = "the timeout argument is likely to change types"]
pub fn connect_timeout<P: ToCStr>(path: &P,
timeout_ms: u64) -> IoResult<UnixStream> {
@ -103,6 +93,27 @@ impl UnixStream {
/// Note that this method affects all cloned handles associated with this
/// stream, not just this one handle.
pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() }
/// Sets the read/write timeout for this socket.
///
/// For more information, see `TcpStream::set_timeout`
pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_timeout(timeout_ms)
}
/// Sets the read timeout for this socket.
///
/// For more information, see `TcpStream::set_timeout`
pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_read_timeout(timeout_ms)
}
/// Sets the write timeout for this socket.
///
/// For more information, see `TcpStream::set_timeout`
pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_write_timeout(timeout_ms)
}
}
impl Clone for UnixStream {
@ -457,6 +468,7 @@ mod tests {
Err(ref e) if e.kind == TimedOut => {}
Err(e) => fail!("error: {}", e),
}
::task::deschedule();
if i == 1000 { fail!("should have a pending connection") }
}
drop(l);
@ -541,4 +553,122 @@ mod tests {
// this test will never finish if the child doesn't wake up
rx.recv();
})
iotest!(fn readwrite_timeouts() {
let addr = next_test_unix();
let mut a = UnixListener::bind(&addr).listen().unwrap();
let (tx, rx) = channel::<()>();
spawn(proc() {
let mut s = UnixStream::connect(&addr).unwrap();
rx.recv();
assert!(s.write([0]).is_ok());
let _ = rx.recv_opt();
});
let mut s = a.accept().unwrap();
s.set_timeout(Some(20));
assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
s.set_timeout(Some(20));
for i in range(0, 1001) {
match s.write([0, .. 128 * 1024]) {
Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
Err(IoError { kind: TimedOut, .. }) => break,
Err(e) => fail!("{}", e),
}
if i == 1000 { fail!("should have filled up?!"); }
}
// I'm not sure as to why, but apparently the write on windows always
// succeeds after the previous timeout. Who knows?
if !cfg!(windows) {
assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
}
tx.send(());
s.set_timeout(None);
assert_eq!(s.read([0, 0]), Ok(1));
})
iotest!(fn read_timeouts() {
let addr = next_test_unix();
let mut a = UnixListener::bind(&addr).listen().unwrap();
let (tx, rx) = channel::<()>();
spawn(proc() {
let mut s = UnixStream::connect(&addr).unwrap();
rx.recv();
let mut amt = 0;
while amt < 100 * 128 * 1024 {
match s.read([0, ..128 * 1024]) {
Ok(n) => { amt += n; }
Err(e) => fail!("{}", e),
}
}
let _ = rx.recv_opt();
});
let mut s = a.accept().unwrap();
s.set_read_timeout(Some(20));
assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
tx.send(());
for _ in range(0, 100) {
assert!(s.write([0, ..128 * 1024]).is_ok());
}
})
iotest!(fn write_timeouts() {
let addr = next_test_unix();
let mut a = UnixListener::bind(&addr).listen().unwrap();
let (tx, rx) = channel::<()>();
spawn(proc() {
let mut s = UnixStream::connect(&addr).unwrap();
rx.recv();
assert!(s.write([0]).is_ok());
let _ = rx.recv_opt();
});
let mut s = a.accept().unwrap();
s.set_write_timeout(Some(20));
for i in range(0, 1001) {
match s.write([0, .. 128 * 1024]) {
Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
Err(IoError { kind: TimedOut, .. }) => break,
Err(e) => fail!("{}", e),
}
if i == 1000 { fail!("should have filled up?!"); }
}
tx.send(());
assert!(s.read([0]).is_ok());
})
iotest!(fn timeout_concurrent_read() {
let addr = next_test_unix();
let mut a = UnixListener::bind(&addr).listen().unwrap();
let (tx, rx) = channel::<()>();
spawn(proc() {
let mut s = UnixStream::connect(&addr).unwrap();
rx.recv();
assert!(s.write([0]).is_ok());
let _ = rx.recv_opt();
});
let mut s = a.accept().unwrap();
let s2 = s.clone();
let (tx2, rx2) = channel();
spawn(proc() {
let mut s2 = s2;
assert!(s2.read([0]).is_ok());
tx2.send(());
});
s.set_read_timeout(Some(20));
assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
tx.send(());
rx2.recv();
})
}

View File

@ -222,6 +222,9 @@ pub trait RtioTcpStream : RtioSocket {
fn clone(&self) -> Box<RtioTcpStream:Send>;
fn close_write(&mut self) -> IoResult<()>;
fn close_read(&mut self) -> IoResult<()>;
fn set_timeout(&mut self, timeout_ms: Option<u64>);
fn set_read_timeout(&mut self, timeout_ms: Option<u64>);
fn set_write_timeout(&mut self, timeout_ms: Option<u64>);
}
pub trait RtioSocket {
@ -245,6 +248,9 @@ pub trait RtioUdpSocket : RtioSocket {
fn ignore_broadcasts(&mut self) -> IoResult<()>;
fn clone(&self) -> Box<RtioUdpSocket:Send>;
fn set_timeout(&mut self, timeout_ms: Option<u64>);
fn set_read_timeout(&mut self, timeout_ms: Option<u64>);
fn set_write_timeout(&mut self, timeout_ms: Option<u64>);
}
pub trait RtioTimer {
@ -278,6 +284,9 @@ pub trait RtioPipe {
fn close_write(&mut self) -> IoResult<()>;
fn close_read(&mut self) -> IoResult<()>;
fn set_timeout(&mut self, timeout_ms: Option<u64>);
fn set_read_timeout(&mut self, timeout_ms: Option<u64>);
fn set_write_timeout(&mut self, timeout_ms: Option<u64>);
}
pub trait RtioUnixListener {

View File

@ -323,6 +323,12 @@ impl BlockedTask {
}
}
/// Reawakens this task if ownership is acquired. If finer-grained control
/// is desired, use `wake` instead.
pub fn reawaken(self) {
self.wake().map(|t| t.reawaken());
}
// This assertion has two flavours because the wake involves an atomic op.
// In the faster version, destructors will fail dramatically instead.
#[cfg(not(test))] pub fn trash(self) { }