auto merge of #13604 : alexcrichton/rust/connect-timeout, r=brson

This adds a `TcpStream::connect_timeout` function in order to assist opening
connections with a timeout (cc #13523). There isn't really much design space for
this specific operation (unlike timing out normal blocking reads/writes), so I
am fairly confident that this is the correct interface for this function.

The function is marked #[experimental] because it takes a u64 timeout argument,
and the u64 type is likely to change in the future.
This commit is contained in:
bors 2014-04-19 00:56:30 -07:00
commit 158e0c86fe
15 changed files with 490 additions and 145 deletions

View File

@ -87,13 +87,14 @@ pub use types::common::c95::{FILE, c_void, fpos_t};
pub use types::common::c99::{int8_t, int16_t, int32_t, int64_t};
pub use types::common::c99::{uint8_t, uint16_t, uint32_t, uint64_t};
pub use types::common::posix88::{DIR, dirent_t};
pub use types::os::common::posix01::{timeval};
pub use types::os::common::bsd44::{addrinfo, in_addr, in6_addr, sockaddr_storage};
pub use types::os::common::bsd44::{ip_mreq, ip6_mreq, sockaddr, sockaddr_un};
pub use types::os::common::bsd44::{sa_family_t, sockaddr_in, sockaddr_in6, socklen_t};
pub use types::os::arch::c95::{c_char, c_double, c_float, c_int, c_uint};
pub use types::os::arch::c95::{c_long, c_short, c_uchar, c_ulong};
pub use types::os::arch::c95::{c_ushort, clock_t, ptrdiff_t};
pub use types::os::arch::c95::{size_t, time_t};
pub use types::os::arch::c95::{size_t, time_t, suseconds_t};
pub use types::os::arch::c99::{c_longlong, c_ulonglong};
pub use types::os::arch::c99::{intptr_t, uintptr_t};
pub use types::os::arch::posix88::{dev_t, ino_t, mode_t};
@ -113,7 +114,7 @@ pub use consts::os::posix88::{STDERR_FILENO, STDIN_FILENO, S_IXUSR};
pub use consts::os::posix88::{STDOUT_FILENO, W_OK, X_OK};
pub use consts::os::bsd44::{AF_INET, AF_INET6, SOCK_STREAM, SOCK_DGRAM};
pub use consts::os::bsd44::{IPPROTO_IP, IPPROTO_IPV6, IPPROTO_TCP, TCP_NODELAY};
pub use consts::os::bsd44::{SOL_SOCKET, SO_KEEPALIVE};
pub use consts::os::bsd44::{SOL_SOCKET, SO_KEEPALIVE, SO_ERROR};
pub use consts::os::bsd44::{SO_REUSEADDR, SO_BROADCAST, SHUT_WR, IP_MULTICAST_LOOP};
pub use consts::os::bsd44::{IP_ADD_MEMBERSHIP, IP_DROP_MEMBERSHIP};
pub use consts::os::bsd44::{IPV6_ADD_MEMBERSHIP, IPV6_DROP_MEMBERSHIP};
@ -170,14 +171,13 @@ pub use funcs::bsd43::{shutdown};
#[cfg(unix)] pub use consts::os::posix88::{ECONNREFUSED, ECONNRESET, EPERM, EPIPE};
#[cfg(unix)] pub use consts::os::posix88::{ENOTCONN, ECONNABORTED, EADDRNOTAVAIL, EINTR};
#[cfg(unix)] pub use consts::os::posix88::{EADDRINUSE, ENOENT, EISDIR, EAGAIN, EWOULDBLOCK};
#[cfg(unix)] pub use consts::os::posix88::{ECANCELED, SIGINT};
#[cfg(unix)] pub use consts::os::posix88::{ECANCELED, SIGINT, EINPROGRESS};
#[cfg(unix)] pub use consts::os::posix88::{SIGTERM, SIGKILL, SIGPIPE, PROT_NONE};
#[cfg(unix)] pub use consts::os::posix01::{SIG_IGN, WNOHANG};
#[cfg(unix)] pub use consts::os::bsd44::{AF_UNIX};
#[cfg(unix)] pub use types::os::common::posix01::{pthread_t, timespec, timezone, timeval};
#[cfg(unix)] pub use types::os::common::posix01::{pthread_t, timespec, timezone};
#[cfg(unix)] pub use types::os::arch::c95::{suseconds_t};
#[cfg(unix)] pub use types::os::arch::posix88::{uid_t, gid_t};
#[cfg(unix)] pub use types::os::arch::posix01::{pthread_attr_t};
#[cfg(unix)] pub use types::os::arch::posix01::{stat, utimbuf};
@ -195,6 +195,7 @@ pub use funcs::bsd43::{shutdown};
#[cfg(windows)] pub use consts::os::c95::{WSAECONNREFUSED, WSAECONNRESET, WSAEACCES};
#[cfg(windows)] pub use consts::os::c95::{WSAEWOULDBLOCK, WSAENOTCONN, WSAECONNABORTED};
#[cfg(windows)] pub use consts::os::c95::{WSAEADDRNOTAVAIL, WSAEADDRINUSE, WSAEINTR};
#[cfg(windows)] pub use consts::os::c95::{WSAEINPROGRESS};
#[cfg(windows)] pub use consts::os::extra::{ERROR_INSUFFICIENT_BUFFER};
#[cfg(windows)] pub use consts::os::extra::{O_BINARY, O_NOINHERIT, PAGE_NOACCESS};
#[cfg(windows)] pub use consts::os::extra::{PAGE_READONLY, PAGE_READWRITE, PAGE_EXECUTE};
@ -1708,6 +1709,7 @@ pub mod consts {
pub static SO_KEEPALIVE: c_int = 8;
pub static SO_BROADCAST: c_int = 32;
pub static SO_REUSEADDR: c_int = 4;
pub static SO_ERROR: c_int = 0x1007;
pub static SHUT_RD: c_int = 0;
pub static SHUT_WR: c_int = 1;
@ -2496,6 +2498,7 @@ pub mod consts {
pub static SO_KEEPALIVE: c_int = 9;
pub static SO_BROADCAST: c_int = 6;
pub static SO_REUSEADDR: c_int = 2;
pub static SO_ERROR: c_int = 4;
pub static SHUT_RD: c_int = 0;
pub static SHUT_WR: c_int = 1;
@ -2954,6 +2957,7 @@ pub mod consts {
pub static SO_KEEPALIVE: c_int = 0x0008;
pub static SO_BROADCAST: c_int = 0x0020;
pub static SO_REUSEADDR: c_int = 0x0004;
pub static SO_ERROR: c_int = 0x1007;
pub static SHUT_RD: c_int = 0;
pub static SHUT_WR: c_int = 1;
@ -3340,6 +3344,7 @@ pub mod consts {
pub static SO_KEEPALIVE: c_int = 0x0008;
pub static SO_BROADCAST: c_int = 0x0020;
pub static SO_REUSEADDR: c_int = 0x0004;
pub static SO_ERROR: c_int = 0x1007;
pub static SHUT_RD: c_int = 0;
pub static SHUT_WR: c_int = 1;

View File

@ -0,0 +1,76 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! C definitions used by libnative that don't belong in liblibc
pub use self::select::fd_set;
use libc;
#[cfg(target_os = "macos")]
#[cfg(target_os = "freebsd")]
pub static FIONBIO: libc::c_ulong = 0x8004667e;
#[cfg(target_os = "linux")]
#[cfg(target_os = "android")]
pub static FIONBIO: libc::c_ulong = 0x5421;
#[cfg(target_os = "macos")]
#[cfg(target_os = "freebsd")]
pub static FIOCLEX: libc::c_ulong = 0x20006601;
#[cfg(target_os = "linux")]
#[cfg(target_os = "android")]
pub static FIOCLEX: libc::c_ulong = 0x5451;
extern {
pub fn gettimeofday(timeval: *mut libc::timeval,
tzp: *libc::c_void) -> libc::c_int;
pub fn select(nfds: libc::c_int,
readfds: *fd_set,
writefds: *fd_set,
errorfds: *fd_set,
timeout: *libc::timeval) -> libc::c_int;
pub fn getsockopt(sockfd: libc::c_int,
level: libc::c_int,
optname: libc::c_int,
optval: *mut libc::c_void,
optlen: *mut libc::socklen_t) -> libc::c_int;
pub fn ioctl(fd: libc::c_int, req: libc::c_ulong, ...) -> libc::c_int;
}
#[cfg(target_os = "macos")]
mod select {
pub static FD_SETSIZE: uint = 1024;
pub struct fd_set {
fds_bits: [i32, ..(FD_SETSIZE / 32)]
}
pub fn fd_set(set: &mut fd_set, fd: i32) {
set.fds_bits[(fd / 32) as uint] |= 1 << (fd % 32);
}
}
#[cfg(target_os = "android")]
#[cfg(target_os = "freebsd")]
#[cfg(target_os = "linux")]
mod select {
use std::uint;
pub static FD_SETSIZE: uint = 1024;
pub struct fd_set {
fds_bits: [uint, ..(FD_SETSIZE / uint::BITS)]
}
pub fn fd_set(set: &mut fd_set, fd: i32) {
let fd = fd as uint;
set.fds_bits[fd / uint::BITS] |= 1 << (fd % uint::BITS);
}
}

View File

@ -0,0 +1,62 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! C definitions used by libnative that don't belong in liblibc
#![allow(type_overflow)]
use libc;
pub static WSADESCRIPTION_LEN: uint = 256;
pub static WSASYS_STATUS_LEN: uint = 128;
pub static FIONBIO: libc::c_long = 0x8004667e;
static FD_SETSIZE: uint = 64;
pub struct WSADATA {
pub wVersion: libc::WORD,
pub wHighVersion: libc::WORD,
pub szDescription: [u8, ..WSADESCRIPTION_LEN + 1],
pub szSystemStatus: [u8, ..WSASYS_STATUS_LEN + 1],
pub iMaxSockets: u16,
pub iMaxUdpDg: u16,
pub lpVendorInfo: *u8,
}
pub type LPWSADATA = *mut WSADATA;
pub struct fd_set {
fd_count: libc::c_uint,
fd_array: [libc::SOCKET, ..FD_SETSIZE],
}
pub fn fd_set(set: &mut fd_set, s: libc::SOCKET) {
set.fd_array[set.fd_count as uint] = s;
set.fd_count += 1;
}
#[link(name = "ws2_32")]
extern "system" {
pub fn WSAStartup(wVersionRequested: libc::WORD,
lpWSAData: LPWSADATA) -> libc::c_int;
pub fn WSAGetLastError() -> libc::c_int;
pub fn ioctlsocket(s: libc::SOCKET, cmd: libc::c_long,
argp: *mut libc::c_ulong) -> libc::c_int;
pub fn select(nfds: libc::c_int,
readfds: *mut fd_set,
writefds: *mut fd_set,
exceptfds: *mut fd_set,
timeout: *libc::timeval) -> libc::c_int;
pub fn getsockopt(sockfd: libc::SOCKET,
level: libc::c_int,
optname: libc::c_int,
optval: *mut libc::c_char,
optlen: *mut libc::c_int) -> libc::c_int;
}

View File

@ -71,6 +71,9 @@ pub mod pipe;
#[path = "pipe_win32.rs"]
pub mod pipe;
#[cfg(unix)] #[path = "c_unix.rs"] mod c;
#[cfg(windows)] #[path = "c_win32.rs"] mod c;
mod timer_helper;
pub type IoResult<T> = Result<T, IoError>;
@ -161,8 +164,9 @@ impl IoFactory {
impl rtio::IoFactory for IoFactory {
// networking
fn tcp_connect(&mut self, addr: SocketAddr) -> IoResult<~RtioTcpStream:Send> {
net::TcpStream::connect(addr).map(|s| ~s as ~RtioTcpStream:Send)
fn tcp_connect(&mut self, addr: SocketAddr,
timeout: Option<u64>) -> IoResult<~RtioTcpStream:Send> {
net::TcpStream::connect(addr, timeout).map(|s| ~s as ~RtioTcpStream:Send)
}
fn tcp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioTcpListener:Send> {
net::TcpListener::bind(addr).map(|s| ~s as ~RtioTcpListener:Send)

View File

@ -8,15 +8,17 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use libc;
use std::cast;
use std::io::net::ip;
use std::io;
use libc;
use std::mem;
use std::ptr;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;
use super::{IoResult, retry, keep_going};
use super::c;
////////////////////////////////////////////////////////////////////////////////
// sockaddr and misc bindings
@ -115,12 +117,26 @@ fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
}
}
fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
val: libc::c_int) -> IoResult<T> {
unsafe {
let mut slot: T = mem::init();
let mut len = mem::size_of::<T>() as libc::socklen_t;
let ret = c::getsockopt(fd, opt, val,
&mut slot as *mut _ as *mut _,
&mut len);
if ret != 0 {
Err(last_error())
} else {
assert!(len as uint == mem::size_of::<T>());
Ok(slot)
}
}
}
#[cfg(windows)]
fn last_error() -> io::IoError {
extern "system" {
fn WSAGetLastError() -> libc::c_int;
}
io::IoError::from_errno(unsafe { WSAGetLastError() } as uint, true)
io::IoError::from_errno(unsafe { c::WSAGetLastError() } as uint, true)
}
#[cfg(not(windows))]
@ -197,24 +213,6 @@ pub fn init() {}
#[cfg(windows)]
pub fn init() {
static WSADESCRIPTION_LEN: uint = 256;
static WSASYS_STATUS_LEN: uint = 128;
struct WSADATA {
wVersion: libc::WORD,
wHighVersion: libc::WORD,
szDescription: [u8, ..WSADESCRIPTION_LEN + 1],
szSystemStatus: [u8, ..WSASYS_STATUS_LEN + 1],
iMaxSockets: u16,
iMaxUdpDg: u16,
lpVendorInfo: *u8,
}
type LPWSADATA = *mut WSADATA;
#[link(name = "ws2_32")]
extern "system" {
fn WSAStartup(wVersionRequested: libc::WORD,
lpWSAData: LPWSADATA) -> libc::c_int;
}
unsafe {
use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
@ -223,9 +221,9 @@ pub fn init() {
let _guard = LOCK.lock();
if !INITIALIZED {
let mut data: WSADATA = mem::init();
let ret = WSAStartup(0x202, // version 2.2
&mut data);
let mut data: c::WSADATA = mem::init();
let ret = c::WSAStartup(0x202, // version 2.2
&mut data);
assert_eq!(ret, 0);
INITIALIZED = true;
}
@ -245,22 +243,118 @@ struct Inner {
}
impl TcpStream {
pub fn connect(addr: ip::SocketAddr) -> IoResult<TcpStream> {
unsafe {
socket(addr, libc::SOCK_STREAM).and_then(|fd| {
let (addr, len) = addr_to_sockaddr(addr);
let addrp = &addr as *libc::sockaddr_storage;
let inner = Inner { fd: fd };
let ret = TcpStream { inner: UnsafeArc::new(inner) };
match retry(|| {
libc::connect(fd, addrp as *libc::sockaddr,
len as libc::socklen_t)
}) {
pub fn connect(addr: ip::SocketAddr,
timeout: Option<u64>) -> IoResult<TcpStream> {
let fd = try!(socket(addr, libc::SOCK_STREAM));
let (addr, len) = addr_to_sockaddr(addr);
let inner = Inner { fd: fd };
let ret = TcpStream { inner: UnsafeArc::new(inner) };
let len = len as libc::socklen_t;
let addrp = &addr as *_ as *libc::sockaddr;
match timeout {
Some(timeout) => {
try!(TcpStream::connect_timeout(fd, addrp, len, timeout));
Ok(ret)
},
None => {
match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
-1 => Err(last_error()),
_ => Ok(ret),
}
}
}
}
// See http://developerweb.net/viewtopic.php?id=3196 for where this is
// derived from.
fn connect_timeout(fd: sock_t,
addrp: *libc::sockaddr,
len: libc::socklen_t,
timeout: u64) -> IoResult<()> {
use std::os;
#[cfg(unix)] use INPROGRESS = libc::EINPROGRESS;
#[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
#[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK;
#[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK;
// Make sure the call to connect() doesn't block
try!(set_nonblocking(fd, true));
let ret = match unsafe { libc::connect(fd, addrp, len) } {
// If the connection is in progress, then we need to wait for it to
// finish (with a timeout). The current strategy for doing this is
// to use select() with a timeout.
-1 if os::errno() as int == INPROGRESS as int ||
os::errno() as int == WOULDBLOCK as int => {
let mut set: c::fd_set = unsafe { mem::init() };
c::fd_set(&mut set, fd);
match await(fd, &mut set, timeout) {
0 => Err(io::IoError {
kind: io::TimedOut,
desc: "connection timed out",
detail: None,
}),
-1 => Err(last_error()),
_ => {
let err: libc::c_int = try!(
getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR));
if err == 0 {
Ok(())
} else {
Err(io::IoError::from_errno(err as uint, true))
}
}
}
}
-1 => Err(last_error()),
_ => Ok(()),
};
// be sure to turn blocking I/O back on
try!(set_nonblocking(fd, false));
return ret;
#[cfg(unix)]
fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
let set = nb as libc::c_int;
super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
}
#[cfg(windows)]
fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
let mut set = nb as libc::c_ulong;
if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } {
Err(last_error())
} else {
Ok(())
}
}
#[cfg(unix)]
fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
let start = ::io::timer::now();
retry(|| unsafe {
// Recalculate the timeout each iteration (it is generally
// undefined what the value of the 'tv' is after select
// returns EINTR).
let timeout = timeout - (::io::timer::now() - start);
let tv = libc::timeval {
tv_sec: (timeout / 1000) as libc::time_t,
tv_usec: ((timeout % 1000) * 1000) as libc::suseconds_t,
};
c::select(fd + 1, ptr::null(), set as *mut _ as *_,
ptr::null(), &tv)
})
}
#[cfg(windows)]
fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
let tv = libc::timeval {
tv_sec: (timeout / 1000) as libc::time_t,
tv_usec: ((timeout % 1000) * 1000) as libc::suseconds_t,
};
unsafe { c::select(1, ptr::mut_null(), set, ptr::mut_null(), &tv) }
}
}
pub fn fd(&self) -> sock_t {

View File

@ -454,7 +454,7 @@ fn spawn_process_os(config: p::ProcessConfig,
err_fd: c_int) -> IoResult<SpawnProcessResult> {
use libc::funcs::posix88::unistd::{fork, dup2, close, chdir, execvp};
use libc::funcs::bsd44::getdtablesize;
use libc::c_ulong;
use io::c;
mod rustrt {
extern {
@ -475,16 +475,7 @@ fn spawn_process_os(config: p::ProcessConfig,
}
unsafe fn set_cloexec(fd: c_int) {
extern { fn ioctl(fd: c_int, req: c_ulong) -> c_int; }
#[cfg(target_os = "macos")]
#[cfg(target_os = "freebsd")]
static FIOCLEX: c_ulong = 0x20006601;
#[cfg(target_os = "linux")]
#[cfg(target_os = "android")]
static FIOCLEX: c_ulong = 0x5451;
let ret = ioctl(fd, FIOCLEX);
let ret = c::ioctl(fd, c::FIOCLEX);
assert_eq!(ret, 0);
}

View File

@ -53,8 +53,9 @@ use std::ptr;
use std::rt::rtio;
use std::sync::atomics;
use io::file::FileDesc;
use io::IoResult;
use io::c;
use io::file::FileDesc;
use io::timer_helper;
pub struct Timer {
@ -84,16 +85,16 @@ pub enum Req {
}
// returns the current time (in milliseconds)
fn now() -> u64 {
pub fn now() -> u64 {
unsafe {
let mut now: libc::timeval = mem::init();
assert_eq!(imp::gettimeofday(&mut now, ptr::null()), 0);
assert_eq!(c::gettimeofday(&mut now, ptr::null()), 0);
return (now.tv_sec as u64) * 1000 + (now.tv_usec as u64) / 1000;
}
}
fn helper(input: libc::c_int, messages: Receiver<Req>) {
let mut set: imp::fd_set = unsafe { mem::init() };
let mut set: c::fd_set = unsafe { mem::init() };
let mut fd = FileDesc::new(input, true);
let mut timeout: libc::timeval = unsafe { mem::init() };
@ -150,9 +151,9 @@ fn helper(input: libc::c_int, messages: Receiver<Req>) {
&timeout as *libc::timeval
};
imp::fd_set(&mut set, input);
c::fd_set(&mut set, input);
match unsafe {
imp::select(input + 1, &set, ptr::null(), ptr::null(), timeout)
c::select(input + 1, &set, ptr::null(), ptr::null(), timeout)
} {
// timed out
0 => signal(&mut active, &mut dead),
@ -283,59 +284,3 @@ impl Drop for Timer {
self.inner = Some(self.inner());
}
}
#[cfg(target_os = "macos")]
mod imp {
use libc;
pub static FD_SETSIZE: uint = 1024;
pub struct fd_set {
fds_bits: [i32, ..(FD_SETSIZE / 32)]
}
pub fn fd_set(set: &mut fd_set, fd: i32) {
set.fds_bits[(fd / 32) as uint] |= 1 << (fd % 32);
}
extern {
pub fn select(nfds: libc::c_int,
readfds: *fd_set,
writefds: *fd_set,
errorfds: *fd_set,
timeout: *libc::timeval) -> libc::c_int;
pub fn gettimeofday(timeval: *mut libc::timeval,
tzp: *libc::c_void) -> libc::c_int;
}
}
#[cfg(target_os = "android")]
#[cfg(target_os = "freebsd")]
#[cfg(target_os = "linux")]
mod imp {
use libc;
use std::uint;
pub static FD_SETSIZE: uint = 1024;
pub struct fd_set {
fds_bits: [uint, ..(FD_SETSIZE / uint::BITS)]
}
pub fn fd_set(set: &mut fd_set, fd: i32) {
let fd = fd as uint;
set.fds_bits[fd / uint::BITS] |= 1 << (fd % uint::BITS);
}
extern {
pub fn select(nfds: libc::c_int,
readfds: *fd_set,
writefds: *fd_set,
errorfds: *fd_set,
timeout: *libc::timeval) -> libc::c_int;
pub fn gettimeofday(timeval: *mut libc::timeval,
tzp: *libc::c_void) -> libc::c_int;
}
}

View File

@ -412,6 +412,7 @@ pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
uvll::EPIPE => io::BrokenPipe,
uvll::ECONNABORTED => io::ConnectionAborted,
uvll::EADDRNOTAVAIL => io::ConnectionRefused,
uvll::ECANCELED => io::TimedOut,
err => {
uvdebug!("uverr.code {}", err as int);
// FIXME: Need to map remaining uv error types

View File

@ -25,6 +25,7 @@ use stream::StreamWatcher;
use super::{Loop, Request, UvError, Buf, status_to_io_result,
uv_error_to_io_error, UvHandle, slice_to_uv_buf,
wait_until_woken_after, wakeup};
use timer::TimerWatcher;
use uvio::UvIoFactory;
use uvll;
@ -198,10 +199,14 @@ impl TcpWatcher {
}
}
pub fn connect(io: &mut UvIoFactory, address: ip::SocketAddr)
-> Result<TcpWatcher, UvError>
{
struct Ctx { status: c_int, task: Option<BlockedTask> }
pub fn connect(io: &mut UvIoFactory,
address: ip::SocketAddr,
timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
struct Ctx {
status: c_int,
task: Option<BlockedTask>,
timer: Option<~TimerWatcher>,
}
let tcp = TcpWatcher::new(io);
let (addr, _len) = addr_to_sockaddr(address);
@ -215,24 +220,72 @@ impl TcpWatcher {
return match result {
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { status: 0, task: None };
let mut cx = Ctx { status: -1, task: None, timer: None };
match timeout {
Some(t) => {
let mut timer = TimerWatcher::new(io);
timer.start(timer_cb, t, 0);
cx.timer = Some(timer);
}
None => {}
}
wait_until_woken_after(&mut cx.task, &io.loop_, || {
req.set_data(&cx);
let data = &cx as *_;
match cx.timer {
Some(ref mut timer) => unsafe { timer.set_data(data) },
None => {}
}
req.set_data(data);
});
// Make sure an erroneously fired callback doesn't have access
// to the context any more.
req.set_data(0 as *int);
// If we failed because of a timeout, drop the TcpWatcher as
// soon as possible because it's data is now set to null and we
// want to cancel the callback ASAP.
match cx.status {
0 => Ok(tcp),
n => Err(UvError(n)),
n => { drop(tcp); Err(UvError(n)) }
}
}
n => Err(UvError(n))
};
extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) {
// Don't close the corresponding tcp request, just wake up the task
// and let RAII take care of the pending watcher.
assert_eq!(status, 0);
let cx: &mut Ctx = unsafe {
&mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx)
};
cx.status = uvll::ECANCELED;
wakeup(&mut cx.task);
}
extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
// This callback can be invoked with ECANCELED if the watcher is
// closed by the timeout callback. In that case we just want to free
// the request and be along our merry way.
let req = Request::wrap(req);
assert!(status != uvll::ECANCELED);
if status == uvll::ECANCELED { return }
let cx: &mut Ctx = unsafe { req.get_data() };
cx.status = status;
wakeup(&mut cx.task);
match cx.timer {
Some(ref mut t) => t.stop(),
None => {}
}
// Note that the timer callback doesn't cancel the connect request
// (that's the job of uv_close()), so it's possible for this
// callback to get triggered after the timeout callback fires, but
// before the task wakes up. In that case, we did indeed
// successfully connect, but we don't need to wake someone up. We
// updated the status above (correctly so), and the task will pick
// up on this when it wakes up.
if cx.task.is_some() {
wakeup(&mut cx.task);
}
}
}
}
@ -741,7 +794,7 @@ mod test {
#[test]
fn connect_close_ip4() {
match TcpWatcher::connect(local_loop(), next_test_ip4()) {
match TcpWatcher::connect(local_loop(), next_test_ip4(), None) {
Ok(..) => fail!(),
Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
}
@ -749,7 +802,7 @@ mod test {
#[test]
fn connect_close_ip6() {
match TcpWatcher::connect(local_loop(), next_test_ip6()) {
match TcpWatcher::connect(local_loop(), next_test_ip6(), None) {
Ok(..) => fail!(),
Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
}
@ -799,7 +852,7 @@ mod test {
});
rx.recv();
let mut w = match TcpWatcher::connect(local_loop(), addr) {
let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
Ok(w) => w, Err(e) => fail!("{:?}", e)
};
match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
@ -835,7 +888,7 @@ mod test {
});
rx.recv();
let mut w = match TcpWatcher::connect(local_loop(), addr) {
let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
Ok(w) => w, Err(e) => fail!("{:?}", e)
};
match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
@ -928,7 +981,7 @@ mod test {
});
rx.recv();
let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
let mut buf = [0, .. 2048];
let mut total_bytes_read = 0;
while total_bytes_read < MAX {
@ -1036,7 +1089,7 @@ mod test {
spawn(proc() {
let rx = rx.recv();
let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
rx.recv();
@ -1088,9 +1141,9 @@ mod test {
}
});
let mut stream = TcpWatcher::connect(local_loop(), addr);
let mut stream = TcpWatcher::connect(local_loop(), addr, None);
while stream.is_err() {
stream = TcpWatcher::connect(local_loop(), addr);
stream = TcpWatcher::connect(local_loop(), addr, None);
}
stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
}
@ -1115,7 +1168,7 @@ mod test {
drop(w.accept().unwrap());
});
rx.recv();
let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
let _w = TcpWatcher::connect(local_loop(), addr, None).unwrap();
fail!();
}

View File

@ -48,15 +48,19 @@ impl TimerWatcher {
return me.install();
}
fn start(&mut self, msecs: u64, period: u64) {
pub fn start(&mut self, f: uvll::uv_timer_cb, msecs: u64, period: u64) {
assert_eq!(unsafe {
uvll::uv_timer_start(self.handle, timer_cb, msecs, period)
uvll::uv_timer_start(self.handle, f, msecs, period)
}, 0)
}
fn stop(&mut self) {
pub fn stop(&mut self) {
assert_eq!(unsafe { uvll::uv_timer_stop(self.handle) }, 0)
}
pub unsafe fn set_data<T>(&mut self, data: *T) {
uvll::set_data_for_uv_handle(self.handle, data);
}
}
impl HomingIO for TimerWatcher {
@ -92,7 +96,7 @@ impl RtioTimer for TimerWatcher {
self.action = Some(WakeTask);
wait_until_woken_after(&mut self.blocker, &self.uv_loop(), || {
self.start(msecs, 0);
self.start(timer_cb, msecs, 0);
});
self.stop();
}
@ -106,7 +110,7 @@ impl RtioTimer for TimerWatcher {
let _m = self.fire_homing_missile();
self.id += 1;
self.stop();
self.start(msecs, 0);
self.start(timer_cb, msecs, 0);
mem::replace(&mut self.action, Some(SendOnce(tx)))
};
@ -122,7 +126,7 @@ impl RtioTimer for TimerWatcher {
let _m = self.fire_homing_missile();
self.id += 1;
self.stop();
self.start(msecs, msecs);
self.start(timer_cb, msecs, msecs);
mem::replace(&mut self.action, Some(SendMany(tx, self.id)))
};

View File

@ -143,10 +143,10 @@ impl IoFactory for UvIoFactory {
// Connect to an address and return a new stream
// NB: This blocks the task waiting on the connection.
// It would probably be better to return a future
fn tcp_connect(&mut self, addr: SocketAddr)
fn tcp_connect(&mut self, addr: SocketAddr, timeout: Option<u64>)
-> Result<~rtio::RtioTcpStream:Send, IoError>
{
match TcpWatcher::connect(self, addr) {
match TcpWatcher::connect(self, addr, timeout) {
Ok(t) => Ok(~t as ~rtio::RtioTcpStream:Send),
Err(e) => Err(uv_error_to_io_error(e)),
}

View File

@ -430,6 +430,8 @@ pub enum IoErrorKind {
IoUnavailable,
/// A parameter was incorrect in a way that caused an I/O error not part of this list.
InvalidInput,
/// The I/O operation's timeout expired, causing it to be canceled.
TimedOut,
}
/// A trait for objects which are byte-oriented streams. Readers are defined by

View File

@ -22,6 +22,7 @@ use io::IoResult;
use io::net::ip::SocketAddr;
use io::{Reader, Writer, Listener, Acceptor};
use kinds::Send;
use option::{None, Some};
use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener};
use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
@ -57,7 +58,21 @@ impl TcpStream {
/// If no error is encountered, then `Ok(stream)` is returned.
pub fn connect(addr: SocketAddr) -> IoResult<TcpStream> {
LocalIo::maybe_raise(|io| {
io.tcp_connect(addr).map(TcpStream::new)
io.tcp_connect(addr, None).map(TcpStream::new)
})
}
/// Creates a TCP connection to a remote socket address, timing out after
/// the specified number of milliseconds.
///
/// This is the same as the `connect` method, except that if the timeout
/// specified (in milliseconds) elapses before a connection is made an error
/// will be returned. The error's kind will be `TimedOut`.
#[experimental = "the timeout argument may eventually change types"]
pub fn connect_timeout(addr: SocketAddr,
timeout_ms: u64) -> IoResult<TcpStream> {
LocalIo::maybe_raise(|io| {
io.tcp_connect(addr, Some(timeout_ms)).map(TcpStream::new)
})
}

View File

@ -146,7 +146,8 @@ impl<'a> LocalIo<'a> {
pub trait IoFactory {
// networking
fn tcp_connect(&mut self, addr: SocketAddr) -> IoResult<~RtioTcpStream:Send>;
fn tcp_connect(&mut self, addr: SocketAddr,
timeout: Option<u64>) -> IoResult<~RtioTcpStream:Send>;
fn tcp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioTcpListener:Send>;
fn udp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioUdpSocket:Send>;
fn unix_bind(&mut self, path: &CString)

View File

@ -0,0 +1,92 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// ignore-pretty
// compile-flags:--test
// exec-env:RUST_TEST_TASKS=1
// Tests for the connect_timeout() function on a TcpStream. This runs with only
// one test task to ensure that errors are timeouts, not file descriptor
// exhaustion.
#![feature(macro_rules, globs)]
#![allow(experimental)]
extern crate native;
extern crate green;
extern crate rustuv;
#[cfg(test)] #[start]
fn start(argc: int, argv: **u8) -> int {
green::start(argc, argv, rustuv::event_loop, __test::main)
}
macro_rules! iotest (
{ fn $name:ident() $b:block $($a:attr)* } => (
mod $name {
#![allow(unused_imports)]
use std::io::*;
use std::io::net::tcp::*;
use std::io::test::*;
use std::io;
fn f() $b
$($a)* #[test] fn green() { f() }
$($a)* #[test] fn native() {
use native;
let (tx, rx) = channel();
native::task::spawn(proc() { tx.send(f()) });
rx.recv();
}
}
)
)
iotest!(fn eventual_timeout() {
use native;
let addr = next_test_ip4();
// Use a native task to receive connections because it turns out libuv is
// really good at accepting connections and will likely run out of file
// descriptors before timing out.
let (tx1, rx1) = channel();
let (_tx2, rx2) = channel::<()>();
native::task::spawn(proc() {
let _l = TcpListener::bind(addr).unwrap().listen();
tx1.send(());
let _ = rx2.recv_opt();
});
rx1.recv();
let mut v = Vec::new();
for _ in range(0, 10000) {
match TcpStream::connect_timeout(addr, 100) {
Ok(e) => v.push(e),
Err(ref e) if e.kind == io::TimedOut => return,
Err(e) => fail!("other error: {}", e),
}
}
fail!("never timed out!");
})
iotest!(fn timeout_success() {
let addr = next_test_ip4();
let _l = TcpListener::bind(addr).unwrap().listen();
assert!(TcpStream::connect_timeout(addr, 1000).is_ok());
})
iotest!(fn timeout_error() {
let addr = next_test_ip4();
assert!(TcpStream::connect_timeout(addr, 1000).is_err());
})