auto merge of #8040 : luqmana/rust/rtn, r=brson

Implements various missing tcp & udp methods.. Also fixes handling ipv4-mapped/compatible ipv6 addresses and addresses the XXX on `status_to_maybe_uv_error`.

r? @brson
This commit is contained in:
bors 2013-07-27 01:49:35 -07:00
commit 15310ba7c2
15 changed files with 546 additions and 154 deletions

View File

@ -8,6 +8,9 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use num::FromStrRadix;
use to_str::ToStr;
type Port = u16;
#[deriving(Eq, TotalEq)]
@ -15,3 +18,42 @@ pub enum IpAddr {
Ipv4(u8, u8, u8, u8, Port),
Ipv6(u16, u16, u16, u16, u16, u16, u16, u16, Port)
}
impl ToStr for IpAddr {
fn to_str(&self) -> ~str {
match *self {
Ipv4(a, b, c, d, p) =>
fmt!("%u.%u.%u.%u:%u",
a as uint, b as uint, c as uint, d as uint, p as uint),
// Ipv4 Compatible address
Ipv6(0, 0, 0, 0, 0, 0, g, h, p) => {
let a = fmt!("%04x", g as uint);
let b = FromStrRadix::from_str_radix(a.slice(2, 4), 16).unwrap();
let a = FromStrRadix::from_str_radix(a.slice(0, 2), 16).unwrap();
let c = fmt!("%04x", h as uint);
let d = FromStrRadix::from_str_radix(c.slice(2, 4), 16).unwrap();
let c = FromStrRadix::from_str_radix(c.slice(0, 2), 16).unwrap();
fmt!("[::%u.%u.%u.%u]:%u", a, b, c, d, p as uint)
}
// Ipv4-Mapped address
Ipv6(0, 0, 0, 0, 0, 1, g, h, p) => {
let a = fmt!("%04x", g as uint);
let b = FromStrRadix::from_str_radix(a.slice(2, 4), 16).unwrap();
let a = FromStrRadix::from_str_radix(a.slice(0, 2), 16).unwrap();
let c = fmt!("%04x", h as uint);
let d = FromStrRadix::from_str_radix(c.slice(2, 4), 16).unwrap();
let c = FromStrRadix::from_str_radix(c.slice(0, 2), 16).unwrap();
fmt!("[::FFFF:%u.%u.%u.%u]:%u", a, b, c, d, p as uint)
}
Ipv6(a, b, c, d, e, f, g, h, p) =>
fmt!("[%x:%x:%x:%x:%x:%x:%x:%x]:%u",
a as uint, b as uint, c as uint, d as uint,
e as uint, f as uint, g as uint, h as uint, p as uint)
}
}
}

View File

@ -14,8 +14,9 @@ use rt::io::net::ip::IpAddr;
use rt::io::{Reader, Writer, Listener};
use rt::io::{io_error, read_error, EndOfFile};
use rt::rtio::{IoFactory, IoFactoryObject,
RtioTcpListener, RtioTcpListenerObject,
RtioTcpStream, RtioTcpStreamObject};
RtioSocket, RtioTcpListener,
RtioTcpListenerObject, RtioTcpStream,
RtioTcpStreamObject};
use rt::local::Local;
pub struct TcpStream(~RtioTcpStreamObject);
@ -42,6 +43,28 @@ impl TcpStream {
}
}
}
pub fn peer_name(&mut self) -> Option<IpAddr> {
match (**self).peer_name() {
Ok(pn) => Some(pn),
Err(ioerr) => {
rtdebug!("failed to get peer name: %?", ioerr);
io_error::cond.raise(ioerr);
None
}
}
}
pub fn socket_name(&mut self) -> Option<IpAddr> {
match (**self).socket_name() {
Ok(sn) => Some(sn),
Err(ioerr) => {
rtdebug!("failed to get socket name: %?", ioerr);
io_error::cond.raise(ioerr);
None
}
}
}
}
impl Reader for TcpStream {
@ -90,6 +113,17 @@ impl TcpListener {
}
}
}
pub fn socket_name(&mut self) -> Option<IpAddr> {
match (**self).socket_name() {
Ok(sn) => Some(sn),
Err(ioerr) => {
rtdebug!("failed to get socket name: %?", ioerr);
io_error::cond.raise(ioerr);
None
}
}
}
}
impl Listener<TcpStream> for TcpListener {
@ -534,4 +568,61 @@ mod test {
}
}
#[cfg(test)]
fn socket_name(addr: IpAddr) {
do run_in_newsched_task {
do spawntask_immediately {
let listener = TcpListener::bind(addr);
assert!(listener.is_some());
let mut listener = listener.unwrap();
// Make sure socket_name gives
// us the socket we binded to.
let so_name = listener.socket_name();
assert!(so_name.is_some());
assert_eq!(addr, so_name.unwrap());
}
}
}
#[cfg(test)]
fn peer_name(addr: IpAddr) {
do run_in_newsched_task {
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
listener.accept();
}
do spawntask_immediately {
let stream = TcpStream::connect(addr);
assert!(stream.is_some());
let mut stream = stream.unwrap();
// Make sure peer_name gives us the
// address/port of the peer we've
// connected to.
let peer_name = stream.peer_name();
assert!(peer_name.is_some());
assert_eq!(addr, peer_name.unwrap());
}
}
}
#[test]
fn socket_and_peer_name_ip4() {
peer_name(next_test_ip4());
socket_name(next_test_ip4());
}
#[test]
fn socket_and_peer_name_ip6() {
// XXX: peer name is not consistent
//peer_name(next_test_ip6());
socket_name(next_test_ip6());
}
}

View File

@ -13,7 +13,7 @@ use result::{Ok, Err};
use rt::io::net::ip::IpAddr;
use rt::io::{Reader, Writer};
use rt::io::{io_error, read_error, EndOfFile};
use rt::rtio::{RtioUdpSocketObject, RtioUdpSocket, IoFactory, IoFactoryObject};
use rt::rtio::{RtioSocket, RtioUdpSocketObject, RtioUdpSocket, IoFactory, IoFactoryObject};
use rt::local::Local;
pub struct UdpSocket(~RtioUdpSocketObject);
@ -53,6 +53,17 @@ impl UdpSocket {
pub fn connect(self, other: IpAddr) -> UdpStream {
UdpStream { socket: self, connectedTo: other }
}
pub fn socket_name(&mut self) -> Option<IpAddr> {
match (***self).socket_name() {
Ok(sn) => Some(sn),
Err(ioerr) => {
rtdebug!("failed to get socket name: %?", ioerr);
io_error::cond.raise(ioerr);
None
}
}
}
}
pub struct UdpStream {
@ -252,4 +263,33 @@ mod test {
}
}
}
#[cfg(test)]
fn socket_name(addr: IpAddr) {
do run_in_newsched_task {
do spawntask_immediately {
let server = UdpSocket::bind(addr);
assert!(server.is_some());
let mut server = server.unwrap();
// Make sure socket_name gives
// us the socket we binded to.
let so_name = server.socket_name();
assert!(so_name.is_some());
assert_eq!(addr, so_name.unwrap());
}
}
}
#[test]
fn socket_name_ip4() {
socket_name(next_test_ip4());
}
#[test]
fn socket_name_ip6() {
socket_name(next_test_ip6());
}
}

View File

@ -52,39 +52,39 @@ pub trait IoFactory {
pub trait RtioTcpListener : RtioSocket {
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>;
fn accept_simultaneously(&mut self);
fn dont_accept_simultaneously(&mut self);
fn accept_simultaneously(&mut self) -> Result<(), IoError>;
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>;
}
pub trait RtioTcpStream : RtioSocket {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
fn peer_name(&mut self) -> IpAddr;
fn control_congestion(&mut self);
fn nodelay(&mut self);
fn keepalive(&mut self, delay_in_seconds: uint);
fn letdie(&mut self);
fn peer_name(&mut self) -> Result<IpAddr, IoError>;
fn control_congestion(&mut self) -> Result<(), IoError>;
fn nodelay(&mut self) -> Result<(), IoError>;
fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError>;
fn letdie(&mut self) -> Result<(), IoError>;
}
pub trait RtioSocket {
fn socket_name(&mut self) -> IpAddr;
fn socket_name(&mut self) -> Result<IpAddr, IoError>;
}
pub trait RtioUdpSocket : RtioSocket {
fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, IpAddr), IoError>;
fn sendto(&mut self, buf: &[u8], dst: IpAddr) -> Result<(), IoError>;
fn join_multicast(&mut self, multi: IpAddr);
fn leave_multicast(&mut self, multi: IpAddr);
fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError>;
fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError>;
fn loop_multicast_locally(&mut self);
fn dont_loop_multicast_locally(&mut self);
fn loop_multicast_locally(&mut self) -> Result<(), IoError>;
fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError>;
fn multicast_time_to_live(&mut self, ttl: int);
fn time_to_live(&mut self, ttl: int);
fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError>;
fn time_to_live(&mut self, ttl: int) -> Result<(), IoError>;
fn hear_broadcasts(&mut self);
fn ignore_broadcasts(&mut self);
fn hear_broadcasts(&mut self) -> Result<(), IoError>;
fn ignore_broadcasts(&mut self) -> Result<(), IoError>;
}
pub trait RtioTimer {

View File

@ -50,7 +50,7 @@ pub fn run_in_newsched_task(f: ~fn()) {
let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status);
let mut task = ~Task::new_root(&mut sched.stack_pool,
f.take());
rtdebug!("newsched_task: %x", to_uint(task));
rtdebug!("newsched_task: %x", ::borrow::to_uint(task));
task.death.on_exit = Some(on_exit);
sched.enqueue_task(task);
sched.run();
@ -145,7 +145,7 @@ pub fn spawntask(f: ~fn()) {
}
};
rtdebug!("new task pointer: %x", to_uint(task));
rtdebug!("new task pointer: %x", ::borrow::to_uint(task));
let sched = Local::take::<Scheduler>();
rtdebug!("spawntask scheduling the new task");

View File

@ -34,7 +34,7 @@ impl AsyncWatcher {
extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle);
let status = status_to_maybe_uv_error(watcher.native_handle(), status);
let status = status_to_maybe_uv_error(watcher, status);
let data = watcher.get_watcher_data();
let cb = data.async_cb.get_ref();
(*cb)(watcher, status);

View File

@ -43,7 +43,7 @@ impl IdleWatcher {
let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
let data = idle_watcher.get_watcher_data();
let cb: &IdleCallback = data.idle_cb.get_ref();
let status = status_to_maybe_uv_error(handle, status);
let status = status_to_maybe_uv_error(idle_watcher, status);
(*cb)(idle_watcher, status);
}
}

View File

@ -282,14 +282,14 @@ pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
}
/// Given a uv handle, convert a callback status to a UvError
// XXX: Follow the pattern below by parameterizing over T: Watcher, not T
pub fn status_to_maybe_uv_error<T>(handle: *T, status: c_int) -> Option<UvError> {
pub fn status_to_maybe_uv_error<T, U: Watcher + NativeHandle<*T>>(handle: U,
status: c_int) -> Option<UvError> {
if status != -1 {
None
} else {
unsafe {
rtdebug!("handle: %x", handle as uint);
let loop_ = uvll::get_loop_for_uv_handle(handle);
rtdebug!("handle: %x", handle.native_handle() as uint);
let loop_ = uvll::get_loop_for_uv_handle(handle.native_handle());
rtdebug!("loop: %x", loop_ as uint);
let err = uvll::last_error(loop_);
Some(UvError(err))

View File

@ -22,7 +22,7 @@ use str;
use from_str::{FromStr};
use num;
enum UvIpAddr {
pub enum UvIpAddr {
UvIpv4(*sockaddr_in),
UvIpv6(*sockaddr_in6),
}
@ -32,8 +32,8 @@ fn sockaddr_to_UvIpAddr(addr: *uvll::sockaddr) -> UvIpAddr {
assert!((is_ip4_addr(addr) || is_ip6_addr(addr)));
assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr)));
match addr {
_ if is_ip4_addr(addr) => UvIpv4(as_sockaddr_in(addr)),
_ if is_ip6_addr(addr) => UvIpv6(as_sockaddr_in6(addr)),
_ if is_ip4_addr(addr) => UvIpv4(addr as *uvll::sockaddr_in),
_ if is_ip6_addr(addr) => UvIpv6(addr as *uvll::sockaddr_in6),
_ => fail!(),
}
}
@ -113,7 +113,36 @@ fn uv_ip_as_ip<T>(addr: UvIpAddr, f: &fn(IpAddr) -> T) -> T {
};
match s {
"" => ~[],
s => s.split_iter(':').transform(read_hex_segment).collect(),
// IPv4-Mapped/Compatible IPv6 Address?
s if s.find('.').is_some() => {
let i = s.rfind(':').get_or_default(-1);
let b = s.slice(i + 1, s.len()); // the ipv4 part
let h = b.split_iter('.')
.transform(|s: &str| -> u8 { FromStr::from_str(s).unwrap() })
.transform(|s: u8| -> ~str { fmt!("%02x", s as uint) })
.collect::<~[~str]>();
if i == -1 {
// Ipv4 Compatible Address (::x.x.x.x)
// first 96 bits are zero leaving 32 bits
// for the ipv4 part
// (i.e ::127.0.0.1 == ::7F00:1)
~[num::FromStrRadix::from_str_radix(h[0] + h[1], 16).unwrap(),
num::FromStrRadix::from_str_radix(h[2] + h[3], 16).unwrap()]
} else {
// Ipv4-Mapped Address (::FFFF:x.x.x.x)
// first 80 bits are zero, followed by all ones
// for the next 16 bits, leaving 32 bits for
// the ipv4 part
// (i.e ::FFFF:127.0.0.1 == ::FFFF:7F00:1)
~[1,
num::FromStrRadix::from_str_radix(h[0] + h[1], 16).unwrap(),
num::FromStrRadix::from_str_radix(h[2] + h[3], 16).unwrap()]
}
},
s => s.split_iter(':').transform(read_hex_segment).collect()
}
};
s.split_str_iter("::").transform(convert_each_segment).collect()
@ -133,7 +162,7 @@ fn uv_ip_as_ip<T>(addr: UvIpAddr, f: &fn(IpAddr) -> T) -> T {
f(ip)
}
fn uv_ip_to_ip(addr: UvIpAddr) -> IpAddr {
pub fn uv_ip_to_ip(addr: UvIpAddr) -> IpAddr {
use util;
uv_ip_as_ip(addr, util::id)
}
@ -154,7 +183,7 @@ fn test_ip6_conversion() {
assert_eq!(ip6, ip_as_uv_ip(ip6, uv_ip_to_ip));
}
// uv_stream t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
// uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
// and uv_file_t
pub struct StreamWatcher(*uvll::uv_stream_t);
impl Watcher for StreamWatcher { }
@ -180,7 +209,7 @@ impl StreamWatcher {
rtdebug!("buf len: %d", buf.len as int);
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
let status = status_to_maybe_uv_error(stream, nread as c_int);
let status = status_to_maybe_uv_error(stream_watcher, nread as c_int);
(*cb)(stream_watcher, nread as int, buf, status);
}
}
@ -210,7 +239,7 @@ impl StreamWatcher {
let mut stream_watcher = write_request.stream();
write_request.delete();
let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap();
let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
let status = status_to_maybe_uv_error(stream_watcher, status);
cb(stream_watcher, status);
}
}
@ -302,7 +331,7 @@ impl TcpWatcher {
let mut stream_watcher = connect_request.stream();
connect_request.delete();
let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
let status = status_to_maybe_uv_error(stream_watcher, status);
cb(stream_watcher, status);
}
}
@ -325,7 +354,7 @@ impl TcpWatcher {
rtdebug!("connection_cb");
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
let status = status_to_maybe_uv_error(handle, status);
let status = status_to_maybe_uv_error(stream_watcher, status);
(*cb)(stream_watcher, status);
}
}
@ -402,7 +431,7 @@ impl UdpWatcher {
rtdebug!("buf len: %d", buf.len as int);
let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
let status = status_to_maybe_uv_error(handle, nread as c_int);
let status = status_to_maybe_uv_error(udp_watcher, nread as c_int);
let addr = uv_ip_to_ip(sockaddr_to_UvIpAddr(addr));
(*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
}
@ -437,7 +466,7 @@ impl UdpWatcher {
let mut udp_watcher = send_request.handle();
send_request.delete();
let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap();
let status = status_to_maybe_uv_error(udp_watcher.native_handle(), status);
let status = status_to_maybe_uv_error(udp_watcher, status);
cb(udp_watcher, status);
}
}

View File

@ -43,7 +43,7 @@ impl TimerWatcher {
let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle);
let data = watcher.get_watcher_data();
let cb = data.timer_cb.get_ref();
let status = status_to_maybe_uv_error(handle, status);
let status = status_to_maybe_uv_error(watcher, status);
(*cb)(watcher, status);
}
}

View File

@ -15,15 +15,19 @@ use cell::Cell;
use cast;
use cast::transmute;
use clone::Clone;
use libc::{c_int, c_uint, c_void};
use ptr;
use rt::io::IoError;
use rt::io::net::ip::IpAddr;
use rt::io::net::ip::{IpAddr, Ipv4, Ipv6};
use rt::uv::*;
use rt::uv::idle::IdleWatcher;
use rt::uv::net::{UvIpv4, UvIpv6};
use rt::rtio::*;
use rt::sched::Scheduler;
use rt::io::{standard_error, OtherIoError};
use rt::tube::Tube;
use rt::local::Local;
use str::StrSlice;
use unstable::sync::{Exclusive, exclusive};
#[cfg(test)] use container::Container;
@ -33,6 +37,47 @@ use unstable::sync::{Exclusive, exclusive};
next_test_ip4,
run_in_newsched_task};
enum SocketNameKind {
TcpPeer,
Tcp,
Udp
}
fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
handle: U) -> Result<IpAddr, IoError> {
let getsockname = match sk {
TcpPeer => uvll::rust_uv_tcp_getpeername,
Tcp => uvll::rust_uv_tcp_getsockname,
Udp => uvll::rust_uv_udp_getsockname
};
// Allocate a sockaddr_storage
// since we don't know if it's ipv4 or ipv6
let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
let r = unsafe {
getsockname(handle.native_handle() as *c_void, r_addr as *uvll::sockaddr_storage)
};
if r != 0 {
let status = status_to_maybe_uv_error(handle, r);
return Err(uv_error_to_io_error(status.unwrap()));
}
let addr = unsafe {
if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
net::uv_ip_to_ip(UvIpv6(r_addr as *uvll::sockaddr_in6))
} else {
net::uv_ip_to_ip(UvIpv4(r_addr as *uvll::sockaddr_in))
}
};
unsafe { uvll::free_sockaddr_storage(r_addr); }
Ok(addr)
}
pub struct UvEventLoop {
uvio: UvIoFactory
@ -220,7 +265,9 @@ impl IoFactory for UvIoFactory {
rtdebug!("connect: in connect callback");
if status.is_none() {
rtdebug!("status is none");
let res = Ok(~UvTcpStream(stream_watcher));
let tcp_watcher =
NativeHandle::from_native_handle(stream_watcher.native_handle());
let res = Ok(~UvTcpStream(tcp_watcher));
// Store the stream in the task's stack
unsafe { (*result_cell_ptr).put_back(res); }
@ -286,7 +333,6 @@ impl IoFactory for UvIoFactory {
}
}
// FIXME #6090: Prefer newtype structs but Drop doesn't work
pub struct UvTcpListener {
watcher: TcpWatcher,
listening: bool,
@ -320,8 +366,9 @@ impl Drop for UvTcpListener {
}
impl RtioSocket for UvTcpListener {
// XXX implement
fn socket_name(&mut self) -> IpAddr { fail!(); }
fn socket_name(&mut self) -> Result<IpAddr, IoError> {
socket_name(Tcp, self.watcher)
}
}
impl RtioTcpListener for UvTcpListener {
@ -344,9 +391,8 @@ impl RtioTcpListener for UvTcpListener {
let maybe_stream = if status.is_none() {
let mut loop_ = server_stream_watcher.event_loop();
let client_tcp_watcher = TcpWatcher::new(&mut loop_);
let client_tcp_watcher = client_tcp_watcher.as_stream();
// XXX: Need's to be surfaced in interface
server_stream_watcher.accept(client_tcp_watcher);
server_stream_watcher.accept(client_tcp_watcher.as_stream());
Ok(~UvTcpStream(client_tcp_watcher))
} else {
Err(standard_error(OtherIoError))
@ -360,13 +406,30 @@ impl RtioTcpListener for UvTcpListener {
return self.incoming_streams.recv();
}
// XXX implement
fn accept_simultaneously(&mut self) { fail!(); }
fn dont_accept_simultaneously(&mut self) { fail!(); }
fn accept_simultaneously(&mut self) -> Result<(), IoError> {
let r = unsafe {
uvll::rust_uv_tcp_simultaneous_accepts(self.watcher.native_handle(), 1 as c_int)
};
match status_to_maybe_uv_error(self.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
let r = unsafe {
uvll::rust_uv_tcp_simultaneous_accepts(self.watcher.native_handle(), 0 as c_int)
};
match status_to_maybe_uv_error(self.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
}
// FIXME #6090: Prefer newtype structs but Drop doesn't work
pub struct UvTcpStream(StreamWatcher);
pub struct UvTcpStream(TcpWatcher);
impl Drop for UvTcpStream {
fn drop(&self) {
@ -374,7 +437,7 @@ impl Drop for UvTcpStream {
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self.close {
do self.as_stream().close {
let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
@ -383,8 +446,9 @@ impl Drop for UvTcpStream {
}
impl RtioSocket for UvTcpStream {
// XXX implement
fn socket_name(&mut self) -> IpAddr { fail!(); }
fn socket_name(&mut self) -> Result<IpAddr, IoError> {
socket_name(Tcp, **self)
}
}
impl RtioTcpStream for UvTcpStream {
@ -404,7 +468,7 @@ impl RtioTcpStream for UvTcpStream {
let alloc: AllocCallback = |_| unsafe {
slice_to_uv_buf(*buf_ptr)
};
let mut watcher = **self;
let mut watcher = self.as_stream();
do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
// Stop reading so that no read callbacks are
@ -440,7 +504,7 @@ impl RtioTcpStream for UvTcpStream {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
let mut watcher = **self;
let mut watcher = self.as_stream();
do watcher.write(buf) |_watcher, status| {
let result = if status.is_none() {
Ok(())
@ -459,12 +523,54 @@ impl RtioTcpStream for UvTcpStream {
return result_cell.take();
}
// XXX implement
fn peer_name(&mut self) -> IpAddr { fail!(); }
fn control_congestion(&mut self) { fail!(); }
fn nodelay(&mut self) { fail!(); }
fn keepalive(&mut self, _delay_in_seconds: uint) { fail!(); }
fn letdie(&mut self) { fail!(); }
fn peer_name(&mut self) -> Result<IpAddr, IoError> {
socket_name(TcpPeer, **self)
}
fn control_congestion(&mut self) -> Result<(), IoError> {
let r = unsafe {
uvll::rust_uv_tcp_nodelay(self.native_handle(), 0 as c_int)
};
match status_to_maybe_uv_error(**self, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
fn nodelay(&mut self) -> Result<(), IoError> {
let r = unsafe {
uvll::rust_uv_tcp_nodelay(self.native_handle(), 1 as c_int)
};
match status_to_maybe_uv_error(**self, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
let r = unsafe {
uvll::rust_uv_tcp_keepalive(self.native_handle(), 1 as c_int,
delay_in_seconds as c_uint)
};
match status_to_maybe_uv_error(**self, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
fn letdie(&mut self) -> Result<(), IoError> {
let r = unsafe {
uvll::rust_uv_tcp_keepalive(self.native_handle(), 0 as c_int, 0 as c_uint)
};
match status_to_maybe_uv_error(**self, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
}
pub struct UvUdpSocket(UdpWatcher);
@ -484,8 +590,9 @@ impl Drop for UvUdpSocket {
}
impl RtioSocket for UvUdpSocket {
// XXX implement
fn socket_name(&mut self) -> IpAddr { fail!(); }
fn socket_name(&mut self) -> Result<IpAddr, IoError> {
socket_name(Udp, **self)
}
}
impl RtioUdpSocket for UvUdpSocket {
@ -552,18 +659,117 @@ impl RtioUdpSocket for UvUdpSocket {
return result_cell.take();
}
// XXX implement
fn join_multicast(&mut self, _multi: IpAddr) { fail!(); }
fn leave_multicast(&mut self, _multi: IpAddr) { fail!(); }
fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
let ip_str = match multi {
Ipv4(x1, x2, x3, x4, _) =>
fmt!("%u.%u.%u.%u", x1 as uint, x2 as uint, x3 as uint, x4 as uint),
Ipv6(x1, x2, x3, x4, x5, x6, x7, x8, _) =>
fmt!("%x:%x:%x:%x:%x:%x:%x:%x",
x1 as uint, x2 as uint, x3 as uint, x4 as uint,
x5 as uint, x6 as uint, x7 as uint, x8 as uint),
};
fn loop_multicast_locally(&mut self) { fail!(); }
fn dont_loop_multicast_locally(&mut self) { fail!(); }
let r = unsafe {
do ip_str.as_c_str |m_addr| {
uvll::udp_set_membership(self.native_handle(), m_addr,
ptr::null(), uvll::UV_JOIN_GROUP)
}
};
fn multicast_time_to_live(&mut self, _ttl: int) { fail!(); }
fn time_to_live(&mut self, _ttl: int) { fail!(); }
match status_to_maybe_uv_error(**self, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
fn hear_broadcasts(&mut self) { fail!(); }
fn ignore_broadcasts(&mut self) { fail!(); }
fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
let ip_str = match multi {
Ipv4(x1, x2, x3, x4, _) =>
fmt!("%u.%u.%u.%u", x1 as uint, x2 as uint, x3 as uint, x4 as uint),
Ipv6(x1, x2, x3, x4, x5, x6, x7, x8, _) =>
fmt!("%x:%x:%x:%x:%x:%x:%x:%x",
x1 as uint, x2 as uint, x3 as uint, x4 as uint,
x5 as uint, x6 as uint, x7 as uint, x8 as uint),
};
let r = unsafe {
do ip_str.as_c_str |m_addr| {
uvll::udp_set_membership(self.native_handle(), m_addr,
ptr::null(), uvll::UV_LEAVE_GROUP)
}
};
match status_to_maybe_uv_error(**self, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
let r = unsafe {
uvll::udp_set_multicast_loop(self.native_handle(), 1 as c_int)
};
match status_to_maybe_uv_error(**self, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
let r = unsafe {
uvll::udp_set_multicast_loop(self.native_handle(), 0 as c_int)
};
match status_to_maybe_uv_error(**self, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
let r = unsafe {
uvll::udp_set_multicast_ttl(self.native_handle(), ttl as c_int)
};
match status_to_maybe_uv_error(**self, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
let r = unsafe {
uvll::udp_set_ttl(self.native_handle(), ttl as c_int)
};
match status_to_maybe_uv_error(**self, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
fn hear_broadcasts(&mut self) -> Result<(), IoError> {
let r = unsafe {
uvll::udp_set_broadcast(self.native_handle(), 1 as c_int)
};
match status_to_maybe_uv_error(**self, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
let r = unsafe {
uvll::udp_set_broadcast(self.native_handle(), 0 as c_int)
};
match status_to_maybe_uv_error(**self, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
}
pub struct UvTimer(timer::TimerWatcher);

View File

@ -77,7 +77,7 @@ pub type uv_udp_recv_cb = *u8;
pub type sockaddr = c_void;
pub type sockaddr_in = c_void;
pub type sockaddr_in6 = c_void;
pub type uv_membership = c_void;
pub type sockaddr_storage = c_void;
#[deriving(Eq)]
pub enum uv_handle_type {
@ -116,6 +116,12 @@ pub enum uv_req_type {
UV_REQ_TYPE_MAX
}
#[deriving(Eq)]
pub enum uv_membership {
UV_LEAVE_GROUP,
UV_JOIN_GROUP
}
pub unsafe fn malloc_handle(handle: uv_handle_type) -> *c_void {
assert!(handle != UV_UNKNOWN_HANDLE && handle != UV_HANDLE_TYPE_MAX);
let size = rust_uv_handle_size(handle as uint);
@ -233,17 +239,13 @@ pub unsafe fn get_udp_handle_from_send_req(send_req: *uv_udp_send_t) -> *uv_udp_
return rust_uv_get_udp_handle_from_send_req(send_req);
}
pub unsafe fn udp_get_sockname(handle: *uv_udp_t, name: *sockaddr_in) -> c_int {
pub unsafe fn udp_get_sockname(handle: *uv_udp_t, name: *sockaddr_storage) -> c_int {
return rust_uv_udp_getsockname(handle, name);
}
pub unsafe fn udp_get_sockname6(handle: *uv_udp_t, name: *sockaddr_in6) -> c_int {
return rust_uv_udp_getsockname6(handle, name);
}
pub unsafe fn udp_set_membership(handle: *uv_udp_t, multicast_addr: *c_char,
interface_addr: *c_char, membership: uv_membership) -> c_int {
return rust_uv_udp_set_membership(handle, multicast_addr, interface_addr, membership);
return rust_uv_udp_set_membership(handle, multicast_addr, interface_addr, membership as c_int);
}
pub unsafe fn udp_set_multicast_loop(handle: *uv_udp_t, on: c_int) -> c_int {
@ -254,6 +256,10 @@ pub unsafe fn udp_set_multicast_ttl(handle: *uv_udp_t, ttl: c_int) -> c_int {
return rust_uv_udp_set_multicast_ttl(handle, ttl);
}
pub unsafe fn udp_set_ttl(handle: *uv_udp_t, ttl: c_int) -> c_int {
return rust_uv_udp_set_ttl(handle, ttl);
}
pub unsafe fn udp_set_broadcast(handle: *uv_udp_t, on: c_int) -> c_int {
return rust_uv_udp_set_broadcast(handle, on);
}
@ -280,22 +286,14 @@ pub unsafe fn tcp_bind6(tcp_server_ptr: *uv_tcp_t, addr_ptr: *sockaddr_in6) -> c
return rust_uv_tcp_bind6(tcp_server_ptr, addr_ptr);
}
pub unsafe fn tcp_getpeername(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in) -> c_int {
pub unsafe fn tcp_getpeername(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_storage) -> c_int {
return rust_uv_tcp_getpeername(tcp_handle_ptr, name);
}
pub unsafe fn tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in6) ->c_int {
return rust_uv_tcp_getpeername6(tcp_handle_ptr, name);
}
pub unsafe fn tcp_getsockname(handle: *uv_tcp_t, name: *sockaddr_in) -> c_int {
pub unsafe fn tcp_getsockname(handle: *uv_tcp_t, name: *sockaddr_storage) -> c_int {
return rust_uv_tcp_getsockname(handle, name);
}
pub unsafe fn tcp_getsockname6(handle: *uv_tcp_t, name: *sockaddr_in6) -> c_int {
return rust_uv_tcp_getsockname6(handle, name);
}
pub unsafe fn tcp_nodelay(handle: *uv_tcp_t, enable: c_int) -> c_int {
return rust_uv_tcp_nodelay(handle, enable);
}
@ -373,14 +371,6 @@ pub unsafe fn is_ip6_addr(addr: *sockaddr) -> bool {
match rust_uv_is_ipv6_sockaddr(addr) { 0 => false, _ => true }
}
pub unsafe fn as_sockaddr_in(addr: *sockaddr) -> *sockaddr_in {
return rust_uv_sockaddr_as_sockaddr_in(addr);
}
pub unsafe fn as_sockaddr_in6(addr: *sockaddr) -> *sockaddr_in6 {
return rust_uv_sockaddr_as_sockaddr_in6(addr);
}
pub unsafe fn malloc_ip4_addr(ip: &str, port: int) -> *sockaddr_in {
do ip.as_c_str |ip_buf| {
rust_uv_ip4_addrp(ip_buf as *u8, port as libc::c_int)
@ -392,6 +382,14 @@ pub unsafe fn malloc_ip6_addr(ip: &str, port: int) -> *sockaddr_in6 {
}
}
pub unsafe fn malloc_sockaddr_storage() -> *sockaddr_storage {
rust_uv_malloc_sockaddr_storage()
}
pub unsafe fn free_sockaddr_storage(ss: *sockaddr_storage) {
rust_uv_free_sockaddr_storage(ss);
}
pub unsafe fn free_ip4_addr(addr: *sockaddr_in) {
rust_uv_free_ip4_addr(addr);
}
@ -520,10 +518,8 @@ extern {
fn rust_uv_tcp_connect6(req: *uv_connect_t, handle: *uv_tcp_t, cb: *u8,
addr: *sockaddr_in6) -> c_int;
fn rust_uv_tcp_bind6(tcp_server: *uv_tcp_t, addr: *sockaddr_in6) -> c_int;
fn rust_uv_tcp_getpeername(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in) -> c_int;
fn rust_uv_tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in6) ->c_int;
fn rust_uv_tcp_getsockname(handle: *uv_tcp_t, name: *sockaddr_in) -> c_int;
fn rust_uv_tcp_getsockname6(handle: *uv_tcp_t, name: *sockaddr_in6) -> c_int;
fn rust_uv_tcp_getpeername(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_storage) -> c_int;
fn rust_uv_tcp_getsockname(handle: *uv_tcp_t, name: *sockaddr_storage) -> c_int;
fn rust_uv_tcp_nodelay(handle: *uv_tcp_t, enable: c_int) -> c_int;
fn rust_uv_tcp_keepalive(handle: *uv_tcp_t, enable: c_int, delay: c_uint) -> c_int;
fn rust_uv_tcp_simultaneous_accepts(handle: *uv_tcp_t, enable: c_int) -> c_int;
@ -538,18 +534,18 @@ extern {
fn rust_uv_udp_recv_start(server: *uv_udp_t, on_alloc: *u8, on_recv: *u8) -> c_int;
fn rust_uv_udp_recv_stop(server: *uv_udp_t) -> c_int;
fn rust_uv_get_udp_handle_from_send_req(req: *uv_udp_send_t) -> *uv_udp_t;
fn rust_uv_udp_getsockname(handle: *uv_udp_t, name: *sockaddr_in) -> c_int;
fn rust_uv_udp_getsockname6(handle: *uv_udp_t, name: *sockaddr_in6) -> c_int;
fn rust_uv_udp_getsockname(handle: *uv_udp_t, name: *sockaddr_storage) -> c_int;
fn rust_uv_udp_set_membership(handle: *uv_udp_t, multicast_addr: *c_char,
interface_addr: *c_char, membership: uv_membership) -> c_int;
interface_addr: *c_char, membership: c_int) -> c_int;
fn rust_uv_udp_set_multicast_loop(handle: *uv_udp_t, on: c_int) -> c_int;
fn rust_uv_udp_set_multicast_ttl(handle: *uv_udp_t, ttl: c_int) -> c_int;
fn rust_uv_udp_set_ttl(handle: *uv_udp_t, ttl: c_int) -> c_int;
fn rust_uv_udp_set_broadcast(handle: *uv_udp_t, on: c_int) -> c_int;
fn rust_uv_is_ipv4_sockaddr(addr: *sockaddr) -> c_int;
fn rust_uv_is_ipv6_sockaddr(addr: *sockaddr) -> c_int;
fn rust_uv_sockaddr_as_sockaddr_in(addr: *sockaddr) -> *sockaddr_in;
fn rust_uv_sockaddr_as_sockaddr_in6(addr: *sockaddr) -> *sockaddr_in6;
fn rust_uv_malloc_sockaddr_storage() -> *sockaddr_storage;
fn rust_uv_free_sockaddr_storage(ss: *sockaddr_storage);
fn rust_uv_listen(stream: *c_void, backlog: c_int, cb: *u8) -> c_int;
fn rust_uv_accept(server: *c_void, client: *c_void) -> c_int;

View File

@ -71,8 +71,8 @@ pub unsafe fn local_malloc(td: *c_char, size: uintptr_t) -> *c_char {
let mut alloc = ::ptr::null();
do Local::borrow::<Task,()> |task| {
rtdebug!("task pointer: %x, heap pointer: %x",
to_uint(task),
to_uint(&task.heap));
::borrow::to_uint(task),
::borrow::to_uint(&task.heap));
alloc = task.heap.alloc(td as *c_void, size as uint) as *c_char;
}
return alloc;

View File

@ -282,29 +282,19 @@ rust_uv_tcp_bind6
extern "C" int
rust_uv_tcp_getpeername
(uv_tcp_t* handle, sockaddr_in* name) {
(uv_tcp_t* handle, sockaddr_storage* name) {
// sockaddr_storage is big enough to hold either
// sockaddr_in or sockaddr_in6
int namelen = sizeof(sockaddr_in);
return uv_tcp_getpeername(handle, (sockaddr*)name, &namelen);
}
extern "C" int
rust_uv_tcp_getpeername6
(uv_tcp_t* handle, sockaddr_in6* name) {
int namelen = sizeof(sockaddr_in6);
return uv_tcp_getpeername(handle, (sockaddr*)name, &namelen);
}
extern "C" int
rust_uv_tcp_getsockname
(uv_tcp_t* handle, sockaddr_in* name) {
int namelen = sizeof(sockaddr_in);
return uv_tcp_getsockname(handle, (sockaddr*)name, &namelen);
}
extern "C" int
rust_uv_tcp_getsockname6
(uv_tcp_t* handle, sockaddr_in6* name) {
int namelen = sizeof(sockaddr_in6);
(uv_tcp_t* handle, sockaddr_storage* name) {
// sockaddr_storage is big enough to hold either
// sockaddr_in or sockaddr_in6
int namelen = sizeof(sockaddr_storage);
return uv_tcp_getsockname(handle, (sockaddr*)name, &namelen);
}
@ -370,15 +360,10 @@ rust_uv_get_udp_handle_from_send_req(uv_udp_send_t* send_req) {
extern "C" int
rust_uv_udp_getsockname
(uv_udp_t* handle, sockaddr_in* name) {
int namelen = sizeof(sockaddr_in);
return uv_udp_getsockname(handle, (sockaddr*)name, &namelen);
}
extern "C" int
rust_uv_udp_getsockname6
(uv_udp_t* handle, sockaddr_in6* name) {
int namelen = sizeof(sockaddr_in6);
(uv_udp_t* handle, sockaddr_storage* name) {
// sockaddr_storage is big enough to hold either
// sockaddr_in or sockaddr_in6
int namelen = sizeof(sockaddr_storage);
return uv_udp_getsockname(handle, (sockaddr*)name, &namelen);
}
@ -400,6 +385,12 @@ rust_uv_udp_set_multicast_ttl
return uv_udp_set_multicast_ttl(handle, ttl);
}
extern "C" int
rust_uv_udp_set_ttl
(uv_udp_t* handle, int ttl) {
return uv_udp_set_ttl(handle, ttl);
}
extern "C" int
rust_uv_udp_set_broadcast
(uv_udp_t* handle, int on) {
@ -609,6 +600,17 @@ rust_uv_ip6_addrp(const char* ip, int port) {
return addrp;
}
extern "C" struct sockaddr_storage *
rust_uv_malloc_sockaddr_storage() {
struct sockaddr_storage *ss = (sockaddr_storage *)malloc(sizeof(struct sockaddr_storage));
return ss;
}
extern "C" void
rust_uv_free_sockaddr_storage(struct sockaddr_storage *ss) {
free(ss);
}
extern "C" void
rust_uv_free_ip4_addr(sockaddr_in *addrp) {
free(addrp);
@ -669,18 +671,6 @@ rust_uv_is_ipv6_sockaddr(sockaddr* addr) {
return addr->sa_family == AF_INET6;
}
extern "C" sockaddr_in*
rust_uv_sockaddr_as_sockaddr_in(sockaddr* addr) {
// return (sockaddr_in*)addr->sa_data;
return (sockaddr_in*)addr;
}
extern "C" sockaddr_in6*
rust_uv_sockaddr_as_sockaddr_in6(sockaddr* addr) {
//return (sockaddr_in6*)addr->sa_data;
return (sockaddr_in6*)addr;
}
extern "C" bool
rust_uv_is_ipv4_addrinfo(addrinfo* input) {
return input->ai_family == AF_INET;

View File

@ -106,7 +106,6 @@ rust_uv_tcp_bind
rust_uv_tcp_connect6
rust_uv_tcp_bind6
rust_uv_tcp_getsockname
rust_uv_tcp_getsockname6
rust_uv_tcp_nodelay
rust_uv_tcp_keepalive
rust_uv_tcp_simultaneous_accepts
@ -119,15 +118,15 @@ rust_uv_udp_recv_start
rust_uv_udp_recv_stop
rust_uv_get_udp_handle_from_send_req
rust_uv_udp_getsockname
rust_uv_udp_getsockname6
rust_uv_udp_set_membership
rust_uv_udp_set_multicast_loop
rust_uv_udp_set_multicast_ttl
rust_uv_udp_set_ttl
rust_uv_udp_set_broadcast
rust_uv_is_ipv4_sockaddr
rust_uv_is_ipv6_sockaddr
rust_uv_sockaddr_as_sockaddr_in
rust_uv_sockaddr_as_sockaddr_in6
rust_uv_malloc_sockaddr_storage
rust_uv_free_sockaddr_storage
rust_uv_listen
rust_uv_accept
rust_uv_write
@ -204,7 +203,6 @@ rust_update_gc_metadata
rust_uv_ip4_port
rust_uv_ip6_port
rust_uv_tcp_getpeername
rust_uv_tcp_getpeername6
linenoise
linenoiseSetCompletionCallback
linenoiseAddCompletion
@ -267,4 +265,4 @@ rust_drop_global_args_lock
rust_set_exit_status_newrt
rust_get_exit_status_newrt
rust_take_change_dir_lock
rust_drop_change_dir_lock
rust_drop_change_dir_lock