Migrate uv net bindings away from ~fn()

This commit is contained in:
Alex Crichton 2013-11-05 00:27:41 -08:00
parent 5842b606a7
commit 584b359348
9 changed files with 671 additions and 1120 deletions

View File

@ -17,14 +17,9 @@ use std::rt::local::Local;
use std::rt::sched::Scheduler;
use net;
use super::{Loop, UvError, NativeHandle};
use uvll::UV_GETADDRINFO;
use super::{Loop, UvError, NativeHandle, Request};
use uvll;
struct GetAddrInfoRequest {
handle: *uvll::uv_getaddrinfo_t,
}
struct Addrinfo {
handle: *uvll::addrinfo,
}
@ -35,13 +30,9 @@ struct Ctx {
addrinfo: Option<Addrinfo>,
}
impl GetAddrInfoRequest {
pub fn new() -> GetAddrInfoRequest {
GetAddrInfoRequest {
handle: unsafe { uvll::malloc_req(uvll::UV_GETADDRINFO) },
}
}
pub struct GetAddrInfoRequest;
impl GetAddrInfoRequest {
pub fn run(loop_: &Loop, node: Option<&str>, service: Option<&str>,
hints: Option<ai::Hint>) -> Result<~[ai::Info], UvError> {
assert!(node.is_some() || service.is_some());
@ -85,7 +76,7 @@ impl GetAddrInfoRequest {
}
});
let hint_ptr = hint.as_ref().map_default(null(), |x| x as *uvll::addrinfo);
let req = GetAddrInfoRequest::new();
let req = Request::new(uvll::UV_GETADDRINFO);
return match unsafe {
uvll::uv_getaddrinfo(loop_.native_handle(), req.handle,
@ -94,7 +85,8 @@ impl GetAddrInfoRequest {
} {
0 => {
let mut cx = Ctx { slot: None, status: 0, addrinfo: None };
unsafe { uvll::set_data_for_req(req.handle, &cx) }
req.set_data(&cx);
req.defuse();
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
cx.slot = Some(task);
@ -112,9 +104,9 @@ impl GetAddrInfoRequest {
extern fn getaddrinfo_cb(req: *uvll::uv_getaddrinfo_t,
status: c_int,
res: *uvll::addrinfo) {
let cx: &mut Ctx = unsafe {
cast::transmute(uvll::get_data_for_req(req))
};
let req = Request::wrap(req);
if status == uvll::ECANCELED { return }
let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
cx.status = status;
cx.addrinfo = Some(Addrinfo { handle: res });
@ -124,12 +116,6 @@ impl GetAddrInfoRequest {
}
}
impl Drop for GetAddrInfoRequest {
fn drop(&mut self) {
unsafe { uvll::free_req(self.handle) }
}
}
impl Drop for Addrinfo {
fn drop(&mut self) {
unsafe { uvll::uv_freeaddrinfo(self.handle) }

View File

@ -54,19 +54,18 @@ use std::libc::{c_void, c_int, size_t, malloc, free};
use std::cast::transmute;
use std::ptr::null;
use std::unstable::finally::Finally;
use std::rt::io::net::ip::SocketAddr;
use std::rt::io::IoError;
//#[cfg(test)] use unstable::run_in_bare_thread;
pub use self::file::{FsRequest, FileWatcher};
pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher};
pub use self::net::{TcpWatcher, TcpListener, TcpAcceptor, UdpWatcher};
pub use self::idle::IdleWatcher;
pub use self::timer::TimerWatcher;
pub use self::async::AsyncWatcher;
pub use self::process::Process;
pub use self::pipe::PipeWatcher;
pub use self::pipe::{PipeWatcher, PipeListener, PipeAcceptor};
pub use self::signal::SignalWatcher;
pub use self::tty::TtyWatcher;
@ -97,24 +96,6 @@ pub struct Loop {
priv handle: *uvll::uv_loop_t
}
pub struct Handle(*uvll::uv_handle_t);
impl Watcher for Handle {}
impl NativeHandle<*uvll::uv_handle_t> for Handle {
fn from_native_handle(h: *uvll::uv_handle_t) -> Handle { Handle(h) }
fn native_handle(&self) -> *uvll::uv_handle_t { **self }
}
/// The trait implemented by uv 'watchers' (handles). Watchers are
/// non-owning wrappers around the uv handles and are not completely
/// safe - there may be multiple instances for a single underlying
/// handle. Watchers are generally created, then `start`ed, `stop`ed
/// and `close`ed, but due to their complex life cycle may not be
/// entirely memory safe if used in unanticipated patterns.
pub trait Watcher { }
pub trait Request { }
/// A type that wraps a native handle
pub trait NativeHandle<T> {
fn from_native_handle(T) -> Self;
@ -160,32 +141,47 @@ pub trait UvHandle<T> {
}
}
pub trait UvRequest<T> {
fn uv_request(&self) -> *T;
pub struct Request {
handle: *uvll::uv_req_t,
}
// FIXME(#8888) dummy self
fn alloc(_: Option<Self>, ty: uvll::uv_req_type) -> *T {
impl Request {
pub fn new(ty: uvll::uv_req_type) -> Request {
Request::wrap(unsafe { uvll::malloc_req(ty) })
}
pub fn wrap(handle: *uvll::uv_req_t) -> Request {
Request { handle: handle }
}
pub fn set_data<T>(&self, t: *T) {
unsafe { uvll::set_data_for_req(self.handle, t) }
}
pub fn get_data(&self) -> *c_void {
unsafe { uvll::get_data_for_req(self.handle) }
}
// This function should be used when the request handle has been given to an
// underlying uv function, and the uv function has succeeded. This means
// that uv will at some point invoke the callback, and in the meantime we
// can't deallocate the handle because libuv could be using it.
//
// This is still a problem in blocking situations due to linked failure. In
// the connection callback the handle should be re-wrapped with the `wrap`
// function to ensure its destruction.
pub fn defuse(mut self) {
self.handle = ptr::null();
}
}
impl Drop for Request {
fn drop(&mut self) {
unsafe {
let handle = uvll::malloc_req(ty);
assert!(!handle.is_null());
handle as *T
if self.handle != ptr::null() {
uvll::free_req(self.handle)
}
}
unsafe fn from_uv_request<'a>(h: &'a *T) -> &'a mut Self {
cast::transmute(uvll::get_data_for_req(*h))
}
fn install(~self) -> ~Self {
unsafe {
let myptr = cast::transmute::<&~Self, &*u8>(&self);
uvll::set_data_for_req(self.uv_request(), *myptr);
}
self
}
fn delete(&mut self) {
unsafe { uvll::free_req(self.uv_request() as *c_void) }
}
}
@ -214,110 +210,6 @@ impl NativeHandle<*uvll::uv_loop_t> for Loop {
}
}
// XXX: The uv alloc callback also has a *uv_handle_t arg
pub type AllocCallback = ~fn(uint) -> Buf;
pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option<UvError>);
pub type NullCallback = ~fn();
pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option<UvError>);
pub type UdpSendCallback = ~fn(UdpWatcher, Option<UvError>);
/// Callbacks used by StreamWatchers, set as custom data on the foreign handle.
/// XXX: Would be better not to have all watchers allocate room for all callback types.
struct WatcherData {
read_cb: Option<ReadCallback>,
write_cb: Option<ConnectionCallback>,
connect_cb: Option<ConnectionCallback>,
close_cb: Option<NullCallback>,
alloc_cb: Option<AllocCallback>,
udp_recv_cb: Option<UdpReceiveCallback>,
udp_send_cb: Option<UdpSendCallback>,
}
pub trait WatcherInterop {
fn event_loop(&self) -> Loop;
fn install_watcher_data(&mut self);
fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData;
fn drop_watcher_data(&mut self);
fn close(self, cb: NullCallback);
fn close_async(self);
}
impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
/// Get the uv event loop from a Watcher
fn event_loop(&self) -> Loop {
unsafe {
let handle = self.native_handle();
let loop_ = uvll::get_loop_for_uv_handle(handle);
NativeHandle::from_native_handle(loop_)
}
}
fn install_watcher_data(&mut self) {
unsafe {
let data = ~WatcherData {
read_cb: None,
write_cb: None,
connect_cb: None,
close_cb: None,
alloc_cb: None,
udp_recv_cb: None,
udp_send_cb: None,
};
let data = transmute::<~WatcherData, *c_void>(data);
uvll::set_data_for_uv_handle(self.native_handle(), data);
}
}
fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData {
unsafe {
let data = uvll::get_data_for_uv_handle(self.native_handle());
let data = transmute::<&*c_void, &mut ~WatcherData>(&data);
return &mut **data;
}
}
fn drop_watcher_data(&mut self) {
unsafe {
let data = uvll::get_data_for_uv_handle(self.native_handle());
let _data = transmute::<*c_void, ~WatcherData>(data);
uvll::set_data_for_uv_handle(self.native_handle(), null::<()>());
}
}
fn close(mut self, cb: NullCallback) {
{
let data = self.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe {
uvll::uv_close(self.native_handle() as *uvll::uv_handle_t, close_cb);
}
extern fn close_cb(handle: *uvll::uv_handle_t) {
let mut h: Handle = NativeHandle::from_native_handle(handle);
h.get_watcher_data().close_cb.take_unwrap()();
h.drop_watcher_data();
unsafe { uvll::free_handle(handle as *c_void) }
}
}
fn close_async(self) {
unsafe {
uvll::uv_close(self.native_handle() as *uvll::uv_handle_t, close_cb);
}
extern fn close_cb(handle: *uvll::uv_handle_t) {
let mut h: Handle = NativeHandle::from_native_handle(handle);
h.drop_watcher_data();
unsafe { uvll::free_handle(handle as *c_void) }
}
}
}
// XXX: Need to define the error constants like EOF so they can be
// compared to the UvError type

View File

@ -8,18 +8,32 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use std::cast;
use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char};
use std::vec;
use std::ptr;
use std::rt::BlockedTask;
use std::rt::io::IoError;
use std::rt::io::net::ip::{Ipv4Addr, Ipv6Addr};
use std::rt::local::Local;
use std::rt::io::net::ip::{SocketAddr, IpAddr};
use std::rt::rtio;
use std::rt::sched::{Scheduler, SchedHandle};
use std::rt::tube::Tube;
use std::str;
use std::rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
use std::vec;
use uvll;
use uvll::*;
use super::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback,
UdpSendCallback, Loop, Watcher, Request, UvError, Buf, NativeHandle,
status_to_maybe_uv_error, empty_buf};
use super::{
Loop, Request, UvError, Buf, NativeHandle,
status_to_io_result,
uv_error_to_io_error, UvHandle, slice_to_uv_buf};
use uvio::HomingIO;
use stream::StreamWatcher;
pub struct UvAddrInfo(*uvll::addrinfo);
////////////////////////////////////////////////////////////////////////////////
/// Generic functions related to dealing with sockaddr things
////////////////////////////////////////////////////////////////////////////////
pub enum UvSocketAddr {
UvIpv4SocketAddr(*sockaddr_in),
@ -113,394 +127,584 @@ fn test_ip6_conversion() {
assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
}
// uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
// and uv_file_t
pub struct StreamWatcher(*uvll::uv_stream_t);
impl Watcher for StreamWatcher { }
enum SocketNameKind {
TcpPeer,
Tcp,
Udp
}
impl StreamWatcher {
pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
unsafe {
match uvll::uv_read_start(self.native_handle(), alloc_cb, read_cb) {
0 => {
let data = self.get_watcher_data();
data.alloc_cb = Some(alloc);
data.read_cb = Some(cb);
}
n => {
cb(*self, 0, empty_buf(), Some(UvError(n)))
}
}
fn socket_name(sk: SocketNameKind, handle: *c_void) -> Result<SocketAddr, IoError> {
let getsockname = match sk {
TcpPeer => uvll::tcp_getpeername,
Tcp => uvll::tcp_getsockname,
Udp => uvll::udp_getsockname,
};
// Allocate a sockaddr_storage
// since we don't know if it's ipv4 or ipv6
let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
let r = unsafe {
getsockname(handle, r_addr as *uvll::sockaddr_storage)
};
if r != 0 {
return Err(uv_error_to_io_error(UvError(r)));
}
extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref();
return (*alloc_cb)(suggested_size as uint);
}
extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) {
uvdebug!("buf addr: {}", buf.base);
uvdebug!("buf len: {}", buf.len);
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
let status = status_to_maybe_uv_error(nread as c_int);
(*cb)(stream_watcher, nread as int, buf, status);
}
}
pub fn read_stop(&mut self) {
// It would be nice to drop the alloc and read callbacks here,
// but read_stop may be called from inside one of them and we
// would end up freeing the in-use environment
let handle = self.native_handle();
unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); }
}
pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
let req = WriteRequest::new();
return unsafe {
match uvll::uv_write(req.native_handle(), self.native_handle(),
[buf], write_cb) {
0 => {
let data = self.get_watcher_data();
assert!(data.write_cb.is_none());
data.write_cb = Some(cb);
}
n => {
req.delete();
cb(*self, Some(UvError(n)))
}
let addr = unsafe {
if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
} else {
uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
}
};
extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
let write_request: WriteRequest = NativeHandle::from_native_handle(req);
let mut stream_watcher = write_request.stream();
write_request.delete();
let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap();
let status = status_to_maybe_uv_error(status);
cb(stream_watcher, status);
unsafe { uvll::free_sockaddr_storage(r_addr); }
Ok(addr)
}
////////////////////////////////////////////////////////////////////////////////
/// TCP implementation
////////////////////////////////////////////////////////////////////////////////
pub struct TcpWatcher {
handle: *uvll::uv_tcp_t,
stream: StreamWatcher,
home: SchedHandle,
}
pub struct TcpListener {
home: SchedHandle,
handle: *uvll::uv_pipe_t,
priv closing_task: Option<BlockedTask>,
priv outgoing: Tube<Result<~rtio::RtioTcpStream, IoError>>,
}
pub struct TcpAcceptor {
listener: ~TcpListener,
priv incoming: Tube<Result<~rtio::RtioTcpStream, IoError>>,
}
// TCP watchers (clients/streams)
impl TcpWatcher {
pub fn new(loop_: &Loop) -> TcpWatcher {
let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
assert_eq!(unsafe {
uvll::uv_tcp_init(loop_.native_handle(), handle)
}, 0);
TcpWatcher {
home: get_handle_to_current_scheduler!(),
handle: handle,
stream: StreamWatcher::new(handle),
}
}
pub fn listen(&mut self, cb: ConnectionCallback) -> Result<(), UvError> {
pub fn connect(loop_: &mut Loop, address: SocketAddr)
-> Result<TcpWatcher, UvError>
{
let data = self.get_watcher_data();
assert!(data.connect_cb.is_none());
data.connect_cb = Some(cb);
}
struct Ctx { status: c_int, task: Option<BlockedTask> }
return unsafe {
static BACKLOG: c_int = 128; // XXX should be configurable
match uvll::uv_listen(self.native_handle(), BACKLOG, connection_cb) {
let tcp = TcpWatcher::new(loop_);
let ret = do socket_addr_as_uv_socket_addr(address) |addr| {
let req = Request::new(uvll::UV_CONNECT);
let result = match addr {
UvIpv4SocketAddr(addr) => unsafe {
uvll::tcp_connect(req.handle, tcp.handle, addr,
connect_cb)
},
UvIpv6SocketAddr(addr) => unsafe {
uvll::tcp_connect6(req.handle, tcp.handle, addr,
connect_cb)
},
};
match result {
0 => {
req.defuse();
let mut cx = Ctx { status: 0, task: None };
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
cx.task = Some(task);
}
match cx.status {
0 => Ok(()),
n => Err(UvError(n)),
}
}
n => Err(UvError(n))
}
};
extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
uvdebug!("connection_cb");
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
let status = status_to_maybe_uv_error(status);
(*cb)(stream_watcher, status);
}
}
pub fn accept(&mut self, stream: StreamWatcher) {
let self_handle = self.native_handle() as *c_void;
let stream_handle = stream.native_handle() as *c_void;
assert_eq!(0, unsafe { uvll::uv_accept(self_handle, stream_handle) } );
}
}
impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
StreamWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_stream_t {
match self { &StreamWatcher(ptr) => ptr }
}
}
pub struct TcpWatcher(*uvll::uv_tcp_t);
impl Watcher for TcpWatcher { }
impl TcpWatcher {
pub fn new(loop_: &Loop) -> TcpWatcher {
unsafe {
let handle = malloc_handle(UV_TCP);
assert!(handle.is_not_null());
assert_eq!(0, uvll::uv_tcp_init(loop_.native_handle(), handle));
let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
watcher.install_watcher_data();
return watcher;
}
}
pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
do socket_addr_as_uv_socket_addr(address) |addr| {
let result = unsafe {
match addr {
UvIpv4SocketAddr(addr) => uvll::tcp_bind(self.native_handle(), addr),
UvIpv6SocketAddr(addr) => uvll::tcp_bind6(self.native_handle(), addr),
}
return match ret {
Ok(()) => Ok(tcp),
Err(e) => Err(e),
};
match result {
0 => Ok(()),
_ => Err(UvError(result)),
}
}
}
pub fn connect(&mut self, address: SocketAddr, cb: ConnectionCallback) {
unsafe {
assert!(self.get_watcher_data().connect_cb.is_none());
self.get_watcher_data().connect_cb = Some(cb);
let connect_handle = ConnectRequest::new().native_handle();
uvdebug!("connect_t: {}", connect_handle);
do socket_addr_as_uv_socket_addr(address) |addr| {
let result = match addr {
UvIpv4SocketAddr(addr) => uvll::tcp_connect(connect_handle,
self.native_handle(), addr, connect_cb),
UvIpv6SocketAddr(addr) => uvll::tcp_connect6(connect_handle,
self.native_handle(), addr, connect_cb),
};
assert_eq!(0, result);
}
extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
uvdebug!("connect_t: {}", req);
let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
let mut stream_watcher = connect_request.stream();
connect_request.delete();
let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
let status = status_to_maybe_uv_error(status);
cb(stream_watcher, status);
let _req = Request::wrap(req);
if status == uvll::ECANCELED { return }
let cx: &mut Ctx = unsafe {
cast::transmute(uvll::get_data_for_req(req))
};
cx.status = status;
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(cx.task.take_unwrap());
}
}
}
pub fn as_stream(&self) -> StreamWatcher {
NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t)
}
}
impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
TcpWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_tcp_t {
match self { &TcpWatcher(ptr) => ptr }
impl HomingIO for TcpWatcher {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
}
impl rtio::RtioSocket for TcpWatcher {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
let _m = self.fire_missiles();
socket_name(Tcp, self.handle)
}
}
pub struct UdpWatcher(*uvll::uv_udp_t);
impl Watcher for UdpWatcher { }
impl rtio::RtioTcpStream for TcpWatcher {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
let _m = self.fire_missiles();
self.stream.read(buf).map_err(uv_error_to_io_error)
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
let _m = self.fire_missiles();
self.stream.write(buf).map_err(uv_error_to_io_error)
}
fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
let _m = self.fire_missiles();
socket_name(TcpPeer, self.handle)
}
fn control_congestion(&mut self) -> Result<(), IoError> {
let _m = self.fire_missiles();
status_to_io_result(unsafe {
uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
})
}
fn nodelay(&mut self) -> Result<(), IoError> {
let _m = self.fire_missiles();
status_to_io_result(unsafe {
uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
})
}
fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
let _m = self.fire_missiles();
status_to_io_result(unsafe {
uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
delay_in_seconds as c_uint)
})
}
fn letdie(&mut self) -> Result<(), IoError> {
let _m = self.fire_missiles();
status_to_io_result(unsafe {
uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
})
}
}
impl Drop for TcpWatcher {
fn drop(&mut self) {
let _m = self.fire_missiles();
self.stream.close(true);
}
}
// TCP listeners (unbound servers)
impl TcpListener {
pub fn bind(loop_: &mut Loop, address: SocketAddr)
-> Result<~TcpListener, UvError>
{
let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
assert_eq!(unsafe {
uvll::uv_tcp_init(loop_.native_handle(), handle)
}, 0);
let l = ~TcpListener {
home: get_handle_to_current_scheduler!(),
handle: handle,
closing_task: None,
outgoing: Tube::new(),
};
let res = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
match addr {
UvIpv4SocketAddr(addr) => uvll::tcp_bind(l.handle, addr),
UvIpv6SocketAddr(addr) => uvll::tcp_bind6(l.handle, addr),
}
});
match res {
0 => Ok(l.install()),
n => Err(UvError(n))
}
}
}
impl HomingIO for TcpListener {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
}
impl UvHandle<uvll::uv_tcp_t> for TcpListener {
fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
}
impl rtio::RtioSocket for TcpListener {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
let _m = self.fire_missiles();
socket_name(Tcp, self.handle)
}
}
impl rtio::RtioTcpListener for TcpListener {
fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
// create the acceptor object from ourselves
let incoming = self.outgoing.clone();
let mut acceptor = ~TcpAcceptor {
listener: self,
incoming: incoming,
};
let _m = acceptor.fire_missiles();
// XXX: the 128 backlog should be configurable
match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
0 => Ok(acceptor as ~rtio::RtioTcpAcceptor),
n => Err(uv_error_to_io_error(UvError(n))),
}
}
}
extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
let msg = match status {
0 => {
let loop_ = NativeHandle::from_native_handle(unsafe {
uvll::get_loop_for_uv_handle(server)
});
let client = TcpWatcher::new(&loop_);
assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
Ok(~client as ~rtio::RtioTcpStream)
}
uvll::ECANCELED => return,
n => Err(uv_error_to_io_error(UvError(n)))
};
let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
tcp.outgoing.send(msg);
}
impl Drop for TcpListener {
fn drop(&mut self) {
let (_m, sched) = self.fire_missiles_sched();
do sched.deschedule_running_task_and_then |_, task| {
self.closing_task = Some(task);
unsafe { uvll::uv_close(self.handle, listener_close_cb) }
}
}
}
extern fn listener_close_cb(handle: *uvll::uv_handle_t) {
let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&handle) };
unsafe { uvll::free_handle(handle) }
let sched: ~Scheduler = Local::take();
sched.resume_blocked_task_immediately(tcp.closing_task.take_unwrap());
}
// TCP acceptors (bound servers)
impl HomingIO for TcpAcceptor {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
}
impl rtio::RtioSocket for TcpAcceptor {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
let _m = self.fire_missiles();
socket_name(Tcp, self.listener.handle)
}
}
impl rtio::RtioTcpAcceptor for TcpAcceptor {
fn accept(&mut self) -> Result<~rtio::RtioTcpStream, IoError> {
let _m = self.fire_missiles();
self.incoming.recv()
}
fn accept_simultaneously(&mut self) -> Result<(), IoError> {
let _m = self.fire_missiles();
status_to_io_result(unsafe {
uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
})
}
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
let _m = self.fire_missiles();
status_to_io_result(unsafe {
uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
})
}
}
////////////////////////////////////////////////////////////////////////////////
/// UDP implementation
////////////////////////////////////////////////////////////////////////////////
pub struct UdpWatcher {
handle: *uvll::uv_udp_t,
home: SchedHandle,
}
impl UdpWatcher {
pub fn new(loop_: &Loop) -> UdpWatcher {
unsafe {
let handle = malloc_handle(UV_UDP);
assert!(handle.is_not_null());
assert_eq!(0, uvll::uv_udp_init(loop_.native_handle(), handle));
let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
watcher.install_watcher_data();
return watcher;
}
}
pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
do socket_addr_as_uv_socket_addr(address) |addr| {
let result = unsafe {
match addr {
UvIpv4SocketAddr(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32),
UvIpv6SocketAddr(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32),
}
};
match result {
0 => Ok(()),
_ => Err(UvError(result)),
}
}
}
pub fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) {
pub fn bind(loop_: &Loop, address: SocketAddr)
-> Result<UdpWatcher, UvError>
{
let data = self.get_watcher_data();
data.alloc_cb = Some(alloc);
data.udp_recv_cb = Some(cb);
let udp = UdpWatcher {
handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
home: get_handle_to_current_scheduler!(),
};
assert_eq!(unsafe {
uvll::uv_udp_init(loop_.native_handle(), udp.handle)
}, 0);
let result = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
match addr {
UvIpv4SocketAddr(addr) => uvll::udp_bind(udp.handle, addr, 0u32),
UvIpv6SocketAddr(addr) => uvll::udp_bind6(udp.handle, addr, 0u32),
}
});
match result {
0 => Ok(udp),
n => Err(UvError(n)),
}
}
}
impl HomingIO for UdpWatcher {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
}
impl rtio::RtioSocket for UdpWatcher {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
let _m = self.fire_missiles();
socket_name(Udp, self.handle)
}
}
impl rtio::RtioUdpSocket for UdpWatcher {
fn recvfrom(&mut self, buf: &mut [u8])
-> Result<(uint, SocketAddr), IoError>
{
struct Ctx {
task: Option<BlockedTask>,
buf: Option<Buf>,
result: Option<(ssize_t, SocketAddr)>,
}
let _m = self.fire_missiles();
return match unsafe {
uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
} {
0 => {
let mut cx = Ctx {
task: None,
buf: Some(slice_to_uv_buf(buf)),
result: None,
};
unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) }
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
cx.task = Some(task);
}
match cx.result.take_unwrap() {
(n, _) if n < 0 =>
Err(uv_error_to_io_error(UvError(n as c_int))),
(n, addr) => Ok((n as uint, addr))
}
}
n => Err(uv_error_to_io_error(UvError(n)))
};
extern fn alloc_cb(handle: *uvll::uv_udp_t,
_suggested_size: size_t) -> Buf {
let cx: &mut Ctx = unsafe {
cast::transmute(uvll::get_data_for_uv_handle(handle))
};
cx.buf.take().expect("alloc_cb called more than once")
}
unsafe { uvll::uv_udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, _buf: Buf,
addr: *uvll::sockaddr, _flags: c_uint) {
extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
let alloc_cb = udp_watcher.get_watcher_data().alloc_cb.get_ref();
return (*alloc_cb)(suggested_size as uint);
}
extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
addr: *uvll::sockaddr, flags: c_uint) {
// When there's no data to read the recv callback can be a no-op.
// This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
// this we just drop back to kqueue and wait for the next callback.
if nread == 0 {
return;
if nread == 0 { return }
if nread == uvll::ECANCELED as ssize_t { return }
unsafe {
assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
}
uvdebug!("buf addr: {}", buf.base);
uvdebug!("buf len: {}", buf.len);
let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
let status = status_to_maybe_uv_error(nread as c_int);
let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
(*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
}
}
pub fn recv_stop(&mut self) {
unsafe { uvll::uv_udp_recv_stop(self.native_handle()); }
}
pub fn send(&mut self, buf: Buf, address: SocketAddr, cb: UdpSendCallback) {
{
let data = self.get_watcher_data();
assert!(data.udp_send_cb.is_none());
data.udp_send_cb = Some(cb);
}
let req = UdpSendRequest::new();
do socket_addr_as_uv_socket_addr(address) |addr| {
let result = unsafe {
match addr {
UvIpv4SocketAddr(addr) => uvll::udp_send(req.native_handle(),
self.native_handle(), [buf], addr, send_cb),
UvIpv6SocketAddr(addr) => uvll::udp_send6(req.native_handle(),
self.native_handle(), [buf], addr, send_cb),
}
let cx: &mut Ctx = unsafe {
cast::transmute(uvll::get_data_for_uv_handle(handle))
};
assert_eq!(0, result);
let addr = sockaddr_to_UvSocketAddr(addr);
let addr = uv_socket_addr_to_socket_addr(addr);
cx.result = Some((nread, addr));
let sched: ~Scheduler = Local::take();
sched.resume_blocked_task_immediately(cx.task.take_unwrap());
}
}
fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
struct Ctx { task: Option<BlockedTask>, result: c_int }
let _m = self.fire_missiles();
let req = Request::new(uvll::UV_UDP_SEND);
let buf = slice_to_uv_buf(buf);
let result = socket_addr_as_uv_socket_addr(dst, |dst| unsafe {
match dst {
UvIpv4SocketAddr(dst) =>
uvll::udp_send(req.handle, self.handle, [buf], dst, send_cb),
UvIpv6SocketAddr(dst) =>
uvll::udp_send6(req.handle, self.handle, [buf], dst, send_cb),
}
});
return match result {
0 => {
let mut cx = Ctx { task: None, result: 0 };
req.set_data(&cx);
req.defuse();
let sched: ~Scheduler = Local::take();
do sched.deschedule_running_task_and_then |_, task| {
cx.task = Some(task);
}
match cx.result {
0 => Ok(()),
n => Err(uv_error_to_io_error(UvError(n)))
}
}
n => Err(uv_error_to_io_error(UvError(n)))
};
extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
let mut udp_watcher = send_request.handle();
send_request.delete();
let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap();
let status = status_to_maybe_uv_error(status);
cb(udp_watcher, status);
let req = Request::wrap(req);
let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
cx.result = status;
let sched: ~Scheduler = Local::take();
sched.resume_blocked_task_immediately(cx.task.take_unwrap());
}
}
fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
let _m = self.fire_missiles();
status_to_io_result(unsafe {
do multi.to_str().with_c_str |m_addr| {
uvll::uv_udp_set_membership(self.handle,
m_addr, ptr::null(),
uvll::UV_JOIN_GROUP)
}
})
}
fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
let _m = self.fire_missiles();
status_to_io_result(unsafe {
do multi.to_str().with_c_str |m_addr| {
uvll::uv_udp_set_membership(self.handle,
m_addr, ptr::null(),
uvll::UV_LEAVE_GROUP)
}
})
}
fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
let _m = self.fire_missiles();
status_to_io_result(unsafe {
uvll::uv_udp_set_multicast_loop(self.handle,
1 as c_int)
})
}
fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
let _m = self.fire_missiles();
status_to_io_result(unsafe {
uvll::uv_udp_set_multicast_loop(self.handle,
0 as c_int)
})
}
fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
let _m = self.fire_missiles();
status_to_io_result(unsafe {
uvll::uv_udp_set_multicast_ttl(self.handle,
ttl as c_int)
})
}
fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
let _m = self.fire_missiles();
status_to_io_result(unsafe {
uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
})
}
fn hear_broadcasts(&mut self) -> Result<(), IoError> {
let _m = self.fire_missiles();
status_to_io_result(unsafe {
uvll::uv_udp_set_broadcast(self.handle,
1 as c_int)
})
}
fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
let _m = self.fire_missiles();
status_to_io_result(unsafe {
uvll::uv_udp_set_broadcast(self.handle,
0 as c_int)
})
}
}
impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
UdpWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_udp_t {
match self { &UdpWatcher(ptr) => ptr }
}
}
// uv_connect_t is a subclass of uv_req_t
pub struct ConnectRequest(*uvll::uv_connect_t);
impl Request for ConnectRequest { }
impl ConnectRequest {
pub fn new() -> ConnectRequest {
let connect_handle = unsafe { malloc_req(UV_CONNECT) };
assert!(connect_handle.is_not_null());
ConnectRequest(connect_handle as *uvll::uv_connect_t)
}
fn stream(&self) -> StreamWatcher {
impl Drop for UdpWatcher {
fn drop(&mut self) {
// Send ourselves home to close this handle (blocking while doing so).
let (_m, sched) = self.fire_missiles_sched();
let mut slot = None;
unsafe {
let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle());
NativeHandle::from_native_handle(stream_handle)
uvll::set_data_for_uv_handle(self.handle, &slot);
uvll::uv_close(self.handle, close_cb);
}
do sched.deschedule_running_task_and_then |_, task| {
slot = Some(task);
}
fn delete(self) {
unsafe { free_req(self.native_handle() as *c_void) }
}
}
impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
ConnectRequest(handle)
}
fn native_handle(&self) -> *uvll::uv_connect_t {
match self { &ConnectRequest(ptr) => ptr }
}
}
pub struct WriteRequest(*uvll::uv_write_t);
impl Request for WriteRequest { }
impl WriteRequest {
pub fn new() -> WriteRequest {
let write_handle = unsafe { malloc_req(UV_WRITE) };
assert!(write_handle.is_not_null());
WriteRequest(write_handle as *uvll::uv_write_t)
}
pub fn stream(&self) -> StreamWatcher {
unsafe {
let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle());
NativeHandle::from_native_handle(stream_handle)
}
}
pub fn delete(self) {
unsafe { free_req(self.native_handle() as *c_void) }
}
}
impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
WriteRequest(handle)
}
fn native_handle(&self) -> *uvll::uv_write_t {
match self { &WriteRequest(ptr) => ptr }
}
}
pub struct UdpSendRequest(*uvll::uv_udp_send_t);
impl Request for UdpSendRequest { }
impl UdpSendRequest {
pub fn new() -> UdpSendRequest {
let send_handle = unsafe { malloc_req(UV_UDP_SEND) };
assert!(send_handle.is_not_null());
UdpSendRequest(send_handle as *uvll::uv_udp_send_t)
}
pub fn handle(&self) -> UdpWatcher {
let send_request_handle = unsafe {
uvll::get_udp_handle_from_send_req(self.native_handle())
extern fn close_cb(handle: *uvll::uv_handle_t) {
let slot: &mut Option<BlockedTask> = unsafe {
cast::transmute(uvll::get_data_for_uv_handle(handle))
};
NativeHandle::from_native_handle(send_request_handle)
let sched: ~Scheduler = Local::take();
sched.resume_blocked_task_immediately(slot.take_unwrap());
}
pub fn delete(self) {
unsafe { free_req(self.native_handle() as *c_void) }
}
}
impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
UdpSendRequest(handle)
}
fn native_handle(&self) -> *uvll::uv_udp_send_t {
match self { &UdpSendRequest(ptr) => ptr }
}
}
////////////////////////////////////////////////////////////////////////////////
/// UV request support
////////////////////////////////////////////////////////////////////////////////
#[cfg(test)]
mod test {

View File

@ -19,7 +19,7 @@ use std::rt::sched::{Scheduler, SchedHandle};
use std::rt::tube::Tube;
use stream::StreamWatcher;
use super::{Loop, UvError, NativeHandle, uv_error_to_io_error, UvHandle};
use super::{Loop, UvError, NativeHandle, uv_error_to_io_error, UvHandle, Request};
use uvio::HomingIO;
use uvll;
@ -79,23 +79,26 @@ impl PipeWatcher {
result: Option<Result<PipeWatcher, UvError>>,
}
let mut cx = Ctx { task: None, result: None };
let req = unsafe { uvll::malloc_req(uvll::UV_CONNECT) };
unsafe { uvll::set_data_for_req(req, &cx as *Ctx) }
let sched: ~Scheduler = Local::take();
do sched.deschedule_running_task_and_then |_, task| {
cx.task = Some(task);
let req = Request::new(uvll::UV_CONNECT);
unsafe {
uvll::uv_pipe_connect(req,
uvll::set_data_for_req(req.handle, &cx as *Ctx);
uvll::uv_pipe_connect(req.handle,
PipeWatcher::alloc(loop_, false),
name.with_ref(|p| p),
connect_cb)
}
req.defuse();
let sched: ~Scheduler = Local::take();
do sched.deschedule_running_task_and_then |_, task| {
cx.task = Some(task);
}
assert!(cx.task.is_none());
return cx.result.take().expect("pipe connect needs a result");
extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {
let _req = Request::wrap(req);
if status == uvll::ECANCELED { return }
unsafe {
let cx: &mut Ctx = cast::transmute(uvll::get_data_for_req(req));
let stream = uvll::get_stream_handle_from_connect_req(req);
@ -106,7 +109,6 @@ impl PipeWatcher {
Err(UvError(n))
}
});
uvll::free_req(req);
let sched: ~Scheduler = Local::take();
sched.resume_blocked_task_immediately(cx.task.take_unwrap());
@ -201,6 +203,7 @@ extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) {
assert_eq!(unsafe { uvll::uv_accept(server, client) }, 0);
Ok(~PipeWatcher::new(client) as ~RtioPipe)
}
uvll::ECANCELED => return,
n => Err(uv_error_to_io_error(UvError(n)))
};

View File

@ -8,7 +8,6 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use std::cell::Cell;
use std::libc::c_int;
use std::libc;
use std::ptr;
@ -58,8 +57,7 @@ impl Process {
}
}
let ret_io = Cell::new(ret_io);
do with_argv(config.program, config.args) |argv| {
let ret = do with_argv(config.program, config.args) |argv| {
do with_env(config.env) |envp| {
let options = uvll::uv_process_options_t {
exit_cb: on_exit,
@ -89,7 +87,7 @@ impl Process {
exit_status: None,
term_signal: None,
};
Ok((process.install(), ret_io.take()))
Ok(process.install())
}
err => {
unsafe { uvll::free_handle(handle) }
@ -97,6 +95,11 @@ impl Process {
}
}
}
};
match ret {
Ok(p) => Ok((p, ret_io)),
Err(e) => Err(e),
}
}
}

View File

@ -15,7 +15,7 @@ use std::rt::BlockedTask;
use std::rt::local::Local;
use std::rt::sched::Scheduler;
use super::{UvError, Buf, slice_to_uv_buf};
use super::{UvError, Buf, slice_to_uv_buf, Request};
use uvll;
// This is a helper structure which is intended to get embedded into other
@ -29,17 +29,17 @@ pub struct StreamWatcher {
// every call to uv_write(). Ideally this would be a stack-allocated
// structure, but currently we don't have mappings for all the structures
// defined in libuv, so we're foced to malloc this.
priv last_write_req: Option<*uvll::uv_write_t>,
priv last_write_req: Option<Request>,
}
struct ReadContext {
buf: Option<Buf>,
result: Option<Result<uint, UvError>>,
result: ssize_t,
task: Option<BlockedTask>,
}
struct WriteContext {
result: Option<Result<(), UvError>>,
result: c_int,
task: Option<BlockedTask>,
}
@ -72,7 +72,7 @@ impl StreamWatcher {
0 => {
let mut rcx = ReadContext {
buf: Some(slice_to_uv_buf(buf)),
result: None,
result: 0,
task: None,
};
unsafe {
@ -82,7 +82,10 @@ impl StreamWatcher {
do scheduler.deschedule_running_task_and_then |_sched, task| {
rcx.task = Some(task);
}
rcx.result.take().expect("no result in read stream?")
match rcx.result {
n if n < 0 => Err(UvError(n as c_int)),
n => Ok(n as uint),
}
}
n => Err(UvError(n))
}
@ -91,27 +94,29 @@ impl StreamWatcher {
pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> {
// Prepare the write request, either using a cached one or allocating a
// new one
let req = match self.last_write_req {
Some(req) => req,
None => unsafe { uvll::malloc_req(uvll::UV_WRITE) },
};
self.last_write_req = Some(req);
let mut wcx = WriteContext { result: None, task: None, };
unsafe { uvll::set_data_for_req(req, &wcx as *WriteContext) }
if self.last_write_req.is_none() {
self.last_write_req = Some(Request::new(uvll::UV_WRITE));
}
let req = self.last_write_req.get_ref();
// Send off the request, but be careful to not block until we're sure
// that the write reqeust is queued. If the reqeust couldn't be queued,
// then we should return immediately with an error.
match unsafe {
uvll::uv_write(req, self.handle, [slice_to_uv_buf(buf)], write_cb)
uvll::uv_write(req.handle, self.handle, [slice_to_uv_buf(buf)],
write_cb)
} {
0 => {
let mut wcx = WriteContext { result: 0, task: None, };
req.set_data(&wcx);
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_sched, task| {
wcx.task = Some(task);
}
assert!(wcx.task.is_none());
wcx.result.take().expect("no result in write stream?")
match wcx.result {
0 => Ok(()),
n => Err(UvError(n)),
}
}
n => Err(UvError(n)),
}
@ -124,12 +129,6 @@ impl StreamWatcher {
// synchronously (the task is blocked) or asynchronously (the task is not
// block, but the handle is still deallocated).
pub fn close(&mut self, synchronous: bool) {
// clean up the cached write request if we have one
match self.last_write_req {
Some(req) => unsafe { uvll::free_req(req) },
None => {}
}
if synchronous {
let mut closing_task = None;
unsafe {
@ -186,31 +185,24 @@ extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: Buf) {
// XXX: Is there a performance impact to calling
// stop here?
unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); }
rcx.result = nread;
assert!(rcx.result.is_none());
rcx.result = Some(match nread {
n if n < 0 => Err(UvError(n as c_int)),
n => Ok(n as uint),
});
let task = rcx.task.take().expect("read_cb needs a task");
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task);
scheduler.resume_blocked_task_immediately(rcx.task.take_unwrap());
}
// Unlike reading, the WriteContext is stored in the uv_write_t request. Like
// reading, however, all this does is wake up the blocked task after squirreling
// away the error code as a result.
extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
if status == uvll::ECANCELED { return }
// Remember to not free the request because it is re-used between writes on
// the same stream.
unsafe {
let wcx: &mut WriteContext = cast::transmute(uvll::get_data_for_req(req));
wcx.result = Some(match status {
0 => Ok(()),
n => Err(UvError(n)),
});
let req = Request::wrap(req);
let wcx: &mut WriteContext = unsafe { cast::transmute(req.get_data()) };
wcx.result = status;
let sched: ~Scheduler = Local::take();
sched.resume_blocked_task_immediately(wcx.task.take_unwrap());
}
req.defuse();
}

View File

@ -8,7 +8,6 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use std::cell::Cell;
use std::comm::{oneshot, stream, PortOne, ChanOne, SendDeferred};
use std::libc::c_int;
use std::rt::BlockedTask;
@ -77,10 +76,9 @@ impl RtioTimer for TimerWatcher {
fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
let (port, chan) = oneshot();
let chan = Cell::new(chan);
let _m = self.fire_missiles();
self.action = Some(SendOnce(chan.take()));
self.action = Some(SendOnce(chan));
self.start(msecs, 0);
return port;
@ -88,10 +86,9 @@ impl RtioTimer for TimerWatcher {
fn period(&mut self, msecs: u64) -> Port<()> {
let (port, chan) = stream();
let chan = Cell::new(chan);
let _m = self.fire_missiles();
self.action = Some(SendMany(chan.take()));
self.action = Some(SendMany(chan));
self.start(msecs, msecs);
return port;

View File

@ -11,22 +11,17 @@
use std::c_str::CString;
use std::cast::transmute;
use std::cast;
use std::cell::Cell;
use std::clone::Clone;
use std::comm::{SharedChan, GenericChan};
use std::libc;
use std::libc::{c_int, c_uint, c_void};
use std::ptr;
use std::libc::c_int;
use std::str;
use std::rt::io;
use std::rt::io::IoError;
use std::rt::io::net::ip::{SocketAddr, IpAddr};
use std::rt::io::{standard_error, OtherIoError};
use std::rt::io::net::ip::SocketAddr;
use std::rt::io::process::ProcessConfig;
use std::rt::local::Local;
use std::rt::rtio::*;
use std::rt::sched::{Scheduler, SchedHandle};
use std::rt::tube::Tube;
use std::rt::task::Task;
use std::path::Path;
use std::libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY,
@ -45,9 +40,7 @@ use ai = std::rt::io::net::addrinfo;
use super::*;
use idle::IdleWatcher;
use net::{UvIpv4SocketAddr, UvIpv6SocketAddr};
use addrinfo::GetAddrInfoRequest;
use pipe::PipeListener;
// XXX we should not be calling uvll functions in here.
@ -137,47 +130,6 @@ impl Drop for HomingMissile {
}
}
enum SocketNameKind {
TcpPeer,
Tcp,
Udp
}
fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
handle: U) -> Result<SocketAddr, IoError> {
let getsockname = match sk {
TcpPeer => uvll::tcp_getpeername,
Tcp => uvll::tcp_getsockname,
Udp => uvll::udp_getsockname,
};
// Allocate a sockaddr_storage
// since we don't know if it's ipv4 or ipv6
let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
let r = unsafe {
getsockname(handle.native_handle() as *c_void, r_addr as *uvll::sockaddr_storage)
};
if r != 0 {
let status = status_to_maybe_uv_error(r);
return Err(uv_error_to_io_error(status.unwrap()));
}
let addr = unsafe {
if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
net::uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
} else {
net::uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
}
};
unsafe { uvll::free_sockaddr_storage(r_addr); }
Ok(addr)
}
// Obviously an Event Loop is always home.
pub struct UvEventLoop {
priv uvio: UvIoFactory
@ -251,97 +203,26 @@ impl IoFactory for UvIoFactory {
// Connect to an address and return a new stream
// NB: This blocks the task waiting on the connection.
// It would probably be better to return a future
fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStream, IoError> {
// Create a cell in the task to hold the result. We will fill
// the cell before resuming the task.
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<~RtioTcpStream, IoError>> = &result_cell;
// Block this task and take ownership, switch to scheduler context
do task::unkillable { // FIXME(#8674)
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
let mut tcp = TcpWatcher::new(self.uv_loop());
let task_cell = Cell::new(task);
// Wait for a connection
do tcp.connect(addr) |stream, status| {
match status {
None => {
let tcp = NativeHandle::from_native_handle(stream.native_handle());
let home = get_handle_to_current_scheduler!();
let res = Ok(~UvTcpStream { watcher: tcp, home: home }
as ~RtioTcpStream);
// Store the stream in the task's stack
unsafe { (*result_cell_ptr).put_back(res); }
// Context switch
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
fn tcp_connect(&mut self, addr: SocketAddr)
-> Result<~RtioTcpStream, IoError>
{
match TcpWatcher::connect(self.uv_loop(), addr) {
Ok(t) => Ok(~t as ~RtioTcpStream),
Err(e) => Err(uv_error_to_io_error(e)),
}
Some(_) => {
let task_cell = Cell::new(task_cell.take());
do stream.close {
let res = Err(uv_error_to_io_error(status.unwrap()));
unsafe { (*result_cell_ptr).put_back(res); }
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
}
}
}
assert!(!result_cell.is_empty());
return result_cell.take();
}
fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListener, IoError> {
let mut watcher = TcpWatcher::new(self.uv_loop());
match watcher.bind(addr) {
Ok(_) => {
let home = get_handle_to_current_scheduler!();
Ok(~UvTcpListener::new(watcher, home) as ~RtioTcpListener)
}
Err(uverr) => {
do task::unkillable { // FIXME(#8674)
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do watcher.as_stream().close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
Err(uv_error_to_io_error(uverr))
}
}
match TcpListener::bind(self.uv_loop(), addr) {
Ok(t) => Ok(t as ~RtioTcpListener),
Err(e) => Err(uv_error_to_io_error(e)),
}
}
fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocket, IoError> {
let mut watcher = UdpWatcher::new(self.uv_loop());
match watcher.bind(addr) {
Ok(_) => {
let home = get_handle_to_current_scheduler!();
Ok(~UvUdpSocket { watcher: watcher, home: home } as ~RtioUdpSocket)
}
Err(uverr) => {
do task::unkillable { // FIXME(#8674)
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do watcher.close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
Err(uv_error_to_io_error(uverr))
}
}
match UdpWatcher::bind(self.uv_loop(), addr) {
Ok(u) => Ok(~u as ~RtioUdpSocket),
Err(e) => Err(uv_error_to_io_error(e)),
}
}
@ -487,416 +368,6 @@ impl IoFactory for UvIoFactory {
}
}
pub struct UvTcpListener {
priv watcher : TcpWatcher,
priv home: SchedHandle,
}
impl HomingIO for UvTcpListener {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
}
impl UvTcpListener {
fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
UvTcpListener { watcher: watcher, home: home }
}
}
impl Drop for UvTcpListener {
fn drop(&mut self) {
let (_m, sched) = self.fire_homing_missile_sched();
do sched.deschedule_running_task_and_then |_, task| {
let task = Cell::new(task);
do self.watcher.as_stream().close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task.take());
}
}
}
}
impl RtioSocket for UvTcpListener {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
let _m = self.fire_homing_missile();
socket_name(Tcp, self.watcher)
}
}
impl RtioTcpListener for UvTcpListener {
fn listen(mut ~self) -> Result<~RtioTcpAcceptor, IoError> {
let _m = self.fire_homing_missile();
let acceptor = ~UvTcpAcceptor::new(*self);
let incoming = Cell::new(acceptor.incoming.clone());
let mut stream = acceptor.listener.watcher.as_stream();
let res = do stream.listen |mut server, status| {
do incoming.with_mut_ref |incoming| {
let inc = match status {
Some(_) => Err(standard_error(OtherIoError)),
None => {
let inc = TcpWatcher::new(&server.event_loop());
// first accept call in the callback guarenteed to succeed
server.accept(inc.as_stream());
let home = get_handle_to_current_scheduler!();
Ok(~UvTcpStream { watcher: inc, home: home }
as ~RtioTcpStream)
}
};
incoming.send(inc);
}
};
match res {
Ok(()) => Ok(acceptor as ~RtioTcpAcceptor),
Err(e) => Err(uv_error_to_io_error(e)),
}
}
}
pub struct UvTcpAcceptor {
priv listener: UvTcpListener,
priv incoming: Tube<Result<~RtioTcpStream, IoError>>,
}
impl HomingIO for UvTcpAcceptor {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
}
impl UvTcpAcceptor {
fn new(listener: UvTcpListener) -> UvTcpAcceptor {
UvTcpAcceptor { listener: listener, incoming: Tube::new() }
}
}
impl RtioSocket for UvTcpAcceptor {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
let _m = self.fire_homing_missile();
socket_name(Tcp, self.listener.watcher)
}
}
fn accept_simultaneously(stream: StreamWatcher, a: int) -> Result<(), IoError> {
let r = unsafe {
uvll::uv_tcp_simultaneous_accepts(stream.native_handle(), a as c_int)
};
status_to_io_result(r)
}
impl RtioTcpAcceptor for UvTcpAcceptor {
fn accept(&mut self) -> Result<~RtioTcpStream, IoError> {
let _m = self.fire_homing_missile();
self.incoming.recv()
}
fn accept_simultaneously(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
accept_simultaneously(self.listener.watcher.as_stream(), 1)
}
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
accept_simultaneously(self.listener.watcher.as_stream(), 0)
}
}
fn read_stream(mut watcher: StreamWatcher,
scheduler: ~Scheduler,
buf: &mut [u8]) -> Result<uint, IoError> {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
let uv_buf = slice_to_uv_buf(buf);
do scheduler.deschedule_running_task_and_then |_sched, task| {
let task_cell = Cell::new(task);
// XXX: We shouldn't reallocate these callbacks every
// call to read
let alloc: AllocCallback = |_| uv_buf;
do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
// Stop reading so that no read callbacks are
// triggered before the user calls `read` again.
// XXX: Is there a performance impact to calling
// stop here?
watcher.read_stop();
let result = if status.is_none() {
assert!(nread >= 0);
Ok(nread as uint)
} else {
Err(uv_error_to_io_error(status.unwrap()))
};
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
assert!(!result_cell.is_empty());
result_cell.take()
}
fn write_stream(mut watcher: StreamWatcher,
scheduler: ~Scheduler,
buf: &[u8]) -> Result<(), IoError> {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
do watcher.write(buf) |_watcher, status| {
let result = if status.is_none() {
Ok(())
} else {
Err(uv_error_to_io_error(status.unwrap()))
};
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
assert!(!result_cell.is_empty());
result_cell.take()
}
pub struct UvTcpStream {
priv watcher: TcpWatcher,
priv home: SchedHandle,
}
impl HomingIO for UvTcpStream {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
}
impl Drop for UvTcpStream {
fn drop(&mut self) {
let (_m, sched) = self.fire_homing_missile_sched();
do sched.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self.watcher.as_stream().close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
}
impl RtioSocket for UvTcpStream {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
let _m = self.fire_homing_missile();
socket_name(Tcp, self.watcher)
}
}
impl RtioTcpStream for UvTcpStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
let (_m, scheduler) = self.fire_homing_missile_sched();
read_stream(self.watcher.as_stream(), scheduler, buf)
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
let (_m, scheduler) = self.fire_homing_missile_sched();
write_stream(self.watcher.as_stream(), scheduler, buf)
}
fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
let _m = self.fire_homing_missile();
socket_name(TcpPeer, self.watcher)
}
fn control_congestion(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_tcp_nodelay(self.watcher.native_handle(), 0 as c_int)
})
}
fn nodelay(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_tcp_nodelay(self.watcher.native_handle(), 1 as c_int)
})
}
fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_tcp_keepalive(self.watcher.native_handle(), 1 as c_int,
delay_in_seconds as c_uint)
})
}
fn letdie(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_tcp_keepalive(self.watcher.native_handle(),
0 as c_int, 0 as c_uint)
})
}
}
pub struct UvUdpSocket {
priv watcher: UdpWatcher,
priv home: SchedHandle,
}
impl HomingIO for UvUdpSocket {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
}
impl Drop for UvUdpSocket {
fn drop(&mut self) {
let (_m, scheduler) = self.fire_homing_missile_sched();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self.watcher.close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
}
impl RtioSocket for UvUdpSocket {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
let _m = self.fire_homing_missile();
socket_name(Udp, self.watcher)
}
}
impl RtioUdpSocket for UvUdpSocket {
fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
let (_m, scheduler) = self.fire_homing_missile_sched();
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
do self.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
let _ = flags; // /XXX add handling for partials?
watcher.recv_stop();
let result = match status {
None => {
assert!(nread >= 0);
Ok((nread as uint, addr))
}
Some(err) => Err(uv_error_to_io_error(err)),
};
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
assert!(!result_cell.is_empty());
result_cell.take()
}
fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
let (_m, scheduler) = self.fire_homing_missile_sched();
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
do self.watcher.send(buf, dst) |_watcher, status| {
let result = match status {
None => Ok(()),
Some(err) => Err(uv_error_to_io_error(err)),
};
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
assert!(!result_cell.is_empty());
result_cell.take()
}
fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
do multi.to_str().with_c_str |m_addr| {
uvll::uv_udp_set_membership(self.watcher.native_handle(),
m_addr, ptr::null(),
uvll::UV_JOIN_GROUP)
}
})
}
fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
do multi.to_str().with_c_str |m_addr| {
uvll::uv_udp_set_membership(self.watcher.native_handle(),
m_addr, ptr::null(),
uvll::UV_LEAVE_GROUP)
}
})
}
fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_udp_set_multicast_loop(self.watcher.native_handle(),
1 as c_int)
})
}
fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_udp_set_multicast_loop(self.watcher.native_handle(),
0 as c_int)
})
}
fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_udp_set_multicast_ttl(self.watcher.native_handle(),
ttl as c_int)
})
}
fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_udp_set_ttl(self.watcher.native_handle(), ttl as c_int)
})
}
fn hear_broadcasts(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_udp_set_broadcast(self.watcher.native_handle(),
1 as c_int)
})
}
fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_udp_set_broadcast(self.watcher.native_handle(),
0 as c_int)
})
}
}
// this function is full of lies
unsafe fn local_io() -> &'static mut IoFactory {
do Local::borrow |sched: &mut Scheduler| {

View File

@ -53,6 +53,7 @@ pub mod errors {
pub static ENOTCONN: c_int = -4054;
pub static EPIPE: c_int = -4048;
pub static ECONNABORTED: c_int = -4080;
pub static ECANCELED: c_int = -4082;
}
#[cfg(not(windows))]
pub mod errors {
@ -65,6 +66,7 @@ pub mod errors {
pub static ENOTCONN: c_int = -libc::ENOTCONN;
pub static EPIPE: c_int = -libc::EPIPE;
pub static ECONNABORTED: c_int = -libc::ECONNABORTED;
pub static ECANCELED : c_int = -libc::ECANCELED;
}
pub static PROCESS_SETUID: c_int = 1 << 0;
@ -127,6 +129,7 @@ pub struct uv_stdio_container_t {
}
pub type uv_handle_t = c_void;
pub type uv_req_t = c_void;
pub type uv_loop_t = c_void;
pub type uv_idle_t = c_void;
pub type uv_tcp_t = c_void;