diff --git a/src/librustuv/addrinfo.rs b/src/librustuv/addrinfo.rs index 36c4defdee9..88818cf2b4d 100644 --- a/src/librustuv/addrinfo.rs +++ b/src/librustuv/addrinfo.rs @@ -17,14 +17,9 @@ use std::rt::local::Local; use std::rt::sched::Scheduler; use net; -use super::{Loop, UvError, NativeHandle}; -use uvll::UV_GETADDRINFO; +use super::{Loop, UvError, NativeHandle, Request}; use uvll; -struct GetAddrInfoRequest { - handle: *uvll::uv_getaddrinfo_t, -} - struct Addrinfo { handle: *uvll::addrinfo, } @@ -35,13 +30,9 @@ struct Ctx { addrinfo: Option, } -impl GetAddrInfoRequest { - pub fn new() -> GetAddrInfoRequest { - GetAddrInfoRequest { - handle: unsafe { uvll::malloc_req(uvll::UV_GETADDRINFO) }, - } - } +pub struct GetAddrInfoRequest; +impl GetAddrInfoRequest { pub fn run(loop_: &Loop, node: Option<&str>, service: Option<&str>, hints: Option) -> Result<~[ai::Info], UvError> { assert!(node.is_some() || service.is_some()); @@ -85,7 +76,7 @@ impl GetAddrInfoRequest { } }); let hint_ptr = hint.as_ref().map_default(null(), |x| x as *uvll::addrinfo); - let req = GetAddrInfoRequest::new(); + let req = Request::new(uvll::UV_GETADDRINFO); return match unsafe { uvll::uv_getaddrinfo(loop_.native_handle(), req.handle, @@ -94,7 +85,8 @@ impl GetAddrInfoRequest { } { 0 => { let mut cx = Ctx { slot: None, status: 0, addrinfo: None }; - unsafe { uvll::set_data_for_req(req.handle, &cx) } + req.set_data(&cx); + req.defuse(); let scheduler: ~Scheduler = Local::take(); do scheduler.deschedule_running_task_and_then |_, task| { cx.slot = Some(task); @@ -112,9 +104,9 @@ impl GetAddrInfoRequest { extern fn getaddrinfo_cb(req: *uvll::uv_getaddrinfo_t, status: c_int, res: *uvll::addrinfo) { - let cx: &mut Ctx = unsafe { - cast::transmute(uvll::get_data_for_req(req)) - }; + let req = Request::wrap(req); + if status == uvll::ECANCELED { return } + let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) }; cx.status = status; cx.addrinfo = Some(Addrinfo { handle: res }); @@ -124,12 +116,6 @@ impl GetAddrInfoRequest { } } -impl Drop for GetAddrInfoRequest { - fn drop(&mut self) { - unsafe { uvll::free_req(self.handle) } - } -} - impl Drop for Addrinfo { fn drop(&mut self) { unsafe { uvll::uv_freeaddrinfo(self.handle) } diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index eb2da05506d..5e79f6e1345 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -54,19 +54,18 @@ use std::libc::{c_void, c_int, size_t, malloc, free}; use std::cast::transmute; use std::ptr::null; use std::unstable::finally::Finally; -use std::rt::io::net::ip::SocketAddr; use std::rt::io::IoError; //#[cfg(test)] use unstable::run_in_bare_thread; pub use self::file::{FsRequest, FileWatcher}; -pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher}; +pub use self::net::{TcpWatcher, TcpListener, TcpAcceptor, UdpWatcher}; pub use self::idle::IdleWatcher; pub use self::timer::TimerWatcher; pub use self::async::AsyncWatcher; pub use self::process::Process; -pub use self::pipe::PipeWatcher; +pub use self::pipe::{PipeWatcher, PipeListener, PipeAcceptor}; pub use self::signal::SignalWatcher; pub use self::tty::TtyWatcher; @@ -97,24 +96,6 @@ pub struct Loop { priv handle: *uvll::uv_loop_t } -pub struct Handle(*uvll::uv_handle_t); - -impl Watcher for Handle {} -impl NativeHandle<*uvll::uv_handle_t> for Handle { - fn from_native_handle(h: *uvll::uv_handle_t) -> Handle { Handle(h) } - fn native_handle(&self) -> *uvll::uv_handle_t { **self } -} - -/// The trait implemented by uv 'watchers' (handles). Watchers are -/// non-owning wrappers around the uv handles and are not completely -/// safe - there may be multiple instances for a single underlying -/// handle. Watchers are generally created, then `start`ed, `stop`ed -/// and `close`ed, but due to their complex life cycle may not be -/// entirely memory safe if used in unanticipated patterns. -pub trait Watcher { } - -pub trait Request { } - /// A type that wraps a native handle pub trait NativeHandle { fn from_native_handle(T) -> Self; @@ -160,33 +141,48 @@ pub trait UvHandle { } } -pub trait UvRequest { - fn uv_request(&self) -> *T; +pub struct Request { + handle: *uvll::uv_req_t, +} - // FIXME(#8888) dummy self - fn alloc(_: Option, ty: uvll::uv_req_type) -> *T { +impl Request { + pub fn new(ty: uvll::uv_req_type) -> Request { + Request::wrap(unsafe { uvll::malloc_req(ty) }) + } + + pub fn wrap(handle: *uvll::uv_req_t) -> Request { + Request { handle: handle } + } + + pub fn set_data(&self, t: *T) { + unsafe { uvll::set_data_for_req(self.handle, t) } + } + + pub fn get_data(&self) -> *c_void { + unsafe { uvll::get_data_for_req(self.handle) } + } + + // This function should be used when the request handle has been given to an + // underlying uv function, and the uv function has succeeded. This means + // that uv will at some point invoke the callback, and in the meantime we + // can't deallocate the handle because libuv could be using it. + // + // This is still a problem in blocking situations due to linked failure. In + // the connection callback the handle should be re-wrapped with the `wrap` + // function to ensure its destruction. + pub fn defuse(mut self) { + self.handle = ptr::null(); + } +} + +impl Drop for Request { + fn drop(&mut self) { unsafe { - let handle = uvll::malloc_req(ty); - assert!(!handle.is_null()); - handle as *T + if self.handle != ptr::null() { + uvll::free_req(self.handle) + } } } - - unsafe fn from_uv_request<'a>(h: &'a *T) -> &'a mut Self { - cast::transmute(uvll::get_data_for_req(*h)) - } - - fn install(~self) -> ~Self { - unsafe { - let myptr = cast::transmute::<&~Self, &*u8>(&self); - uvll::set_data_for_req(self.uv_request(), *myptr); - } - self - } - - fn delete(&mut self) { - unsafe { uvll::free_req(self.uv_request() as *c_void) } - } } impl Loop { @@ -214,110 +210,6 @@ impl NativeHandle<*uvll::uv_loop_t> for Loop { } } -// XXX: The uv alloc callback also has a *uv_handle_t arg -pub type AllocCallback = ~fn(uint) -> Buf; -pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option); -pub type NullCallback = ~fn(); -pub type ConnectionCallback = ~fn(StreamWatcher, Option); -pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option); -pub type UdpSendCallback = ~fn(UdpWatcher, Option); - - -/// Callbacks used by StreamWatchers, set as custom data on the foreign handle. -/// XXX: Would be better not to have all watchers allocate room for all callback types. -struct WatcherData { - read_cb: Option, - write_cb: Option, - connect_cb: Option, - close_cb: Option, - alloc_cb: Option, - udp_recv_cb: Option, - udp_send_cb: Option, -} - -pub trait WatcherInterop { - fn event_loop(&self) -> Loop; - fn install_watcher_data(&mut self); - fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData; - fn drop_watcher_data(&mut self); - fn close(self, cb: NullCallback); - fn close_async(self); -} - -impl> WatcherInterop for W { - /// Get the uv event loop from a Watcher - fn event_loop(&self) -> Loop { - unsafe { - let handle = self.native_handle(); - let loop_ = uvll::get_loop_for_uv_handle(handle); - NativeHandle::from_native_handle(loop_) - } - } - - fn install_watcher_data(&mut self) { - unsafe { - let data = ~WatcherData { - read_cb: None, - write_cb: None, - connect_cb: None, - close_cb: None, - alloc_cb: None, - udp_recv_cb: None, - udp_send_cb: None, - }; - let data = transmute::<~WatcherData, *c_void>(data); - uvll::set_data_for_uv_handle(self.native_handle(), data); - } - } - - fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData { - unsafe { - let data = uvll::get_data_for_uv_handle(self.native_handle()); - let data = transmute::<&*c_void, &mut ~WatcherData>(&data); - return &mut **data; - } - } - - fn drop_watcher_data(&mut self) { - unsafe { - let data = uvll::get_data_for_uv_handle(self.native_handle()); - let _data = transmute::<*c_void, ~WatcherData>(data); - uvll::set_data_for_uv_handle(self.native_handle(), null::<()>()); - } - } - - fn close(mut self, cb: NullCallback) { - { - let data = self.get_watcher_data(); - assert!(data.close_cb.is_none()); - data.close_cb = Some(cb); - } - - unsafe { - uvll::uv_close(self.native_handle() as *uvll::uv_handle_t, close_cb); - } - - extern fn close_cb(handle: *uvll::uv_handle_t) { - let mut h: Handle = NativeHandle::from_native_handle(handle); - h.get_watcher_data().close_cb.take_unwrap()(); - h.drop_watcher_data(); - unsafe { uvll::free_handle(handle as *c_void) } - } - } - - fn close_async(self) { - unsafe { - uvll::uv_close(self.native_handle() as *uvll::uv_handle_t, close_cb); - } - - extern fn close_cb(handle: *uvll::uv_handle_t) { - let mut h: Handle = NativeHandle::from_native_handle(handle); - h.drop_watcher_data(); - unsafe { uvll::free_handle(handle as *c_void) } - } - } -} - // XXX: Need to define the error constants like EOF so they can be // compared to the UvError type diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index e9f3f2bba4c..ef64b1e5cc5 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -8,18 +8,32 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use std::cast; use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char}; -use std::vec; +use std::ptr; +use std::rt::BlockedTask; +use std::rt::io::IoError; +use std::rt::io::net::ip::{Ipv4Addr, Ipv6Addr}; +use std::rt::local::Local; +use std::rt::io::net::ip::{SocketAddr, IpAddr}; +use std::rt::rtio; +use std::rt::sched::{Scheduler, SchedHandle}; +use std::rt::tube::Tube; use std::str; -use std::rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr}; +use std::vec; use uvll; use uvll::*; -use super::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, - UdpSendCallback, Loop, Watcher, Request, UvError, Buf, NativeHandle, - status_to_maybe_uv_error, empty_buf}; +use super::{ + Loop, Request, UvError, Buf, NativeHandle, + status_to_io_result, + uv_error_to_io_error, UvHandle, slice_to_uv_buf}; +use uvio::HomingIO; +use stream::StreamWatcher; -pub struct UvAddrInfo(*uvll::addrinfo); +//////////////////////////////////////////////////////////////////////////////// +/// Generic functions related to dealing with sockaddr things +//////////////////////////////////////////////////////////////////////////////// pub enum UvSocketAddr { UvIpv4SocketAddr(*sockaddr_in), @@ -113,394 +127,584 @@ fn test_ip6_conversion() { assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr)); } -// uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t -// and uv_file_t -pub struct StreamWatcher(*uvll::uv_stream_t); -impl Watcher for StreamWatcher { } +enum SocketNameKind { + TcpPeer, + Tcp, + Udp +} -impl StreamWatcher { - pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) { - unsafe { - match uvll::uv_read_start(self.native_handle(), alloc_cb, read_cb) { +fn socket_name(sk: SocketNameKind, handle: *c_void) -> Result { + let getsockname = match sk { + TcpPeer => uvll::tcp_getpeername, + Tcp => uvll::tcp_getsockname, + Udp => uvll::udp_getsockname, + }; + + // Allocate a sockaddr_storage + // since we don't know if it's ipv4 or ipv6 + let r_addr = unsafe { uvll::malloc_sockaddr_storage() }; + + let r = unsafe { + getsockname(handle, r_addr as *uvll::sockaddr_storage) + }; + + if r != 0 { + return Err(uv_error_to_io_error(UvError(r))); + } + + let addr = unsafe { + if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) { + uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6)) + } else { + uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in)) + } + }; + + unsafe { uvll::free_sockaddr_storage(r_addr); } + + Ok(addr) + +} + +//////////////////////////////////////////////////////////////////////////////// +/// TCP implementation +//////////////////////////////////////////////////////////////////////////////// + +pub struct TcpWatcher { + handle: *uvll::uv_tcp_t, + stream: StreamWatcher, + home: SchedHandle, +} + +pub struct TcpListener { + home: SchedHandle, + handle: *uvll::uv_pipe_t, + priv closing_task: Option, + priv outgoing: Tube>, +} + +pub struct TcpAcceptor { + listener: ~TcpListener, + priv incoming: Tube>, +} + +// TCP watchers (clients/streams) + +impl TcpWatcher { + pub fn new(loop_: &Loop) -> TcpWatcher { + let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) }; + assert_eq!(unsafe { + uvll::uv_tcp_init(loop_.native_handle(), handle) + }, 0); + TcpWatcher { + home: get_handle_to_current_scheduler!(), + handle: handle, + stream: StreamWatcher::new(handle), + } + } + + pub fn connect(loop_: &mut Loop, address: SocketAddr) + -> Result + { + struct Ctx { status: c_int, task: Option } + + let tcp = TcpWatcher::new(loop_); + let ret = do socket_addr_as_uv_socket_addr(address) |addr| { + let req = Request::new(uvll::UV_CONNECT); + let result = match addr { + UvIpv4SocketAddr(addr) => unsafe { + uvll::tcp_connect(req.handle, tcp.handle, addr, + connect_cb) + }, + UvIpv6SocketAddr(addr) => unsafe { + uvll::tcp_connect6(req.handle, tcp.handle, addr, + connect_cb) + }, + }; + match result { 0 => { - let data = self.get_watcher_data(); - data.alloc_cb = Some(alloc); - data.read_cb = Some(cb); + req.defuse(); + let mut cx = Ctx { status: 0, task: None }; + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + cx.task = Some(task); + } + match cx.status { + 0 => Ok(()), + n => Err(UvError(n)), + } } - n => { - cb(*self, 0, empty_buf(), Some(UvError(n))) - } - } - } - - extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf { - let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream); - let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref(); - return (*alloc_cb)(suggested_size as uint); - } - - extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) { - uvdebug!("buf addr: {}", buf.base); - uvdebug!("buf len: {}", buf.len); - let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream); - let cb = stream_watcher.get_watcher_data().read_cb.get_ref(); - let status = status_to_maybe_uv_error(nread as c_int); - (*cb)(stream_watcher, nread as int, buf, status); - } - } - - pub fn read_stop(&mut self) { - // It would be nice to drop the alloc and read callbacks here, - // but read_stop may be called from inside one of them and we - // would end up freeing the in-use environment - let handle = self.native_handle(); - unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); } - } - - pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) { - let req = WriteRequest::new(); - return unsafe { - match uvll::uv_write(req.native_handle(), self.native_handle(), - [buf], write_cb) { - 0 => { - let data = self.get_watcher_data(); - assert!(data.write_cb.is_none()); - data.write_cb = Some(cb); - } - n => { - req.delete(); - cb(*self, Some(UvError(n))) - } - } - }; - - extern fn write_cb(req: *uvll::uv_write_t, status: c_int) { - let write_request: WriteRequest = NativeHandle::from_native_handle(req); - let mut stream_watcher = write_request.stream(); - write_request.delete(); - let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap(); - let status = status_to_maybe_uv_error(status); - cb(stream_watcher, status); - } - } - - - pub fn listen(&mut self, cb: ConnectionCallback) -> Result<(), UvError> { - { - let data = self.get_watcher_data(); - assert!(data.connect_cb.is_none()); - data.connect_cb = Some(cb); - } - - return unsafe { - static BACKLOG: c_int = 128; // XXX should be configurable - match uvll::uv_listen(self.native_handle(), BACKLOG, connection_cb) { - 0 => Ok(()), n => Err(UvError(n)) } }; - extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) { - uvdebug!("connection_cb"); - let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle); - let cb = stream_watcher.get_watcher_data().connect_cb.get_ref(); - let status = status_to_maybe_uv_error(status); - (*cb)(stream_watcher, status); - } - } + return match ret { + Ok(()) => Ok(tcp), + Err(e) => Err(e), + }; - pub fn accept(&mut self, stream: StreamWatcher) { - let self_handle = self.native_handle() as *c_void; - let stream_handle = stream.native_handle() as *c_void; - assert_eq!(0, unsafe { uvll::uv_accept(self_handle, stream_handle) } ); - } -} - -impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher { - fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher { - StreamWatcher(handle) - } - fn native_handle(&self) -> *uvll::uv_stream_t { - match self { &StreamWatcher(ptr) => ptr } - } -} - -pub struct TcpWatcher(*uvll::uv_tcp_t); -impl Watcher for TcpWatcher { } - -impl TcpWatcher { - pub fn new(loop_: &Loop) -> TcpWatcher { - unsafe { - let handle = malloc_handle(UV_TCP); - assert!(handle.is_not_null()); - assert_eq!(0, uvll::uv_tcp_init(loop_.native_handle(), handle)); - let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle); - watcher.install_watcher_data(); - return watcher; - } - } - - pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> { - do socket_addr_as_uv_socket_addr(address) |addr| { - let result = unsafe { - match addr { - UvIpv4SocketAddr(addr) => uvll::tcp_bind(self.native_handle(), addr), - UvIpv6SocketAddr(addr) => uvll::tcp_bind6(self.native_handle(), addr), - } + extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) { + let _req = Request::wrap(req); + if status == uvll::ECANCELED { return } + let cx: &mut Ctx = unsafe { + cast::transmute(uvll::get_data_for_req(req)) }; - match result { - 0 => Ok(()), - _ => Err(UvError(result)), - } + cx.status = status; + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(cx.task.take_unwrap()); } } - - pub fn connect(&mut self, address: SocketAddr, cb: ConnectionCallback) { - unsafe { - assert!(self.get_watcher_data().connect_cb.is_none()); - self.get_watcher_data().connect_cb = Some(cb); - - let connect_handle = ConnectRequest::new().native_handle(); - uvdebug!("connect_t: {}", connect_handle); - do socket_addr_as_uv_socket_addr(address) |addr| { - let result = match addr { - UvIpv4SocketAddr(addr) => uvll::tcp_connect(connect_handle, - self.native_handle(), addr, connect_cb), - UvIpv6SocketAddr(addr) => uvll::tcp_connect6(connect_handle, - self.native_handle(), addr, connect_cb), - }; - assert_eq!(0, result); - } - - extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) { - uvdebug!("connect_t: {}", req); - let connect_request: ConnectRequest = NativeHandle::from_native_handle(req); - let mut stream_watcher = connect_request.stream(); - connect_request.delete(); - let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap(); - let status = status_to_maybe_uv_error(status); - cb(stream_watcher, status); - } - } - } - - pub fn as_stream(&self) -> StreamWatcher { - NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t) - } } -impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher { - fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher { - TcpWatcher(handle) - } - fn native_handle(&self) -> *uvll::uv_tcp_t { - match self { &TcpWatcher(ptr) => ptr } +impl HomingIO for TcpWatcher { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} + +impl rtio::RtioSocket for TcpWatcher { + fn socket_name(&mut self) -> Result { + let _m = self.fire_missiles(); + socket_name(Tcp, self.handle) } } -pub struct UdpWatcher(*uvll::uv_udp_t); -impl Watcher for UdpWatcher { } +impl rtio::RtioTcpStream for TcpWatcher { + fn read(&mut self, buf: &mut [u8]) -> Result { + let _m = self.fire_missiles(); + self.stream.read(buf).map_err(uv_error_to_io_error) + } + + fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { + let _m = self.fire_missiles(); + self.stream.write(buf).map_err(uv_error_to_io_error) + } + + fn peer_name(&mut self) -> Result { + let _m = self.fire_missiles(); + socket_name(TcpPeer, self.handle) + } + + fn control_congestion(&mut self) -> Result<(), IoError> { + let _m = self.fire_missiles(); + status_to_io_result(unsafe { + uvll::uv_tcp_nodelay(self.handle, 0 as c_int) + }) + } + + fn nodelay(&mut self) -> Result<(), IoError> { + let _m = self.fire_missiles(); + status_to_io_result(unsafe { + uvll::uv_tcp_nodelay(self.handle, 1 as c_int) + }) + } + + fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> { + let _m = self.fire_missiles(); + status_to_io_result(unsafe { + uvll::uv_tcp_keepalive(self.handle, 1 as c_int, + delay_in_seconds as c_uint) + }) + } + + fn letdie(&mut self) -> Result<(), IoError> { + let _m = self.fire_missiles(); + status_to_io_result(unsafe { + uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint) + }) + } +} + +impl Drop for TcpWatcher { + fn drop(&mut self) { + let _m = self.fire_missiles(); + self.stream.close(true); + } +} + +// TCP listeners (unbound servers) + +impl TcpListener { + pub fn bind(loop_: &mut Loop, address: SocketAddr) + -> Result<~TcpListener, UvError> + { + let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) }; + assert_eq!(unsafe { + uvll::uv_tcp_init(loop_.native_handle(), handle) + }, 0); + let l = ~TcpListener { + home: get_handle_to_current_scheduler!(), + handle: handle, + closing_task: None, + outgoing: Tube::new(), + }; + let res = socket_addr_as_uv_socket_addr(address, |addr| unsafe { + match addr { + UvIpv4SocketAddr(addr) => uvll::tcp_bind(l.handle, addr), + UvIpv6SocketAddr(addr) => uvll::tcp_bind6(l.handle, addr), + } + }); + match res { + 0 => Ok(l.install()), + n => Err(UvError(n)) + } + } +} + +impl HomingIO for TcpListener { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} + +impl UvHandle for TcpListener { + fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle } +} + +impl rtio::RtioSocket for TcpListener { + fn socket_name(&mut self) -> Result { + let _m = self.fire_missiles(); + socket_name(Tcp, self.handle) + } +} + +impl rtio::RtioTcpListener for TcpListener { + fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> { + // create the acceptor object from ourselves + let incoming = self.outgoing.clone(); + let mut acceptor = ~TcpAcceptor { + listener: self, + incoming: incoming, + }; + + let _m = acceptor.fire_missiles(); + // XXX: the 128 backlog should be configurable + match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } { + 0 => Ok(acceptor as ~rtio::RtioTcpAcceptor), + n => Err(uv_error_to_io_error(UvError(n))), + } + } +} + +extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) { + let msg = match status { + 0 => { + let loop_ = NativeHandle::from_native_handle(unsafe { + uvll::get_loop_for_uv_handle(server) + }); + let client = TcpWatcher::new(&loop_); + assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0); + Ok(~client as ~rtio::RtioTcpStream) + } + uvll::ECANCELED => return, + n => Err(uv_error_to_io_error(UvError(n))) + }; + + let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) }; + tcp.outgoing.send(msg); +} + +impl Drop for TcpListener { + fn drop(&mut self) { + let (_m, sched) = self.fire_missiles_sched(); + + do sched.deschedule_running_task_and_then |_, task| { + self.closing_task = Some(task); + unsafe { uvll::uv_close(self.handle, listener_close_cb) } + } + } +} + +extern fn listener_close_cb(handle: *uvll::uv_handle_t) { + let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&handle) }; + unsafe { uvll::free_handle(handle) } + + let sched: ~Scheduler = Local::take(); + sched.resume_blocked_task_immediately(tcp.closing_task.take_unwrap()); +} + +// TCP acceptors (bound servers) + +impl HomingIO for TcpAcceptor { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() } +} + +impl rtio::RtioSocket for TcpAcceptor { + fn socket_name(&mut self) -> Result { + let _m = self.fire_missiles(); + socket_name(Tcp, self.listener.handle) + } +} + +impl rtio::RtioTcpAcceptor for TcpAcceptor { + fn accept(&mut self) -> Result<~rtio::RtioTcpStream, IoError> { + let _m = self.fire_missiles(); + self.incoming.recv() + } + + fn accept_simultaneously(&mut self) -> Result<(), IoError> { + let _m = self.fire_missiles(); + status_to_io_result(unsafe { + uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1) + }) + } + + fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> { + let _m = self.fire_missiles(); + status_to_io_result(unsafe { + uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0) + }) + } +} + +//////////////////////////////////////////////////////////////////////////////// +/// UDP implementation +//////////////////////////////////////////////////////////////////////////////// + +pub struct UdpWatcher { + handle: *uvll::uv_udp_t, + home: SchedHandle, +} impl UdpWatcher { - pub fn new(loop_: &Loop) -> UdpWatcher { - unsafe { - let handle = malloc_handle(UV_UDP); - assert!(handle.is_not_null()); - assert_eq!(0, uvll::uv_udp_init(loop_.native_handle(), handle)); - let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle); - watcher.install_watcher_data(); - return watcher; - } - } - - pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> { - do socket_addr_as_uv_socket_addr(address) |addr| { - let result = unsafe { - match addr { - UvIpv4SocketAddr(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32), - UvIpv6SocketAddr(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32), - } - }; - match result { - 0 => Ok(()), - _ => Err(UvError(result)), + pub fn bind(loop_: &Loop, address: SocketAddr) + -> Result + { + let udp = UdpWatcher { + handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) }, + home: get_handle_to_current_scheduler!(), + }; + assert_eq!(unsafe { + uvll::uv_udp_init(loop_.native_handle(), udp.handle) + }, 0); + let result = socket_addr_as_uv_socket_addr(address, |addr| unsafe { + match addr { + UvIpv4SocketAddr(addr) => uvll::udp_bind(udp.handle, addr, 0u32), + UvIpv6SocketAddr(addr) => uvll::udp_bind6(udp.handle, addr, 0u32), } + }); + match result { + 0 => Ok(udp), + n => Err(UvError(n)), } } +} - pub fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) { - { - let data = self.get_watcher_data(); - data.alloc_cb = Some(alloc); - data.udp_recv_cb = Some(cb); +impl HomingIO for UdpWatcher { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} + +impl rtio::RtioSocket for UdpWatcher { + fn socket_name(&mut self) -> Result { + let _m = self.fire_missiles(); + socket_name(Udp, self.handle) + } +} + +impl rtio::RtioUdpSocket for UdpWatcher { + fn recvfrom(&mut self, buf: &mut [u8]) + -> Result<(uint, SocketAddr), IoError> + { + struct Ctx { + task: Option, + buf: Option, + result: Option<(ssize_t, SocketAddr)>, + } + let _m = self.fire_missiles(); + + return match unsafe { + uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb) + } { + 0 => { + let mut cx = Ctx { + task: None, + buf: Some(slice_to_uv_buf(buf)), + result: None, + }; + unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) } + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + cx.task = Some(task); + } + match cx.result.take_unwrap() { + (n, _) if n < 0 => + Err(uv_error_to_io_error(UvError(n as c_int))), + (n, addr) => Ok((n as uint, addr)) + } + } + n => Err(uv_error_to_io_error(UvError(n))) + }; + + extern fn alloc_cb(handle: *uvll::uv_udp_t, + _suggested_size: size_t) -> Buf { + let cx: &mut Ctx = unsafe { + cast::transmute(uvll::get_data_for_uv_handle(handle)) + }; + cx.buf.take().expect("alloc_cb called more than once") } - unsafe { uvll::uv_udp_recv_start(self.native_handle(), alloc_cb, recv_cb); } + extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, _buf: Buf, + addr: *uvll::sockaddr, _flags: c_uint) { - extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf { - let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle); - let alloc_cb = udp_watcher.get_watcher_data().alloc_cb.get_ref(); - return (*alloc_cb)(suggested_size as uint); - } - - extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf, - addr: *uvll::sockaddr, flags: c_uint) { // When there's no data to read the recv callback can be a no-op. // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring // this we just drop back to kqueue and wait for the next callback. - if nread == 0 { - return; + if nread == 0 { return } + if nread == uvll::ECANCELED as ssize_t { return } + + unsafe { + assert_eq!(uvll::uv_udp_recv_stop(handle), 0) } - uvdebug!("buf addr: {}", buf.base); - uvdebug!("buf len: {}", buf.len); - let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle); - let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref(); - let status = status_to_maybe_uv_error(nread as c_int); - let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr)); - (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status); - } - } - - pub fn recv_stop(&mut self) { - unsafe { uvll::uv_udp_recv_stop(self.native_handle()); } - } - - pub fn send(&mut self, buf: Buf, address: SocketAddr, cb: UdpSendCallback) { - { - let data = self.get_watcher_data(); - assert!(data.udp_send_cb.is_none()); - data.udp_send_cb = Some(cb); - } - - let req = UdpSendRequest::new(); - do socket_addr_as_uv_socket_addr(address) |addr| { - let result = unsafe { - match addr { - UvIpv4SocketAddr(addr) => uvll::udp_send(req.native_handle(), - self.native_handle(), [buf], addr, send_cb), - UvIpv6SocketAddr(addr) => uvll::udp_send6(req.native_handle(), - self.native_handle(), [buf], addr, send_cb), - } + let cx: &mut Ctx = unsafe { + cast::transmute(uvll::get_data_for_uv_handle(handle)) }; - assert_eq!(0, result); + let addr = sockaddr_to_UvSocketAddr(addr); + let addr = uv_socket_addr_to_socket_addr(addr); + cx.result = Some((nread, addr)); + + let sched: ~Scheduler = Local::take(); + sched.resume_blocked_task_immediately(cx.task.take_unwrap()); } + } + + fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> { + struct Ctx { task: Option, result: c_int } + + let _m = self.fire_missiles(); + + let req = Request::new(uvll::UV_UDP_SEND); + let buf = slice_to_uv_buf(buf); + let result = socket_addr_as_uv_socket_addr(dst, |dst| unsafe { + match dst { + UvIpv4SocketAddr(dst) => + uvll::udp_send(req.handle, self.handle, [buf], dst, send_cb), + UvIpv6SocketAddr(dst) => + uvll::udp_send6(req.handle, self.handle, [buf], dst, send_cb), + } + }); + + return match result { + 0 => { + let mut cx = Ctx { task: None, result: 0 }; + req.set_data(&cx); + req.defuse(); + + let sched: ~Scheduler = Local::take(); + do sched.deschedule_running_task_and_then |_, task| { + cx.task = Some(task); + } + + match cx.result { + 0 => Ok(()), + n => Err(uv_error_to_io_error(UvError(n))) + } + } + n => Err(uv_error_to_io_error(UvError(n))) + }; extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) { - let send_request: UdpSendRequest = NativeHandle::from_native_handle(req); - let mut udp_watcher = send_request.handle(); - send_request.delete(); - let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap(); - let status = status_to_maybe_uv_error(status); - cb(udp_watcher, status); + let req = Request::wrap(req); + let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) }; + cx.result = status; + + let sched: ~Scheduler = Local::take(); + sched.resume_blocked_task_immediately(cx.task.take_unwrap()); } } -} -impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher { - fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher { - UdpWatcher(handle) + fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> { + let _m = self.fire_missiles(); + status_to_io_result(unsafe { + do multi.to_str().with_c_str |m_addr| { + uvll::uv_udp_set_membership(self.handle, + m_addr, ptr::null(), + uvll::UV_JOIN_GROUP) + } + }) } - fn native_handle(&self) -> *uvll::uv_udp_t { - match self { &UdpWatcher(ptr) => ptr } + + fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> { + let _m = self.fire_missiles(); + status_to_io_result(unsafe { + do multi.to_str().with_c_str |m_addr| { + uvll::uv_udp_set_membership(self.handle, + m_addr, ptr::null(), + uvll::UV_LEAVE_GROUP) + } + }) + } + + fn loop_multicast_locally(&mut self) -> Result<(), IoError> { + let _m = self.fire_missiles(); + status_to_io_result(unsafe { + uvll::uv_udp_set_multicast_loop(self.handle, + 1 as c_int) + }) + } + + fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> { + let _m = self.fire_missiles(); + status_to_io_result(unsafe { + uvll::uv_udp_set_multicast_loop(self.handle, + 0 as c_int) + }) + } + + fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> { + let _m = self.fire_missiles(); + status_to_io_result(unsafe { + uvll::uv_udp_set_multicast_ttl(self.handle, + ttl as c_int) + }) + } + + fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> { + let _m = self.fire_missiles(); + status_to_io_result(unsafe { + uvll::uv_udp_set_ttl(self.handle, ttl as c_int) + }) + } + + fn hear_broadcasts(&mut self) -> Result<(), IoError> { + let _m = self.fire_missiles(); + status_to_io_result(unsafe { + uvll::uv_udp_set_broadcast(self.handle, + 1 as c_int) + }) + } + + fn ignore_broadcasts(&mut self) -> Result<(), IoError> { + let _m = self.fire_missiles(); + status_to_io_result(unsafe { + uvll::uv_udp_set_broadcast(self.handle, + 0 as c_int) + }) } } -// uv_connect_t is a subclass of uv_req_t -pub struct ConnectRequest(*uvll::uv_connect_t); -impl Request for ConnectRequest { } - -impl ConnectRequest { - - pub fn new() -> ConnectRequest { - let connect_handle = unsafe { malloc_req(UV_CONNECT) }; - assert!(connect_handle.is_not_null()); - ConnectRequest(connect_handle as *uvll::uv_connect_t) - } - - fn stream(&self) -> StreamWatcher { +impl Drop for UdpWatcher { + fn drop(&mut self) { + // Send ourselves home to close this handle (blocking while doing so). + let (_m, sched) = self.fire_missiles_sched(); + let mut slot = None; unsafe { - let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle()); - NativeHandle::from_native_handle(stream_handle) + uvll::set_data_for_uv_handle(self.handle, &slot); + uvll::uv_close(self.handle, close_cb); + } + do sched.deschedule_running_task_and_then |_, task| { + slot = Some(task); + } + + extern fn close_cb(handle: *uvll::uv_handle_t) { + let slot: &mut Option = unsafe { + cast::transmute(uvll::get_data_for_uv_handle(handle)) + }; + let sched: ~Scheduler = Local::take(); + sched.resume_blocked_task_immediately(slot.take_unwrap()); } } - - fn delete(self) { - unsafe { free_req(self.native_handle() as *c_void) } - } } -impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest { - fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest { - ConnectRequest(handle) - } - fn native_handle(&self) -> *uvll::uv_connect_t { - match self { &ConnectRequest(ptr) => ptr } - } -} - -pub struct WriteRequest(*uvll::uv_write_t); - -impl Request for WriteRequest { } - -impl WriteRequest { - pub fn new() -> WriteRequest { - let write_handle = unsafe { malloc_req(UV_WRITE) }; - assert!(write_handle.is_not_null()); - WriteRequest(write_handle as *uvll::uv_write_t) - } - - pub fn stream(&self) -> StreamWatcher { - unsafe { - let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle()); - NativeHandle::from_native_handle(stream_handle) - } - } - - pub fn delete(self) { - unsafe { free_req(self.native_handle() as *c_void) } - } -} - -impl NativeHandle<*uvll::uv_write_t> for WriteRequest { - fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest { - WriteRequest(handle) - } - fn native_handle(&self) -> *uvll::uv_write_t { - match self { &WriteRequest(ptr) => ptr } - } -} - -pub struct UdpSendRequest(*uvll::uv_udp_send_t); -impl Request for UdpSendRequest { } - -impl UdpSendRequest { - pub fn new() -> UdpSendRequest { - let send_handle = unsafe { malloc_req(UV_UDP_SEND) }; - assert!(send_handle.is_not_null()); - UdpSendRequest(send_handle as *uvll::uv_udp_send_t) - } - - pub fn handle(&self) -> UdpWatcher { - let send_request_handle = unsafe { - uvll::get_udp_handle_from_send_req(self.native_handle()) - }; - NativeHandle::from_native_handle(send_request_handle) - } - - pub fn delete(self) { - unsafe { free_req(self.native_handle() as *c_void) } - } -} - -impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest { - fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest { - UdpSendRequest(handle) - } - fn native_handle(&self) -> *uvll::uv_udp_send_t { - match self { &UdpSendRequest(ptr) => ptr } - } -} +//////////////////////////////////////////////////////////////////////////////// +/// UV request support +//////////////////////////////////////////////////////////////////////////////// #[cfg(test)] mod test { diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index a857308a81b..2a41dd9efe1 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -19,7 +19,7 @@ use std::rt::sched::{Scheduler, SchedHandle}; use std::rt::tube::Tube; use stream::StreamWatcher; -use super::{Loop, UvError, NativeHandle, uv_error_to_io_error, UvHandle}; +use super::{Loop, UvError, NativeHandle, uv_error_to_io_error, UvHandle, Request}; use uvio::HomingIO; use uvll; @@ -79,23 +79,26 @@ impl PipeWatcher { result: Option>, } let mut cx = Ctx { task: None, result: None }; - let req = unsafe { uvll::malloc_req(uvll::UV_CONNECT) }; - unsafe { uvll::set_data_for_req(req, &cx as *Ctx) } + let req = Request::new(uvll::UV_CONNECT); + unsafe { + uvll::set_data_for_req(req.handle, &cx as *Ctx); + uvll::uv_pipe_connect(req.handle, + PipeWatcher::alloc(loop_, false), + name.with_ref(|p| p), + connect_cb) + } + req.defuse(); let sched: ~Scheduler = Local::take(); do sched.deschedule_running_task_and_then |_, task| { cx.task = Some(task); - unsafe { - uvll::uv_pipe_connect(req, - PipeWatcher::alloc(loop_, false), - name.with_ref(|p| p), - connect_cb) - } } assert!(cx.task.is_none()); return cx.result.take().expect("pipe connect needs a result"); extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) { + let _req = Request::wrap(req); + if status == uvll::ECANCELED { return } unsafe { let cx: &mut Ctx = cast::transmute(uvll::get_data_for_req(req)); let stream = uvll::get_stream_handle_from_connect_req(req); @@ -106,7 +109,6 @@ impl PipeWatcher { Err(UvError(n)) } }); - uvll::free_req(req); let sched: ~Scheduler = Local::take(); sched.resume_blocked_task_immediately(cx.task.take_unwrap()); @@ -201,6 +203,7 @@ extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) { assert_eq!(unsafe { uvll::uv_accept(server, client) }, 0); Ok(~PipeWatcher::new(client) as ~RtioPipe) } + uvll::ECANCELED => return, n => Err(uv_error_to_io_error(UvError(n))) }; diff --git a/src/librustuv/process.rs b/src/librustuv/process.rs index 50964d7a84c..7b44c350f13 100644 --- a/src/librustuv/process.rs +++ b/src/librustuv/process.rs @@ -8,7 +8,6 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::cell::Cell; use std::libc::c_int; use std::libc; use std::ptr; @@ -58,8 +57,7 @@ impl Process { } } - let ret_io = Cell::new(ret_io); - do with_argv(config.program, config.args) |argv| { + let ret = do with_argv(config.program, config.args) |argv| { do with_env(config.env) |envp| { let options = uvll::uv_process_options_t { exit_cb: on_exit, @@ -89,7 +87,7 @@ impl Process { exit_status: None, term_signal: None, }; - Ok((process.install(), ret_io.take())) + Ok(process.install()) } err => { unsafe { uvll::free_handle(handle) } @@ -97,6 +95,11 @@ impl Process { } } } + }; + + match ret { + Ok(p) => Ok((p, ret_io)), + Err(e) => Err(e), } } } diff --git a/src/librustuv/stream.rs b/src/librustuv/stream.rs index ad0deebd457..01bc02a50be 100644 --- a/src/librustuv/stream.rs +++ b/src/librustuv/stream.rs @@ -15,7 +15,7 @@ use std::rt::BlockedTask; use std::rt::local::Local; use std::rt::sched::Scheduler; -use super::{UvError, Buf, slice_to_uv_buf}; +use super::{UvError, Buf, slice_to_uv_buf, Request}; use uvll; // This is a helper structure which is intended to get embedded into other @@ -29,17 +29,17 @@ pub struct StreamWatcher { // every call to uv_write(). Ideally this would be a stack-allocated // structure, but currently we don't have mappings for all the structures // defined in libuv, so we're foced to malloc this. - priv last_write_req: Option<*uvll::uv_write_t>, + priv last_write_req: Option, } struct ReadContext { buf: Option, - result: Option>, + result: ssize_t, task: Option, } struct WriteContext { - result: Option>, + result: c_int, task: Option, } @@ -72,7 +72,7 @@ impl StreamWatcher { 0 => { let mut rcx = ReadContext { buf: Some(slice_to_uv_buf(buf)), - result: None, + result: 0, task: None, }; unsafe { @@ -82,7 +82,10 @@ impl StreamWatcher { do scheduler.deschedule_running_task_and_then |_sched, task| { rcx.task = Some(task); } - rcx.result.take().expect("no result in read stream?") + match rcx.result { + n if n < 0 => Err(UvError(n as c_int)), + n => Ok(n as uint), + } } n => Err(UvError(n)) } @@ -91,27 +94,29 @@ impl StreamWatcher { pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> { // Prepare the write request, either using a cached one or allocating a // new one - let req = match self.last_write_req { - Some(req) => req, - None => unsafe { uvll::malloc_req(uvll::UV_WRITE) }, - }; - self.last_write_req = Some(req); - let mut wcx = WriteContext { result: None, task: None, }; - unsafe { uvll::set_data_for_req(req, &wcx as *WriteContext) } + if self.last_write_req.is_none() { + self.last_write_req = Some(Request::new(uvll::UV_WRITE)); + } + let req = self.last_write_req.get_ref(); // Send off the request, but be careful to not block until we're sure // that the write reqeust is queued. If the reqeust couldn't be queued, // then we should return immediately with an error. match unsafe { - uvll::uv_write(req, self.handle, [slice_to_uv_buf(buf)], write_cb) + uvll::uv_write(req.handle, self.handle, [slice_to_uv_buf(buf)], + write_cb) } { 0 => { + let mut wcx = WriteContext { result: 0, task: None, }; + req.set_data(&wcx); let scheduler: ~Scheduler = Local::take(); do scheduler.deschedule_running_task_and_then |_sched, task| { wcx.task = Some(task); } - assert!(wcx.task.is_none()); - wcx.result.take().expect("no result in write stream?") + match wcx.result { + 0 => Ok(()), + n => Err(UvError(n)), + } } n => Err(UvError(n)), } @@ -124,12 +129,6 @@ impl StreamWatcher { // synchronously (the task is blocked) or asynchronously (the task is not // block, but the handle is still deallocated). pub fn close(&mut self, synchronous: bool) { - // clean up the cached write request if we have one - match self.last_write_req { - Some(req) => unsafe { uvll::free_req(req) }, - None => {} - } - if synchronous { let mut closing_task = None; unsafe { @@ -186,31 +185,24 @@ extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: Buf) { // XXX: Is there a performance impact to calling // stop here? unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); } + rcx.result = nread; - assert!(rcx.result.is_none()); - rcx.result = Some(match nread { - n if n < 0 => Err(UvError(n as c_int)), - n => Ok(n as uint), - }); - - let task = rcx.task.take().expect("read_cb needs a task"); let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task); + scheduler.resume_blocked_task_immediately(rcx.task.take_unwrap()); } // Unlike reading, the WriteContext is stored in the uv_write_t request. Like // reading, however, all this does is wake up the blocked task after squirreling // away the error code as a result. extern fn write_cb(req: *uvll::uv_write_t, status: c_int) { + if status == uvll::ECANCELED { return } // Remember to not free the request because it is re-used between writes on // the same stream. - unsafe { - let wcx: &mut WriteContext = cast::transmute(uvll::get_data_for_req(req)); - wcx.result = Some(match status { - 0 => Ok(()), - n => Err(UvError(n)), - }); - let sched: ~Scheduler = Local::take(); - sched.resume_blocked_task_immediately(wcx.task.take_unwrap()); - } + let req = Request::wrap(req); + let wcx: &mut WriteContext = unsafe { cast::transmute(req.get_data()) }; + wcx.result = status; + + let sched: ~Scheduler = Local::take(); + sched.resume_blocked_task_immediately(wcx.task.take_unwrap()); + req.defuse(); } diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index 1732e84be4e..46731993bc7 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -8,7 +8,6 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::cell::Cell; use std::comm::{oneshot, stream, PortOne, ChanOne, SendDeferred}; use std::libc::c_int; use std::rt::BlockedTask; @@ -77,10 +76,9 @@ impl RtioTimer for TimerWatcher { fn oneshot(&mut self, msecs: u64) -> PortOne<()> { let (port, chan) = oneshot(); - let chan = Cell::new(chan); let _m = self.fire_missiles(); - self.action = Some(SendOnce(chan.take())); + self.action = Some(SendOnce(chan)); self.start(msecs, 0); return port; @@ -88,10 +86,9 @@ impl RtioTimer for TimerWatcher { fn period(&mut self, msecs: u64) -> Port<()> { let (port, chan) = stream(); - let chan = Cell::new(chan); let _m = self.fire_missiles(); - self.action = Some(SendMany(chan.take())); + self.action = Some(SendMany(chan)); self.start(msecs, msecs); return port; diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs index 1c6e59d9f2e..d0a160ba8ce 100644 --- a/src/librustuv/uvio.rs +++ b/src/librustuv/uvio.rs @@ -11,22 +11,17 @@ use std::c_str::CString; use std::cast::transmute; use std::cast; -use std::cell::Cell; -use std::clone::Clone; use std::comm::{SharedChan, GenericChan}; use std::libc; -use std::libc::{c_int, c_uint, c_void}; -use std::ptr; +use std::libc::c_int; use std::str; use std::rt::io; use std::rt::io::IoError; -use std::rt::io::net::ip::{SocketAddr, IpAddr}; -use std::rt::io::{standard_error, OtherIoError}; +use std::rt::io::net::ip::SocketAddr; use std::rt::io::process::ProcessConfig; use std::rt::local::Local; use std::rt::rtio::*; use std::rt::sched::{Scheduler, SchedHandle}; -use std::rt::tube::Tube; use std::rt::task::Task; use std::path::Path; use std::libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, @@ -45,9 +40,7 @@ use ai = std::rt::io::net::addrinfo; use super::*; use idle::IdleWatcher; -use net::{UvIpv4SocketAddr, UvIpv6SocketAddr}; use addrinfo::GetAddrInfoRequest; -use pipe::PipeListener; // XXX we should not be calling uvll functions in here. @@ -137,47 +130,6 @@ impl Drop for HomingMissile { } } -enum SocketNameKind { - TcpPeer, - Tcp, - Udp -} - -fn socket_name>(sk: SocketNameKind, - handle: U) -> Result { - let getsockname = match sk { - TcpPeer => uvll::tcp_getpeername, - Tcp => uvll::tcp_getsockname, - Udp => uvll::udp_getsockname, - }; - - // Allocate a sockaddr_storage - // since we don't know if it's ipv4 or ipv6 - let r_addr = unsafe { uvll::malloc_sockaddr_storage() }; - - let r = unsafe { - getsockname(handle.native_handle() as *c_void, r_addr as *uvll::sockaddr_storage) - }; - - if r != 0 { - let status = status_to_maybe_uv_error(r); - return Err(uv_error_to_io_error(status.unwrap())); - } - - let addr = unsafe { - if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) { - net::uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6)) - } else { - net::uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in)) - } - }; - - unsafe { uvll::free_sockaddr_storage(r_addr); } - - Ok(addr) - -} - // Obviously an Event Loop is always home. pub struct UvEventLoop { priv uvio: UvIoFactory @@ -251,97 +203,26 @@ impl IoFactory for UvIoFactory { // Connect to an address and return a new stream // NB: This blocks the task waiting on the connection. // It would probably be better to return a future - fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStream, IoError> { - // Create a cell in the task to hold the result. We will fill - // the cell before resuming the task. - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - - // Block this task and take ownership, switch to scheduler context - do task::unkillable { // FIXME(#8674) - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - - let mut tcp = TcpWatcher::new(self.uv_loop()); - let task_cell = Cell::new(task); - - // Wait for a connection - do tcp.connect(addr) |stream, status| { - match status { - None => { - let tcp = NativeHandle::from_native_handle(stream.native_handle()); - let home = get_handle_to_current_scheduler!(); - let res = Ok(~UvTcpStream { watcher: tcp, home: home } - as ~RtioTcpStream); - - // Store the stream in the task's stack - unsafe { (*result_cell_ptr).put_back(res); } - - // Context switch - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - Some(_) => { - let task_cell = Cell::new(task_cell.take()); - do stream.close { - let res = Err(uv_error_to_io_error(status.unwrap())); - unsafe { (*result_cell_ptr).put_back(res); } - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - } - } - } + fn tcp_connect(&mut self, addr: SocketAddr) + -> Result<~RtioTcpStream, IoError> + { + match TcpWatcher::connect(self.uv_loop(), addr) { + Ok(t) => Ok(~t as ~RtioTcpStream), + Err(e) => Err(uv_error_to_io_error(e)), } - - assert!(!result_cell.is_empty()); - return result_cell.take(); } fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListener, IoError> { - let mut watcher = TcpWatcher::new(self.uv_loop()); - match watcher.bind(addr) { - Ok(_) => { - let home = get_handle_to_current_scheduler!(); - Ok(~UvTcpListener::new(watcher, home) as ~RtioTcpListener) - } - Err(uverr) => { - do task::unkillable { // FIXME(#8674) - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do watcher.as_stream().close { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - Err(uv_error_to_io_error(uverr)) - } - } + match TcpListener::bind(self.uv_loop(), addr) { + Ok(t) => Ok(t as ~RtioTcpListener), + Err(e) => Err(uv_error_to_io_error(e)), } } fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocket, IoError> { - let mut watcher = UdpWatcher::new(self.uv_loop()); - match watcher.bind(addr) { - Ok(_) => { - let home = get_handle_to_current_scheduler!(); - Ok(~UvUdpSocket { watcher: watcher, home: home } as ~RtioUdpSocket) - } - Err(uverr) => { - do task::unkillable { // FIXME(#8674) - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do watcher.close { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - Err(uv_error_to_io_error(uverr)) - } - } + match UdpWatcher::bind(self.uv_loop(), addr) { + Ok(u) => Ok(~u as ~RtioUdpSocket), + Err(e) => Err(uv_error_to_io_error(e)), } } @@ -487,416 +368,6 @@ impl IoFactory for UvIoFactory { } } -pub struct UvTcpListener { - priv watcher : TcpWatcher, - priv home: SchedHandle, -} - -impl HomingIO for UvTcpListener { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } -} - -impl UvTcpListener { - fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener { - UvTcpListener { watcher: watcher, home: home } - } -} - -impl Drop for UvTcpListener { - fn drop(&mut self) { - let (_m, sched) = self.fire_homing_missile_sched(); - do sched.deschedule_running_task_and_then |_, task| { - let task = Cell::new(task); - do self.watcher.as_stream().close { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task.take()); - } - } - } -} - -impl RtioSocket for UvTcpListener { - fn socket_name(&mut self) -> Result { - let _m = self.fire_homing_missile(); - socket_name(Tcp, self.watcher) - } -} - -impl RtioTcpListener for UvTcpListener { - fn listen(mut ~self) -> Result<~RtioTcpAcceptor, IoError> { - let _m = self.fire_homing_missile(); - let acceptor = ~UvTcpAcceptor::new(*self); - let incoming = Cell::new(acceptor.incoming.clone()); - let mut stream = acceptor.listener.watcher.as_stream(); - let res = do stream.listen |mut server, status| { - do incoming.with_mut_ref |incoming| { - let inc = match status { - Some(_) => Err(standard_error(OtherIoError)), - None => { - let inc = TcpWatcher::new(&server.event_loop()); - // first accept call in the callback guarenteed to succeed - server.accept(inc.as_stream()); - let home = get_handle_to_current_scheduler!(); - Ok(~UvTcpStream { watcher: inc, home: home } - as ~RtioTcpStream) - } - }; - incoming.send(inc); - } - }; - match res { - Ok(()) => Ok(acceptor as ~RtioTcpAcceptor), - Err(e) => Err(uv_error_to_io_error(e)), - } - } -} - -pub struct UvTcpAcceptor { - priv listener: UvTcpListener, - priv incoming: Tube>, -} - -impl HomingIO for UvTcpAcceptor { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() } -} - -impl UvTcpAcceptor { - fn new(listener: UvTcpListener) -> UvTcpAcceptor { - UvTcpAcceptor { listener: listener, incoming: Tube::new() } - } -} - -impl RtioSocket for UvTcpAcceptor { - fn socket_name(&mut self) -> Result { - let _m = self.fire_homing_missile(); - socket_name(Tcp, self.listener.watcher) - } -} - -fn accept_simultaneously(stream: StreamWatcher, a: int) -> Result<(), IoError> { - let r = unsafe { - uvll::uv_tcp_simultaneous_accepts(stream.native_handle(), a as c_int) - }; - status_to_io_result(r) -} - -impl RtioTcpAcceptor for UvTcpAcceptor { - fn accept(&mut self) -> Result<~RtioTcpStream, IoError> { - let _m = self.fire_homing_missile(); - self.incoming.recv() - } - - fn accept_simultaneously(&mut self) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); - accept_simultaneously(self.listener.watcher.as_stream(), 1) - } - - fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); - accept_simultaneously(self.listener.watcher.as_stream(), 0) - } -} - -fn read_stream(mut watcher: StreamWatcher, - scheduler: ~Scheduler, - buf: &mut [u8]) -> Result { - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - - let uv_buf = slice_to_uv_buf(buf); - do scheduler.deschedule_running_task_and_then |_sched, task| { - let task_cell = Cell::new(task); - // XXX: We shouldn't reallocate these callbacks every - // call to read - let alloc: AllocCallback = |_| uv_buf; - do watcher.read_start(alloc) |mut watcher, nread, _buf, status| { - - // Stop reading so that no read callbacks are - // triggered before the user calls `read` again. - // XXX: Is there a performance impact to calling - // stop here? - watcher.read_stop(); - - let result = if status.is_none() { - assert!(nread >= 0); - Ok(nread as uint) - } else { - Err(uv_error_to_io_error(status.unwrap())) - }; - - unsafe { (*result_cell_ptr).put_back(result); } - - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - - assert!(!result_cell.is_empty()); - result_cell.take() -} - -fn write_stream(mut watcher: StreamWatcher, - scheduler: ~Scheduler, - buf: &[u8]) -> Result<(), IoError> { - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; - do watcher.write(buf) |_watcher, status| { - let result = if status.is_none() { - Ok(()) - } else { - Err(uv_error_to_io_error(status.unwrap())) - }; - - unsafe { (*result_cell_ptr).put_back(result); } - - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - - assert!(!result_cell.is_empty()); - result_cell.take() -} - -pub struct UvTcpStream { - priv watcher: TcpWatcher, - priv home: SchedHandle, -} - -impl HomingIO for UvTcpStream { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } -} - -impl Drop for UvTcpStream { - fn drop(&mut self) { - let (_m, sched) = self.fire_homing_missile_sched(); - do sched.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self.watcher.as_stream().close { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - } -} - -impl RtioSocket for UvTcpStream { - fn socket_name(&mut self) -> Result { - let _m = self.fire_homing_missile(); - socket_name(Tcp, self.watcher) - } -} - -impl RtioTcpStream for UvTcpStream { - fn read(&mut self, buf: &mut [u8]) -> Result { - let (_m, scheduler) = self.fire_homing_missile_sched(); - read_stream(self.watcher.as_stream(), scheduler, buf) - } - - fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { - let (_m, scheduler) = self.fire_homing_missile_sched(); - write_stream(self.watcher.as_stream(), scheduler, buf) - } - - fn peer_name(&mut self) -> Result { - let _m = self.fire_homing_missile(); - socket_name(TcpPeer, self.watcher) - } - - fn control_congestion(&mut self) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); - status_to_io_result(unsafe { - uvll::uv_tcp_nodelay(self.watcher.native_handle(), 0 as c_int) - }) - } - - fn nodelay(&mut self) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); - status_to_io_result(unsafe { - uvll::uv_tcp_nodelay(self.watcher.native_handle(), 1 as c_int) - }) - } - - fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); - status_to_io_result(unsafe { - uvll::uv_tcp_keepalive(self.watcher.native_handle(), 1 as c_int, - delay_in_seconds as c_uint) - }) - } - - fn letdie(&mut self) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); - status_to_io_result(unsafe { - uvll::uv_tcp_keepalive(self.watcher.native_handle(), - 0 as c_int, 0 as c_uint) - }) - } -} - -pub struct UvUdpSocket { - priv watcher: UdpWatcher, - priv home: SchedHandle, -} - -impl HomingIO for UvUdpSocket { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } -} - -impl Drop for UvUdpSocket { - fn drop(&mut self) { - let (_m, scheduler) = self.fire_homing_missile_sched(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self.watcher.close { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - } -} - -impl RtioSocket for UvUdpSocket { - fn socket_name(&mut self) -> Result { - let _m = self.fire_homing_missile(); - socket_name(Udp, self.watcher) - } -} - -impl RtioUdpSocket for UvUdpSocket { - fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> { - let (_m, scheduler) = self.fire_homing_missile_sched(); - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - - let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; - do self.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| { - let _ = flags; // /XXX add handling for partials? - - watcher.recv_stop(); - - let result = match status { - None => { - assert!(nread >= 0); - Ok((nread as uint, addr)) - } - Some(err) => Err(uv_error_to_io_error(err)), - }; - - unsafe { (*result_cell_ptr).put_back(result); } - - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - - assert!(!result_cell.is_empty()); - result_cell.take() - } - - fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> { - let (_m, scheduler) = self.fire_homing_missile_sched(); - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; - do self.watcher.send(buf, dst) |_watcher, status| { - - let result = match status { - None => Ok(()), - Some(err) => Err(uv_error_to_io_error(err)), - }; - - unsafe { (*result_cell_ptr).put_back(result); } - - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - - assert!(!result_cell.is_empty()); - result_cell.take() - } - - fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); - status_to_io_result(unsafe { - do multi.to_str().with_c_str |m_addr| { - uvll::uv_udp_set_membership(self.watcher.native_handle(), - m_addr, ptr::null(), - uvll::UV_JOIN_GROUP) - } - }) - } - - fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); - status_to_io_result(unsafe { - do multi.to_str().with_c_str |m_addr| { - uvll::uv_udp_set_membership(self.watcher.native_handle(), - m_addr, ptr::null(), - uvll::UV_LEAVE_GROUP) - } - }) - } - - fn loop_multicast_locally(&mut self) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); - status_to_io_result(unsafe { - uvll::uv_udp_set_multicast_loop(self.watcher.native_handle(), - 1 as c_int) - }) - } - - fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); - status_to_io_result(unsafe { - uvll::uv_udp_set_multicast_loop(self.watcher.native_handle(), - 0 as c_int) - }) - } - - fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); - status_to_io_result(unsafe { - uvll::uv_udp_set_multicast_ttl(self.watcher.native_handle(), - ttl as c_int) - }) - } - - fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); - status_to_io_result(unsafe { - uvll::uv_udp_set_ttl(self.watcher.native_handle(), ttl as c_int) - }) - } - - fn hear_broadcasts(&mut self) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); - status_to_io_result(unsafe { - uvll::uv_udp_set_broadcast(self.watcher.native_handle(), - 1 as c_int) - }) - } - - fn ignore_broadcasts(&mut self) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); - status_to_io_result(unsafe { - uvll::uv_udp_set_broadcast(self.watcher.native_handle(), - 0 as c_int) - }) - } -} - // this function is full of lies unsafe fn local_io() -> &'static mut IoFactory { do Local::borrow |sched: &mut Scheduler| { diff --git a/src/librustuv/uvll.rs b/src/librustuv/uvll.rs index a32f03732d6..42e0f58d87d 100644 --- a/src/librustuv/uvll.rs +++ b/src/librustuv/uvll.rs @@ -53,6 +53,7 @@ pub mod errors { pub static ENOTCONN: c_int = -4054; pub static EPIPE: c_int = -4048; pub static ECONNABORTED: c_int = -4080; + pub static ECANCELED: c_int = -4082; } #[cfg(not(windows))] pub mod errors { @@ -65,6 +66,7 @@ pub mod errors { pub static ENOTCONN: c_int = -libc::ENOTCONN; pub static EPIPE: c_int = -libc::EPIPE; pub static ECONNABORTED: c_int = -libc::ECONNABORTED; + pub static ECANCELED : c_int = -libc::ECANCELED; } pub static PROCESS_SETUID: c_int = 1 << 0; @@ -127,6 +129,7 @@ pub struct uv_stdio_container_t { } pub type uv_handle_t = c_void; +pub type uv_req_t = c_void; pub type uv_loop_t = c_void; pub type uv_idle_t = c_void; pub type uv_tcp_t = c_void;