auto merge of #13723 : alexcrichton/rust/pipe-connect-timeout, r=brson

This adds support for connecting to a unix socket with a timeout (a named pipe
on windows), and accepting a connection with a timeout. The goal is to bring
unix pipes/named sockets back in line with TCP support for timeouts.

Similarly to the TCP sockets, all methods are marked #[experimental] due to
uncertainty about the type of the timeout argument.

This internally involved a good bit of refactoring to share as much code as
possible between TCP servers and pipe servers, but the core implementation did
not change drastically as part of this commit.

cc #13523
This commit is contained in:
bors 2014-04-24 19:36:14 -07:00
commit 66486518d5
12 changed files with 547 additions and 436 deletions

View File

@ -225,7 +225,7 @@ pub use funcs::bsd43::{shutdown};
#[cfg(windows)] pub use consts::os::extra::{PIPE_UNLIMITED_INSTANCES, ERROR_ACCESS_DENIED};
#[cfg(windows)] pub use consts::os::extra::{FILE_WRITE_ATTRIBUTES, FILE_READ_ATTRIBUTES};
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_BUSY, ERROR_IO_PENDING};
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED};
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED, WAIT_OBJECT_0};
#[cfg(windows)] pub use types::os::common::bsd44::{SOCKET};
#[cfg(windows)] pub use types::os::common::posix01::{stat, utimbuf};
#[cfg(windows)] pub use types::os::arch::extra::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES};

View File

@ -59,4 +59,6 @@ extern "system" {
optname: libc::c_int,
optval: *mut libc::c_char,
optlen: *mut libc::c_int) -> libc::c_int;
pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL;
}

View File

@ -44,6 +44,7 @@ pub use self::process::Process;
pub mod addrinfo;
pub mod net;
pub mod process;
mod util;
#[cfg(unix)]
#[path = "file_unix.rs"]
@ -177,8 +178,9 @@ impl rtio::IoFactory for IoFactory {
fn unix_bind(&mut self, path: &CString) -> IoResult<~RtioUnixListener:Send> {
pipe::UnixListener::bind(path).map(|s| ~s as ~RtioUnixListener:Send)
}
fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe:Send> {
pipe::UnixStream::connect(path).map(|s| ~s as ~RtioPipe:Send)
fn unix_connect(&mut self, path: &CString,
timeout: Option<u64>) -> IoResult<~RtioPipe:Send> {
pipe::UnixStream::connect(path, timeout).map(|s| ~s as ~RtioPipe:Send)
}
fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
hint: Option<ai::Hint>) -> IoResult<~[ai::Info]> {

View File

@ -13,13 +13,12 @@ use std::cast;
use std::io::net::ip;
use std::io;
use std::mem;
use std::os;
use std::ptr;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;
use super::{IoResult, retry, keep_going};
use super::c;
use super::util;
////////////////////////////////////////////////////////////////////////////////
// sockaddr and misc bindings
@ -118,8 +117,8 @@ fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
}
}
fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
val: libc::c_int) -> IoResult<T> {
pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
val: libc::c_int) -> IoResult<T> {
unsafe {
let mut slot: T = mem::init();
let mut len = mem::size_of::<T>() as libc::socklen_t;
@ -145,21 +144,6 @@ fn last_error() -> io::IoError {
super::last_error()
}
fn ms_to_timeval(ms: u64) -> libc::timeval {
libc::timeval {
tv_sec: (ms / 1000) as libc::time_t,
tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t,
}
}
fn timeout(desc: &'static str) -> io::IoError {
io::IoError {
kind: io::TimedOut,
desc: desc,
detail: None,
}
}
#[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
#[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
@ -270,7 +254,7 @@ impl TcpStream {
let addrp = &addr as *_ as *libc::sockaddr;
match timeout {
Some(timeout) => {
try!(TcpStream::connect_timeout(fd, addrp, len, timeout));
try!(util::connect_timeout(fd, addrp, len, timeout));
Ok(ret)
},
None => {
@ -282,84 +266,6 @@ impl TcpStream {
}
}
// See http://developerweb.net/viewtopic.php?id=3196 for where this is
// derived from.
fn connect_timeout(fd: sock_t,
addrp: *libc::sockaddr,
len: libc::socklen_t,
timeout_ms: u64) -> IoResult<()> {
#[cfg(unix)] use INPROGRESS = libc::EINPROGRESS;
#[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
#[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK;
#[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK;
// Make sure the call to connect() doesn't block
try!(set_nonblocking(fd, true));
let ret = match unsafe { libc::connect(fd, addrp, len) } {
// If the connection is in progress, then we need to wait for it to
// finish (with a timeout). The current strategy for doing this is
// to use select() with a timeout.
-1 if os::errno() as int == INPROGRESS as int ||
os::errno() as int == WOULDBLOCK as int => {
let mut set: c::fd_set = unsafe { mem::init() };
c::fd_set(&mut set, fd);
match await(fd, &mut set, timeout_ms) {
0 => Err(timeout("connection timed out")),
-1 => Err(last_error()),
_ => {
let err: libc::c_int = try!(
getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR));
if err == 0 {
Ok(())
} else {
Err(io::IoError::from_errno(err as uint, true))
}
}
}
}
-1 => Err(last_error()),
_ => Ok(()),
};
// be sure to turn blocking I/O back on
try!(set_nonblocking(fd, false));
return ret;
#[cfg(unix)]
fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
let set = nb as libc::c_int;
super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
}
#[cfg(windows)]
fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
let mut set = nb as libc::c_ulong;
if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } {
Err(last_error())
} else {
Ok(())
}
}
#[cfg(unix)]
fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
let start = ::io::timer::now();
retry(|| unsafe {
// Recalculate the timeout each iteration (it is generally
// undefined what the value of the 'tv' is after select
// returns EINTR).
let tv = ms_to_timeval(timeout - (::io::timer::now() - start));
c::select(fd + 1, ptr::null(), &*set, ptr::null(), &tv)
})
}
#[cfg(windows)]
fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
let tv = ms_to_timeval(timeout);
unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) }
}
}
pub fn fd(&self) -> sock_t {
// This unsafety is fine because it's just a read-only arc
unsafe { (*self.inner.get()).fd }
@ -533,7 +439,7 @@ impl TcpAcceptor {
pub fn native_accept(&mut self) -> IoResult<TcpStream> {
if self.deadline != 0 {
try!(self.accept_deadline());
try!(util::accept_deadline(self.fd(), self.deadline));
}
unsafe {
let mut storage: libc::sockaddr_storage = mem::init();
@ -550,25 +456,6 @@ impl TcpAcceptor {
}
}
}
fn accept_deadline(&mut self) -> IoResult<()> {
let mut set: c::fd_set = unsafe { mem::init() };
c::fd_set(&mut set, self.fd());
match retry(|| {
// If we're past the deadline, then pass a 0 timeout to select() so
// we can poll the status of the socket.
let now = ::io::timer::now();
let ms = if self.deadline > now {0} else {self.deadline - now};
let tv = ms_to_timeval(ms);
let n = if cfg!(windows) {1} else {self.fd() as libc::c_int + 1};
unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) }
}) {
-1 => Err(last_error()),
0 => Err(timeout("accept timed out")),
_ => return Ok(()),
}
}
}
impl rtio::RtioSocket for TcpAcceptor {
@ -585,10 +472,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = match timeout {
None => 0,
Some(t) => ::io::timer::now() + t,
};
self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
}

View File

@ -8,16 +8,17 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use libc;
use std::c_str::CString;
use std::cast;
use std::intrinsics;
use std::io;
use libc;
use std::mem;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;
use std::intrinsics;
use super::{IoResult, retry, keep_going};
use super::util;
use super::file::fd_t;
fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
@ -52,22 +53,6 @@ fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint
return Ok((storage, len));
}
fn sockaddr_to_unix(storage: &libc::sockaddr_storage,
len: uint) -> IoResult<CString> {
match storage.ss_family as libc::c_int {
libc::AF_UNIX => {
assert!(len as uint <= mem::size_of::<libc::sockaddr_un>());
let storage: &libc::sockaddr_un = unsafe {
cast::transmute(storage)
};
unsafe {
Ok(CString::new(storage.sun_path.as_ptr(), false).clone())
}
}
_ => Err(io::standard_error(io::InvalidInput))
}
}
struct Inner {
fd: fd_t,
}
@ -76,16 +61,24 @@ impl Drop for Inner {
fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } }
}
fn connect(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
fn connect(addr: &CString, ty: libc::c_int,
timeout: Option<u64>) -> IoResult<Inner> {
let (addr, len) = try!(addr_to_sockaddr_un(addr));
let inner = Inner { fd: try!(unix_socket(ty)) };
let addrp = &addr as *libc::sockaddr_storage;
match retry(|| unsafe {
libc::connect(inner.fd, addrp as *libc::sockaddr,
len as libc::socklen_t)
}) {
-1 => Err(super::last_error()),
_ => Ok(inner)
let addrp = &addr as *_ as *libc::sockaddr;
let len = len as libc::socklen_t;
match timeout {
None => {
match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) {
-1 => Err(super::last_error()),
_ => Ok(inner)
}
}
Some(timeout_ms) => {
try!(util::connect_timeout(inner.fd, addrp, len, timeout_ms));
Ok(inner)
}
}
}
@ -110,8 +103,9 @@ pub struct UnixStream {
}
impl UnixStream {
pub fn connect(addr: &CString) -> IoResult<UnixStream> {
connect(addr, libc::SOCK_STREAM).map(|inner| {
pub fn connect(addr: &CString,
timeout: Option<u64>) -> IoResult<UnixStream> {
connect(addr, libc::SOCK_STREAM, timeout).map(|inner| {
UnixStream { inner: UnsafeArc::new(inner) }
})
}
@ -155,77 +149,6 @@ impl rtio::RtioPipe for UnixStream {
}
}
////////////////////////////////////////////////////////////////////////////////
// Unix Datagram
////////////////////////////////////////////////////////////////////////////////
pub struct UnixDatagram {
inner: UnsafeArc<Inner>,
}
impl UnixDatagram {
pub fn connect(addr: &CString) -> IoResult<UnixDatagram> {
connect(addr, libc::SOCK_DGRAM).map(|inner| {
UnixDatagram { inner: UnsafeArc::new(inner) }
})
}
pub fn bind(addr: &CString) -> IoResult<UnixDatagram> {
bind(addr, libc::SOCK_DGRAM).map(|inner| {
UnixDatagram { inner: UnsafeArc::new(inner) }
})
}
fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } }
pub fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, CString)> {
let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() };
let storagep = &mut storage as *mut libc::sockaddr_storage;
let mut addrlen: libc::socklen_t =
mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
let ret = retry(|| unsafe {
libc::recvfrom(self.fd(),
buf.as_ptr() as *mut libc::c_void,
buf.len() as libc::size_t,
0,
storagep as *mut libc::sockaddr,
&mut addrlen) as libc::c_int
});
if ret < 0 { return Err(super::last_error()) }
sockaddr_to_unix(&storage, addrlen as uint).and_then(|addr| {
Ok((ret as uint, addr))
})
}
pub fn sendto(&mut self, buf: &[u8], dst: &CString) -> IoResult<()> {
let (dst, len) = try!(addr_to_sockaddr_un(dst));
let dstp = &dst as *libc::sockaddr_storage;
let ret = retry(|| unsafe {
libc::sendto(self.fd(),
buf.as_ptr() as *libc::c_void,
buf.len() as libc::size_t,
0,
dstp as *libc::sockaddr,
len as libc::socklen_t) as libc::c_int
});
match ret {
-1 => Err(super::last_error()),
n if n as uint != buf.len() => {
Err(io::IoError {
kind: io::OtherIoError,
desc: "couldn't send entire packet at once",
detail: None,
})
}
_ => Ok(())
}
}
pub fn clone(&mut self) -> UnixDatagram {
UnixDatagram { inner: self.inner.clone() }
}
}
////////////////////////////////////////////////////////////////////////////////
// Unix Listener
////////////////////////////////////////////////////////////////////////////////
@ -247,7 +170,7 @@ impl UnixListener {
pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
-1 => Err(super::last_error()),
_ => Ok(UnixAcceptor { listener: self })
_ => Ok(UnixAcceptor { listener: self, deadline: 0 })
}
}
}
@ -260,12 +183,16 @@ impl rtio::RtioUnixListener for UnixListener {
pub struct UnixAcceptor {
listener: UnixListener,
deadline: u64,
}
impl UnixAcceptor {
fn fd(&self) -> fd_t { self.listener.fd() }
pub fn native_accept(&mut self) -> IoResult<UnixStream> {
if self.deadline != 0 {
try!(util::accept_deadline(self.fd(), self.deadline));
}
let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() };
let storagep = &mut storage as *mut libc::sockaddr_storage;
let size = mem::size_of::<libc::sockaddr_storage>();
@ -285,6 +212,9 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
fn accept(&mut self) -> IoResult<~rtio::RtioPipe:Send> {
self.native_accept().map(|s| ~s as ~rtio::RtioPipe:Send)
}
fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
}
impl Drop for UnixListener {

View File

@ -93,6 +93,8 @@ use std::sync::arc::UnsafeArc;
use std::intrinsics;
use super::IoResult;
use super::c;
use super::util;
struct Event(libc::HANDLE);
@ -210,8 +212,9 @@ impl UnixStream {
None
}
pub fn connect(addr: &CString) -> IoResult<UnixStream> {
pub fn connect(addr: &CString, timeout: Option<u64>) -> IoResult<UnixStream> {
as_utf16_p(addr.as_str().unwrap(), |p| {
let start = ::io::timer::now();
loop {
match UnixStream::try_connect(p) {
Some(handle) => {
@ -246,11 +249,26 @@ impl UnixStream {
return Err(super::last_error())
}
// An example I found on microsoft's website used 20 seconds,
// libuv uses 30 seconds, hence we make the obvious choice of
// waiting for 25 seconds.
if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 {
return Err(super::last_error())
match timeout {
Some(timeout) => {
let now = ::io::timer::now();
let timed_out = (now - start) >= timeout || unsafe {
let ms = (timeout - (now - start)) as libc::DWORD;
libc::WaitNamedPipeW(p, ms) == 0
};
if timed_out {
return Err(util::timeout("connect timed out"))
}
}
// An example I found on microsoft's website used 20
// seconds, libuv uses 30 seconds, hence we make the
// obvious choice of waiting for 25 seconds.
None => {
if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 {
return Err(super::last_error())
}
}
}
}
})
@ -372,6 +390,7 @@ impl UnixListener {
Ok(UnixAcceptor {
listener: self,
event: try!(Event::new(true, false)),
deadline: 0,
})
}
}
@ -391,6 +410,7 @@ impl rtio::RtioUnixListener for UnixListener {
pub struct UnixAcceptor {
listener: UnixListener,
event: Event,
deadline: u64,
}
impl UnixAcceptor {
@ -438,7 +458,28 @@ impl UnixAcceptor {
overlapped.hEvent = self.event.handle();
if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } {
let mut err = unsafe { libc::GetLastError() };
if err == libc::ERROR_IO_PENDING as libc::DWORD {
// If we've got a timeout, use WaitForSingleObject in tandem
// with CancelIo to figure out if we should indeed get the
// result.
if self.deadline != 0 {
let now = ::io::timer::now();
let timeout = self.deadline < now || unsafe {
let ms = (self.deadline - now) as libc::DWORD;
let r = libc::WaitForSingleObject(overlapped.hEvent,
ms);
r != libc::WAIT_OBJECT_0
};
if timeout {
unsafe { let _ = c::CancelIo(handle); }
return Err(util::timeout("accept timed out"))
}
}
// This will block until the overlapped I/O is completed. The
// timeout was previously handled, so this will either block in
// the normal case or succeed very quickly in the timeout case.
let ret = unsafe {
let mut transfer = 0;
libc::GetOverlappedResult(handle,
@ -488,5 +529,8 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
fn accept(&mut self) -> IoResult<~rtio::RtioPipe:Send> {
self.native_accept().map(|s| ~s as ~rtio::RtioPipe:Send)
}
fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0);
}
}

136
src/libnative/io/util.rs Normal file
View File

@ -0,0 +1,136 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use libc;
use std::io::IoResult;
use std::io;
use std::mem;
use std::ptr;
use super::c;
use super::net;
use super::{retry, last_error};
pub fn timeout(desc: &'static str) -> io::IoError {
io::IoError {
kind: io::TimedOut,
desc: desc,
detail: None,
}
}
pub fn ms_to_timeval(ms: u64) -> libc::timeval {
libc::timeval {
tv_sec: (ms / 1000) as libc::time_t,
tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t,
}
}
// See http://developerweb.net/viewtopic.php?id=3196 for where this is
// derived from.
pub fn connect_timeout(fd: net::sock_t,
addrp: *libc::sockaddr,
len: libc::socklen_t,
timeout_ms: u64) -> IoResult<()> {
use std::os;
#[cfg(unix)] use INPROGRESS = libc::EINPROGRESS;
#[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
#[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK;
#[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK;
// Make sure the call to connect() doesn't block
try!(set_nonblocking(fd, true));
let ret = match unsafe { libc::connect(fd, addrp, len) } {
// If the connection is in progress, then we need to wait for it to
// finish (with a timeout). The current strategy for doing this is
// to use select() with a timeout.
-1 if os::errno() as int == INPROGRESS as int ||
os::errno() as int == WOULDBLOCK as int => {
let mut set: c::fd_set = unsafe { mem::init() };
c::fd_set(&mut set, fd);
match await(fd, &mut set, timeout_ms) {
0 => Err(timeout("connection timed out")),
-1 => Err(last_error()),
_ => {
let err: libc::c_int = try!(
net::getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR));
if err == 0 {
Ok(())
} else {
Err(io::IoError::from_errno(err as uint, true))
}
}
}
}
-1 => Err(last_error()),
_ => Ok(()),
};
// be sure to turn blocking I/O back on
try!(set_nonblocking(fd, false));
return ret;
#[cfg(unix)]
fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> {
let set = nb as libc::c_int;
super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
}
#[cfg(windows)]
fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> {
let mut set = nb as libc::c_ulong;
if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } {
Err(last_error())
} else {
Ok(())
}
}
#[cfg(unix)]
fn await(fd: net::sock_t, set: &mut c::fd_set,
timeout: u64) -> libc::c_int {
let start = ::io::timer::now();
retry(|| unsafe {
// Recalculate the timeout each iteration (it is generally
// undefined what the value of the 'tv' is after select
// returns EINTR).
let tv = ms_to_timeval(timeout - (::io::timer::now() - start));
c::select(fd + 1, ptr::null(), set as *mut _ as *_,
ptr::null(), &tv)
})
}
#[cfg(windows)]
fn await(_fd: net::sock_t, set: &mut c::fd_set,
timeout: u64) -> libc::c_int {
let tv = ms_to_timeval(timeout);
unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) }
}
}
pub fn accept_deadline(fd: net::sock_t, deadline: u64) -> IoResult<()> {
let mut set: c::fd_set = unsafe { mem::init() };
c::fd_set(&mut set, fd);
match retry(|| {
// If we're past the deadline, then pass a 0 timeout to select() so
// we can poll the status of the socket.
let now = ::io::timer::now();
let ms = if deadline < now {0} else {deadline - now};
let tv = ms_to_timeval(ms);
let n = if cfg!(windows) {1} else {fd as libc::c_int + 1};
unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) }
}) {
-1 => Err(last_error()),
0 => Err(timeout("accept timed out")),
_ => return Ok(()),
}
}

View File

@ -9,7 +9,7 @@
// except according to those terms.
use std::cast;
use std::io::IoError;
use std::io::{IoError, IoResult};
use std::io::net::ip;
use libc::{size_t, ssize_t, c_int, c_void, c_uint};
use libc;
@ -145,6 +145,190 @@ fn socket_name(sk: SocketNameKind,
n => Err(uv_error_to_io_error(UvError(n)))
}
}
////////////////////////////////////////////////////////////////////////////////
// Helpers for handling timeouts, shared for pipes/tcp
////////////////////////////////////////////////////////////////////////////////
pub struct ConnectCtx {
pub status: c_int,
pub task: Option<BlockedTask>,
pub timer: Option<~TimerWatcher>,
}
pub struct AcceptTimeout {
timer: Option<TimerWatcher>,
timeout_tx: Option<Sender<()>>,
timeout_rx: Option<Receiver<()>>,
}
impl ConnectCtx {
pub fn connect<T>(
mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
f: |&Request, &T, uvll::uv_connect_cb| -> libc::c_int
) -> Result<T, UvError> {
let mut req = Request::new(uvll::UV_CONNECT);
let r = f(&req, &obj, connect_cb);
return match r {
0 => {
req.defuse(); // uv callback now owns this request
match timeout {
Some(t) => {
let mut timer = TimerWatcher::new(io);
timer.start(timer_cb, t, 0);
self.timer = Some(timer);
}
None => {}
}
wait_until_woken_after(&mut self.task, &io.loop_, || {
let data = &self as *_;
match self.timer {
Some(ref mut timer) => unsafe { timer.set_data(data) },
None => {}
}
req.set_data(data);
});
// Make sure an erroneously fired callback doesn't have access
// to the context any more.
req.set_data(0 as *int);
// If we failed because of a timeout, drop the TcpWatcher as
// soon as possible because it's data is now set to null and we
// want to cancel the callback ASAP.
match self.status {
0 => Ok(obj),
n => { drop(obj); Err(UvError(n)) }
}
}
n => Err(UvError(n))
};
extern fn timer_cb(handle: *uvll::uv_timer_t) {
// Don't close the corresponding tcp request, just wake up the task
// and let RAII take care of the pending watcher.
let cx: &mut ConnectCtx = unsafe {
&mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx)
};
cx.status = uvll::ECANCELED;
wakeup(&mut cx.task);
}
extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
// This callback can be invoked with ECANCELED if the watcher is
// closed by the timeout callback. In that case we just want to free
// the request and be along our merry way.
let req = Request::wrap(req);
if status == uvll::ECANCELED { return }
// Apparently on windows when the handle is closed this callback may
// not be invoked with ECANCELED but rather another error code.
// Either ways, if the data is null, then our timeout has expired
// and there's nothing we can do.
let data = unsafe { uvll::get_data_for_req(req.handle) };
if data.is_null() { return }
let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) };
cx.status = status;
match cx.timer {
Some(ref mut t) => t.stop(),
None => {}
}
// Note that the timer callback doesn't cancel the connect request
// (that's the job of uv_close()), so it's possible for this
// callback to get triggered after the timeout callback fires, but
// before the task wakes up. In that case, we did indeed
// successfully connect, but we don't need to wake someone up. We
// updated the status above (correctly so), and the task will pick
// up on this when it wakes up.
if cx.task.is_some() {
wakeup(&mut cx.task);
}
}
}
}
impl AcceptTimeout {
pub fn new() -> AcceptTimeout {
AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
}
pub fn accept<T: Send>(&mut self, c: &Receiver<IoResult<T>>) -> IoResult<T> {
match self.timeout_rx {
None => c.recv(),
Some(ref rx) => {
use std::comm::Select;
// Poll the incoming channel first (don't rely on the order of
// select just yet). If someone's pending then we should return
// them immediately.
match c.try_recv() {
Ok(data) => return data,
Err(..) => {}
}
// Use select to figure out which channel gets ready first. We
// do some custom handling of select to ensure that we never
// actually drain the timeout channel (we'll keep seeing the
// timeout message in the future).
let s = Select::new();
let mut timeout = s.handle(rx);
let mut data = s.handle(c);
unsafe {
timeout.add();
data.add();
}
if s.wait() == timeout.id() {
Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
} else {
c.recv()
}
}
}
}
pub fn clear(&mut self) {
// Clear any previous timeout by dropping the timer and transmission
// channels
drop((self.timer.take(),
self.timeout_tx.take(),
self.timeout_rx.take()))
}
pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
&mut self, ms: u64, t: &mut T
) {
// If we have a timeout, lazily initialize the timer which will be used
// to fire when the timeout runs out.
if self.timer.is_none() {
let _m = t.fire_homing_missile();
let loop_ = Loop::wrap(unsafe {
uvll::get_loop_for_uv_handle(t.uv_handle())
});
let mut timer = TimerWatcher::new_home(&loop_, t.home().clone());
unsafe {
timer.set_data(self as *mut _ as *AcceptTimeout);
}
self.timer = Some(timer);
}
// Once we've got a timer, stop any previous timeout, reset it for the
// current one, and install some new channels to send/receive data on
let timer = self.timer.get_mut_ref();
timer.stop();
timer.start(timer_cb, ms, 0);
let (tx, rx) = channel();
self.timeout_tx = Some(tx);
self.timeout_rx = Some(rx);
extern fn timer_cb(timer: *uvll::uv_timer_t) {
let acceptor: &mut AcceptTimeout = unsafe {
&mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
};
// This send can never fail because if this timer is active then the
// receiving channel is guaranteed to be alive
acceptor.timeout_tx.get_ref().send(());
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// TCP implementation
@ -174,9 +358,7 @@ pub struct TcpListener {
pub struct TcpAcceptor {
listener: ~TcpListener,
timer: Option<TimerWatcher>,
timeout_tx: Option<Sender<()>>,
timeout_rx: Option<Receiver<()>>,
timeout: AcceptTimeout,
}
// TCP watchers (clients/streams)
@ -205,97 +387,13 @@ impl TcpWatcher {
pub fn connect(io: &mut UvIoFactory,
address: ip::SocketAddr,
timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
struct Ctx {
status: c_int,
task: Option<BlockedTask>,
timer: Option<~TimerWatcher>,
}
let tcp = TcpWatcher::new(io);
let cx = ConnectCtx { status: -1, task: None, timer: None };
let (addr, _len) = addr_to_sockaddr(address);
let mut req = Request::new(uvll::UV_CONNECT);
let result = unsafe {
let addr_p = &addr as *libc::sockaddr_storage;
uvll::uv_tcp_connect(req.handle, tcp.handle,
addr_p as *libc::sockaddr,
connect_cb)
};
return match result {
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { status: -1, task: None, timer: None };
match timeout {
Some(t) => {
let mut timer = TimerWatcher::new(io);
timer.start(timer_cb, t, 0);
cx.timer = Some(timer);
}
None => {}
}
wait_until_woken_after(&mut cx.task, &io.loop_, || {
let data = &cx as *_;
match cx.timer {
Some(ref mut timer) => unsafe { timer.set_data(data) },
None => {}
}
req.set_data(data);
});
// Make sure an erroneously fired callback doesn't have access
// to the context any more.
req.set_data(0 as *int);
// If we failed because of a timeout, drop the TcpWatcher as
// soon as possible because it's data is now set to null and we
// want to cancel the callback ASAP.
match cx.status {
0 => Ok(tcp),
n => { drop(tcp); Err(UvError(n)) }
}
}
n => Err(UvError(n))
};
extern fn timer_cb(handle: *uvll::uv_timer_t) {
// Don't close the corresponding tcp request, just wake up the task
// and let RAII take care of the pending watcher.
let cx: &mut Ctx = unsafe {
&mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx)
};
cx.status = uvll::ECANCELED;
wakeup(&mut cx.task);
}
extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
// This callback can be invoked with ECANCELED if the watcher is
// closed by the timeout callback. In that case we just want to free
// the request and be along our merry way.
let req = Request::wrap(req);
if status == uvll::ECANCELED { return }
// Apparently on windows when the handle is closed this callback may
// not be invoked with ECANCELED but rather another error code.
// Either ways, if the data is null, then our timeout has expired
// and there's nothing we can do.
let data = unsafe { uvll::get_data_for_req(req.handle) };
if data.is_null() { return }
let cx: &mut Ctx = unsafe { &mut *(data as *mut Ctx) };
cx.status = status;
match cx.timer {
Some(ref mut t) => t.stop(),
None => {}
}
// Note that the timer callback doesn't cancel the connect request
// (that's the job of uv_close()), so it's possible for this
// callback to get triggered after the timeout callback fires, but
// before the task wakes up. In that case, we did indeed
// successfully connect, but we don't need to wake someone up. We
// updated the status above (correctly so), and the task will pick
// up on this when it wakes up.
if cx.task.is_some() {
wakeup(&mut cx.task);
}
}
let addr_p = &addr as *_ as *libc::sockaddr;
cx.connect(tcp, timeout, io, |req, tcp, cb| {
unsafe { uvll::uv_tcp_connect(req.handle, tcp.handle, addr_p, cb) }
})
}
}
@ -463,9 +561,7 @@ impl rtio::RtioTcpListener for TcpListener {
// create the acceptor object from ourselves
let mut acceptor = ~TcpAcceptor {
listener: self,
timer: None,
timeout_tx: None,
timeout_rx: None,
timeout: AcceptTimeout::new(),
};
let _m = acceptor.fire_homing_missile();
@ -516,37 +612,7 @@ impl rtio::RtioSocket for TcpAcceptor {
impl rtio::RtioTcpAcceptor for TcpAcceptor {
fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> {
match self.timeout_rx {
None => self.listener.incoming.recv(),
Some(ref rx) => {
use std::comm::Select;
// Poll the incoming channel first (don't rely on the order of
// select just yet). If someone's pending then we should return
// them immediately.
match self.listener.incoming.try_recv() {
Ok(data) => return data,
Err(..) => {}
}
// Use select to figure out which channel gets ready first. We
// do some custom handling of select to ensure that we never
// actually drain the timeout channel (we'll keep seeing the
// timeout message in the future).
let s = Select::new();
let mut timeout = s.handle(rx);
let mut data = s.handle(&self.listener.incoming);
unsafe {
timeout.add();
data.add();
}
if s.wait() == timeout.id() {
Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
} else {
self.listener.incoming.recv()
}
}
}
self.timeout.accept(&self.listener.incoming)
}
fn accept_simultaneously(&mut self) -> Result<(), IoError> {
@ -564,47 +630,9 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
}
fn set_timeout(&mut self, ms: Option<u64>) {
// First, if the timeout is none, clear any previous timeout by dropping
// the timer and transmission channels
let ms = match ms {
None => {
return drop((self.timer.take(),
self.timeout_tx.take(),
self.timeout_rx.take()))
}
Some(ms) => ms,
};
// If we have a timeout, lazily initialize the timer which will be used
// to fire when the timeout runs out.
if self.timer.is_none() {
let _m = self.fire_homing_missile();
let loop_ = Loop::wrap(unsafe {
uvll::get_loop_for_uv_handle(self.listener.handle)
});
let mut timer = TimerWatcher::new_home(&loop_, self.home().clone());
unsafe {
timer.set_data(self as *mut _ as *TcpAcceptor);
}
self.timer = Some(timer);
}
// Once we've got a timer, stop any previous timeout, reset it for the
// current one, and install some new channels to send/receive data on
let timer = self.timer.get_mut_ref();
timer.stop();
timer.start(timer_cb, ms, 0);
let (tx, rx) = channel();
self.timeout_tx = Some(tx);
self.timeout_rx = Some(rx);
extern fn timer_cb(timer: *uvll::uv_timer_t) {
let acceptor: &mut TcpAcceptor = unsafe {
&mut *(uvll::get_data_for_uv_handle(timer) as *mut TcpAcceptor)
};
// This send can never fail because if this timer is active then the
// receiving channel is guaranteed to be alive
acceptor.timeout_tx.get_ref().send(());
match ms {
None => self.timeout.clear(),
Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
}
}
}

View File

@ -12,14 +12,13 @@ use std::c_str::CString;
use std::io::IoError;
use libc;
use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
use std::rt::task::BlockedTask;
use access::Access;
use homing::{HomingIO, HomeHandle};
use net;
use rc::Refcount;
use stream::StreamWatcher;
use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error,
wait_until_woken_after, wakeup};
use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
use uvio::UvIoFactory;
use uvll;
@ -43,6 +42,7 @@ pub struct PipeListener {
pub struct PipeAcceptor {
listener: ~PipeListener,
timeout: net::AcceptTimeout,
}
// PipeWatcher implementation and traits
@ -84,36 +84,18 @@ impl PipeWatcher {
}
}
pub fn connect(io: &mut UvIoFactory, name: &CString)
pub fn connect(io: &mut UvIoFactory, name: &CString, timeout: Option<u64>)
-> Result<PipeWatcher, UvError>
{
struct Ctx { task: Option<BlockedTask>, result: libc::c_int, }
let mut cx = Ctx { task: None, result: 0 };
let mut req = Request::new(uvll::UV_CONNECT);
let pipe = PipeWatcher::new(io, false);
wait_until_woken_after(&mut cx.task, &io.loop_, || {
let cx = net::ConnectCtx { status: -1, task: None, timer: None };
cx.connect(pipe, timeout, io, |req, pipe, cb| {
unsafe {
uvll::uv_pipe_connect(req.handle,
pipe.handle(),
name.with_ref(|p| p),
connect_cb)
uvll::uv_pipe_connect(req.handle, pipe.handle(),
name.with_ref(|p| p), cb)
}
req.set_data(&cx);
req.defuse(); // uv callback now owns this request
});
return match cx.result {
0 => Ok(pipe),
n => Err(UvError(n))
};
extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {;
let req = Request::wrap(req);
assert!(status != uvll::ECANCELED);
let cx: &mut Ctx = unsafe { req.get_data() };
cx.result = status;
wakeup(&mut cx.task);
}
0
})
}
pub fn handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }
@ -199,7 +181,10 @@ impl PipeListener {
impl RtioUnixListener for PipeListener {
fn listen(~self) -> Result<~RtioUnixAcceptor:Send, IoError> {
// create the acceptor object from ourselves
let mut acceptor = ~PipeAcceptor { listener: self };
let mut acceptor = ~PipeAcceptor {
listener: self,
timeout: net::AcceptTimeout::new(),
};
let _m = acceptor.fire_homing_missile();
// FIXME: the 128 backlog should be configurable
@ -247,7 +232,14 @@ impl Drop for PipeListener {
impl RtioUnixAcceptor for PipeAcceptor {
fn accept(&mut self) -> Result<~RtioPipe:Send, IoError> {
self.listener.incoming.recv()
self.timeout.accept(&self.listener.incoming)
}
fn set_timeout(&mut self, timeout_ms: Option<u64>) {
match timeout_ms {
None => self.timeout.clear(),
Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
}
}
}
@ -265,7 +257,8 @@ mod tests {
#[test]
fn connect_err() {
match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str()) {
match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str(),
None) {
Ok(..) => fail!(),
Err(..) => {}
}
@ -312,7 +305,7 @@ mod tests {
assert!(client.write([2]).is_ok());
});
rx.recv();
let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str()).unwrap();
let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
assert!(c.write([1]).is_ok());
let mut buf = [0];
assert!(c.read(buf).unwrap() == 1);
@ -332,7 +325,7 @@ mod tests {
drop(p.accept().unwrap());
});
rx.recv();
let _c = PipeWatcher::connect(local_loop(), &path.to_c_str()).unwrap();
let _c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
fail!()
}

View File

@ -291,8 +291,9 @@ impl IoFactory for UvIoFactory {
}
}
fn unix_connect(&mut self, path: &CString) -> Result<~rtio::RtioPipe:Send, IoError> {
match PipeWatcher::connect(self, path) {
fn unix_connect(&mut self, path: &CString,
timeout: Option<u64>) -> Result<~rtio::RtioPipe:Send, IoError> {
match PipeWatcher::connect(self, path, timeout) {
Ok(p) => Ok(~p as ~rtio::RtioPipe:Send),
Err(e) => Err(uv_error_to_io_error(e)),
}

View File

@ -61,7 +61,31 @@ impl UnixStream {
/// ```
pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> {
LocalIo::maybe_raise(|io| {
io.unix_connect(&path.to_c_str()).map(UnixStream::new)
io.unix_connect(&path.to_c_str(), None).map(UnixStream::new)
})
}
/// Connect to a pipe named by `path`. This will attempt to open a
/// connection to the underlying socket.
///
/// The returned stream will be closed when the object falls out of scope.
///
/// # Example
///
/// ```rust
/// # #![allow(unused_must_use)]
/// use std::io::net::unix::UnixStream;
///
/// let server = Path::new("path/to/my/socket");
/// let mut stream = UnixStream::connect(&server);
/// stream.write([1, 2, 3]);
/// ```
#[experimental = "the timeout argument is likely to change types"]
pub fn connect_timeout<P: ToCStr>(path: &P,
timeout_ms: u64) -> IoResult<UnixStream> {
LocalIo::maybe_raise(|io| {
let s = io.unix_connect(&path.to_c_str(), Some(timeout_ms));
s.map(UnixStream::new)
})
}
}
@ -128,6 +152,25 @@ pub struct UnixAcceptor {
obj: ~RtioUnixAcceptor:Send,
}
impl UnixAcceptor {
/// Sets a timeout for this acceptor, after which accept() will no longer
/// block indefinitely.
///
/// The argument specified is the amount of time, in milliseconds, into the
/// future after which all invocations of accept() will not block (and any
/// pending invocation will return). A value of `None` will clear any
/// existing timeout.
///
/// When using this method, it is likely necessary to reset the timeout as
/// appropriate, the timeout specified is specific to this object, not
/// specific to the next request.
#[experimental = "the name and arguments to this function are likely \
to change"]
pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_timeout(timeout_ms)
}
}
impl Acceptor<UnixStream> for UnixAcceptor {
fn accept(&mut self) -> IoResult<UnixStream> {
self.obj.accept().map(UnixStream::new)
@ -135,6 +178,7 @@ impl Acceptor<UnixStream> for UnixAcceptor {
}
#[cfg(test)]
#[allow(experimental)]
mod tests {
use prelude::*;
use super::*;
@ -371,4 +415,49 @@ mod tests {
drop(l.listen().unwrap());
assert!(!path.exists());
} #[cfg(not(windows))])
iotest!(fn accept_timeout() {
let addr = next_test_unix();
let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap();
a.set_timeout(Some(10));
// Make sure we time out once and future invocations also time out
let err = a.accept().err().unwrap();
assert_eq!(err.kind, TimedOut);
let err = a.accept().err().unwrap();
assert_eq!(err.kind, TimedOut);
// Also make sure that even though the timeout is expired that we will
// continue to receive any pending connections.
let l = UnixStream::connect(&addr).unwrap();
for i in range(0, 1001) {
match a.accept() {
Ok(..) => break,
Err(ref e) if e.kind == TimedOut => {}
Err(e) => fail!("error: {}", e),
}
if i == 1000 { fail!("should have a pending connection") }
}
drop(l);
// Unset the timeout and make sure that this always blocks.
a.set_timeout(None);
let addr2 = addr.clone();
spawn(proc() {
drop(UnixStream::connect(&addr2));
});
a.accept().unwrap();
})
iotest!(fn connect_timeout_error() {
let addr = next_test_unix();
assert!(UnixStream::connect_timeout(&addr, 100).is_err());
})
iotest!(fn connect_timeout_success() {
let addr = next_test_unix();
let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
assert!(UnixStream::connect_timeout(&addr, 100).is_ok());
})
}

View File

@ -152,7 +152,8 @@ pub trait IoFactory {
fn udp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioUdpSocket:Send>;
fn unix_bind(&mut self, path: &CString)
-> IoResult<~RtioUnixListener:Send>;
fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe:Send>;
fn unix_connect(&mut self, path: &CString,
timeout: Option<u64>) -> IoResult<~RtioPipe:Send>;
fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
hint: Option<ai::Hint>) -> IoResult<~[ai::Info]>;
@ -274,6 +275,7 @@ pub trait RtioUnixListener {
pub trait RtioUnixAcceptor {
fn accept(&mut self) -> IoResult<~RtioPipe:Send>;
fn set_timeout(&mut self, timeout: Option<u64>);
}
pub trait RtioTTY {