auto merge of #11894 : alexcrichton/rust/io-clone, r=brson

This is part of the overall strategy I would like to take when approaching
issue #11165. The only two I/O objects that reasonably want to be "split" are
the network stream objects. Everything else can be "split" by just creating
another version.

The initial idea I had was the literally split the object into a reader and a
writer half, but that would just introduce lots of clutter with extra interfaces
that were a little unnnecssary, or it would return a ~Reader and a ~Writer which
means you couldn't access things like the remote peer name or local socket name.

The solution I found to be nicer was to just clone the stream itself. The clone
is just a clone of the handle, nothing fancy going on at the kernel level.
Conceptually I found this very easy to wrap my head around (everything else
supports clone()), and it solved the "split" problem at the same time.

The cloning support is pretty specific per platform/lib combination:

* native/win32 - uses some specific WSA apis to clone the SOCKET handle
* native/unix - uses dup() to get another file descriptor
* green/all - This is where things get interesting. When we support full clones
              of a handle, this implies that we're allowing simultaneous writes
              and reads to happen. It turns out that libuv doesn't support two
              simultaneous reads or writes of the same object. It does support
              *one* read and *one* write at the same time, however. Some extra
              infrastructure was added to just block concurrent writers/readers
              until the previous read/write operation was completed.

I've added tests to the tcp/unix modules to make sure that this functionality is
supported everywhere.
This commit is contained in:
bors 2014-02-05 12:56:34 -08:00
commit 6aad3bf944
18 changed files with 812 additions and 82 deletions

View File

@ -10,6 +10,7 @@
//! Blocking posix-based file I/O
use std::sync::arc::UnsafeArc;
use std::c_str::CString;
use std::io::IoError;
use std::io;
@ -55,9 +56,13 @@ pub fn keep_going(data: &[u8], f: |*u8, uint| -> i64) -> i64 {
pub type fd_t = libc::c_int;
struct Inner {
fd: fd_t,
close_on_drop: bool,
}
pub struct FileDesc {
priv fd: fd_t,
priv close_on_drop: bool,
priv inner: UnsafeArc<Inner>
}
impl FileDesc {
@ -70,7 +75,10 @@ impl FileDesc {
/// Note that all I/O operations done on this object will be *blocking*, but
/// they do not require the runtime to be active.
pub fn new(fd: fd_t, close_on_drop: bool) -> FileDesc {
FileDesc { fd: fd, close_on_drop: close_on_drop }
FileDesc { inner: UnsafeArc::new(Inner {
fd: fd,
close_on_drop: close_on_drop
}) }
}
// FIXME(#10465) these functions should not be public, but anything in
@ -80,7 +88,7 @@ impl FileDesc {
#[cfg(windows)] type rlen = libc::c_uint;
#[cfg(not(windows))] type rlen = libc::size_t;
let ret = retry(|| unsafe {
libc::read(self.fd,
libc::read(self.fd(),
buf.as_ptr() as *mut libc::c_void,
buf.len() as rlen) as libc::c_int
});
@ -97,7 +105,7 @@ impl FileDesc {
#[cfg(not(windows))] type wlen = libc::size_t;
let ret = keep_going(buf, |buf, len| {
unsafe {
libc::write(self.fd, buf as *libc::c_void, len as wlen) as i64
libc::write(self.fd(), buf as *libc::c_void, len as wlen) as i64
}
});
if ret < 0 {
@ -107,7 +115,11 @@ impl FileDesc {
}
}
pub fn fd(&self) -> fd_t { self.fd }
pub fn fd(&self) -> fd_t {
// This unsafety is fine because we're just reading off the file
// descriptor, no one is modifying this.
unsafe { (*self.inner.get()).fd }
}
}
impl io::Reader for FileDesc {
@ -130,7 +142,7 @@ impl rtio::RtioFileStream for FileDesc {
self.inner_write(buf)
}
fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result<int, IoError> {
return os_pread(self.fd, buf.as_ptr(), buf.len(), offset);
return os_pread(self.fd(), buf.as_ptr(), buf.len(), offset);
#[cfg(windows)]
fn os_pread(fd: c_int, buf: *u8, amt: uint, offset: u64) -> IoResult<int> {
@ -162,7 +174,7 @@ impl rtio::RtioFileStream for FileDesc {
}
}
fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError> {
return os_pwrite(self.fd, buf.as_ptr(), buf.len(), offset);
return os_pwrite(self.fd(), buf.as_ptr(), buf.len(), offset);
#[cfg(windows)]
fn os_pwrite(fd: c_int, buf: *u8, amt: uint, offset: u64) -> IoResult<()> {
@ -197,7 +209,7 @@ impl rtio::RtioFileStream for FileDesc {
io::SeekCur => libc::FILE_CURRENT,
};
unsafe {
let handle = libc::get_osfhandle(self.fd) as libc::HANDLE;
let handle = libc::get_osfhandle(self.fd()) as libc::HANDLE;
let mut newpos = 0;
match libc::SetFilePointerEx(handle, pos, &mut newpos, whence) {
0 => Err(super::last_error()),
@ -212,7 +224,7 @@ impl rtio::RtioFileStream for FileDesc {
io::SeekEnd => libc::SEEK_END,
io::SeekCur => libc::SEEK_CUR,
};
let n = unsafe { libc::lseek(self.fd, pos as libc::off_t, whence) };
let n = unsafe { libc::lseek(self.fd(), pos as libc::off_t, whence) };
if n < 0 {
Err(super::last_error())
} else {
@ -220,7 +232,7 @@ impl rtio::RtioFileStream for FileDesc {
}
}
fn tell(&self) -> Result<u64, IoError> {
let n = unsafe { libc::lseek(self.fd, 0, libc::SEEK_CUR) };
let n = unsafe { libc::lseek(self.fd(), 0, libc::SEEK_CUR) };
if n < 0 {
Err(super::last_error())
} else {
@ -228,7 +240,7 @@ impl rtio::RtioFileStream for FileDesc {
}
}
fn fsync(&mut self) -> Result<(), IoError> {
return os_fsync(self.fd);
return os_fsync(self.fd());
#[cfg(windows)]
fn os_fsync(fd: c_int) -> IoResult<()> {
@ -247,7 +259,7 @@ impl rtio::RtioFileStream for FileDesc {
#[cfg(not(windows))]
fn datasync(&mut self) -> Result<(), IoError> {
return super::mkerr_libc(os_datasync(self.fd));
return super::mkerr_libc(os_datasync(self.fd()));
#[cfg(target_os = "macos")]
fn os_datasync(fd: c_int) -> c_int {
@ -270,7 +282,7 @@ impl rtio::RtioFileStream for FileDesc {
Ok(_) => {}, Err(e) => return Err(e),
};
let ret = unsafe {
let handle = libc::get_osfhandle(self.fd) as libc::HANDLE;
let handle = libc::get_osfhandle(self.fd()) as libc::HANDLE;
match libc::SetEndOfFile(handle) {
0 => Err(super::last_error()),
_ => Ok(())
@ -282,7 +294,7 @@ impl rtio::RtioFileStream for FileDesc {
#[cfg(unix)]
fn truncate(&mut self, offset: i64) -> Result<(), IoError> {
super::mkerr_libc(retry(|| unsafe {
libc::ftruncate(self.fd, offset as libc::off_t)
libc::ftruncate(self.fd(), offset as libc::off_t)
}))
}
}
@ -294,6 +306,9 @@ impl rtio::RtioPipe for FileDesc {
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
self.inner_write(buf)
}
fn clone(&self) -> ~rtio::RtioPipe {
~FileDesc { inner: self.inner.clone() } as ~rtio::RtioPipe
}
}
impl rtio::RtioTTY for FileDesc {
@ -312,7 +327,7 @@ impl rtio::RtioTTY for FileDesc {
fn isatty(&self) -> bool { false }
}
impl Drop for FileDesc {
impl Drop for Inner {
fn drop(&mut self) {
// closing stdio file handles makes no sense, so never do it. Also, note
// that errors are ignored when closing a file descriptor. The reason

View File

@ -14,6 +14,7 @@ use std::io;
use std::libc;
use std::mem;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;
use std::unstable::intrinsics;
use super::{IoResult, retry};
@ -108,10 +109,27 @@ fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
let ret = libc::setsockopt(fd, opt, val,
payload,
mem::size_of::<T>() as libc::socklen_t);
super::mkerr_libc(ret)
if ret != 0 {
Err(last_error())
} else {
Ok(())
}
}
}
#[cfg(windows)]
fn last_error() -> io::IoError {
extern "system" {
fn WSAGetLastError() -> libc::c_int;
}
super::translate_error(unsafe { WSAGetLastError() }, true)
}
#[cfg(not(windows))]
fn last_error() -> io::IoError {
super::last_error()
}
#[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
#[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
@ -128,7 +146,7 @@ fn sockname(fd: sock_t,
storage as *mut libc::sockaddr,
&mut len as *mut libc::socklen_t);
if ret != 0 {
return Err(super::last_error())
return Err(last_error())
}
}
return sockaddr_to_addr(&storage, len as uint);
@ -222,7 +240,11 @@ pub fn init() {
////////////////////////////////////////////////////////////////////////////////
pub struct TcpStream {
priv fd: sock_t,
priv inner: UnsafeArc<Inner>,
}
struct Inner {
fd: sock_t,
}
impl TcpStream {
@ -231,27 +253,31 @@ impl TcpStream {
socket(addr, libc::SOCK_STREAM).and_then(|fd| {
let (addr, len) = addr_to_sockaddr(addr);
let addrp = &addr as *libc::sockaddr_storage;
let ret = TcpStream { fd: fd };
let inner = Inner { fd: fd };
let ret = TcpStream { inner: UnsafeArc::new(inner) };
match retry(|| {
libc::connect(fd, addrp as *libc::sockaddr,
len as libc::socklen_t)
}) {
-1 => Err(super::last_error()),
-1 => Err(last_error()),
_ => Ok(ret),
}
})
}
}
pub fn fd(&self) -> sock_t { self.fd }
pub fn fd(&self) -> sock_t {
// This unsafety is fine because it's just a read-only arc
unsafe { (*self.inner.get()).fd }
}
fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_NODELAY,
setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY,
nodelay as libc::c_int)
}
fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> {
let ret = setsockopt(self.fd, libc::SOL_SOCKET, libc::SO_KEEPALIVE,
let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE,
seconds.is_some() as libc::c_int);
match seconds {
Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)),
@ -261,12 +287,12 @@ impl TcpStream {
#[cfg(target_os = "macos")]
fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
seconds as libc::c_int)
}
#[cfg(target_os = "freebsd")]
fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
seconds as libc::c_int)
}
#[cfg(not(target_os = "macos"), not(target_os = "freebsd"))]
@ -282,7 +308,7 @@ impl rtio::RtioTcpStream for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
let ret = retry(|| {
unsafe {
libc::recv(self.fd,
libc::recv(self.fd(),
buf.as_ptr() as *mut libc::c_void,
buf.len() as wrlen,
0) as libc::c_int
@ -291,7 +317,7 @@ impl rtio::RtioTcpStream for TcpStream {
if ret == 0 {
Err(io::standard_error(io::EndOfFile))
} else if ret < 0 {
Err(super::last_error())
Err(last_error())
} else {
Ok(ret as uint)
}
@ -299,20 +325,20 @@ impl rtio::RtioTcpStream for TcpStream {
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
let ret = keep_going(buf, |buf, len| {
unsafe {
libc::send(self.fd,
libc::send(self.fd(),
buf as *mut libc::c_void,
len as wrlen,
0) as i64
}
});
if ret < 0 {
Err(super::last_error())
Err(last_error())
} else {
Ok(())
}
}
fn peer_name(&mut self) -> IoResult<ip::SocketAddr> {
sockname(self.fd, libc::getpeername)
sockname(self.fd(), libc::getpeername)
}
fn control_congestion(&mut self) -> IoResult<()> {
self.set_nodelay(false)
@ -326,15 +352,19 @@ impl rtio::RtioTcpStream for TcpStream {
fn letdie(&mut self) -> IoResult<()> {
self.set_keepalive(None)
}
fn clone(&self) -> ~rtio::RtioTcpStream {
~TcpStream { inner: self.inner.clone() } as ~rtio::RtioTcpStream
}
}
impl rtio::RtioSocket for TcpStream {
fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
sockname(self.fd, libc::getsockname)
sockname(self.fd(), libc::getsockname)
}
}
impl Drop for TcpStream {
impl Drop for Inner {
fn drop(&mut self) { unsafe { close(self.fd); } }
}
@ -343,7 +373,7 @@ impl Drop for TcpStream {
////////////////////////////////////////////////////////////////////////////////
pub struct TcpListener {
priv fd: sock_t,
priv inner: UnsafeArc<Inner>,
}
impl TcpListener {
@ -352,7 +382,8 @@ impl TcpListener {
socket(addr, libc::SOCK_STREAM).and_then(|fd| {
let (addr, len) = addr_to_sockaddr(addr);
let addrp = &addr as *libc::sockaddr_storage;
let ret = TcpListener { fd: fd };
let inner = Inner { fd: fd };
let ret = TcpListener { inner: UnsafeArc::new(inner) };
// On platforms with Berkeley-derived sockets, this allows
// to quickly rebind a socket, without needing to wait for
// the OS to clean up the previous one.
@ -366,18 +397,21 @@ impl TcpListener {
}
match libc::bind(fd, addrp as *libc::sockaddr,
len as libc::socklen_t) {
-1 => Err(super::last_error()),
-1 => Err(last_error()),
_ => Ok(ret),
}
})
}
}
pub fn fd(&self) -> sock_t { self.fd }
pub fn fd(&self) -> sock_t {
// This is just a read-only arc so the unsafety is fine
unsafe { (*self.inner.get()).fd }
}
pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
match unsafe { libc::listen(self.fd, backlog as libc::c_int) } {
-1 => Err(super::last_error()),
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
-1 => Err(last_error()),
_ => Ok(TcpAcceptor { listener: self })
}
}
@ -391,20 +425,16 @@ impl rtio::RtioTcpListener for TcpListener {
impl rtio::RtioSocket for TcpListener {
fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
sockname(self.fd, libc::getsockname)
sockname(self.fd(), libc::getsockname)
}
}
impl Drop for TcpListener {
fn drop(&mut self) { unsafe { close(self.fd); } }
}
pub struct TcpAcceptor {
priv listener: TcpListener,
}
impl TcpAcceptor {
pub fn fd(&self) -> sock_t { self.listener.fd }
pub fn fd(&self) -> sock_t { self.listener.fd() }
pub fn native_accept(&mut self) -> IoResult<TcpStream> {
unsafe {
@ -417,8 +447,8 @@ impl TcpAcceptor {
storagep as *mut libc::sockaddr,
&mut size as *mut libc::socklen_t) as libc::c_int
}) as sock_t {
-1 => Err(super::last_error()),
fd => Ok(TcpStream { fd: fd })
-1 => Err(last_error()),
fd => Ok(TcpStream { inner: UnsafeArc::new(Inner { fd: fd })})
}
}
}
@ -444,7 +474,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
////////////////////////////////////////////////////////////////////////////////
pub struct UdpSocket {
priv fd: sock_t,
priv inner: UnsafeArc<Inner>,
}
impl UdpSocket {
@ -453,25 +483,29 @@ impl UdpSocket {
socket(addr, libc::SOCK_DGRAM).and_then(|fd| {
let (addr, len) = addr_to_sockaddr(addr);
let addrp = &addr as *libc::sockaddr_storage;
let ret = UdpSocket { fd: fd };
let inner = Inner { fd: fd };
let ret = UdpSocket { inner: UnsafeArc::new(inner) };
match libc::bind(fd, addrp as *libc::sockaddr,
len as libc::socklen_t) {
-1 => Err(super::last_error()),
-1 => Err(last_error()),
_ => Ok(ret),
}
})
}
}
pub fn fd(&self) -> sock_t { self.fd }
pub fn fd(&self) -> sock_t {
// unsafety is fine because it's just a read-only arc
unsafe { (*self.inner.get()).fd }
}
pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> {
setsockopt(self.fd, libc::SOL_SOCKET, libc::SO_BROADCAST,
setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST,
on as libc::c_int)
}
pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
on as libc::c_int)
}
@ -484,14 +518,14 @@ impl UdpSocket {
// interface == INADDR_ANY
imr_interface: libc::in_addr { s_addr: 0x0 },
};
setsockopt(self.fd, libc::IPPROTO_IP, opt, mreq)
setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq)
}
In6Addr(addr) => {
let mreq = libc::ip6_mreq {
ipv6mr_multiaddr: addr,
ipv6mr_interface: 0,
};
setsockopt(self.fd, libc::IPPROTO_IPV6, opt, mreq)
setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq)
}
}
}
@ -514,14 +548,14 @@ impl rtio::RtioUdpSocket for UdpSocket {
let mut addrlen: libc::socklen_t =
mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
let ret = retry(|| {
libc::recvfrom(self.fd,
libc::recvfrom(self.fd(),
buf.as_ptr() as *mut libc::c_void,
buf.len() as msglen_t,
0,
storagep as *mut libc::sockaddr,
&mut addrlen) as libc::c_int
});
if ret < 0 { return Err(super::last_error()) }
if ret < 0 { return Err(last_error()) }
sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
Ok((ret as uint, addr))
})
@ -532,7 +566,7 @@ impl rtio::RtioUdpSocket for UdpSocket {
let dstp = &dst as *libc::sockaddr_storage;
unsafe {
let ret = retry(|| {
libc::sendto(self.fd,
libc::sendto(self.fd(),
buf.as_ptr() as *libc::c_void,
buf.len() as msglen_t,
0,
@ -540,7 +574,7 @@ impl rtio::RtioUdpSocket for UdpSocket {
len as libc::socklen_t) as libc::c_int
});
match ret {
-1 => Err(super::last_error()),
-1 => Err(last_error()),
n if n as uint != buf.len() => {
Err(io::IoError {
kind: io::OtherIoError,
@ -582,11 +616,11 @@ impl rtio::RtioUdpSocket for UdpSocket {
}
fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
ttl as libc::c_int)
}
fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
}
fn hear_broadcasts(&mut self) -> IoResult<()> {
@ -595,8 +629,8 @@ impl rtio::RtioUdpSocket for UdpSocket {
fn ignore_broadcasts(&mut self) -> IoResult<()> {
self.set_broadcast(false)
}
}
impl Drop for UdpSocket {
fn drop(&mut self) { unsafe { close(self.fd) } }
fn clone(&self) -> ~rtio::RtioUdpSocket {
~UdpSocket { inner: self.inner.clone() } as ~rtio::RtioUdpSocket
}
}

109
src/librustuv/access.rs Normal file
View File

@ -0,0 +1,109 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
/// An exclusive access primitive
///
/// This primitive is used to gain exclusive access to read() and write() in uv.
/// It is assumed that all invocations of this struct happen on the same thread
/// (the uv event loop).
use std::cast;
use std::sync::arc::UnsafeArc;
use std::rt::task::{BlockedTask, Task};
use std::rt::local::Local;
use homing::HomingMissile;
pub struct Access {
priv inner: UnsafeArc<Inner>,
}
pub struct Guard<'a> {
priv access: &'a mut Access,
priv missile: Option<HomingMissile>,
}
struct Inner {
queue: ~[BlockedTask],
held: bool,
}
impl Access {
pub fn new() -> Access {
Access {
inner: UnsafeArc::new(Inner {
queue: ~[],
held: false,
})
}
}
pub fn grant<'a>(&'a mut self, missile: HomingMissile) -> Guard<'a> {
// This unsafety is actually OK because the homing missile argument
// guarantees that we're on the same event loop as all the other objects
// attempting to get access granted.
let inner: &mut Inner = unsafe { cast::transmute(self.inner.get()) };
if inner.held {
let t: ~Task = Local::take();
t.deschedule(1, |task| {
inner.queue.push(task);
Ok(())
});
assert!(inner.held);
} else {
inner.held = true;
}
Guard { access: self, missile: Some(missile) }
}
}
impl Clone for Access {
fn clone(&self) -> Access {
Access { inner: self.inner.clone() }
}
}
#[unsafe_destructor]
impl<'a> Drop for Guard<'a> {
fn drop(&mut self) {
// This guard's homing missile is still armed, so we're guaranteed to be
// on the same I/O event loop, so this unsafety should be ok.
assert!(self.missile.is_some());
let inner: &mut Inner = unsafe {
cast::transmute(self.access.inner.get())
};
match inner.queue.shift() {
// Here we have found a task that was waiting for access, and we
// current have the "access lock" we need to relinquish access to
// this sleeping task.
//
// To do so, we first drop out homing missile and we then reawaken
// the task. In reawakening the task, it will be immediately
// scheduled on this scheduler. Because we might be woken up on some
// other scheduler, we drop our homing missile before we reawaken
// the task.
Some(task) => {
drop(self.missile.take());
let _ = task.wake().map(|t| t.reawaken());
}
None => { inner.held = false; }
}
}
}
impl Drop for Inner {
fn drop(&mut self) {
assert!(!self.held);
assert_eq!(self.queue.len(), 0);
}
}

View File

@ -125,8 +125,8 @@ pub trait HomingIO {
/// After a homing operation has been completed, this will return the current
/// task back to its appropriate home (if applicable). The field is used to
/// assert that we are where we think we are.
struct HomingMissile {
io_home: uint,
pub struct HomingMissile {
priv io_home: uint,
}
impl HomingMissile {

View File

@ -68,8 +68,10 @@ pub use self::tty::TtyWatcher;
mod macros;
mod queue;
mod access;
mod homing;
mod queue;
mod rc;
/// The implementation of `rtio` for libuv
pub mod uvio;

View File

@ -19,7 +19,9 @@ use std::rt::rtio;
use std::rt::task::BlockedTask;
use std::unstable::intrinsics;
use access::Access;
use homing::{HomingIO, HomeHandle};
use rc::Refcount;
use stream::StreamWatcher;
use super::{Loop, Request, UvError, Buf, status_to_io_result,
uv_error_to_io_error, UvHandle, slice_to_uv_buf,
@ -152,6 +154,14 @@ pub struct TcpWatcher {
handle: *uvll::uv_tcp_t,
stream: StreamWatcher,
home: HomeHandle,
priv refcount: Refcount,
// libuv can't support concurrent reads and concurrent writes of the same
// stream object, so we use these access guards in order to arbitrate among
// multiple concurrent reads and writes. Note that libuv *can* read and
// write simultaneously, it just can't read and read simultaneously.
priv read_access: Access,
priv write_access: Access,
}
pub struct TcpListener {
@ -183,6 +193,9 @@ impl TcpWatcher {
home: home,
handle: handle,
stream: StreamWatcher::new(handle),
refcount: Refcount::new(),
read_access: Access::new(),
write_access: Access::new(),
}
}
@ -238,12 +251,14 @@ impl rtio::RtioSocket for TcpWatcher {
impl rtio::RtioTcpStream for TcpWatcher {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
let _m = self.fire_homing_missile();
let m = self.fire_homing_missile();
let _g = self.read_access.grant(m);
self.stream.read(buf).map_err(uv_error_to_io_error)
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
let m = self.fire_homing_missile();
let _g = self.write_access.grant(m);
self.stream.write(buf).map_err(uv_error_to_io_error)
}
@ -280,6 +295,17 @@ impl rtio::RtioTcpStream for TcpWatcher {
uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
})
}
fn clone(&self) -> ~rtio::RtioTcpStream {
~TcpWatcher {
handle: self.handle,
stream: StreamWatcher::new(self.handle),
home: self.home.clone(),
refcount: self.refcount.clone(),
write_access: self.write_access.clone(),
read_access: self.read_access.clone(),
} as ~rtio::RtioTcpStream
}
}
impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
@ -289,7 +315,9 @@ impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
impl Drop for TcpWatcher {
fn drop(&mut self) {
let _m = self.fire_homing_missile();
self.close();
if self.refcount.decrement() {
self.close();
}
}
}
@ -415,6 +443,11 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
pub struct UdpWatcher {
handle: *uvll::uv_udp_t,
home: HomeHandle,
// See above for what these fields are
priv refcount: Refcount,
priv read_access: Access,
priv write_access: Access,
}
impl UdpWatcher {
@ -423,6 +456,9 @@ impl UdpWatcher {
let udp = UdpWatcher {
handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
home: io.make_handle(),
refcount: Refcount::new(),
read_access: Access::new(),
write_access: Access::new(),
};
assert_eq!(unsafe {
uvll::uv_udp_init(io.uv_loop(), udp.handle)
@ -463,7 +499,8 @@ impl rtio::RtioUdpSocket for UdpWatcher {
buf: Option<Buf>,
result: Option<(ssize_t, Option<ip::SocketAddr>)>,
}
let _m = self.fire_homing_missile();
let m = self.fire_homing_missile();
let _g = self.read_access.grant(m);
let a = match unsafe {
uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
@ -533,7 +570,8 @@ impl rtio::RtioUdpSocket for UdpWatcher {
fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> {
struct Ctx { task: Option<BlockedTask>, result: c_int }
let _m = self.fire_homing_missile();
let m = self.fire_homing_missile();
let _g = self.write_access.grant(m);
let mut req = Request::new(uvll::UV_UDP_SEND);
let buf = slice_to_uv_buf(buf);
@ -636,13 +674,25 @@ impl rtio::RtioUdpSocket for UdpWatcher {
0 as c_int)
})
}
fn clone(&self) -> ~rtio::RtioUdpSocket {
~UdpWatcher {
handle: self.handle,
home: self.home.clone(),
refcount: self.refcount.clone(),
write_access: self.write_access.clone(),
read_access: self.read_access.clone(),
} as ~rtio::RtioUdpSocket
}
}
impl Drop for UdpWatcher {
fn drop(&mut self) {
// Send ourselves home to close this handle (blocking while doing so).
let _m = self.fire_homing_missile();
self.close();
if self.refcount.decrement() {
self.close();
}
}
}

View File

@ -14,7 +14,9 @@ use std::libc;
use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
use std::rt::task::BlockedTask;
use access::Access;
use homing::{HomingIO, HomeHandle};
use rc::Refcount;
use stream::StreamWatcher;
use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error,
wait_until_woken_after, wakeup};
@ -25,6 +27,11 @@ pub struct PipeWatcher {
stream: StreamWatcher,
home: HomeHandle,
priv defused: bool,
priv refcount: Refcount,
// see comments in TcpWatcher for why these exist
priv write_access: Access,
priv read_access: Access,
}
pub struct PipeListener {
@ -61,6 +68,9 @@ impl PipeWatcher {
stream: StreamWatcher::new(handle),
home: home,
defused: false,
refcount: Refcount::new(),
read_access: Access::new(),
write_access: Access::new(),
}
}
@ -118,14 +128,27 @@ impl PipeWatcher {
impl RtioPipe for PipeWatcher {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
let _m = self.fire_homing_missile();
let m = self.fire_homing_missile();
let _g = self.read_access.grant(m);
self.stream.read(buf).map_err(uv_error_to_io_error)
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
let m = self.fire_homing_missile();
let _g = self.write_access.grant(m);
self.stream.write(buf).map_err(uv_error_to_io_error)
}
fn clone(&self) -> ~RtioPipe {
~PipeWatcher {
stream: StreamWatcher::new(self.stream.handle),
defused: false,
home: self.home.clone(),
refcount: self.refcount.clone(),
read_access: self.read_access.clone(),
write_access: self.write_access.clone(),
} as ~RtioPipe
}
}
impl HomingIO for PipeWatcher {
@ -138,8 +161,8 @@ impl UvHandle<uvll::uv_pipe_t> for PipeWatcher {
impl Drop for PipeWatcher {
fn drop(&mut self) {
if !self.defused {
let _m = self.fire_homing_missile();
let _m = self.fire_homing_missile();
if !self.defused && self.refcount.decrement() {
self.close();
}
}

49
src/librustuv/rc.rs Normal file
View File

@ -0,0 +1,49 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
/// Simple refcount structure for cloning handles
///
/// This is meant to be an unintrusive solution to cloning handles in rustuv.
/// The handles themselves shouldn't be sharing memory because there are bits of
/// state in the rust objects which shouldn't be shared across multiple users of
/// the same underlying uv object, hence Rc is not used and this simple counter
/// should suffice.
use std::sync::arc::UnsafeArc;
pub struct Refcount {
priv rc: UnsafeArc<uint>,
}
impl Refcount {
/// Creates a new refcount of 1
pub fn new() -> Refcount {
Refcount { rc: UnsafeArc::new(1) }
}
fn increment(&self) {
unsafe { *self.rc.get() += 1; }
}
/// Returns whether the refcount just hit 0 or not
pub fn decrement(&self) -> bool {
unsafe {
*self.rc.get() -= 1;
*self.rc.get() == 0
}
}
}
impl Clone for Refcount {
fn clone(&self) -> Refcount {
self.increment();
Refcount { rc: self.rc.clone() }
}
}

View File

@ -8,11 +8,42 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! TCP network connections
//!
//! This module contains the ability to open a TCP stream to a socket address,
//! as well as creating a socket server to accept incoming connections. The
//! destination and binding addresses can either be an IPv4 or IPv6 address.
//!
//! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
//! listener (socket server) implements the `Listener` and `Acceptor` traits.
#[deny(missing_doc)];
use clone::Clone;
use io::net::ip::SocketAddr;
use io::{Reader, Writer, Listener, Acceptor, IoResult};
use io::{Reader, Writer, Listener, Acceptor};
use io::IoResult;
use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener};
use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
/// A structure which represents a TCP stream between a local socket and a
/// remote socket.
///
/// # Example
///
/// ```rust
/// # #[allow(unused_must_use)];
/// use std::io::net::tcp::TcpStream;
/// use std::io::net::ip::{Ipv4Addr, SocketAddr};
///
/// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 };
/// let mut stream = TcpStream::connect(addr);
///
/// stream.write([1]);
/// let mut buf = [0];
/// stream.read(buf);
/// drop(stream); // close the connection
/// ```
pub struct TcpStream {
priv obj: ~RtioTcpStream
}
@ -22,21 +53,40 @@ impl TcpStream {
TcpStream { obj: s }
}
/// Creates a TCP connection to a remote socket address.
///
/// If no error is encountered, then `Ok(stream)` is returned.
pub fn connect(addr: SocketAddr) -> IoResult<TcpStream> {
LocalIo::maybe_raise(|io| {
io.tcp_connect(addr).map(TcpStream::new)
})
}
/// Returns the socket address of the remote peer of this TCP connection.
pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
self.obj.peer_name()
}
/// Returns the socket address of the local half of this TCP connection.
pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
self.obj.socket_name()
}
}
impl Clone for TcpStream {
/// Creates a new handle to this TCP stream, allowing for simultaneous reads
/// and writes of this connection.
///
/// The underlying TCP stream will not be closed until all handles to the
/// stream have been deallocated. All handles will also follow the same
/// stream, but two concurrent reads will not receive the same data.
/// Instead, the first read will receive the first packet received, and the
/// second read will receive the second packet.
fn clone(&self) -> TcpStream {
TcpStream { obj: self.obj.clone() }
}
}
impl Reader for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) }
}
@ -45,17 +95,56 @@ impl Writer for TcpStream {
fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.obj.write(buf) }
}
/// A structure representing a socket server. This listener is used to create a
/// `TcpAcceptor` which can be used to accept sockets on a local port.
///
/// # Example
///
/// ```rust
/// # fn main() {}
/// # fn foo() {
/// # #[allow(unused_must_use, dead_code)];
/// use std::io::net::tcp::TcpListener;
/// use std::io::net::ip::{Ipv4Addr, SocketAddr};
/// use std::io::{Acceptor, Listener};
///
/// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 80 };
/// let listener = TcpListener::bind(addr);
///
/// // bind the listener to the specified address
/// let mut acceptor = listener.listen();
///
/// // accept connections and process them
/// # fn handle_client<T>(_: T) {}
/// for stream in acceptor.incoming() {
/// spawn(proc() {
/// handle_client(stream);
/// });
/// }
///
/// // close the socket server
/// drop(acceptor);
/// # }
/// ```
pub struct TcpListener {
priv obj: ~RtioTcpListener
}
impl TcpListener {
/// Creates a new `TcpListener` which will be bound to the specified local
/// socket address. This listener is not ready for accepting connections,
/// `listen` must be called on it before that's possible.
///
/// Binding with a port number of 0 will request that the OS assigns a port
/// to this listener. The port allocated can be queried via the
/// `socket_name` function.
pub fn bind(addr: SocketAddr) -> IoResult<TcpListener> {
LocalIo::maybe_raise(|io| {
io.tcp_bind(addr).map(|l| TcpListener { obj: l })
})
}
/// Returns the local socket address of this listener.
pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
self.obj.socket_name()
}
@ -67,6 +156,9 @@ impl Listener<TcpStream, TcpAcceptor> for TcpListener {
}
}
/// The accepting half of a TCP socket server. This structure is created through
/// a `TcpListener`'s `listen` method, and this object can be used to accept new
/// `TcpStream` instances.
pub struct TcpAcceptor {
priv obj: ~RtioTcpAcceptor
}
@ -573,4 +665,91 @@ mod test {
}
let _listener = TcpListener::bind(addr);
})
iotest!(fn tcp_clone_smoke() {
let addr = next_test_ip4();
let mut acceptor = TcpListener::bind(addr).listen();
spawn(proc() {
let mut s = TcpStream::connect(addr);
let mut buf = [0, 0];
assert_eq!(s.read(buf), Ok(1));
assert_eq!(buf[0], 1);
s.write([2]).unwrap();
});
let mut s1 = acceptor.accept().unwrap();
let s2 = s1.clone();
let (p1, c1) = Chan::new();
let (p2, c2) = Chan::new();
spawn(proc() {
let mut s2 = s2;
p1.recv();
s2.write([1]).unwrap();
c2.send(());
});
c1.send(());
let mut buf = [0, 0];
assert_eq!(s1.read(buf), Ok(1));
p2.recv();
})
iotest!(fn tcp_clone_two_read() {
let addr = next_test_ip6();
let mut acceptor = TcpListener::bind(addr).listen();
let (p, c) = SharedChan::new();
let c2 = c.clone();
spawn(proc() {
let mut s = TcpStream::connect(addr);
s.write([1]).unwrap();
p.recv();
s.write([2]).unwrap();
p.recv();
});
let mut s1 = acceptor.accept().unwrap();
let s2 = s1.clone();
let (p, done) = Chan::new();
spawn(proc() {
let mut s2 = s2;
let mut buf = [0, 0];
s2.read(buf).unwrap();
c2.send(());
done.send(());
});
let mut buf = [0, 0];
s1.read(buf).unwrap();
c.send(());
p.recv();
})
iotest!(fn tcp_clone_two_write() {
let addr = next_test_ip4();
let mut acceptor = TcpListener::bind(addr).listen();
spawn(proc() {
let mut s = TcpStream::connect(addr);
let mut buf = [0, 1];
s.read(buf).unwrap();
s.read(buf).unwrap();
});
let mut s1 = acceptor.accept().unwrap();
let s2 = s1.clone();
let (p, done) = Chan::new();
spawn(proc() {
let mut s2 = s2;
s2.write([1]).unwrap();
done.send(());
});
s1.write([2]).unwrap();
p.recv();
})
}

View File

@ -8,6 +8,7 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use clone::Clone;
use result::{Ok, Err};
use io::net::ip::SocketAddr;
use io::{Reader, Writer, IoResult};
@ -41,6 +42,19 @@ impl UdpSocket {
}
}
impl Clone for UdpSocket {
/// Creates a new handle to this UDP socket, allowing for simultaneous reads
/// and writes of the socket.
///
/// The underlying UDP socket will not be closed until all handles to the
/// socket have been deallocated. Two concurrent reads will not receive the
/// same data. Instead, the first read will receive the first packet
/// received, and the second read will receive the second packet.
fn clone(&self) -> UdpSocket {
UdpSocket { obj: self.obj.clone() }
}
}
pub struct UdpStream {
priv socket: UdpSocket,
priv connectedTo: SocketAddr
@ -250,4 +264,107 @@ mod test {
iotest!(fn socket_name_ip6() {
socket_name(next_test_ip6());
})
iotest!(fn udp_clone_smoke() {
let addr1 = next_test_ip4();
let addr2 = next_test_ip4();
let mut sock1 = UdpSocket::bind(addr1).unwrap();
let sock2 = UdpSocket::bind(addr2).unwrap();
spawn(proc() {
let mut sock2 = sock2;
let mut buf = [0, 0];
assert_eq!(sock2.recvfrom(buf), Ok((1, addr1)));
assert_eq!(buf[0], 1);
sock2.sendto([2], addr1).unwrap();
});
let sock3 = sock1.clone();
let (p1, c1) = Chan::new();
let (p2, c2) = Chan::new();
spawn(proc() {
let mut sock3 = sock3;
p1.recv();
sock3.sendto([1], addr2).unwrap();
c2.send(());
});
c1.send(());
let mut buf = [0, 0];
assert_eq!(sock1.recvfrom(buf), Ok((1, addr2)));
p2.recv();
})
iotest!(fn udp_clone_two_read() {
let addr1 = next_test_ip4();
let addr2 = next_test_ip4();
let mut sock1 = UdpSocket::bind(addr1).unwrap();
let sock2 = UdpSocket::bind(addr2).unwrap();
let (p, c) = SharedChan::new();
let c2 = c.clone();
spawn(proc() {
let mut sock2 = sock2;
sock2.sendto([1], addr1).unwrap();
p.recv();
sock2.sendto([2], addr1).unwrap();
p.recv();
});
let sock3 = sock1.clone();
let (p, done) = Chan::new();
spawn(proc() {
let mut sock3 = sock3;
let mut buf = [0, 0];
sock3.recvfrom(buf).unwrap();
c2.send(());
done.send(());
});
let mut buf = [0, 0];
sock1.recvfrom(buf).unwrap();
c.send(());
p.recv();
})
iotest!(fn udp_clone_two_write() {
let addr1 = next_test_ip4();
let addr2 = next_test_ip4();
let mut sock1 = UdpSocket::bind(addr1).unwrap();
let sock2 = UdpSocket::bind(addr2).unwrap();
let (p, c) = SharedChan::new();
spawn(proc() {
let mut sock2 = sock2;
let mut buf = [0, 1];
for _ in p.iter() {
match sock2.recvfrom(buf) {
Ok(..) => {}
Err(e) => fail!("failed receive: {}", e),
}
}
});
let sock3 = sock1.clone();
let (p, done) = Chan::new();
let c2 = c.clone();
spawn(proc() {
let mut sock3 = sock3;
match sock3.sendto([1], addr2) {
Ok(..) => c2.send(()),
Err(..) => {}
}
done.send(());
});
match sock1.sendto([2], addr2) {
Ok(..) => c.send(()),
Err(..) => {}
}
p.recv();
})
}

View File

@ -25,6 +25,7 @@ instances as clients.
use prelude::*;
use c_str::ToCStr;
use clone::Clone;
use rt::rtio::{IoFactory, LocalIo, RtioUnixListener};
use rt::rtio::{RtioUnixAcceptor, RtioPipe};
use io::pipe::PipeStream;
@ -62,6 +63,12 @@ impl UnixStream {
}
}
impl Clone for UnixStream {
fn clone(&self) -> UnixStream {
UnixStream { obj: self.obj.clone() }
}
}
impl Reader for UnixStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) }
}
@ -228,4 +235,93 @@ mod tests {
let _acceptor = UnixListener::bind(&path).listen();
assert!(path.exists());
}
#[test]
fn unix_clone_smoke() {
let addr = next_test_unix();
let mut acceptor = UnixListener::bind(&addr).listen();
spawn(proc() {
let mut s = UnixStream::connect(&addr);
let mut buf = [0, 0];
assert_eq!(s.read(buf), Ok(1));
assert_eq!(buf[0], 1);
s.write([2]).unwrap();
});
let mut s1 = acceptor.accept().unwrap();
let s2 = s1.clone();
let (p1, c1) = Chan::new();
let (p2, c2) = Chan::new();
spawn(proc() {
let mut s2 = s2;
p1.recv();
s2.write([1]).unwrap();
c2.send(());
});
c1.send(());
let mut buf = [0, 0];
assert_eq!(s1.read(buf), Ok(1));
p2.recv();
}
#[test]
fn unix_clone_two_read() {
let addr = next_test_unix();
let mut acceptor = UnixListener::bind(&addr).listen();
let (p, c) = SharedChan::new();
let c2 = c.clone();
spawn(proc() {
let mut s = UnixStream::connect(&addr);
s.write([1]).unwrap();
p.recv();
s.write([2]).unwrap();
p.recv();
});
let mut s1 = acceptor.accept().unwrap();
let s2 = s1.clone();
let (p, done) = Chan::new();
spawn(proc() {
let mut s2 = s2;
let mut buf = [0, 0];
s2.read(buf).unwrap();
c2.send(());
done.send(());
});
let mut buf = [0, 0];
s1.read(buf).unwrap();
c.send(());
p.recv();
}
#[test]
fn unix_clone_two_write() {
let addr = next_test_unix();
let mut acceptor = UnixListener::bind(&addr).listen();
spawn(proc() {
let mut s = UnixStream::connect(&addr);
let mut buf = [0, 1];
s.read(buf).unwrap();
s.read(buf).unwrap();
});
let mut s1 = acceptor.accept().unwrap();
let s2 = s1.clone();
let (p, done) = Chan::new();
spawn(proc() {
let mut s2 = s2;
s2.write([1]).unwrap();
done.send(());
});
s1.write([2]).unwrap();
p.recv();
}
}

View File

@ -51,6 +51,12 @@ impl PipeStream {
}
}
impl Clone for PipeStream {
fn clone(&self) -> PipeStream {
PipeStream { obj: self.obj.clone() }
}
}
impl Reader for PipeStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) }
}

View File

@ -960,6 +960,8 @@ pub mod types {
}
pub mod extra {
use ptr;
use libc::consts::os::extra::{MAX_PROTOCOL_CHAIN,
WSAPROTOCOL_LEN};
use libc::types::common::c95::c_void;
use libc::types::os::arch::c95::{c_char, c_int, c_uint, size_t};
use libc::types::os::arch::c95::{c_long, c_ulong};
@ -1106,6 +1108,47 @@ pub mod types {
}
pub type LPFILETIME = *mut FILETIME;
pub struct GUID {
Data1: DWORD,
Data2: DWORD,
Data3: DWORD,
Data4: [BYTE, ..8],
}
struct WSAPROTOCOLCHAIN {
ChainLen: c_int,
ChainEntries: [DWORD, ..MAX_PROTOCOL_CHAIN],
}
pub type LPWSAPROTOCOLCHAIN = *mut WSAPROTOCOLCHAIN;
pub struct WSAPROTOCOL_INFO {
dwServiceFlags1: DWORD,
dwServiceFlags2: DWORD,
dwServiceFlags3: DWORD,
dwServiceFlags4: DWORD,
dwProviderFlags: DWORD,
ProviderId: GUID,
dwCatalogEntryId: DWORD,
ProtocolChain: WSAPROTOCOLCHAIN,
iVersion: c_int,
iAddressFamily: c_int,
iMaxSockAddr: c_int,
iMinSockAddr: c_int,
iSocketType: c_int,
iProtocol: c_int,
iProtocolMaxOffset: c_int,
iNetworkByteOrder: c_int,
iSecurityScheme: c_int,
dwMessageSize: DWORD,
dwProviderReserved: DWORD,
szProtocol: [u8, ..WSAPROTOCOL_LEN+1],
}
pub type LPWSAPROTOCOL_INFO = *mut WSAPROTOCOL_INFO;
pub type GROUP = c_uint;
}
}
}
@ -1721,6 +1764,10 @@ pub mod consts {
pub static FILE_BEGIN: DWORD = 0;
pub static FILE_CURRENT: DWORD = 1;
pub static FILE_END: DWORD = 2;
pub static MAX_PROTOCOL_CHAIN: DWORD = 7;
pub static WSAPROTOCOL_LEN: DWORD = 255;
pub static INVALID_SOCKET: DWORD = !0;
}
pub mod sysconf {
}
@ -4098,6 +4145,8 @@ pub mod funcs {
lpFrequency: *mut LARGE_INTEGER) -> BOOL;
pub fn QueryPerformanceCounter(
lpPerformanceCount: *mut LARGE_INTEGER) -> BOOL;
pub fn GetCurrentProcessId() -> DWORD;
}
}

View File

@ -480,7 +480,6 @@ mod tests {
use iter::range;
use str::StrSlice;
use util;
use kinds::marker;
use vec::ImmutableVector;

View File

@ -203,6 +203,7 @@ pub trait RtioTcpStream : RtioSocket {
fn nodelay(&mut self) -> Result<(), IoError>;
fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError>;
fn letdie(&mut self) -> Result<(), IoError>;
fn clone(&self) -> ~RtioTcpStream;
}
pub trait RtioSocket {
@ -224,6 +225,8 @@ pub trait RtioUdpSocket : RtioSocket {
fn hear_broadcasts(&mut self) -> Result<(), IoError>;
fn ignore_broadcasts(&mut self) -> Result<(), IoError>;
fn clone(&self) -> ~RtioUdpSocket;
}
pub trait RtioTimer {
@ -253,6 +256,7 @@ pub trait RtioProcess {
pub trait RtioPipe {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
fn clone(&self) -> ~RtioPipe;
}
pub trait RtioUnixListener {

View File

@ -380,7 +380,6 @@ mod test {
use super::{Mutex, MUTEX_INIT};
use rt::thread::Thread;
use task;
#[test]
fn somke_lock() {

View File

@ -69,7 +69,6 @@ impl Void {
mod tests {
use super::*;
use prelude::*;
use mem::size_of;
#[test]
fn identity_crisis() {

View File

@ -4253,7 +4253,7 @@ mod tests {
let h = x.mut_last();
assert_eq!(*h.unwrap(), 5);
let mut y: &mut [int] = [];
let y: &mut [int] = [];
assert!(y.mut_last().is_none());
}
}