diff --git a/src/libnative/io/c_unix.rs b/src/libnative/io/c_unix.rs index e2bf515a1e5..abb22476e52 100644 --- a/src/libnative/io/c_unix.rs +++ b/src/libnative/io/c_unix.rs @@ -27,6 +27,13 @@ pub static FIOCLEX: libc::c_ulong = 0x20006601; #[cfg(target_os = "android")] pub static FIOCLEX: libc::c_ulong = 0x5451; +#[cfg(target_os = "macos")] +#[cfg(target_os = "freebsd")] +pub static MSG_DONTWAIT: libc::c_int = 0x80; +#[cfg(target_os = "linux")] +#[cfg(target_os = "android")] +pub static MSG_DONTWAIT: libc::c_int = 0x40; + extern { pub fn gettimeofday(timeval: *mut libc::timeval, tzp: *libc::c_void) -> libc::c_int; diff --git a/src/libnative/io/c_win32.rs b/src/libnative/io/c_win32.rs index 4fdd05a8b42..151111af3df 100644 --- a/src/libnative/io/c_win32.rs +++ b/src/libnative/io/c_win32.rs @@ -18,6 +18,7 @@ pub static WSADESCRIPTION_LEN: uint = 256; pub static WSASYS_STATUS_LEN: uint = 128; pub static FIONBIO: libc::c_long = 0x8004667e; static FD_SETSIZE: uint = 64; +pub static MSG_DONTWAIT: libc::c_int = 0; pub struct WSADATA { pub wVersion: libc::WORD, diff --git a/src/libnative/io/file_unix.rs b/src/libnative/io/file_unix.rs index 84ea0d29434..87225a10e76 100644 --- a/src/libnative/io/file_unix.rs +++ b/src/libnative/io/file_unix.rs @@ -189,6 +189,9 @@ impl rtio::RtioPipe for FileDesc { fn close_write(&mut self) -> Result<(), IoError> { Err(io::standard_error(io::InvalidInput)) } + fn set_timeout(&mut self, _t: Option) {} + fn set_read_timeout(&mut self, _t: Option) {} + fn set_write_timeout(&mut self, _t: Option) {} } impl rtio::RtioTTY for FileDesc { diff --git a/src/libnative/io/file_win32.rs b/src/libnative/io/file_win32.rs index c2acd91d476..282f9c2e343 100644 --- a/src/libnative/io/file_win32.rs +++ b/src/libnative/io/file_win32.rs @@ -221,6 +221,9 @@ impl rtio::RtioPipe for FileDesc { fn close_write(&mut self) -> IoResult<()> { Err(io::standard_error(io::InvalidInput)) } + fn set_timeout(&mut self, _t: Option) {} + fn set_read_timeout(&mut self, _t: Option) {} + fn set_write_timeout(&mut self, _t: Option) {} } impl rtio::RtioTTY for FileDesc { diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index a54fe911ae0..63d57756e5d 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -15,6 +15,7 @@ use std::io; use std::mem; use std::rt::rtio; use std::sync::arc::UnsafeArc; +use std::unstable::mutex; use super::{IoResult, retry, keep_going}; use super::c; @@ -236,22 +237,36 @@ pub fn init() { pub struct TcpStream { inner: UnsafeArc, + read_deadline: u64, + write_deadline: u64, } struct Inner { fd: sock_t, + lock: mutex::NativeMutex, +} + +pub struct Guard<'a> { + pub fd: sock_t, + pub guard: mutex::LockGuard<'a>, +} + +impl Inner { + fn new(fd: sock_t) -> Inner { + Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } } + } } impl TcpStream { pub fn connect(addr: ip::SocketAddr, timeout: Option) -> IoResult { let fd = try!(socket(addr, libc::SOCK_STREAM)); - let (addr, len) = addr_to_sockaddr(addr); - let inner = Inner { fd: fd }; - let ret = TcpStream { inner: UnsafeArc::new(inner) }; + let ret = TcpStream::new(Inner::new(fd)); - let len = len as libc::socklen_t; + let (addr, len) = addr_to_sockaddr(addr); let addrp = &addr as *_ as *libc::sockaddr; + let len = len as libc::socklen_t; + match timeout { Some(timeout) => { try!(util::connect_timeout(fd, addrp, len, timeout)); @@ -266,6 +281,14 @@ impl TcpStream { } } + fn new(inner: Inner) -> TcpStream { + TcpStream { + inner: UnsafeArc::new(inner), + read_deadline: 0, + write_deadline: 0, + } + } + pub fn fd(&self) -> sock_t { // This unsafety is fine because it's just a read-only arc unsafe { (*self.inner.get()).fd } @@ -299,6 +322,19 @@ impl TcpStream { fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> { Ok(()) } + + #[cfg(target_os = "linux")] + fn lock_nonblocking(&self) {} + + #[cfg(not(target_os = "linux"))] + fn lock_nonblocking<'a>(&'a self) -> Guard<'a> { + let ret = Guard { + fd: self.fd(), + guard: unsafe { (*self.inner.get()).lock.lock() }, + }; + assert!(util::set_nonblocking(self.fd(), true).is_ok()); + ret + } } #[cfg(windows)] type wrlen = libc::c_int; @@ -306,33 +342,31 @@ impl TcpStream { impl rtio::RtioTcpStream for TcpStream { fn read(&mut self, buf: &mut [u8]) -> IoResult { - let ret = retry(|| { - unsafe { - libc::recv(self.fd(), - buf.as_mut_ptr() as *mut libc::c_void, - buf.len() as wrlen, - 0) as libc::c_int - } - }); - if ret == 0 { - Err(io::standard_error(io::EndOfFile)) - } else if ret < 0 { - Err(last_error()) - } else { - Ok(ret as uint) - } + let fd = self.fd(); + let dolock = || self.lock_nonblocking(); + let doread = |nb| unsafe { + let flags = if nb {c::MSG_DONTWAIT} else {0}; + libc::recv(fd, + buf.as_mut_ptr() as *mut libc::c_void, + buf.len() as wrlen, + flags) as libc::c_int + }; + read(fd, self.read_deadline, dolock, doread) } + fn write(&mut self, buf: &[u8]) -> IoResult<()> { - let ret = keep_going(buf, |buf, len| unsafe { - libc::send(self.fd(), + let fd = self.fd(); + let dolock = || self.lock_nonblocking(); + let dowrite = |nb: bool, buf: *u8, len: uint| unsafe { + let flags = if nb {c::MSG_DONTWAIT} else {0}; + libc::send(fd, buf as *mut libc::c_void, len as wrlen, - 0) as i64 - }); - if ret < 0 { - Err(super::last_error()) - } else { - Ok(()) + flags) as i64 + }; + match write(fd, self.write_deadline, buf, true, dolock, dowrite) { + Ok(_) => Ok(()), + Err(e) => Err(e) } } fn peer_name(&mut self) -> IoResult { @@ -354,14 +388,29 @@ impl rtio::RtioTcpStream for TcpStream { fn clone(&self) -> Box { box TcpStream { inner: self.inner.clone(), + read_deadline: 0, + write_deadline: 0, } as Box } + fn close_write(&mut self) -> IoResult<()> { super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) }) } fn close_read(&mut self) -> IoResult<()> { super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) } + + fn set_timeout(&mut self, timeout: Option) { + let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + self.read_deadline = deadline; + self.write_deadline = deadline; + } + fn set_read_timeout(&mut self, timeout: Option) { + self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + } + fn set_write_timeout(&mut self, timeout: Option) { + self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + } } impl rtio::RtioSocket for TcpStream { @@ -374,6 +423,13 @@ impl Drop for Inner { fn drop(&mut self) { unsafe { close(self.fd); } } } +#[unsafe_destructor] +impl<'a> Drop for Guard<'a> { + fn drop(&mut self) { + assert!(util::set_nonblocking(self.fd, false).is_ok()); + } +} + //////////////////////////////////////////////////////////////////////////////// // TCP listeners //////////////////////////////////////////////////////////////////////////////// @@ -384,29 +440,24 @@ pub struct TcpListener { impl TcpListener { pub fn bind(addr: ip::SocketAddr) -> IoResult { - unsafe { - socket(addr, libc::SOCK_STREAM).and_then(|fd| { - let (addr, len) = addr_to_sockaddr(addr); - let addrp = &addr as *libc::sockaddr_storage; - let inner = Inner { fd: fd }; - let ret = TcpListener { inner: inner }; - // On platforms with Berkeley-derived sockets, this allows - // to quickly rebind a socket, without needing to wait for - // the OS to clean up the previous one. - if cfg!(unix) { - match setsockopt(fd, libc::SOL_SOCKET, - libc::SO_REUSEADDR, - 1 as libc::c_int) { - Err(n) => { return Err(n); }, - Ok(..) => { } - } - } - match libc::bind(fd, addrp as *libc::sockaddr, - len as libc::socklen_t) { - -1 => Err(last_error()), - _ => Ok(ret), - } - }) + let fd = try!(socket(addr, libc::SOCK_STREAM)); + let ret = TcpListener { inner: Inner::new(fd) }; + + let (addr, len) = addr_to_sockaddr(addr); + let addrp = &addr as *_ as *libc::sockaddr; + let len = len as libc::socklen_t; + + // On platforms with Berkeley-derived sockets, this allows + // to quickly rebind a socket, without needing to wait for + // the OS to clean up the previous one. + if cfg!(unix) { + try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR, + 1 as libc::c_int)); + } + + match unsafe { libc::bind(fd, addrp, len) } { + -1 => Err(last_error()), + _ => Ok(ret), } } @@ -444,7 +495,7 @@ impl TcpAcceptor { pub fn native_accept(&mut self) -> IoResult { if self.deadline != 0 { - try!(util::accept_deadline(self.fd(), self.deadline)); + try!(util::await(self.fd(), Some(self.deadline), util::Readable)); } unsafe { let mut storage: libc::sockaddr_storage = mem::init(); @@ -457,7 +508,7 @@ impl TcpAcceptor { &mut size as *mut libc::socklen_t) as libc::c_int }) as sock_t { -1 => Err(last_error()), - fd => Ok(TcpStream { inner: UnsafeArc::new(Inner { fd: fd })}) + fd => Ok(TcpStream::new(Inner::new(fd))), } } } @@ -487,22 +538,26 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { pub struct UdpSocket { inner: UnsafeArc, + read_deadline: u64, + write_deadline: u64, } impl UdpSocket { pub fn bind(addr: ip::SocketAddr) -> IoResult { - unsafe { - socket(addr, libc::SOCK_DGRAM).and_then(|fd| { - let (addr, len) = addr_to_sockaddr(addr); - let addrp = &addr as *libc::sockaddr_storage; - let inner = Inner { fd: fd }; - let ret = UdpSocket { inner: UnsafeArc::new(inner) }; - match libc::bind(fd, addrp as *libc::sockaddr, - len as libc::socklen_t) { - -1 => Err(last_error()), - _ => Ok(ret), - } - }) + let fd = try!(socket(addr, libc::SOCK_DGRAM)); + let ret = UdpSocket { + inner: UnsafeArc::new(Inner::new(fd)), + read_deadline: 0, + write_deadline: 0, + }; + + let (addr, len) = addr_to_sockaddr(addr); + let addrp = &addr as *_ as *libc::sockaddr; + let len = len as libc::socklen_t; + + match unsafe { libc::bind(fd, addrp, len) } { + -1 => Err(last_error()), + _ => Ok(ret), } } @@ -541,6 +596,19 @@ impl UdpSocket { } } } + + #[cfg(target_os = "linux")] + fn lock_nonblocking(&self) {} + + #[cfg(not(target_os = "linux"))] + fn lock_nonblocking<'a>(&'a self) -> Guard<'a> { + let ret = Guard { + fd: self.fd(), + guard: unsafe { (*self.inner.get()).lock.lock() }, + }; + assert!(util::set_nonblocking(self.fd(), true).is_ok()); + ret + } } impl rtio::RtioSocket for UdpSocket { @@ -554,48 +622,54 @@ impl rtio::RtioSocket for UdpSocket { impl rtio::RtioUdpSocket for UdpSocket { fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, ip::SocketAddr)> { - unsafe { - let mut storage: libc::sockaddr_storage = mem::init(); - let storagep = &mut storage as *mut libc::sockaddr_storage; - let mut addrlen: libc::socklen_t = - mem::size_of::() as libc::socklen_t; - let ret = retry(|| { - libc::recvfrom(self.fd(), - buf.as_ptr() as *mut libc::c_void, - buf.len() as msglen_t, - 0, - storagep as *mut libc::sockaddr, - &mut addrlen) as libc::c_int - }); - if ret < 0 { return Err(last_error()) } - sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| { - Ok((ret as uint, addr)) - }) - } + let fd = self.fd(); + let mut storage: libc::sockaddr_storage = unsafe { mem::init() }; + let storagep = &mut storage as *mut _ as *mut libc::sockaddr; + let mut addrlen: libc::socklen_t = + mem::size_of::() as libc::socklen_t; + + let dolock = || self.lock_nonblocking(); + let doread = |nb| unsafe { + let flags = if nb {c::MSG_DONTWAIT} else {0}; + libc::recvfrom(fd, + buf.as_mut_ptr() as *mut libc::c_void, + buf.len() as msglen_t, + flags, + storagep, + &mut addrlen) as libc::c_int + }; + let n = try!(read(fd, self.read_deadline, dolock, doread)); + sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| { + Ok((n as uint, addr)) + }) } + fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> IoResult<()> { - let (dst, len) = addr_to_sockaddr(dst); - let dstp = &dst as *libc::sockaddr_storage; - unsafe { - let ret = retry(|| { - libc::sendto(self.fd(), - buf.as_ptr() as *libc::c_void, - buf.len() as msglen_t, - 0, - dstp as *libc::sockaddr, - len as libc::socklen_t) as libc::c_int - }); - match ret { - -1 => Err(last_error()), - n if n as uint != buf.len() => { - Err(io::IoError { - kind: io::OtherIoError, - desc: "couldn't send entire packet at once", - detail: None, - }) - } - _ => Ok(()) - } + let (dst, dstlen) = addr_to_sockaddr(dst); + let dstp = &dst as *_ as *libc::sockaddr; + let dstlen = dstlen as libc::socklen_t; + + let fd = self.fd(); + let dolock = || self.lock_nonblocking(); + let dowrite = |nb, buf: *u8, len: uint| unsafe { + let flags = if nb {c::MSG_DONTWAIT} else {0}; + libc::sendto(fd, + buf as *libc::c_void, + len as msglen_t, + flags, + dstp, + dstlen) as i64 + }; + + let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite)); + if n != buf.len() { + Err(io::IoError { + kind: io::ShortWrite(n), + desc: "couldn't send entire packet at once", + detail: None, + }) + } else { + Ok(()) } } @@ -645,6 +719,179 @@ impl rtio::RtioUdpSocket for UdpSocket { fn clone(&self) -> Box { box UdpSocket { inner: self.inner.clone(), + read_deadline: 0, + write_deadline: 0, } as Box } + + fn set_timeout(&mut self, timeout: Option) { + let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + self.read_deadline = deadline; + self.write_deadline = deadline; + } + fn set_read_timeout(&mut self, timeout: Option) { + self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + } + fn set_write_timeout(&mut self, timeout: Option) { + self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Timeout helpers +// +// The read/write functions below are the helpers for reading/writing a socket +// with a possible deadline specified. This is generally viewed as a timed out +// I/O operation. +// +// From the application's perspective, timeouts apply to the I/O object, not to +// the underlying file descriptor (it's one timeout per object). This means that +// we can't use the SO_RCVTIMEO and corresponding send timeout option. +// +// The next idea to implement timeouts would be to use nonblocking I/O. An +// invocation of select() would wait (with a timeout) for a socket to be ready. +// Once its ready, we can perform the operation. Note that the operation *must* +// be nonblocking, even though select() says the socket is ready. This is +// because some other thread could have come and stolen our data (handles can be +// cloned). +// +// To implement nonblocking I/O, the first option we have is to use the +// O_NONBLOCK flag. Remember though that this is a global setting, affecting all +// I/O objects, so this was initially viewed as unwise. +// +// It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to +// send/recv, but the niftiness wears off once you realize it only works well on +// linux [1] [2]. This means that it's pretty easy to get a nonblocking +// operation on linux (no flag fidding, no affecting other objects), but not on +// other platforms. +// +// To work around this constraint on other platforms, we end up using the +// original strategy of flipping the O_NONBLOCK flag. As mentioned before, this +// could cause other objects' blocking operations to suddenly become +// nonblocking. To get around this, a "blocking operation" which returns EAGAIN +// falls back to using the same code path as nonblocking operations, but with an +// infinite timeout (select + send/recv). This helps emulate blocking +// reads/writes despite the underlying descriptor being nonblocking, as well as +// optimizing the fast path of just hitting one syscall in the good case. +// +// As a final caveat, this implementation uses a mutex so only one thread is +// doing a nonblocking operation at at time. This is the operation that comes +// after the select() (at which point we think the socket is ready). This is +// done for sanity to ensure that the state of the O_NONBLOCK flag is what we +// expect (wouldn't want someone turning it on when it should be off!). All +// operations performed in the lock are *nonblocking* to avoid holding the mutex +// forever. +// +// So, in summary, linux uses MSG_DONTWAIT and doesn't need mutexes, everyone +// else uses O_NONBLOCK and mutexes with some trickery to make sure blocking +// reads/writes are still blocking. +// +// Fun, fun! +// +// [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html +// [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait + +pub fn read(fd: sock_t, + deadline: u64, + lock: || -> T, + read: |bool| -> libc::c_int) -> IoResult { + let mut ret = -1; + if deadline == 0 { + ret = retry(|| read(false)); + } + + if deadline != 0 || (ret == -1 && util::wouldblock()) { + let deadline = match deadline { + 0 => None, + n => Some(n), + }; + loop { + // With a timeout, first we wait for the socket to become + // readable using select(), specifying the relevant timeout for + // our previously set deadline. + try!(util::await(fd, deadline, util::Readable)); + + // At this point, we're still within the timeout, and we've + // determined that the socket is readable (as returned by + // select). We must still read the socket in *nonblocking* mode + // because some other thread could come steal our data. If we + // fail to read some data, we retry (hence the outer loop) and + // wait for the socket to become readable again. + let _guard = lock(); + match retry(|| read(deadline.is_some())) { + -1 if util::wouldblock() => { assert!(deadline.is_some()); } + -1 => return Err(last_error()), + n => { ret = n; break } + } + } + } + + match ret { + 0 => Err(io::standard_error(io::EndOfFile)), + n if n < 0 => Err(last_error()), + n => Ok(n as uint) + } +} + +pub fn write(fd: sock_t, + deadline: u64, + buf: &[u8], + write_everything: bool, + lock: || -> T, + write: |bool, *u8, uint| -> i64) -> IoResult { + let mut ret = -1; + let mut written = 0; + if deadline == 0 { + if write_everything { + ret = keep_going(buf, |inner, len| { + written = buf.len() - len; + write(false, inner, len) + }); + } else { + ret = retry(|| { + write(false, buf.as_ptr(), buf.len()) as libc::c_int + }) as i64; + if ret > 0 { written = ret as uint; } + } + } + + if deadline != 0 || (ret == -1 && util::wouldblock()) { + let deadline = match deadline { + 0 => None, + n => Some(n), + }; + while written < buf.len() && (write_everything || written == 0) { + // As with read(), first wait for the socket to be ready for + // the I/O operation. + match util::await(fd, deadline, util::Writable) { + Err(ref e) if e.kind == io::TimedOut && written > 0 => { + assert!(deadline.is_some()); + return Err(io::IoError { + kind: io::ShortWrite(written), + desc: "short write", + detail: None, + }) + } + Err(e) => return Err(e), + Ok(()) => {} + } + + // Also as with read(), we use MSG_DONTWAIT to guard ourselves + // against unforseen circumstances. + let _guard = lock(); + let ptr = buf.slice_from(written).as_ptr(); + let len = buf.len() - written; + match retry(|| write(deadline.is_some(), ptr, len) as libc::c_int) { + -1 if util::wouldblock() => {} + -1 => return Err(last_error()), + n => { written += n as uint; } + } + } + ret = 0; + } + if ret < 0 { + Err(last_error()) + } else { + Ok(written) + } } diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs index 94aca1ef748..36ae2ba06d5 100644 --- a/src/libnative/io/pipe_unix.rs +++ b/src/libnative/io/pipe_unix.rs @@ -16,9 +16,12 @@ use std::io; use std::mem; use std::rt::rtio; use std::sync::arc::UnsafeArc; +use std::unstable::mutex; -use super::{IoResult, retry, keep_going}; +use super::{IoResult, retry}; +use super::net; use super::util; +use super::c; use super::file::fd_t; fn unix_socket(ty: libc::c_int) -> IoResult { @@ -55,6 +58,13 @@ fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint struct Inner { fd: fd_t, + lock: mutex::NativeMutex, +} + +impl Inner { + fn new(fd: fd_t) -> Inner { + Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } } + } } impl Drop for Inner { @@ -64,7 +74,7 @@ impl Drop for Inner { fn connect(addr: &CString, ty: libc::c_int, timeout: Option) -> IoResult { let (addr, len) = try!(addr_to_sockaddr_un(addr)); - let inner = Inner { fd: try!(unix_socket(ty)) }; + let inner = Inner::new(try!(unix_socket(ty))); let addrp = &addr as *_ as *libc::sockaddr; let len = len as libc::socklen_t; @@ -84,7 +94,7 @@ fn connect(addr: &CString, ty: libc::c_int, fn bind(addr: &CString, ty: libc::c_int) -> IoResult { let (addr, len) = try!(addr_to_sockaddr_un(addr)); - let inner = Inner { fd: try!(unix_socket(ty)) }; + let inner = Inner::new(try!(unix_socket(ty))); let addrp = &addr as *libc::sockaddr_storage; match unsafe { libc::bind(inner.fd, addrp as *libc::sockaddr, len as libc::socklen_t) @@ -100,54 +110,74 @@ fn bind(addr: &CString, ty: libc::c_int) -> IoResult { pub struct UnixStream { inner: UnsafeArc, + read_deadline: u64, + write_deadline: u64, } impl UnixStream { pub fn connect(addr: &CString, timeout: Option) -> IoResult { connect(addr, libc::SOCK_STREAM, timeout).map(|inner| { - UnixStream { inner: UnsafeArc::new(inner) } + UnixStream::new(UnsafeArc::new(inner)) }) } + fn new(inner: UnsafeArc) -> UnixStream { + UnixStream { + inner: inner, + read_deadline: 0, + write_deadline: 0, + } + } + fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } } + + #[cfg(target_os = "linux")] + fn lock_nonblocking(&self) {} + + #[cfg(not(target_os = "linux"))] + fn lock_nonblocking<'a>(&'a self) -> net::Guard<'a> { + let ret = net::Guard { + fd: self.fd(), + guard: unsafe { (*self.inner.get()).lock.lock() }, + }; + assert!(util::set_nonblocking(self.fd(), true).is_ok()); + ret + } } impl rtio::RtioPipe for UnixStream { fn read(&mut self, buf: &mut [u8]) -> IoResult { - let ret = retry(|| unsafe { - libc::recv(self.fd(), - buf.as_ptr() as *mut libc::c_void, + let fd = self.fd(); + let dolock = || self.lock_nonblocking(); + let doread = |nb| unsafe { + let flags = if nb {c::MSG_DONTWAIT} else {0}; + libc::recv(fd, + buf.as_mut_ptr() as *mut libc::c_void, buf.len() as libc::size_t, - 0) as libc::c_int - }); - if ret == 0 { - Err(io::standard_error(io::EndOfFile)) - } else if ret < 0 { - Err(super::last_error()) - } else { - Ok(ret as uint) - } + flags) as libc::c_int + }; + net::read(fd, self.read_deadline, dolock, doread) } fn write(&mut self, buf: &[u8]) -> IoResult<()> { - let ret = keep_going(buf, |buf, len| unsafe { - libc::send(self.fd(), + let fd = self.fd(); + let dolock = || self.lock_nonblocking(); + let dowrite = |nb: bool, buf: *u8, len: uint| unsafe { + let flags = if nb {c::MSG_DONTWAIT} else {0}; + libc::send(fd, buf as *mut libc::c_void, len as libc::size_t, - 0) as i64 - }); - if ret < 0 { - Err(super::last_error()) - } else { - Ok(()) + flags) as i64 + }; + match net::write(fd, self.write_deadline, buf, true, dolock, dowrite) { + Ok(_) => Ok(()), + Err(e) => Err(e) } } fn clone(&self) -> Box { - box UnixStream { - inner: self.inner.clone(), - } as Box + box UnixStream::new(self.inner.clone()) as Box } fn close_write(&mut self) -> IoResult<()> { @@ -156,6 +186,17 @@ impl rtio::RtioPipe for UnixStream { fn close_read(&mut self) -> IoResult<()> { super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) } + fn set_timeout(&mut self, timeout: Option) { + let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + self.read_deadline = deadline; + self.write_deadline = deadline; + } + fn set_read_timeout(&mut self, timeout: Option) { + self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + } + fn set_write_timeout(&mut self, timeout: Option) { + self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + } } //////////////////////////////////////////////////////////////////////////////// @@ -202,7 +243,7 @@ impl UnixAcceptor { pub fn native_accept(&mut self) -> IoResult { if self.deadline != 0 { - try!(util::accept_deadline(self.fd(), self.deadline)); + try!(util::await(self.fd(), Some(self.deadline), util::Readable)); } let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() }; let storagep = &mut storage as *mut libc::sockaddr_storage; @@ -214,7 +255,7 @@ impl UnixAcceptor { &mut size as *mut libc::socklen_t) as libc::c_int }) { -1 => Err(super::last_error()), - fd => Ok(UnixStream { inner: UnsafeArc::new(Inner { fd: fd }) }) + fd => Ok(UnixStream::new(UnsafeArc::new(Inner::new(fd)))) } } } diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs index 8050123cedc..af80c7174f2 100644 --- a/src/libnative/io/pipe_win32.rs +++ b/src/libnative/io/pipe_win32.rs @@ -169,6 +169,27 @@ unsafe fn pipe(name: *u16, init: bool) -> libc::HANDLE { ) } +pub fn await(handle: libc::HANDLE, deadline: u64, + overlapped: &mut libc::OVERLAPPED) -> bool { + if deadline == 0 { return true } + + // If we've got a timeout, use WaitForSingleObject in tandem with CancelIo + // to figure out if we should indeed get the result. + let now = ::io::timer::now(); + let timeout = deadline < now || unsafe { + let ms = (deadline - now) as libc::DWORD; + let r = libc::WaitForSingleObject(overlapped.hEvent, + ms); + r != libc::WAIT_OBJECT_0 + }; + if timeout { + unsafe { let _ = c::CancelIo(handle); } + false + } else { + true + } +} + //////////////////////////////////////////////////////////////////////////////// // Unix Streams //////////////////////////////////////////////////////////////////////////////// @@ -177,6 +198,8 @@ pub struct UnixStream { inner: UnsafeArc, write: Option, read: Option, + read_deadline: u64, + write_deadline: u64, } impl UnixStream { @@ -253,6 +276,8 @@ impl UnixStream { inner: UnsafeArc::new(inner), read: None, write: None, + read_deadline: 0, + write_deadline: 0, }) } } @@ -358,6 +383,10 @@ impl rtio::RtioPipe for UnixStream { // sleep. drop(guard); loop { + // Process a timeout if one is pending + let succeeded = await(self.handle(), self.read_deadline, + &mut overlapped); + let ret = unsafe { libc::GetOverlappedResult(self.handle(), &mut overlapped, @@ -373,6 +402,9 @@ impl rtio::RtioPipe for UnixStream { // If the reading half is now closed, then we're done. If we woke up // because the writing half was closed, keep trying. + if !succeeded { + return Err(io::standard_error(io::TimedOut)) + } if self.read_closed() { return Err(io::standard_error(io::EndOfFile)) } @@ -408,12 +440,16 @@ impl rtio::RtioPipe for UnixStream { &mut bytes_written, &mut overlapped) }; + let err = os::errno(); drop(guard); if ret == 0 { - if os::errno() != libc::ERROR_IO_PENDING as uint { - return Err(super::last_error()) + if err != libc::ERROR_IO_PENDING as uint { + return Err(io::IoError::from_errno(err, true)); } + // Process a timeout if one is pending + let succeeded = await(self.handle(), self.write_deadline, + &mut overlapped); let ret = unsafe { libc::GetOverlappedResult(self.handle(), &mut overlapped, @@ -427,10 +463,22 @@ impl rtio::RtioPipe for UnixStream { if os::errno() != libc::ERROR_OPERATION_ABORTED as uint { return Err(super::last_error()) } + if !succeeded { + let amt = offset + bytes_written as uint; + return if amt > 0 { + Err(io::IoError { + kind: io::ShortWrite(amt), + desc: "short write during write", + detail: None, + }) + } else { + Err(util::timeout("write timed out")) + } + } if self.write_closed() { return Err(io::standard_error(io::BrokenPipe)) } - continue; // retry + continue // retry } } offset += bytes_written as uint; @@ -443,6 +491,8 @@ impl rtio::RtioPipe for UnixStream { inner: self.inner.clone(), read: None, write: None, + read_deadline: 0, + write_deadline: 0, } as Box } @@ -475,6 +525,18 @@ impl rtio::RtioPipe for UnixStream { unsafe { (*self.inner.get()).write_closed.store(true, atomics::SeqCst) } self.cancel_io() } + + fn set_timeout(&mut self, timeout: Option) { + let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + self.read_deadline = deadline; + self.write_deadline = deadline; + } + fn set_read_timeout(&mut self, timeout: Option) { + self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + } + fn set_write_timeout(&mut self, timeout: Option) { + self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + } } //////////////////////////////////////////////////////////////////////////////// @@ -577,22 +639,8 @@ impl UnixAcceptor { let mut err = unsafe { libc::GetLastError() }; if err == libc::ERROR_IO_PENDING as libc::DWORD { - // If we've got a timeout, use WaitForSingleObject in tandem - // with CancelIo to figure out if we should indeed get the - // result. - if self.deadline != 0 { - let now = ::io::timer::now(); - let timeout = self.deadline < now || unsafe { - let ms = (self.deadline - now) as libc::DWORD; - let r = libc::WaitForSingleObject(overlapped.hEvent, - ms); - r != libc::WAIT_OBJECT_0 - }; - if timeout { - unsafe { let _ = c::CancelIo(handle); } - return Err(util::timeout("accept timed out")) - } - } + // Process a timeout if one is pending + let _ = await(handle, self.deadline, &mut overlapped); // This will block until the overlapped I/O is completed. The // timeout was previously handled, so this will either block in @@ -638,6 +686,8 @@ impl UnixAcceptor { inner: UnsafeArc::new(Inner::new(handle)), read: None, write: None, + read_deadline: 0, + write_deadline: 0, }) } } diff --git a/src/libnative/io/util.rs b/src/libnative/io/util.rs index 0aaac8f8ad8..0d032f9f4bc 100644 --- a/src/libnative/io/util.rs +++ b/src/libnative/io/util.rs @@ -12,12 +12,19 @@ use libc; use std::io::IoResult; use std::io; use std::mem; +use std::os; use std::ptr; use super::c; use super::net; use super::{retry, last_error}; +#[deriving(Show)] +pub enum SocketStatus { + Readable, + Writable, +} + pub fn timeout(desc: &'static str) -> io::IoError { io::IoError { kind: io::TimedOut, @@ -33,6 +40,34 @@ pub fn ms_to_timeval(ms: u64) -> libc::timeval { } } +#[cfg(unix)] +pub fn wouldblock() -> bool { + let err = os::errno(); + err == libc::EWOULDBLOCK as int || err == libc::EAGAIN as int +} + +#[cfg(windows)] +pub fn wouldblock() -> bool { + let err = os::errno(); + err == libc::WSAEWOULDBLOCK as uint +} + +#[cfg(unix)] +pub fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> { + let set = nb as libc::c_int; + super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) })) +} + +#[cfg(windows)] +pub fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> { + let mut set = nb as libc::c_ulong; + if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } { + Err(last_error()) + } else { + Ok(()) + } +} + // See http://developerweb.net/viewtopic.php?id=3196 for where this is // derived from. pub fn connect_timeout(fd: net::sock_t, @@ -79,22 +114,6 @@ pub fn connect_timeout(fd: net::sock_t, try!(set_nonblocking(fd, false)); return ret; - #[cfg(unix)] - fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> { - let set = nb as libc::c_int; - super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) })) - } - - #[cfg(windows)] - fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> { - let mut set = nb as libc::c_ulong; - if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } { - Err(last_error()) - } else { - Ok(()) - } - } - #[cfg(unix)] fn await(fd: net::sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int { @@ -116,21 +135,34 @@ pub fn connect_timeout(fd: net::sock_t, } } -pub fn accept_deadline(fd: net::sock_t, deadline: u64) -> IoResult<()> { +pub fn await(fd: net::sock_t, deadline: Option, + status: SocketStatus) -> IoResult<()> { let mut set: c::fd_set = unsafe { mem::init() }; c::fd_set(&mut set, fd); + let (read, write) = match status { + Readable => (&set as *_, ptr::null()), + Writable => (ptr::null(), &set as *_), + }; + let mut tv: libc::timeval = unsafe { mem::init() }; match retry(|| { - // If we're past the deadline, then pass a 0 timeout to select() so - // we can poll the status of the socket. let now = ::io::timer::now(); - let ms = if deadline < now {0} else {deadline - now}; - let tv = ms_to_timeval(ms); + let tvp = match deadline { + None => ptr::null(), + Some(deadline) => { + // If we're past the deadline, then pass a 0 timeout to + // select() so we can poll the status + let ms = if deadline < now {0} else {deadline - now}; + tv = ms_to_timeval(ms); + &tv as *_ + } + }; let n = if cfg!(windows) {1} else {fd as libc::c_int + 1}; - unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) } + let r = unsafe { c::select(n, read, write, ptr::null(), tvp) }; + r }) { -1 => Err(last_error()), - 0 => Err(timeout("accept timed out")), - _ => return Ok(()), + 0 => Err(timeout("timed out")), + _ => Ok(()), } } diff --git a/src/librustuv/access.rs b/src/librustuv/access.rs index f96fa1e5be6..433073b43c4 100644 --- a/src/librustuv/access.rs +++ b/src/librustuv/access.rs @@ -31,7 +31,7 @@ pub struct Guard<'a> { } struct Inner { - queue: Vec, + queue: Vec<(BlockedTask, uint)>, held: bool, closed: bool, } @@ -47,16 +47,17 @@ impl Access { } } - pub fn grant<'a>(&'a mut self, missile: HomingMissile) -> Guard<'a> { + pub fn grant<'a>(&'a mut self, token: uint, + missile: HomingMissile) -> Guard<'a> { // This unsafety is actually OK because the homing missile argument // guarantees that we're on the same event loop as all the other objects // attempting to get access granted. - let inner: &mut Inner = unsafe { cast::transmute(self.inner.get()) }; + let inner: &mut Inner = unsafe { &mut *self.inner.get() }; if inner.held { let t: Box = Local::take(); t.deschedule(1, |task| { - inner.queue.push(task); + inner.queue.push((task, token)); Ok(()) }); assert!(inner.held); @@ -75,6 +76,17 @@ impl Access { // necessary synchronization to be running on this thread. unsafe { (*self.inner.get()).closed = true; } } + + // Dequeue a blocked task with a specified token. This is unsafe because it + // is only safe to invoke while on the home event loop, and there is no + // guarantee that this i being invoked on the home event loop. + pub unsafe fn dequeue(&mut self, token: uint) -> Option { + let inner: &mut Inner = &mut *self.inner.get(); + match inner.queue.iter().position(|&(_, t)| t == token) { + Some(i) => Some(inner.queue.remove(i).unwrap().val0()), + None => None, + } + } } impl Clone for Access { @@ -111,9 +123,9 @@ impl<'a> Drop for Guard<'a> { // scheduled on this scheduler. Because we might be woken up on some // other scheduler, we drop our homing missile before we reawaken // the task. - Some(task) => { + Some((task, _)) => { drop(self.missile.take()); - let _ = task.wake().map(|t| t.reawaken()); + task.reawaken(); } None => { inner.held = false; } } diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index 84d4b6b4702..968029a6edc 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -84,6 +84,7 @@ fn start(argc: int, argv: **u8) -> int { mod macros; mod access; +mod timeout; mod homing; mod queue; mod rc; diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 0ddf50921fd..999da1cfda7 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -12,21 +12,20 @@ use libc::{size_t, ssize_t, c_int, c_void, c_uint}; use libc; use std::cast; use std::io; -use std::io::{IoError, IoResult}; +use std::io::IoError; use std::io::net::ip; use std::mem; use std::ptr; use std::rt::rtio; use std::rt::task::BlockedTask; -use access::Access; use homing::{HomingIO, HomeHandle}; use rc::Refcount; use stream::StreamWatcher; use super::{Loop, Request, UvError, Buf, status_to_io_result, uv_error_to_io_error, UvHandle, slice_to_uv_buf, wait_until_woken_after, wakeup}; -use timer::TimerWatcher; +use timeout::{AccessTimeout, AcceptTimeout, ConnectCtx}; use uvio::UvIoFactory; use uvll; @@ -146,190 +145,6 @@ fn socket_name(sk: SocketNameKind, n => Err(uv_error_to_io_error(UvError(n))) } } -//////////////////////////////////////////////////////////////////////////////// -// Helpers for handling timeouts, shared for pipes/tcp -//////////////////////////////////////////////////////////////////////////////// - -pub struct ConnectCtx { - pub status: c_int, - pub task: Option, - pub timer: Option>, -} - -pub struct AcceptTimeout { - timer: Option, - timeout_tx: Option>, - timeout_rx: Option>, -} - -impl ConnectCtx { - pub fn connect( - mut self, obj: T, timeout: Option, io: &mut UvIoFactory, - f: |&Request, &T, uvll::uv_connect_cb| -> libc::c_int - ) -> Result { - let mut req = Request::new(uvll::UV_CONNECT); - let r = f(&req, &obj, connect_cb); - return match r { - 0 => { - req.defuse(); // uv callback now owns this request - match timeout { - Some(t) => { - let mut timer = TimerWatcher::new(io); - timer.start(timer_cb, t, 0); - self.timer = Some(timer); - } - None => {} - } - wait_until_woken_after(&mut self.task, &io.loop_, || { - let data = &self as *_; - match self.timer { - Some(ref mut timer) => unsafe { timer.set_data(data) }, - None => {} - } - req.set_data(data); - }); - // Make sure an erroneously fired callback doesn't have access - // to the context any more. - req.set_data(0 as *int); - - // If we failed because of a timeout, drop the TcpWatcher as - // soon as possible because it's data is now set to null and we - // want to cancel the callback ASAP. - match self.status { - 0 => Ok(obj), - n => { drop(obj); Err(UvError(n)) } - } - } - n => Err(UvError(n)) - }; - - extern fn timer_cb(handle: *uvll::uv_timer_t) { - // Don't close the corresponding tcp request, just wake up the task - // and let RAII take care of the pending watcher. - let cx: &mut ConnectCtx = unsafe { - &mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx) - }; - cx.status = uvll::ECANCELED; - wakeup(&mut cx.task); - } - - extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) { - // This callback can be invoked with ECANCELED if the watcher is - // closed by the timeout callback. In that case we just want to free - // the request and be along our merry way. - let req = Request::wrap(req); - if status == uvll::ECANCELED { return } - - // Apparently on windows when the handle is closed this callback may - // not be invoked with ECANCELED but rather another error code. - // Either ways, if the data is null, then our timeout has expired - // and there's nothing we can do. - let data = unsafe { uvll::get_data_for_req(req.handle) }; - if data.is_null() { return } - - let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) }; - cx.status = status; - match cx.timer { - Some(ref mut t) => t.stop(), - None => {} - } - // Note that the timer callback doesn't cancel the connect request - // (that's the job of uv_close()), so it's possible for this - // callback to get triggered after the timeout callback fires, but - // before the task wakes up. In that case, we did indeed - // successfully connect, but we don't need to wake someone up. We - // updated the status above (correctly so), and the task will pick - // up on this when it wakes up. - if cx.task.is_some() { - wakeup(&mut cx.task); - } - } - } -} - -impl AcceptTimeout { - pub fn new() -> AcceptTimeout { - AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None } - } - - pub fn accept(&mut self, c: &Receiver>) -> IoResult { - match self.timeout_rx { - None => c.recv(), - Some(ref rx) => { - use std::comm::Select; - - // Poll the incoming channel first (don't rely on the order of - // select just yet). If someone's pending then we should return - // them immediately. - match c.try_recv() { - Ok(data) => return data, - Err(..) => {} - } - - // Use select to figure out which channel gets ready first. We - // do some custom handling of select to ensure that we never - // actually drain the timeout channel (we'll keep seeing the - // timeout message in the future). - let s = Select::new(); - let mut timeout = s.handle(rx); - let mut data = s.handle(c); - unsafe { - timeout.add(); - data.add(); - } - if s.wait() == timeout.id() { - Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) - } else { - c.recv() - } - } - } - } - - pub fn clear(&mut self) { - // Clear any previous timeout by dropping the timer and transmission - // channels - drop((self.timer.take(), - self.timeout_tx.take(), - self.timeout_rx.take())) - } - - pub fn set_timeout + HomingIO>( - &mut self, ms: u64, t: &mut T - ) { - // If we have a timeout, lazily initialize the timer which will be used - // to fire when the timeout runs out. - if self.timer.is_none() { - let _m = t.fire_homing_missile(); - let loop_ = Loop::wrap(unsafe { - uvll::get_loop_for_uv_handle(t.uv_handle()) - }); - let mut timer = TimerWatcher::new_home(&loop_, t.home().clone()); - unsafe { - timer.set_data(self as *mut _ as *AcceptTimeout); - } - self.timer = Some(timer); - } - - // Once we've got a timer, stop any previous timeout, reset it for the - // current one, and install some new channels to send/receive data on - let timer = self.timer.get_mut_ref(); - timer.stop(); - timer.start(timer_cb, ms, 0); - let (tx, rx) = channel(); - self.timeout_tx = Some(tx); - self.timeout_rx = Some(rx); - - extern fn timer_cb(timer: *uvll::uv_timer_t) { - let acceptor: &mut AcceptTimeout = unsafe { - &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout) - }; - // This send can never fail because if this timer is active then the - // receiving channel is guaranteed to be alive - acceptor.timeout_tx.get_ref().send(()); - } - } -} //////////////////////////////////////////////////////////////////////////////// /// TCP implementation @@ -345,8 +160,8 @@ pub struct TcpWatcher { // stream object, so we use these access guards in order to arbitrate among // multiple concurrent reads and writes. Note that libuv *can* read and // write simultaneously, it just can't read and read simultaneously. - read_access: Access, - write_access: Access, + read_access: AccessTimeout, + write_access: AccessTimeout, } pub struct TcpListener { @@ -380,8 +195,8 @@ impl TcpWatcher { handle: handle, stream: StreamWatcher::new(handle), refcount: Refcount::new(), - read_access: Access::new(), - write_access: Access::new(), + read_access: AccessTimeout::new(), + write_access: AccessTimeout::new(), } } @@ -412,10 +227,10 @@ impl rtio::RtioSocket for TcpWatcher { impl rtio::RtioTcpStream for TcpWatcher { fn read(&mut self, buf: &mut [u8]) -> Result { let m = self.fire_homing_missile(); - let access = self.read_access.grant(m); + let guard = try!(self.read_access.grant(m)); // see comments in close_read about this check - if access.is_closed() { + if guard.access.is_closed() { return Err(io::standard_error(io::EndOfFile)) } @@ -424,8 +239,8 @@ impl rtio::RtioTcpStream for TcpWatcher { fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { let m = self.fire_homing_missile(); - let _g = self.write_access.grant(m); - self.stream.write(buf).map_err(uv_error_to_io_error) + let guard = try!(self.write_access.grant(m)); + self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error) } fn peer_name(&mut self) -> Result { @@ -468,16 +283,19 @@ impl rtio::RtioTcpStream for TcpWatcher { stream: StreamWatcher::new(self.handle), home: self.home.clone(), refcount: self.refcount.clone(), - write_access: self.write_access.clone(), read_access: self.read_access.clone(), + write_access: self.write_access.clone(), } as Box } fn close_read(&mut self) -> Result<(), IoError> { // see comments in PipeWatcher::close_read - let m = self.fire_homing_missile(); - self.read_access.close(&m); - self.stream.cancel_read(m); + let task = { + let m = self.fire_homing_missile(); + self.read_access.access.close(&m); + self.stream.cancel_read(uvll::EOF as libc::ssize_t) + }; + let _ = task.map(|t| t.reawaken()); Ok(()) } @@ -485,6 +303,35 @@ impl rtio::RtioTcpStream for TcpWatcher { let _m = self.fire_homing_missile(); shutdown(self.handle, &self.uv_loop()) } + + fn set_timeout(&mut self, timeout: Option) { + self.set_read_timeout(timeout); + self.set_write_timeout(timeout); + } + + fn set_read_timeout(&mut self, ms: Option) { + let _m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read, + &self.stream as *_ as uint); + + fn cancel_read(stream: uint) -> Option { + let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) }; + stream.cancel_read(uvll::ECANCELED as ssize_t) + } + } + + fn set_write_timeout(&mut self, ms: Option) { + let _m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write, + &self.stream as *_ as uint); + + fn cancel_write(stream: uint) -> Option { + let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) }; + stream.cancel_write() + } + } } impl UvHandle for TcpWatcher { @@ -618,6 +465,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { } fn set_timeout(&mut self, ms: Option) { + let _m = self.fire_homing_missile(); match ms { None => self.timeout.clear(), Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener), @@ -635,8 +483,22 @@ pub struct UdpWatcher { // See above for what these fields are refcount: Refcount, - read_access: Access, - write_access: Access, + read_access: AccessTimeout, + write_access: AccessTimeout, + + blocked_sender: Option, +} + +struct UdpRecvCtx { + task: Option, + buf: Option, + result: Option<(ssize_t, Option)>, +} + +struct UdpSendCtx { + result: c_int, + data: Option>, + udp: *mut UdpWatcher, } impl UdpWatcher { @@ -646,8 +508,9 @@ impl UdpWatcher { handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) }, home: io.make_handle(), refcount: Refcount::new(), - read_access: Access::new(), - write_access: Access::new(), + read_access: AccessTimeout::new(), + write_access: AccessTimeout::new(), + blocked_sender: None, }; assert_eq!(unsafe { uvll::uv_udp_init(io.uv_loop(), udp.handle) @@ -683,20 +546,15 @@ impl rtio::RtioUdpSocket for UdpWatcher { fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, ip::SocketAddr), IoError> { - struct Ctx { - task: Option, - buf: Option, - result: Option<(ssize_t, Option)>, - } let loop_ = self.uv_loop(); let m = self.fire_homing_missile(); - let _g = self.read_access.grant(m); + let _guard = try!(self.read_access.grant(m)); return match unsafe { uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb) } { 0 => { - let mut cx = Ctx { + let mut cx = UdpRecvCtx { task: None, buf: Some(slice_to_uv_buf(buf)), result: None, @@ -718,7 +576,8 @@ impl rtio::RtioUdpSocket for UdpWatcher { _suggested_size: size_t, buf: *mut Buf) { unsafe { - let cx = &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx); + let cx = uvll::get_data_for_uv_handle(handle); + let cx = &mut *(cx as *mut UdpRecvCtx); *buf = cx.buf.take().expect("recv alloc_cb called more than once") } } @@ -727,7 +586,7 @@ impl rtio::RtioUdpSocket for UdpWatcher { addr: *libc::sockaddr, _flags: c_uint) { assert!(nread != uvll::ECANCELED as ssize_t); let cx = unsafe { - &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx) + &mut *(uvll::get_data_for_uv_handle(handle) as *mut UdpRecvCtx) }; // When there's no data to read the recv callback can be a no-op. @@ -751,42 +610,68 @@ impl rtio::RtioUdpSocket for UdpWatcher { } fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> { - struct Ctx { task: Option, result: c_int } - let m = self.fire_homing_missile(); let loop_ = self.uv_loop(); - let _g = self.write_access.grant(m); + let guard = try!(self.write_access.grant(m)); let mut req = Request::new(uvll::UV_UDP_SEND); - let buf = slice_to_uv_buf(buf); let (addr, _len) = addr_to_sockaddr(dst); - let result = unsafe { - let addr_p = &addr as *libc::sockaddr_storage; - uvll::uv_udp_send(req.handle, self.handle, [buf], - addr_p as *libc::sockaddr, send_cb) + let addr_p = &addr as *_ as *libc::sockaddr; + + // see comments in StreamWatcher::write for why we may allocate a buffer + // here. + let data = if guard.can_timeout {Some(Vec::from_slice(buf))} else {None}; + let uv_buf = if guard.can_timeout { + slice_to_uv_buf(data.get_ref().as_slice()) + } else { + slice_to_uv_buf(buf) }; - return match result { + return match unsafe { + uvll::uv_udp_send(req.handle, self.handle, [uv_buf], addr_p, send_cb) + } { 0 => { req.defuse(); // uv callback now owns this request - let mut cx = Ctx { task: None, result: 0 }; - wait_until_woken_after(&mut cx.task, &loop_, || { + let mut cx = UdpSendCtx { + result: uvll::ECANCELED, data: data, udp: self as *mut _ + }; + wait_until_woken_after(&mut self.blocked_sender, &loop_, || { req.set_data(&cx); }); - match cx.result { - 0 => Ok(()), - n => Err(uv_error_to_io_error(UvError(n))) + + if cx.result != uvll::ECANCELED { + return match cx.result { + 0 => Ok(()), + n => Err(uv_error_to_io_error(UvError(n))) + } } + let new_cx = box UdpSendCtx { + result: 0, + udp: 0 as *mut UdpWatcher, + data: cx.data.take(), + }; + unsafe { + req.set_data(&*new_cx); + cast::forget(new_cx); + } + Err(uv_error_to_io_error(UvError(cx.result))) } n => Err(uv_error_to_io_error(UvError(n))) }; + // This function is the same as stream::write_cb, but adapted for udp + // instead of streams. extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) { let req = Request::wrap(req); - assert!(status != uvll::ECANCELED); - let cx: &mut Ctx = unsafe { req.get_data() }; + let cx: &mut UdpSendCtx = unsafe { req.get_data() }; cx.result = status; - wakeup(&mut cx.task); + + if cx.udp as uint != 0 { + let udp: &mut UdpWatcher = unsafe { &mut *cx.udp }; + wakeup(&mut udp.blocked_sender); + } else { + let _cx: Box = unsafe { cast::transmute(cx) }; + } } } @@ -866,8 +751,48 @@ impl rtio::RtioUdpSocket for UdpWatcher { refcount: self.refcount.clone(), write_access: self.write_access.clone(), read_access: self.read_access.clone(), + blocked_sender: None, } as Box } + + fn set_timeout(&mut self, timeout: Option) { + self.set_read_timeout(timeout); + self.set_write_timeout(timeout); + } + + fn set_read_timeout(&mut self, ms: Option) { + let _m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read, + self.handle as uint); + + fn cancel_read(stream: uint) -> Option { + // This method is quite similar to StreamWatcher::cancel_read, see + // there for more information + let handle = stream as *uvll::uv_udp_t; + assert_eq!(unsafe { uvll::uv_udp_recv_stop(handle) }, 0); + let data = unsafe { + let data = uvll::get_data_for_uv_handle(handle); + if data.is_null() { return None } + uvll::set_data_for_uv_handle(handle, 0 as *int); + &mut *(data as *mut UdpRecvCtx) + }; + data.result = Some((uvll::ECANCELED as ssize_t, None)); + data.task.take() + } + } + + fn set_write_timeout(&mut self, ms: Option) { + let _m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write, + self as *mut _ as uint); + + fn cancel_write(stream: uint) -> Option { + let stream: &mut UdpWatcher = unsafe { cast::transmute(stream) }; + stream.blocked_sender.take() + } + } } impl Drop for UdpWatcher { diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index 7fec4051761..76bf92bd555 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -10,16 +10,18 @@ use libc; use std::c_str::CString; +use std::cast; use std::io::IoError; use std::io; use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor}; +use std::rt::task::BlockedTask; -use access::Access; use homing::{HomingIO, HomeHandle}; use net; use rc::Refcount; use stream::StreamWatcher; use super::{Loop, UvError, UvHandle, uv_error_to_io_error}; +use timeout::{AcceptTimeout, ConnectCtx, AccessTimeout}; use uvio::UvIoFactory; use uvll; @@ -30,8 +32,8 @@ pub struct PipeWatcher { refcount: Refcount, // see comments in TcpWatcher for why these exist - write_access: Access, - read_access: Access, + write_access: AccessTimeout, + read_access: AccessTimeout, } pub struct PipeListener { @@ -43,7 +45,7 @@ pub struct PipeListener { pub struct PipeAcceptor { listener: Box, - timeout: net::AcceptTimeout, + timeout: AcceptTimeout, } // PipeWatcher implementation and traits @@ -70,8 +72,8 @@ impl PipeWatcher { home: home, defused: false, refcount: Refcount::new(), - read_access: Access::new(), - write_access: Access::new(), + read_access: AccessTimeout::new(), + write_access: AccessTimeout::new(), } } @@ -89,7 +91,7 @@ impl PipeWatcher { -> Result { let pipe = PipeWatcher::new(io, false); - let cx = net::ConnectCtx { status: -1, task: None, timer: None }; + let cx = ConnectCtx { status: -1, task: None, timer: None }; cx.connect(pipe, timeout, io, |req, pipe, cb| { unsafe { uvll::uv_pipe_connect(req.handle, pipe.handle(), @@ -112,10 +114,10 @@ impl PipeWatcher { impl RtioPipe for PipeWatcher { fn read(&mut self, buf: &mut [u8]) -> Result { let m = self.fire_homing_missile(); - let access = self.read_access.grant(m); + let guard = try!(self.read_access.grant(m)); // see comments in close_read about this check - if access.is_closed() { + if guard.access.is_closed() { return Err(io::standard_error(io::EndOfFile)) } @@ -124,8 +126,8 @@ impl RtioPipe for PipeWatcher { fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { let m = self.fire_homing_missile(); - let _g = self.write_access.grant(m); - self.stream.write(buf).map_err(uv_error_to_io_error) + let guard = try!(self.write_access.grant(m)); + self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error) } fn clone(&self) -> Box { @@ -157,9 +159,12 @@ impl RtioPipe for PipeWatcher { // ordering is crucial because we could in theory be rescheduled during // the uv_read_stop which means that another read invocation could leak // in before we set the flag. - let m = self.fire_homing_missile(); - self.read_access.close(&m); - self.stream.cancel_read(m); + let task = { + let m = self.fire_homing_missile(); + self.read_access.access.close(&m); + self.stream.cancel_read(uvll::EOF as libc::ssize_t) + }; + let _ = task.map(|t| t.reawaken()); Ok(()) } @@ -167,6 +172,35 @@ impl RtioPipe for PipeWatcher { let _m = self.fire_homing_missile(); net::shutdown(self.stream.handle, &self.uv_loop()) } + + fn set_timeout(&mut self, timeout: Option) { + self.set_read_timeout(timeout); + self.set_write_timeout(timeout); + } + + fn set_read_timeout(&mut self, ms: Option) { + let _m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read, + &self.stream as *_ as uint); + + fn cancel_read(stream: uint) -> Option { + let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) }; + stream.cancel_read(uvll::ECANCELED as libc::ssize_t) + } + } + + fn set_write_timeout(&mut self, ms: Option) { + let _m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write, + &self.stream as *_ as uint); + + fn cancel_write(stream: uint) -> Option { + let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) }; + stream.cancel_write() + } + } } impl HomingIO for PipeWatcher { @@ -219,7 +253,7 @@ impl RtioUnixListener for PipeListener { // create the acceptor object from ourselves let mut acceptor = box PipeAcceptor { listener: self, - timeout: net::AcceptTimeout::new(), + timeout: AcceptTimeout::new(), }; let _m = acceptor.fire_homing_missile(); diff --git a/src/librustuv/stream.rs b/src/librustuv/stream.rs index a1b606709d8..36b6ed09ca5 100644 --- a/src/librustuv/stream.rs +++ b/src/librustuv/stream.rs @@ -14,7 +14,6 @@ use std::ptr; use std::rt::task::BlockedTask; use Loop; -use homing::HomingMissile; use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after, ForbidUnwind, wakeup}; use uvll; @@ -31,6 +30,8 @@ pub struct StreamWatcher { // structure, but currently we don't have mappings for all the structures // defined in libuv, so we're foced to malloc this. last_write_req: Option, + + blocked_writer: Option, } struct ReadContext { @@ -41,7 +42,8 @@ struct ReadContext { struct WriteContext { result: c_int, - task: Option, + stream: *mut StreamWatcher, + data: Option>, } impl StreamWatcher { @@ -62,6 +64,7 @@ impl StreamWatcher { StreamWatcher { handle: stream, last_write_req: None, + blocked_writer: None, } } @@ -74,7 +77,7 @@ impl StreamWatcher { buf: Some(slice_to_uv_buf(buf)), // if the read is canceled, we'll see eof, otherwise this will get // overwritten - result: uvll::EOF as ssize_t, + result: 0, task: None, }; // When reading a TTY stream on windows, libuv will invoke alloc_cb @@ -104,27 +107,22 @@ impl StreamWatcher { return ret; } - pub fn cancel_read(&mut self, m: HomingMissile) { + pub fn cancel_read(&mut self, reason: ssize_t) -> Option { // When we invoke uv_read_stop, it cancels the read and alloc // callbacks. We need to manually wake up a pending task (if one was - // present). Note that we wake up the task *outside* the homing missile - // to ensure that we don't switch schedulers when we're not supposed to. + // present). assert_eq!(unsafe { uvll::uv_read_stop(self.handle) }, 0); let data = unsafe { let data = uvll::get_data_for_uv_handle(self.handle); - if data.is_null() { return } + if data.is_null() { return None } uvll::set_data_for_uv_handle(self.handle, 0 as *int); &mut *(data as *mut ReadContext) }; - let task = data.task.take(); - drop(m); - match task { - Some(task) => { let _ = task.wake().map(|t| t.reawaken()); } - None => {} - } + data.result = reason; + data.task.take() } - pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> { + pub fn write(&mut self, buf: &[u8], may_timeout: bool) -> Result<(), UvError> { // The ownership of the write request is dubious if this function // unwinds. I believe that if the write_cb fails to re-schedule the task // then the write request will be leaked. @@ -137,30 +135,94 @@ impl StreamWatcher { }; req.set_data(ptr::null::<()>()); + // And here's where timeouts get a little interesting. Currently, libuv + // does not support canceling an in-flight write request. Consequently, + // when a write timeout expires, there's not much we can do other than + // detach the sleeping task from the write request itself. Semantically, + // this means that the write request will complete asynchronously, but + // the calling task will return error (because the write timed out). + // + // There is special wording in the documentation of set_write_timeout() + // indicating that this is a plausible failure scenario, and this + // function is why that wording exists. + // + // Implementation-wise, we must be careful when passing a buffer down to + // libuv. Most of this implementation avoids allocations becuase of the + // blocking guarantee (all stack local variables are valid for the + // entire read/write request). If our write request can be timed out, + // however, we must heap allocate the data and pass that to the libuv + // functions instead. The reason for this is that if we time out and + // return, there's no guarantee that `buf` is a valid buffer any more. + // + // To do this, the write context has an optionally owned vector of + // bytes. + let data = if may_timeout {Some(Vec::from_slice(buf))} else {None}; + let uv_buf = if may_timeout { + slice_to_uv_buf(data.get_ref().as_slice()) + } else { + slice_to_uv_buf(buf) + }; + // Send off the request, but be careful to not block until we're sure // that the write reqeust is queued. If the reqeust couldn't be queued, // then we should return immediately with an error. match unsafe { - uvll::uv_write(req.handle, self.handle, [slice_to_uv_buf(buf)], - write_cb) + uvll::uv_write(req.handle, self.handle, [uv_buf], write_cb) } { 0 => { - let mut wcx = WriteContext { result: 0, task: None, }; + let mut wcx = WriteContext { + result: uvll::ECANCELED, + stream: self as *mut _, + data: data, + }; req.defuse(); // uv callback now owns this request let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) }; - wait_until_woken_after(&mut wcx.task, &Loop::wrap(loop_), || { + wait_until_woken_after(&mut self.blocked_writer, + &Loop::wrap(loop_), || { req.set_data(&wcx); }); - self.last_write_req = Some(Request::wrap(req.handle)); - match wcx.result { - 0 => Ok(()), - n => Err(UvError(n)), + + if wcx.result != uvll::ECANCELED { + self.last_write_req = Some(Request::wrap(req.handle)); + return match wcx.result { + 0 => Ok(()), + n => Err(UvError(n)), + } } + + // This is the second case where canceling an in-flight write + // gets interesting. If we've been canceled (no one reset our + // result), then someone still needs to free the request, and + // someone still needs to free the allocate buffer. + // + // To take care of this, we swap out the stack-allocated write + // context for a heap-allocated context, transferring ownership + // of everything to the write_cb. Libuv guarantees that this + // callback will be invoked at some point, and the callback will + // be responsible for deallocating these resources. + // + // Note that we don't cache this write request back in the + // stream watcher because we no longer have ownership of it, and + // we never will. + let new_wcx = box WriteContext { + result: 0, + stream: 0 as *mut StreamWatcher, + data: wcx.data.take(), + }; + unsafe { + req.set_data(&*new_wcx); + cast::forget(new_wcx); + } + Err(UvError(wcx.result)) } n => Err(UvError(n)), } } + + pub fn cancel_write(&mut self) -> Option { + self.blocked_writer.take() + } } // This allocation callback expects to be invoked once and only once. It will @@ -198,12 +260,18 @@ extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: *Buf) { // away the error code as a result. extern fn write_cb(req: *uvll::uv_write_t, status: c_int) { let mut req = Request::wrap(req); - assert!(status != uvll::ECANCELED); // Remember to not free the request because it is re-used between writes on // the same stream. let wcx: &mut WriteContext = unsafe { req.get_data() }; wcx.result = status; - req.defuse(); - wakeup(&mut wcx.task); + // If the stream is present, we haven't timed out, otherwise we acquire + // ownership of everything and then deallocate it all at once. + if wcx.stream as uint != 0 { + req.defuse(); + let stream: &mut StreamWatcher = unsafe { &mut *wcx.stream }; + wakeup(&mut stream.blocked_writer); + } else { + let _wcx: Box = unsafe { cast::transmute(wcx) }; + } } diff --git a/src/librustuv/timeout.rs b/src/librustuv/timeout.rs new file mode 100644 index 00000000000..3dbb34bb17a --- /dev/null +++ b/src/librustuv/timeout.rs @@ -0,0 +1,394 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use libc::c_int; +use std::cast; +use std::io::IoResult; +use std::mem; +use std::rt::task::BlockedTask; + +use access; +use homing::{HomeHandle, HomingMissile, HomingIO}; +use timer::TimerWatcher; +use uvll; +use uvio::UvIoFactory; +use {Loop, UvError, uv_error_to_io_error, Request, wakeup}; +use {UvHandle, wait_until_woken_after}; + +/// Managment of a timeout when gaining access to a portion of a duplex stream. +pub struct AccessTimeout { + state: TimeoutState, + timer: Option>, + pub access: access::Access, +} + +pub struct Guard<'a> { + state: &'a mut TimeoutState, + pub access: access::Guard<'a>, + pub can_timeout: bool, +} + +#[deriving(Eq)] +enum TimeoutState { + NoTimeout, + TimeoutPending(ClientState), + TimedOut, +} + +#[deriving(Eq)] +enum ClientState { + NoWaiter, + AccessPending, + RequestPending, +} + +struct TimerContext { + timeout: *mut AccessTimeout, + callback: fn(uint) -> Option, + payload: uint, +} + +impl AccessTimeout { + pub fn new() -> AccessTimeout { + AccessTimeout { + state: NoTimeout, + timer: None, + access: access::Access::new(), + } + } + + /// Grants access to half of a duplex stream, timing out if necessary. + /// + /// On success, Ok(Guard) is returned and access has been granted to the + /// stream. If a timeout occurs, then Err is returned with an appropriate + /// error. + pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult> { + // First, flag that we're attempting to acquire access. This will allow + // us to cancel the pending grant if we timeout out while waiting for a + // grant. + match self.state { + NoTimeout => {}, + TimeoutPending(ref mut client) => *client = AccessPending, + TimedOut => return Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) + } + let access = self.access.grant(self as *mut _ as uint, m); + + // After acquiring the grant, we need to flag ourselves as having a + // pending request so the timeout knows to cancel the request. + let can_timeout = match self.state { + NoTimeout => false, + TimeoutPending(ref mut client) => { *client = RequestPending; true } + TimedOut => return Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) + }; + + Ok(Guard { + access: access, + state: &mut self.state, + can_timeout: can_timeout + }) + } + + /// Sets the pending timeout to the value specified. + /// + /// The home/loop variables are used to construct a timer if one has not + /// been previously constructed. + /// + /// The callback will be invoked if the timeout elapses, and the data of + /// the time will be set to `data`. + pub fn set_timeout(&mut self, ms: Option, + home: &HomeHandle, + loop_: &Loop, + cb: fn(uint) -> Option, + data: uint) { + self.state = NoTimeout; + let ms = match ms { + Some(ms) => ms, + None => return match self.timer { + Some(ref mut t) => t.stop(), + None => {} + } + }; + + // If we have a timeout, lazily initialize the timer which will be used + // to fire when the timeout runs out. + if self.timer.is_none() { + let mut timer = box TimerWatcher::new_home(loop_, home.clone()); + let cx = box TimerContext { + timeout: self as *mut _, + callback: cb, + payload: data, + }; + unsafe { + timer.set_data(&*cx); + cast::forget(cx); + } + self.timer = Some(timer); + } + + let timer = self.timer.get_mut_ref(); + unsafe { + let cx = uvll::get_data_for_uv_handle(timer.handle); + let cx = cx as *mut TimerContext; + (*cx).callback = cb; + (*cx).payload = data; + } + timer.stop(); + timer.start(timer_cb, ms, 0); + self.state = TimeoutPending(NoWaiter); + + extern fn timer_cb(timer: *uvll::uv_timer_t) { + let cx: &TimerContext = unsafe { + &*(uvll::get_data_for_uv_handle(timer) as *TimerContext) + }; + let me = unsafe { &mut *cx.timeout }; + + match mem::replace(&mut me.state, TimedOut) { + TimedOut | NoTimeout => unreachable!(), + TimeoutPending(NoWaiter) => {} + TimeoutPending(AccessPending) => { + match unsafe { me.access.dequeue(me as *mut _ as uint) } { + Some(task) => task.reawaken(), + None => unreachable!(), + } + } + TimeoutPending(RequestPending) => { + match (cx.callback)(cx.payload) { + Some(task) => task.reawaken(), + None => unreachable!(), + } + } + } + } + } +} + +impl Clone for AccessTimeout { + fn clone(&self) -> AccessTimeout { + AccessTimeout { + access: self.access.clone(), + state: NoTimeout, + timer: None, + } + } +} + +#[unsafe_destructor] +impl<'a> Drop for Guard<'a> { + fn drop(&mut self) { + match *self.state { + TimeoutPending(NoWaiter) | TimeoutPending(AccessPending) => + unreachable!(), + + NoTimeout | TimedOut => {} + TimeoutPending(RequestPending) => { + *self.state = TimeoutPending(NoWaiter); + } + } + } +} + +impl Drop for AccessTimeout { + fn drop(&mut self) { + match self.timer { + Some(ref timer) => unsafe { + let data = uvll::get_data_for_uv_handle(timer.handle); + let _data: Box = cast::transmute(data); + }, + None => {} + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Connect timeouts +//////////////////////////////////////////////////////////////////////////////// + +pub struct ConnectCtx { + pub status: c_int, + pub task: Option, + pub timer: Option>, +} + +pub struct AcceptTimeout { + timer: Option, + timeout_tx: Option>, + timeout_rx: Option>, +} + +impl ConnectCtx { + pub fn connect( + mut self, obj: T, timeout: Option, io: &mut UvIoFactory, + f: |&Request, &T, uvll::uv_connect_cb| -> c_int + ) -> Result { + let mut req = Request::new(uvll::UV_CONNECT); + let r = f(&req, &obj, connect_cb); + return match r { + 0 => { + req.defuse(); // uv callback now owns this request + match timeout { + Some(t) => { + let mut timer = TimerWatcher::new(io); + timer.start(timer_cb, t, 0); + self.timer = Some(timer); + } + None => {} + } + wait_until_woken_after(&mut self.task, &io.loop_, || { + let data = &self as *_; + match self.timer { + Some(ref mut timer) => unsafe { timer.set_data(data) }, + None => {} + } + req.set_data(data); + }); + // Make sure an erroneously fired callback doesn't have access + // to the context any more. + req.set_data(0 as *int); + + // If we failed because of a timeout, drop the TcpWatcher as + // soon as possible because it's data is now set to null and we + // want to cancel the callback ASAP. + match self.status { + 0 => Ok(obj), + n => { drop(obj); Err(UvError(n)) } + } + } + n => Err(UvError(n)) + }; + + extern fn timer_cb(handle: *uvll::uv_timer_t) { + // Don't close the corresponding tcp request, just wake up the task + // and let RAII take care of the pending watcher. + let cx: &mut ConnectCtx = unsafe { + &mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx) + }; + cx.status = uvll::ECANCELED; + wakeup(&mut cx.task); + } + + extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) { + // This callback can be invoked with ECANCELED if the watcher is + // closed by the timeout callback. In that case we just want to free + // the request and be along our merry way. + let req = Request::wrap(req); + if status == uvll::ECANCELED { return } + + // Apparently on windows when the handle is closed this callback may + // not be invoked with ECANCELED but rather another error code. + // Either ways, if the data is null, then our timeout has expired + // and there's nothing we can do. + let data = unsafe { uvll::get_data_for_req(req.handle) }; + if data.is_null() { return } + + let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) }; + cx.status = status; + match cx.timer { + Some(ref mut t) => t.stop(), + None => {} + } + // Note that the timer callback doesn't cancel the connect request + // (that's the job of uv_close()), so it's possible for this + // callback to get triggered after the timeout callback fires, but + // before the task wakes up. In that case, we did indeed + // successfully connect, but we don't need to wake someone up. We + // updated the status above (correctly so), and the task will pick + // up on this when it wakes up. + if cx.task.is_some() { + wakeup(&mut cx.task); + } + } + } +} + +impl AcceptTimeout { + pub fn new() -> AcceptTimeout { + AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None } + } + + pub fn accept(&mut self, c: &Receiver>) -> IoResult { + match self.timeout_rx { + None => c.recv(), + Some(ref rx) => { + use std::comm::Select; + + // Poll the incoming channel first (don't rely on the order of + // select just yet). If someone's pending then we should return + // them immediately. + match c.try_recv() { + Ok(data) => return data, + Err(..) => {} + } + + // Use select to figure out which channel gets ready first. We + // do some custom handling of select to ensure that we never + // actually drain the timeout channel (we'll keep seeing the + // timeout message in the future). + let s = Select::new(); + let mut timeout = s.handle(rx); + let mut data = s.handle(c); + unsafe { + timeout.add(); + data.add(); + } + if s.wait() == timeout.id() { + Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) + } else { + c.recv() + } + } + } + } + + pub fn clear(&mut self) { + match self.timeout_rx { + Some(ref t) => { let _ = t.try_recv(); } + None => {} + } + match self.timer { + Some(ref mut t) => t.stop(), + None => {} + } + } + + pub fn set_timeout + HomingIO>( + &mut self, ms: u64, t: &mut T + ) { + // If we have a timeout, lazily initialize the timer which will be used + // to fire when the timeout runs out. + if self.timer.is_none() { + let loop_ = Loop::wrap(unsafe { + uvll::get_loop_for_uv_handle(t.uv_handle()) + }); + let mut timer = TimerWatcher::new_home(&loop_, t.home().clone()); + unsafe { + timer.set_data(self as *mut _ as *AcceptTimeout); + } + self.timer = Some(timer); + } + + // Once we've got a timer, stop any previous timeout, reset it for the + // current one, and install some new channels to send/receive data on + let timer = self.timer.get_mut_ref(); + timer.stop(); + timer.start(timer_cb, ms, 0); + let (tx, rx) = channel(); + self.timeout_tx = Some(tx); + self.timeout_rx = Some(rx); + + extern fn timer_cb(timer: *uvll::uv_timer_t) { + let acceptor: &mut AcceptTimeout = unsafe { + &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout) + }; + // This send can never fail because if this timer is active then the + // receiving channel is guaranteed to be alive + acceptor.timeout_tx.get_ref().send(()); + } + } +} diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index 216eb600130..525539f8b36 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -18,7 +18,7 @@ use uvio::UvIoFactory; use uvll; pub struct TimerWatcher { - handle: *uvll::uv_timer_t, + pub handle: *uvll::uv_timer_t, home: HomeHandle, action: Option, blocker: Option, diff --git a/src/librustuv/tty.rs b/src/librustuv/tty.rs index 4f3e12b6974..f70c3b4c1bd 100644 --- a/src/librustuv/tty.rs +++ b/src/librustuv/tty.rs @@ -87,7 +87,7 @@ impl RtioTTY for TtyWatcher { fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { let _m = self.fire_homing_missile(); - self.stream.write(buf).map_err(uv_error_to_io_error) + self.stream.write(buf, false).map_err(uv_error_to_io_error) } fn set_raw(&mut self, raw: bool) -> Result<(), IoError> { diff --git a/src/libstd/io/mod.rs b/src/libstd/io/mod.rs index e2fde98a77c..a89af05c50a 100644 --- a/src/libstd/io/mod.rs +++ b/src/libstd/io/mod.rs @@ -326,6 +326,8 @@ impl IoError { libc::WSAEADDRNOTAVAIL => (ConnectionRefused, "address not available"), libc::WSAEADDRINUSE => (ConnectionRefused, "address in use"), libc::ERROR_BROKEN_PIPE => (EndOfFile, "the pipe has ended"), + libc::ERROR_OPERATION_ABORTED => + (TimedOut, "operation timed out"), // libuv maps this error code to EISDIR. we do too. if it is found // to be incorrect, we can add in some more machinery to only @@ -434,6 +436,17 @@ pub enum IoErrorKind { InvalidInput, /// The I/O operation's timeout expired, causing it to be canceled. TimedOut, + /// This write operation failed to write all of its data. + /// + /// Normally the write() method on a Writer guarantees that all of its data + /// has been written, but some operations may be terminated after only + /// partially writing some data. An example of this is a timed out write + /// which successfully wrote a known number of bytes, but bailed out after + /// doing so. + /// + /// The payload contained as part of this variant is the number of bytes + /// which are known to have been successfully written. + ShortWrite(uint), } /// A trait for objects which are byte-oriented streams. Readers are defined by @@ -1429,7 +1442,8 @@ pub fn standard_error(kind: IoErrorKind) -> IoError { PathDoesntExist => "no such file", MismatchedFileTypeForOperation => "mismatched file type", ResourceUnavailable => "resource unavailable", - TimedOut => "operation timed out" + TimedOut => "operation timed out", + ShortWrite(..) => "short write", }; IoError { kind: kind, diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index d07b2e556d6..89141155ae4 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -151,6 +151,69 @@ impl TcpStream { /// Note that this method affects all cloned handles associated with this /// stream, not just this one handle. pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() } + + /// Sets a timeout, in milliseconds, for blocking operations on this stream. + /// + /// This function will set a timeout for all blocking operations (including + /// reads and writes) on this stream. The timeout specified is a relative + /// time, in milliseconds, into the future after which point operations will + /// time out. This means that the timeout must be reset periodically to keep + /// it from expiring. Specifying a value of `None` will clear the timeout + /// for this stream. + /// + /// The timeout on this stream is local to this stream only. Setting a + /// timeout does not affect any other cloned instances of this stream, nor + /// does the timeout propagated to cloned handles of this stream. Setting + /// this timeout will override any specific read or write timeouts + /// previously set for this stream. + /// + /// For clarification on the semantics of interrupting a read and a write, + /// take a look at `set_read_timeout` and `set_write_timeout`. + pub fn set_timeout(&mut self, timeout_ms: Option) { + self.obj.set_timeout(timeout_ms) + } + + /// Sets the timeout for read operations on this stream. + /// + /// See documentation in `set_timeout` for the semantics of this read time. + /// This will overwrite any previous read timeout set through either this + /// function or `set_timeout`. + /// + /// # Errors + /// + /// When this timeout expires, if there is no pending read operation, no + /// action is taken. Otherwise, the read operation will be scheduled to + /// promptly return. If a timeout error is returned, then no data was read + /// during the timeout period. + pub fn set_read_timeout(&mut self, timeout_ms: Option) { + self.obj.set_read_timeout(timeout_ms) + } + + /// Sets the timeout for write operations on this stream. + /// + /// See documentation in `set_timeout` for the semantics of this write time. + /// This will overwrite any previous write timeout set through either this + /// function or `set_timeout`. + /// + /// # Errors + /// + /// When this timeout expires, if there is no pending write operation, no + /// action is taken. Otherwise, the pending write operation will be + /// scheduled to promptly return. The actual state of the underlying stream + /// is not specified. + /// + /// The write operation may return an error of type `ShortWrite` which + /// indicates that the object is known to have written an exact number of + /// bytes successfully during the timeout period, and the remaining bytes + /// were never written. + /// + /// If the write operation returns `TimedOut`, then it the timeout primitive + /// does not know how many bytes were written as part of the timeout + /// operation. It may be the case that bytes continue to be written in an + /// asynchronous fashion after the call to write returns. + pub fn set_write_timeout(&mut self, timeout_ms: Option) { + self.obj.set_write_timeout(timeout_ms) + } } impl Clone for TcpStream { @@ -892,6 +955,7 @@ mod test { Err(ref e) if e.kind == TimedOut => {} Err(e) => fail!("error: {}", e), } + ::task::deschedule(); if i == 1000 { fail!("should have a pending connection") } } drop(l); @@ -964,4 +1028,118 @@ mod test { // this test will never finish if the child doesn't wake up rx.recv(); }) + + iotest!(fn readwrite_timeouts() { + let addr = next_test_ip6(); + let mut a = TcpListener::bind(addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = TcpStream::connect(addr).unwrap(); + rx.recv(); + assert!(s.write([0]).is_ok()); + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + s.set_timeout(Some(20)); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + + s.set_timeout(Some(20)); + for i in range(0, 1001) { + match s.write([0, .. 128 * 1024]) { + Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {}, + Err(IoError { kind: TimedOut, .. }) => break, + Err(e) => fail!("{}", e), + } + if i == 1000 { fail!("should have filled up?!"); } + } + assert_eq!(s.write([0]).err().unwrap().kind, TimedOut); + + tx.send(()); + s.set_timeout(None); + assert_eq!(s.read([0, 0]), Ok(1)); + }) + + iotest!(fn read_timeouts() { + let addr = next_test_ip6(); + let mut a = TcpListener::bind(addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = TcpStream::connect(addr).unwrap(); + rx.recv(); + let mut amt = 0; + while amt < 100 * 128 * 1024 { + match s.read([0, ..128 * 1024]) { + Ok(n) => { amt += n; } + Err(e) => fail!("{}", e), + } + } + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + s.set_read_timeout(Some(20)); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + + tx.send(()); + for _ in range(0, 100) { + assert!(s.write([0, ..128 * 1024]).is_ok()); + } + }) + + iotest!(fn write_timeouts() { + let addr = next_test_ip6(); + let mut a = TcpListener::bind(addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = TcpStream::connect(addr).unwrap(); + rx.recv(); + assert!(s.write([0]).is_ok()); + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + s.set_write_timeout(Some(20)); + for i in range(0, 1001) { + match s.write([0, .. 128 * 1024]) { + Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {}, + Err(IoError { kind: TimedOut, .. }) => break, + Err(e) => fail!("{}", e), + } + if i == 1000 { fail!("should have filled up?!"); } + } + assert_eq!(s.write([0]).err().unwrap().kind, TimedOut); + + tx.send(()); + assert!(s.read([0]).is_ok()); + }) + + iotest!(fn timeout_concurrent_read() { + let addr = next_test_ip6(); + let mut a = TcpListener::bind(addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = TcpStream::connect(addr).unwrap(); + rx.recv(); + assert_eq!(s.write([0]), Ok(())); + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + let s2 = s.clone(); + let (tx2, rx2) = channel(); + spawn(proc() { + let mut s2 = s2; + assert_eq!(s2.read([0]), Ok(1)); + tx2.send(()); + }); + + s.set_read_timeout(Some(20)); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + tx.send(()); + + rx2.recv(); + }) } diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs index b7636493dec..45da872ca11 100644 --- a/src/libstd/io/net/udp.rs +++ b/src/libstd/io/net/udp.rs @@ -20,6 +20,7 @@ use io::net::ip::{SocketAddr, IpAddr}; use io::{Reader, Writer, IoResult}; use kinds::Send; use owned::Box; +use option::Option; use result::{Ok, Err}; use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory, LocalIo}; @@ -142,6 +143,27 @@ impl UdpSocket { self.obj.ignore_broadcasts() } } + + /// Sets the read/write timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + pub fn set_timeout(&mut self, timeout_ms: Option) { + self.obj.set_timeout(timeout_ms) + } + + /// Sets the read timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + pub fn set_read_timeout(&mut self, timeout_ms: Option) { + self.obj.set_read_timeout(timeout_ms) + } + + /// Sets the write timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + pub fn set_write_timeout(&mut self, timeout_ms: Option) { + self.obj.set_write_timeout(timeout_ms) + } } impl Clone for UdpSocket { @@ -485,4 +507,56 @@ mod test { rx.recv(); serv_rx.recv(); }) + + iotest!(fn recvfrom_timeout() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut a = UdpSocket::bind(addr1).unwrap(); + + let (tx, rx) = channel(); + let (tx2, rx2) = channel(); + spawn(proc() { + let mut a = UdpSocket::bind(addr2).unwrap(); + assert_eq!(a.recvfrom([0]), Ok((1, addr1))); + assert_eq!(a.sendto([0], addr1), Ok(())); + rx.recv(); + assert_eq!(a.sendto([0], addr1), Ok(())); + + tx2.send(()); + }); + + // Make sure that reads time out, but writes can continue + a.set_read_timeout(Some(20)); + assert_eq!(a.recvfrom([0]).err().unwrap().kind, TimedOut); + assert_eq!(a.recvfrom([0]).err().unwrap().kind, TimedOut); + assert_eq!(a.sendto([0], addr2), Ok(())); + + // Cloned handles should be able to block + let mut a2 = a.clone(); + assert_eq!(a2.recvfrom([0]), Ok((1, addr2))); + + // Clearing the timeout should allow for receiving + a.set_timeout(None); + tx.send(()); + assert_eq!(a2.recvfrom([0]), Ok((1, addr2))); + + // Make sure the child didn't die + rx2.recv(); + }) + + iotest!(fn sendto_timeout() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut a = UdpSocket::bind(addr1).unwrap(); + let _b = UdpSocket::bind(addr2).unwrap(); + + a.set_write_timeout(Some(1000)); + for _ in range(0, 100) { + match a.sendto([0, ..4*1024], addr2) { + Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {}, + Err(IoError { kind: TimedOut, .. }) => break, + Err(e) => fail!("other error: {}", e), + } + } + }) } diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index bbe39885c03..ac7a0f5cdce 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -61,21 +61,11 @@ impl UnixStream { }) } - /// Connect to a pipe named by `path`. This will attempt to open a - /// connection to the underlying socket. + /// Connect to a pipe named by `path`, timing out if the specified number of + /// milliseconds. /// - /// The returned stream will be closed when the object falls out of scope. - /// - /// # Example - /// - /// ```rust - /// # #![allow(unused_must_use)] - /// use std::io::net::unix::UnixStream; - /// - /// let server = Path::new("path/to/my/socket"); - /// let mut stream = UnixStream::connect(&server); - /// stream.write([1, 2, 3]); - /// ``` + /// This function is similar to `connect`, except that if `timeout_ms` + /// elapses the function will return an error of kind `TimedOut`. #[experimental = "the timeout argument is likely to change types"] pub fn connect_timeout(path: &P, timeout_ms: u64) -> IoResult { @@ -103,6 +93,27 @@ impl UnixStream { /// Note that this method affects all cloned handles associated with this /// stream, not just this one handle. pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() } + + /// Sets the read/write timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + pub fn set_timeout(&mut self, timeout_ms: Option) { + self.obj.set_timeout(timeout_ms) + } + + /// Sets the read timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + pub fn set_read_timeout(&mut self, timeout_ms: Option) { + self.obj.set_read_timeout(timeout_ms) + } + + /// Sets the write timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + pub fn set_write_timeout(&mut self, timeout_ms: Option) { + self.obj.set_write_timeout(timeout_ms) + } } impl Clone for UnixStream { @@ -457,6 +468,7 @@ mod tests { Err(ref e) if e.kind == TimedOut => {} Err(e) => fail!("error: {}", e), } + ::task::deschedule(); if i == 1000 { fail!("should have a pending connection") } } drop(l); @@ -541,4 +553,122 @@ mod tests { // this test will never finish if the child doesn't wake up rx.recv(); }) + + iotest!(fn readwrite_timeouts() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = UnixStream::connect(&addr).unwrap(); + rx.recv(); + assert!(s.write([0]).is_ok()); + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + s.set_timeout(Some(20)); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + + s.set_timeout(Some(20)); + for i in range(0, 1001) { + match s.write([0, .. 128 * 1024]) { + Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {}, + Err(IoError { kind: TimedOut, .. }) => break, + Err(e) => fail!("{}", e), + } + if i == 1000 { fail!("should have filled up?!"); } + } + + // I'm not sure as to why, but apparently the write on windows always + // succeeds after the previous timeout. Who knows? + if !cfg!(windows) { + assert_eq!(s.write([0]).err().unwrap().kind, TimedOut); + } + + tx.send(()); + s.set_timeout(None); + assert_eq!(s.read([0, 0]), Ok(1)); + }) + + iotest!(fn read_timeouts() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = UnixStream::connect(&addr).unwrap(); + rx.recv(); + let mut amt = 0; + while amt < 100 * 128 * 1024 { + match s.read([0, ..128 * 1024]) { + Ok(n) => { amt += n; } + Err(e) => fail!("{}", e), + } + } + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + s.set_read_timeout(Some(20)); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + + tx.send(()); + for _ in range(0, 100) { + assert!(s.write([0, ..128 * 1024]).is_ok()); + } + }) + + iotest!(fn write_timeouts() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = UnixStream::connect(&addr).unwrap(); + rx.recv(); + assert!(s.write([0]).is_ok()); + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + s.set_write_timeout(Some(20)); + for i in range(0, 1001) { + match s.write([0, .. 128 * 1024]) { + Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {}, + Err(IoError { kind: TimedOut, .. }) => break, + Err(e) => fail!("{}", e), + } + if i == 1000 { fail!("should have filled up?!"); } + } + + tx.send(()); + assert!(s.read([0]).is_ok()); + }) + + iotest!(fn timeout_concurrent_read() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = UnixStream::connect(&addr).unwrap(); + rx.recv(); + assert!(s.write([0]).is_ok()); + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + let s2 = s.clone(); + let (tx2, rx2) = channel(); + spawn(proc() { + let mut s2 = s2; + assert!(s2.read([0]).is_ok()); + tx2.send(()); + }); + + s.set_read_timeout(Some(20)); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + tx.send(()); + + rx2.recv(); + }) } diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index c5afe7887ad..16882624ab7 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -222,6 +222,9 @@ pub trait RtioTcpStream : RtioSocket { fn clone(&self) -> Box; fn close_write(&mut self) -> IoResult<()>; fn close_read(&mut self) -> IoResult<()>; + fn set_timeout(&mut self, timeout_ms: Option); + fn set_read_timeout(&mut self, timeout_ms: Option); + fn set_write_timeout(&mut self, timeout_ms: Option); } pub trait RtioSocket { @@ -245,6 +248,9 @@ pub trait RtioUdpSocket : RtioSocket { fn ignore_broadcasts(&mut self) -> IoResult<()>; fn clone(&self) -> Box; + fn set_timeout(&mut self, timeout_ms: Option); + fn set_read_timeout(&mut self, timeout_ms: Option); + fn set_write_timeout(&mut self, timeout_ms: Option); } pub trait RtioTimer { @@ -278,6 +284,9 @@ pub trait RtioPipe { fn close_write(&mut self) -> IoResult<()>; fn close_read(&mut self) -> IoResult<()>; + fn set_timeout(&mut self, timeout_ms: Option); + fn set_read_timeout(&mut self, timeout_ms: Option); + fn set_write_timeout(&mut self, timeout_ms: Option); } pub trait RtioUnixListener { diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 909df5618aa..8924ed7cfd2 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -323,6 +323,12 @@ impl BlockedTask { } } + /// Reawakens this task if ownership is acquired. If finer-grained control + /// is desired, use `wake` instead. + pub fn reawaken(self) { + self.wake().map(|t| t.reawaken()); + } + // This assertion has two flavours because the wake involves an atomic op. // In the faster version, destructors will fail dramatically instead. #[cfg(not(test))] pub fn trash(self) { }