diff --git a/src/libstd/rt/io/timer.rs b/src/libstd/rt/io/timer.rs index c7820ebf623..bfd1ed48ac1 100644 --- a/src/libstd/rt/io/timer.rs +++ b/src/libstd/rt/io/timer.rs @@ -41,7 +41,7 @@ impl Timer { } impl RtioTimer for Timer { - fn sleep(&self, msecs: u64) { + fn sleep(&mut self, msecs: u64) { (**self).sleep(msecs); } } @@ -50,15 +50,11 @@ impl RtioTimer for Timer { mod test { use super::*; use rt::test::*; - use option::{Some, None}; #[test] fn test_io_timer_sleep_simple() { do run_in_newsched_task { let timer = Timer::new(); - match timer { - Some(t) => t.sleep(1), - None => assert!(false) - } + do timer.map_move |mut t| { t.sleep(1) }; } } -} \ No newline at end of file +} diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 2bec782847b..36eb37a3630 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -22,7 +22,7 @@ pub type RemoteCallbackObject = uvio::UvRemoteCallback; pub type IoFactoryObject = uvio::UvIoFactory; pub type RtioTcpStreamObject = uvio::UvTcpStream; pub type RtioTcpListenerObject = uvio::UvTcpListener; -pub type RtioUdpSocketObject = uvio::HomedUvUdpSocket; //uvio::UvUdpSocket; +pub type RtioUdpSocketObject = uvio::UvUdpSocket; pub type RtioTimerObject = uvio::UvTimer; pub trait EventLoop { @@ -88,5 +88,5 @@ pub trait RtioUdpSocket : RtioSocket { } pub trait RtioTimer { - fn sleep(&self, msecs: u64); + fn sleep(&mut self, msecs: u64); } diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 43be09434a4..1250a4512f7 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -37,6 +37,49 @@ use unstable::sync::Exclusive; run_in_newsched_task}; #[cfg(test)] use iterator::{Iterator, range}; +// XXX we should not be calling uvll functions in here. + +trait HomingIO { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle; + /* XXX This will move pinned tasks to do IO on the proper scheduler + * and then move them back to their home. + */ + fn home_for_io(&mut self, io: &fn(&mut Self) -> A) -> A { + use rt::sched::{PinnedTask, TaskFromFriend}; + // go home + let old_home = Cell::new_empty(); + let old_home_ptr = &old_home; + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + // get the old home first + do task.wake().map_move |mut task| { + old_home_ptr.put_back(task.take_unwrap_home()); + self.home().send(PinnedTask(task)); + }; + } + + // do IO + let a = io(self); + + // unhome home + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |scheduler, task| { + do task.wake().map_move |mut task| { + task.give_home(old_home.take()); + scheduler.make_handle().send(TaskFromFriend(task)); + }; + } + + // return the result of the IO + a + } +} + +// get a handle for the current scheduler +macro_rules! get_handle_to_current_scheduler( + () => (do Local::borrow:: |sched| { sched.make_handle() }) +) + enum SocketNameKind { TcpPeer, Tcp, @@ -45,12 +88,10 @@ enum SocketNameKind { fn socket_name>(sk: SocketNameKind, handle: U) -> Result { - #[fixed_stack_segment]; #[inline(never)]; - let getsockname = match sk { - TcpPeer => uvll::rust_uv_tcp_getpeername, - Tcp => uvll::rust_uv_tcp_getsockname, - Udp => uvll::rust_uv_udp_getsockname + TcpPeer => uvll::tcp_getpeername, + Tcp => uvll::tcp_getsockname, + Udp => uvll::udp_getsockname, }; // Allocate a sockaddr_storage @@ -80,6 +121,7 @@ fn socket_name>(sk: SocketNameKind, } +// Obviously an Event Loop is always home. pub struct UvEventLoop { uvio: UvIoFactory } @@ -149,6 +191,7 @@ fn test_callback_run_once() { } } +// The entire point of async is to call into a loop from other threads so it does not need to home. pub struct UvRemoteCallback { // The uv async handle for triggering the callback async: AsyncWatcher, @@ -251,40 +294,38 @@ impl IoFactory for UvIoFactory { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Local::take::(); - // Block this task and take ownership, switch to scheduler context + let scheduler = Local::take::(); do scheduler.deschedule_running_task_and_then |_, task| { - rtdebug!("connect: entered scheduler context"); - let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); + let mut tcp = TcpWatcher::new(self.uv_loop()); let task_cell = Cell::new(task); // Wait for a connection - do tcp_watcher.connect(addr) |stream_watcher, status| { - rtdebug!("connect: in connect callback"); - if status.is_none() { - rtdebug!("status is none"); - let tcp_watcher = - NativeHandle::from_native_handle(stream_watcher.native_handle()); - let res = Ok(~UvTcpStream(tcp_watcher)); + do tcp.connect(addr) |stream, status| { + match status { + None => { + let tcp = NativeHandle::from_native_handle(stream.native_handle()); + let home = get_handle_to_current_scheduler!(); + let res = Ok(~UvTcpStream { watcher: tcp, home: home }); - // Store the stream in the task's stack - unsafe { (*result_cell_ptr).put_back(res); } - - // Context switch - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } else { - rtdebug!("status is some"); - let task_cell = Cell::new(task_cell.take()); - do stream_watcher.close { - let res = Err(uv_error_to_io_error(status.unwrap())); + // Store the stream in the task's stack unsafe { (*result_cell_ptr).put_back(res); } + + // Context switch let scheduler = Local::take::(); scheduler.resume_blocked_task_immediately(task_cell.take()); } - }; + Some(_) => { + let task_cell = Cell::new(task_cell.take()); + do stream.close { + let res = Err(uv_error_to_io_error(status.unwrap())); + unsafe { (*result_cell_ptr).put_back(res); } + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + } } } @@ -295,7 +336,10 @@ impl IoFactory for UvIoFactory { fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError> { let mut watcher = TcpWatcher::new(self.uv_loop()); match watcher.bind(addr) { - Ok(_) => Ok(~UvTcpListener::new(watcher)), + Ok(_) => { + let home = get_handle_to_current_scheduler!(); + Ok(~UvTcpListener::new(watcher, home)) + } Err(uverr) => { let scheduler = Local::take::(); do scheduler.deschedule_running_task_and_then |_, task| { @@ -310,32 +354,12 @@ impl IoFactory for UvIoFactory { } } - /* fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> { - let mut watcher = UdpWatcher::new(self.uv_loop()); - match watcher.bind(addr) { - Ok(_) => Ok(~UvUdpSocket(watcher)), - Err(uverr) => { - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do watcher.close { - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - Err(uv_error_to_io_error(uverr)) - } - } - } - */ - - pub fn /*homed_*/udp_bind(&mut self, addr: SocketAddr) -> Result<~/*HomedUvUdpSocket*/RtioUdpSocketObject, IoError> { let mut watcher = UdpWatcher::new(self.uv_loop()); match watcher.bind(addr) { Ok(_) => { - let home = do Local::borrow:: |sched| {sched.make_handle()}; - Ok(~HomedUvUdpSocket { watcher: watcher, home: home }) + let home = get_handle_to_current_scheduler!(); + Ok(~UvUdpSocket { watcher: watcher, home: home }) } Err(uverr) => { let scheduler = Local::take::(); @@ -352,22 +376,30 @@ impl IoFactory for UvIoFactory { } fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> { - Ok(~UvTimer(TimerWatcher::new(self.uv_loop()))) + let watcher = TimerWatcher::new(self.uv_loop()); + let home = get_handle_to_current_scheduler!(); + Ok(~UvTimer::new(watcher, home)) } } pub struct UvTcpListener { watcher: TcpWatcher, listening: bool, - incoming_streams: Tube> + incoming_streams: Tube>, + home: SchedHandle, +} + +impl HomingIO for UvTcpListener { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } } impl UvTcpListener { - fn new(watcher: TcpWatcher) -> UvTcpListener { + fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener { UvTcpListener { watcher: watcher, listening: false, - incoming_streams: Tube::new() + incoming_streams: Tube::new(), + home: home, } } @@ -376,13 +408,16 @@ impl UvTcpListener { impl Drop for UvTcpListener { fn drop(&self) { - let watcher = self.watcher(); - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do watcher.as_stream().close { - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + // XXX need mutable finalizer + let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) }; + do self_.home_for_io |self_| { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do self_.watcher().as_stream().close { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } } } @@ -390,83 +425,92 @@ impl Drop for UvTcpListener { impl RtioSocket for UvTcpListener { fn socket_name(&mut self) -> Result { - socket_name(Tcp, self.watcher) + do self.home_for_io |self_| { + socket_name(Tcp, self_.watcher) + } } } impl RtioTcpListener for UvTcpListener { fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> { - rtdebug!("entering listen"); + do self.home_for_io |self_| { - if self.listening { - return self.incoming_streams.recv(); + if !self_.listening { + self_.listening = true; + + let incoming_streams_cell = Cell::new(self_.incoming_streams.clone()); + + do self_.watcher().listen |mut server, status| { + let stream = match status { + Some(_) => Err(standard_error(OtherIoError)), + None => { + let client = TcpWatcher::new(&server.event_loop()); + // XXX: needs to be surfaced in interface + server.accept(client.as_stream()); + let home = get_handle_to_current_scheduler!(); + Ok(~UvTcpStream { watcher: client, home: home }) + } + }; + + let mut incoming_streams = incoming_streams_cell.take(); + incoming_streams.send(stream); + incoming_streams_cell.put_back(incoming_streams); + } + + } + self_.incoming_streams.recv() } - - self.listening = true; - - let server_tcp_watcher = self.watcher(); - let incoming_streams_cell = Cell::new(self.incoming_streams.clone()); - - let incoming_streams_cell = Cell::new(incoming_streams_cell.take()); - let mut server_tcp_watcher = server_tcp_watcher; - do server_tcp_watcher.listen |mut server_stream_watcher, status| { - let maybe_stream = if status.is_none() { - let mut loop_ = server_stream_watcher.event_loop(); - let client_tcp_watcher = TcpWatcher::new(&mut loop_); - // XXX: Need's to be surfaced in interface - server_stream_watcher.accept(client_tcp_watcher.as_stream()); - Ok(~UvTcpStream(client_tcp_watcher)) - } else { - Err(standard_error(OtherIoError)) - }; - - let mut incoming_streams = incoming_streams_cell.take(); - incoming_streams.send(maybe_stream); - incoming_streams_cell.put_back(incoming_streams); - } - - return self.incoming_streams.recv(); } fn accept_simultaneously(&mut self) -> Result<(), IoError> { - #[fixed_stack_segment]; #[inline(never)]; + do self.home_for_io |self_| { + let r = unsafe { + uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int) + }; - let r = unsafe { - uvll::rust_uv_tcp_simultaneous_accepts(self.watcher.native_handle(), 1 as c_int) - }; - - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher(), r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> { - #[fixed_stack_segment]; #[inline(never)]; + do self.home_for_io |self_| { + let r = unsafe { + uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int) + }; - let r = unsafe { - uvll::rust_uv_tcp_simultaneous_accepts(self.watcher.native_handle(), 0 as c_int) - }; - - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher(), r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } } -pub struct UvTcpStream(TcpWatcher); +pub struct UvTcpStream { + watcher: TcpWatcher, + home: SchedHandle, +} + +impl HomingIO for UvTcpStream { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} impl Drop for UvTcpStream { fn drop(&self) { - rtdebug!("closing tcp stream"); - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self.as_stream().close { - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + // XXX need mutable finalizer + let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) }; + do this.home_for_io |self_| { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do self_.watcher.as_stream().close { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } } } @@ -474,464 +518,161 @@ impl Drop for UvTcpStream { impl RtioSocket for UvTcpStream { fn socket_name(&mut self) -> Result { - socket_name(Tcp, **self) + do self.home_for_io |self_| { + socket_name(Tcp, self_.watcher) + } } } impl RtioTcpStream for UvTcpStream { fn read(&mut self, buf: &mut [u8]) -> Result { - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; + do self.home_for_io |self_| { + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Local::take::(); - let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |_sched, task| { - rtdebug!("read: entered scheduler context"); - let task_cell = Cell::new(task); - // XXX: We shouldn't reallocate these callbacks every - // call to read - let alloc: AllocCallback = |_| unsafe { - slice_to_uv_buf(*buf_ptr) - }; - let mut watcher = self.as_stream(); - do watcher.read_start(alloc) |mut watcher, nread, _buf, status| { - - // Stop reading so that no read callbacks are - // triggered before the user calls `read` again. - // XXX: Is there a performance impact to calling - // stop here? - watcher.read_stop(); - - let result = if status.is_none() { - assert!(nread >= 0); - Ok(nread as uint) - } else { - Err(uv_error_to_io_error(status.unwrap())) + let scheduler = Local::take::(); + let buf_ptr: *&mut [u8] = &buf; + do scheduler.deschedule_running_task_and_then |_sched, task| { + let task_cell = Cell::new(task); + // XXX: We shouldn't reallocate these callbacks every + // call to read + let alloc: AllocCallback = |_| unsafe { + slice_to_uv_buf(*buf_ptr) }; + let mut watcher = self_.watcher.as_stream(); + do watcher.read_start(alloc) |mut watcher, nread, _buf, status| { - unsafe { (*result_cell_ptr).put_back(result); } + // Stop reading so that no read callbacks are + // triggered before the user calls `read` again. + // XXX: Is there a performance impact to calling + // stop here? + watcher.read_stop(); - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + let result = if status.is_none() { + assert!(nread >= 0); + Ok(nread as uint) + } else { + Err(uv_error_to_io_error(status.unwrap())) + }; + + unsafe { (*result_cell_ptr).put_back(result); } + + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } - } - assert!(!result_cell.is_empty()); - return result_cell.take(); + assert!(!result_cell.is_empty()); + result_cell.take() + } } fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Local::take::(); - let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; - let mut watcher = self.as_stream(); - do watcher.write(buf) |_watcher, status| { - let result = if status.is_none() { - Ok(()) - } else { - Err(uv_error_to_io_error(status.unwrap())) - }; + do self.home_for_io |self_| { + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; + let scheduler = Local::take::(); + let buf_ptr: *&[u8] = &buf; + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; + let mut watcher = self_.watcher.as_stream(); + do watcher.write(buf) |_watcher, status| { + let result = if status.is_none() { + Ok(()) + } else { + Err(uv_error_to_io_error(status.unwrap())) + }; - unsafe { (*result_cell_ptr).put_back(result); } + unsafe { (*result_cell_ptr).put_back(result); } - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } - } - assert!(!result_cell.is_empty()); - return result_cell.take(); + assert!(!result_cell.is_empty()); + result_cell.take() + } } fn peer_name(&mut self) -> Result { - socket_name(TcpPeer, **self) + do self.home_for_io |self_| { + socket_name(TcpPeer, self_.watcher) + } } fn control_congestion(&mut self) -> Result<(), IoError> { - #[fixed_stack_segment]; #[inline(never)]; + do self.home_for_io |self_| { + let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) }; - let r = unsafe { - uvll::rust_uv_tcp_nodelay(self.native_handle(), 0 as c_int) - }; - - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn nodelay(&mut self) -> Result<(), IoError> { - #[fixed_stack_segment]; #[inline(never)]; + do self.home_for_io |self_| { + let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) }; - let r = unsafe { - uvll::rust_uv_tcp_nodelay(self.native_handle(), 1 as c_int) - }; - - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> { - #[fixed_stack_segment]; #[inline(never)]; + do self.home_for_io |self_| { + let r = unsafe { + uvll::tcp_keepalive(self_.watcher.native_handle(), 1 as c_int, + delay_in_seconds as c_uint) + }; - let r = unsafe { - uvll::rust_uv_tcp_keepalive(self.native_handle(), 1 as c_int, - delay_in_seconds as c_uint) - }; - - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn letdie(&mut self) -> Result<(), IoError> { - #[fixed_stack_segment]; #[inline(never)]; + do self.home_for_io |self_| { + let r = unsafe { + uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint) + }; - let r = unsafe { - uvll::rust_uv_tcp_keepalive(self.native_handle(), 0 as c_int, 0 as c_uint) - }; - - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } } -pub struct HomedUvUdpSocket { +pub struct UvUdpSocket { watcher: UdpWatcher, home: SchedHandle, } -impl HomedUvUdpSocket { - fn go_home(&mut self) { - use rt::sched::PinnedTask; - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - do task.wake().map_move |task| { self.home.send(PinnedTask(task)); }; - } - } +impl HomingIO for UvUdpSocket { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } } -impl Drop for HomedUvUdpSocket { - fn drop(&self) { - rtdebug!("closing homed udp socket"); - // first go home - // XXX need mutable finalizer - let this = unsafe { transmute::<&HomedUvUdpSocket, &mut HomedUvUdpSocket>(self) }; - this.go_home(); - // now we're home so block the task and start IO - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do this.watcher.close { - // now IO is finished so resume the blocked task - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - } -} - -impl RtioSocket for HomedUvUdpSocket { - fn socket_name(&mut self) -> Result { - // first go home - self.go_home(); - socket_name(Udp, self.watcher) - } -} - -impl RtioUdpSocket for HomedUvUdpSocket { - fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> { - // first go home - self.go_home(); - - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - - let scheduler = Local::take::(); - let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { - rtdebug!("recvfrom: entered scheduler context"); - let task_cell = Cell::new(task); - let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; - do self.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| { - let _ = flags; // /XXX add handling for partials? - - watcher.recv_stop(); - - let result = match status { - None => { - assert!(nread >= 0); - Ok((nread as uint, addr)) - } - Some(err) => Err(uv_error_to_io_error(err)), - }; - - unsafe { (*result_cell_ptr).put_back(result); } - - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - - assert!(!result_cell.is_empty()); - return result_cell.take(); - } - - fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> { - // first go home - self.go_home(); - - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Local::take::(); - let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; - do self.watcher.send(buf, dst) |_watcher, status| { - - let result = match status { - None => Ok(()), - Some(err) => Err(uv_error_to_io_error(err)), - }; - - unsafe { (*result_cell_ptr).put_back(result); } - - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - - assert!(!result_cell.is_empty()); - return result_cell.take(); - } - - fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> { - // first go home - self.go_home(); - - let r = unsafe { - do multi.to_str().as_c_str |m_addr| { - uvll::udp_set_membership(self.watcher.native_handle(), m_addr, - ptr::null(), uvll::UV_JOIN_GROUP) - } - }; - - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } - } - - fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> { - // first go home - self.go_home(); - - let r = unsafe { - do multi.to_str().as_c_str |m_addr| { - uvll::udp_set_membership(self.watcher.native_handle(), m_addr, - ptr::null(), uvll::UV_LEAVE_GROUP) - } - }; - - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } - } - - fn loop_multicast_locally(&mut self) -> Result<(), IoError> { - // first go home - self.go_home(); - - let r = unsafe { - uvll::udp_set_multicast_loop(self.watcher.native_handle(), 1 as c_int) - }; - - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } - } - - fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> { - // first go home - self.go_home(); - - let r = unsafe { - uvll::udp_set_multicast_loop(self.watcher.native_handle(), 0 as c_int) - }; - - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } - } - - fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> { - // first go home - self.go_home(); - - let r = unsafe { - uvll::udp_set_multicast_ttl(self.watcher.native_handle(), ttl as c_int) - }; - - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } - } - - fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> { - // first go home - self.go_home(); - - let r = unsafe { - uvll::udp_set_ttl(self.watcher.native_handle(), ttl as c_int) - }; - - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } - } - - fn hear_broadcasts(&mut self) -> Result<(), IoError> { - // first go home - self.go_home(); - - let r = unsafe { - uvll::udp_set_broadcast(self.watcher.native_handle(), 1 as c_int) - }; - - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } - } - - fn ignore_broadcasts(&mut self) -> Result<(), IoError> { - // first go home - self.go_home(); - - let r = unsafe { - uvll::udp_set_broadcast(self.watcher.native_handle(), 0 as c_int) - }; - - match status_to_maybe_uv_error(self.watcher, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } - } -} - -#[test] -fn test_simple_homed_udp_io_bind_only() { - do run_in_newsched_task { - unsafe { - let io = Local::unsafe_borrow::(); - let addr = next_test_ip4(); - let maybe_socket = (*io)./*homed_*/udp_bind(addr); - assert!(maybe_socket.is_ok()); - } - } -} - -#[test] -fn test_simple_homed_udp_io_bind_then_move_then_home_and_close() { - use rt::sleeper_list::SleeperList; - use rt::work_queue::WorkQueue; - use rt::thread::Thread; - use rt::task::Task; - use rt::sched::{Shutdown, TaskFromFriend}; - do run_in_bare_thread { - let sleepers = SleeperList::new(); - let work_queue1 = WorkQueue::new(); - let work_queue2 = WorkQueue::new(); - let queues = ~[work_queue1.clone(), work_queue2.clone()]; - - let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(), - sleepers.clone()); - let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(), - sleepers.clone()); - - let handle1 = Cell::new(sched1.make_handle()); - let handle2 = Cell::new(sched2.make_handle()); - let tasksFriendHandle = Cell::new(sched2.make_handle()); - - let on_exit: ~fn(bool) = |exit_status| { - handle1.take().send(Shutdown); - handle2.take().send(Shutdown); - rtassert!(exit_status); - }; - - let test_function: ~fn() = || { - let io = unsafe { Local::unsafe_borrow::() }; - let addr = next_test_ip4(); - let maybe_socket = unsafe { (*io)./*homed_*/udp_bind(addr) }; - // this socket is bound to this event loop - assert!(maybe_socket.is_ok()); - - // block self on sched1 - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - // unblock task - do task.wake().map_move |task| { - // send self to sched2 - tasksFriendHandle.take().send(TaskFromFriend(task)); - }; - // sched1 should now sleep since it has nothing else to do - } - // sched2 will wake up and get the task - // as we do nothing else, the function ends and the socket goes out of scope - // sched2 will start to run the destructor - // the destructor will first block the task, set it's home as sched1, then enqueue it - // sched2 will dequeue the task, see that it has a home, and send it to sched1 - // sched1 will wake up, execute the close function on the correct loop, and then we're done - }; - - let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function); - main_task.death.on_exit = Some(on_exit); - let main_task = Cell::new(main_task); - - let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {}); - - let sched1 = Cell::new(sched1); - let sched2 = Cell::new(sched2); - - // XXX could there be a race on the threads that causes a crash? - let thread1 = do Thread::start { - sched1.take().bootstrap(main_task.take()); - }; - let thread2 = do Thread::start { - sched2.take().bootstrap(null_task.take()); - }; - - thread1.join(); - thread2.join(); - } -} - -pub struct UvUdpSocket(UdpWatcher); - impl Drop for UvUdpSocket { fn drop(&self) { - rtdebug!("closing udp socket"); - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self.close { - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + // XXX need mutable finalizer + let this = unsafe { transmute::<&UvUdpSocket, &mut UvUdpSocket>(self) }; + do this.home_for_io |_| { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do this.watcher.close { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } } } @@ -939,203 +680,240 @@ impl Drop for UvUdpSocket { impl RtioSocket for UvUdpSocket { fn socket_name(&mut self) -> Result { - socket_name(Udp, **self) + do self.home_for_io |self_| { + socket_name(Udp, self_.watcher) + } } } impl RtioUdpSocket for UvUdpSocket { fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> { - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; + do self.home_for_io |self_| { + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Local::take::(); - let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |_sched, task| { - rtdebug!("recvfrom: entered scheduler context"); - let task_cell = Cell::new(task); - let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; - do self.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| { - let _ = flags; // XXX add handling for partials? + let scheduler = Local::take::(); + let buf_ptr: *&mut [u8] = &buf; + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; + do self_.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| { + let _ = flags; // /XXX add handling for partials? - watcher.recv_stop(); + watcher.recv_stop(); - let result = match status { - None => { - assert!(nread >= 0); - Ok((nread as uint, addr)) - } - Some(err) => Err(uv_error_to_io_error(err)) - }; + let result = match status { + None => { + assert!(nread >= 0); + Ok((nread as uint, addr)) + } + Some(err) => Err(uv_error_to_io_error(err)), + }; - unsafe { (*result_cell_ptr).put_back(result); } + unsafe { (*result_cell_ptr).put_back(result); } - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } - } - assert!(!result_cell.is_empty()); - return result_cell.take(); + assert!(!result_cell.is_empty()); + result_cell.take() + } } fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> { - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Local::take::(); - let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; - do self.send(buf, dst) |_watcher, status| { + do self.home_for_io |self_| { + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; + let scheduler = Local::take::(); + let buf_ptr: *&[u8] = &buf; + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; + do self_.watcher.send(buf, dst) |_watcher, status| { - let result = match status { - None => Ok(()), - Some(err) => Err(uv_error_to_io_error(err)), - }; + let result = match status { + None => Ok(()), + Some(err) => Err(uv_error_to_io_error(err)), + }; - unsafe { (*result_cell_ptr).put_back(result); } + unsafe { (*result_cell_ptr).put_back(result); } - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } - } - assert!(!result_cell.is_empty()); - return result_cell.take(); + assert!(!result_cell.is_empty()); + result_cell.take() + } } fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> { - let r = unsafe { - do multi.to_str().with_c_str |m_addr| { - uvll::udp_set_membership(self.native_handle(), m_addr, - ptr::null(), uvll::UV_JOIN_GROUP) - } - }; + do self.home_for_io |self_| { + let r = unsafe { + do multi.to_str().with_c_str |m_addr| { + uvll::udp_set_membership(self_.watcher.native_handle(), m_addr, + ptr::null(), uvll::UV_JOIN_GROUP) + } + }; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> { - let r = unsafe { - do multi.to_str().with_c_str |m_addr| { - uvll::udp_set_membership(self.native_handle(), m_addr, - ptr::null(), uvll::UV_LEAVE_GROUP) - } - }; + do self.home_for_io |self_| { + let r = unsafe { + do multi.to_str().with_c_str |m_addr| { + uvll::udp_set_membership(self_.watcher.native_handle(), m_addr, + ptr::null(), uvll::UV_LEAVE_GROUP) + } + }; - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn loop_multicast_locally(&mut self) -> Result<(), IoError> { - let r = unsafe { - uvll::udp_set_multicast_loop(self.native_handle(), 1 as c_int) - }; + do self.home_for_io |self_| { - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + let r = unsafe { + uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int) + }; + + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> { - let r = unsafe { - uvll::udp_set_multicast_loop(self.native_handle(), 0 as c_int) - }; + do self.home_for_io |self_| { - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + let r = unsafe { + uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int) + }; + + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> { - let r = unsafe { - uvll::udp_set_multicast_ttl(self.native_handle(), ttl as c_int) - }; + do self.home_for_io |self_| { - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + let r = unsafe { + uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int) + }; + + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> { - let r = unsafe { - uvll::udp_set_ttl(self.native_handle(), ttl as c_int) - }; + do self.home_for_io |self_| { - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + let r = unsafe { + uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int) + }; + + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn hear_broadcasts(&mut self) -> Result<(), IoError> { - let r = unsafe { - uvll::udp_set_broadcast(self.native_handle(), 1 as c_int) - }; + do self.home_for_io |self_| { - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + let r = unsafe { + uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int) + }; + + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } fn ignore_broadcasts(&mut self) -> Result<(), IoError> { - let r = unsafe { - uvll::udp_set_broadcast(self.native_handle(), 0 as c_int) - }; + do self.home_for_io |self_| { - match status_to_maybe_uv_error(**self, r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) + let r = unsafe { + uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int) + }; + + match status_to_maybe_uv_error(self_.watcher, r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } } } } -pub struct UvTimer(timer::TimerWatcher); +pub struct UvTimer { + watcher: timer::TimerWatcher, + home: SchedHandle, +} + +impl HomingIO for UvTimer { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} impl UvTimer { - fn new(w: timer::TimerWatcher) -> UvTimer { - UvTimer(w) + fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer { + UvTimer { watcher: w, home: home } } } impl Drop for UvTimer { fn drop(&self) { - rtdebug!("closing UvTimer"); - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self.close { - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) }; + do self_.home_for_io |self_| { + rtdebug!("closing UvTimer"); + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do self_.watcher.close { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } } } } impl RtioTimer for UvTimer { - fn sleep(&self, msecs: u64) { - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_sched, task| { - rtdebug!("sleep: entered scheduler context"); - let task_cell = Cell::new(task); - let mut watcher = **self; - do watcher.start(msecs, 0) |_, status| { - assert!(status.is_none()); - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + fn sleep(&mut self, msecs: u64) { + do self.home_for_io |self_| { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_sched, task| { + rtdebug!("sleep: entered scheduler context"); + let task_cell = Cell::new(task); + do self_.watcher.start(msecs, 0) |_, status| { + assert!(status.is_none()); + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } } + self_.watcher.stop(); } - let mut w = **self; - w.stop(); } } @@ -1163,6 +941,152 @@ fn test_simple_udp_io_bind_only() { } } +#[test] +fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() { + use rt::sleeper_list::SleeperList; + use rt::work_queue::WorkQueue; + use rt::thread::Thread; + use rt::task::Task; + use rt::sched::{Shutdown, TaskFromFriend}; + do run_in_bare_thread { + let sleepers = SleeperList::new(); + let work_queue1 = WorkQueue::new(); + let work_queue2 = WorkQueue::new(); + let queues = ~[work_queue1.clone(), work_queue2.clone()]; + + let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(), + sleepers.clone()); + let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(), + sleepers.clone()); + + let handle1 = Cell::new(sched1.make_handle()); + let handle2 = Cell::new(sched2.make_handle()); + let tasksFriendHandle = Cell::new(sched2.make_handle()); + + let on_exit: ~fn(bool) = |exit_status| { + handle1.take().send(Shutdown); + handle2.take().send(Shutdown); + rtassert!(exit_status); + }; + + let test_function: ~fn() = || { + let io = unsafe { Local::unsafe_borrow::() }; + let addr = next_test_ip4(); + let maybe_socket = unsafe { (*io).udp_bind(addr) }; + // this socket is bound to this event loop + assert!(maybe_socket.is_ok()); + + // block self on sched1 + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + // unblock task + do task.wake().map_move |task| { + // send self to sched2 + tasksFriendHandle.take().send(TaskFromFriend(task)); + }; + // sched1 should now sleep since it has nothing else to do + } + // sched2 will wake up and get the task + // as we do nothing else, the function ends and the socket goes out of scope + // sched2 will start to run the destructor + // the destructor will first block the task, set it's home as sched1, then enqueue it + // sched2 will dequeue the task, see that it has a home, and send it to sched1 + // sched1 will wake up, exec the close function on the correct loop, and then we're done + }; + + let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function); + main_task.death.on_exit = Some(on_exit); + let main_task = Cell::new(main_task); + + let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {}); + + let sched1 = Cell::new(sched1); + let sched2 = Cell::new(sched2); + + let thread1 = do Thread::start { + sched1.take().bootstrap(main_task.take()); + }; + let thread2 = do Thread::start { + sched2.take().bootstrap(null_task.take()); + }; + + thread1.join(); + thread2.join(); + } +} + +#[test] +fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() { + use rt::sleeper_list::SleeperList; + use rt::work_queue::WorkQueue; + use rt::thread::Thread; + use rt::task::Task; + use rt::comm::oneshot; + use rt::sched::Shutdown; + do run_in_bare_thread { + let sleepers = SleeperList::new(); + let work_queue1 = WorkQueue::new(); + let work_queue2 = WorkQueue::new(); + let queues = ~[work_queue1.clone(), work_queue2.clone()]; + + let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(), + sleepers.clone()); + let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(), + sleepers.clone()); + + let handle1 = Cell::new(sched1.make_handle()); + let handle2 = Cell::new(sched2.make_handle()); + + let (port, chan) = oneshot(); + let port = Cell::new(port); + let chan = Cell::new(chan); + + let body1: ~fn() = || { + let io = unsafe { Local::unsafe_borrow::() }; + let addr = next_test_ip4(); + let socket = unsafe { (*io).udp_bind(addr) }; + assert!(socket.is_ok()); + chan.take().send(socket); + }; + + let body2: ~fn() = || { + let socket = port.take().recv(); + assert!(socket.is_ok()); + /* The socket goes out of scope and the destructor is called. + * The destructor: + * - sends itself back to sched1 + * - frees the socket + * - resets the home of the task to whatever it was previously + */ + }; + + let on_exit: ~fn(bool) = |exit| { + handle1.take().send(Shutdown); + handle2.take().send(Shutdown); + rtassert!(exit); + }; + + let task1 = Cell::new(~Task::new_root(&mut sched1.stack_pool, None, body1)); + + let mut task2 = ~Task::new_root(&mut sched2.stack_pool, None, body2); + task2.death.on_exit = Some(on_exit); + let task2 = Cell::new(task2); + + let sched1 = Cell::new(sched1); + let sched2 = Cell::new(sched2); + + let thread1 = do Thread::start { + sched1.take().bootstrap(task1.take()); + }; + let thread2 = do Thread::start { + sched2.take().bootstrap(task2.take()); + }; + + thread1.join(); + thread2.join(); + } +} + #[test] fn test_simple_tcp_server_and_client() { do run_in_newsched_task { @@ -1194,6 +1118,85 @@ fn test_simple_tcp_server_and_client() { } } +#[test] +fn test_simple_tcp_server_and_client_on_diff_threads() { + use rt::sleeper_list::SleeperList; + use rt::work_queue::WorkQueue; + use rt::thread::Thread; + use rt::task::Task; + use rt::sched::{Shutdown}; + do run_in_bare_thread { + let sleepers = SleeperList::new(); + + let server_addr = next_test_ip4(); + let client_addr = server_addr.clone(); + + let server_work_queue = WorkQueue::new(); + let client_work_queue = WorkQueue::new(); + let queues = ~[server_work_queue.clone(), client_work_queue.clone()]; + + let mut server_sched = ~Scheduler::new(~UvEventLoop::new(), server_work_queue, + queues.clone(), sleepers.clone()); + let mut client_sched = ~Scheduler::new(~UvEventLoop::new(), client_work_queue, + queues.clone(), sleepers.clone()); + + let server_handle = Cell::new(server_sched.make_handle()); + let client_handle = Cell::new(client_sched.make_handle()); + + let server_on_exit: ~fn(bool) = |exit_status| { + server_handle.take().send(Shutdown); + rtassert!(exit_status); + }; + + let client_on_exit: ~fn(bool) = |exit_status| { + client_handle.take().send(Shutdown); + rtassert!(exit_status); + }; + + let server_fn: ~fn() = || { + let io = unsafe { Local::unsafe_borrow::() }; + let mut listener = unsafe { (*io).tcp_bind(server_addr).unwrap() }; + let mut stream = listener.accept().unwrap(); + let mut buf = [0, .. 2048]; + let nread = stream.read(buf).unwrap(); + assert_eq!(nread, 8); + for i in range(0u, nread) { + assert_eq!(buf[i], i as u8); + } + }; + + let client_fn: ~fn() = || { + let io = unsafe { Local::unsafe_borrow::() }; + let mut stream = unsafe { (*io).tcp_connect(client_addr) }; + while stream.is_err() { + stream = unsafe { (*io).tcp_connect(client_addr) }; + } + stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]); + }; + + let mut server_task = ~Task::new_root(&mut server_sched.stack_pool, None, server_fn); + server_task.death.on_exit = Some(server_on_exit); + let server_task = Cell::new(server_task); + + let mut client_task = ~Task::new_root(&mut client_sched.stack_pool, None, client_fn); + client_task.death.on_exit = Some(client_on_exit); + let client_task = Cell::new(client_task); + + let server_sched = Cell::new(server_sched); + let client_sched = Cell::new(client_sched); + + let server_thread = do Thread::start { + server_sched.take().bootstrap(server_task.take()); + }; + let client_thread = do Thread::start { + client_sched.take().bootstrap(client_task.take()); + }; + + server_thread.join(); + client_thread.join(); + } +} + #[test] fn test_simple_udp_server_and_client() { do run_in_newsched_task { @@ -1410,19 +1413,13 @@ fn test_udp_many_read() { } } -fn test_timer_sleep_simple_impl() { - unsafe { - let io = Local::unsafe_borrow::(); - let timer = (*io).timer_init(); - match timer { - Ok(t) => t.sleep(1), - Err(_) => assert!(false) - } - } -} #[test] fn test_timer_sleep_simple() { do run_in_newsched_task { - test_timer_sleep_simple_impl(); + unsafe { + let io = Local::unsafe_borrow::(); + let timer = (*io).timer_init(); + do timer.map_move |mut t| { t.sleep(1) }; + } } } diff --git a/src/libstd/rt/uv/uvll.rs b/src/libstd/rt/uv/uvll.rs index 8ed1287fe01..0ea2175336a 100644 --- a/src/libstd/rt/uv/uvll.rs +++ b/src/libstd/rt/uv/uvll.rs @@ -288,7 +288,7 @@ pub unsafe fn get_udp_handle_from_send_req(send_req: *uv_udp_send_t) -> *uv_udp_ return rust_uv_get_udp_handle_from_send_req(send_req); } -pub unsafe fn udp_get_sockname(handle: *uv_udp_t, name: *sockaddr_storage) -> c_int { +pub unsafe fn udp_getsockname(handle: *uv_udp_t, name: *sockaddr_storage) -> c_int { #[fixed_stack_segment]; #[inline(never)]; return rust_uv_udp_getsockname(handle, name);