core::rt: Wait for handles to close

This commit is contained in:
Brian Anderson 2013-05-12 14:35:52 -07:00
parent 204e3d82cc
commit ee0ce64d9d
3 changed files with 60 additions and 24 deletions

View File

@ -11,7 +11,7 @@
use libc::c_int;
use option::Some;
use rt::uv::uvll;
use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback};
use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback, NullCallback};
use rt::uv::status_to_maybe_uv_error;
pub struct IdleWatcher(*uvll::uv_idle_t);
@ -57,12 +57,23 @@ pub impl IdleWatcher {
}
}
fn close(self) {
fn close(self, cb: NullCallback) {
{
let mut this = self;
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe { uvll::close(self.native_handle(), close_cb) };
extern fn close_cb(handle: *uvll::uv_idle_t) {
unsafe {
let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
{
let mut data = idle_watcher.get_watcher_data();
data.close_cb.swap_unwrap()();
}
idle_watcher.drop_watcher_data();
uvll::idle_delete(handle);
}

View File

@ -356,7 +356,7 @@ fn idle_new_then_close() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
let idle_watcher = { IdleWatcher::new(&mut loop_) };
idle_watcher.close();
idle_watcher.close(||());
}
}
@ -372,7 +372,7 @@ fn idle_smoke_test() {
assert!(status.is_none());
if unsafe { *count_ptr == 10 } {
idle_watcher.stop();
idle_watcher.close();
idle_watcher.close(||());
} else {
unsafe { *count_ptr = *count_ptr + 1; }
}
@ -396,7 +396,7 @@ fn idle_start_stop_start() {
assert!(status.is_none());
let mut idle_watcher = idle_watcher;
idle_watcher.stop();
idle_watcher.close();
idle_watcher.close(||());
}
}
loop_.run();

View File

@ -66,7 +66,7 @@ impl EventLoop for UvEventLoop {
assert!(status.is_none());
let mut idle_watcher = idle_watcher;
idle_watcher.stop();
idle_watcher.close();
idle_watcher.close(||());
f();
}
}
@ -124,22 +124,26 @@ impl IoFactory for UvIoFactory {
// Wait for a connection
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("connect: in connect callback");
let maybe_stream = if status.is_none() {
if status.is_none() {
rtdebug!("status is none");
Ok(~UvTcpStream { watcher: stream_watcher })
let res = Ok(~UvTcpStream { watcher: stream_watcher });
// Store the stream in the task's stack
unsafe { (*result_cell_ptr).put_back(res); }
// Context switch
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
} else {
rtdebug!("status is some");
// XXX: Wait for close
stream_watcher.close(||());
Err(uv_error_to_io_error(status.get()))
let task_cell = Cell(task_cell.take());
do stream_watcher.close {
let res = Err(uv_error_to_io_error(status.get()));
unsafe { (*result_cell_ptr).put_back(res); }
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
}
};
// Store the stream in the task's stack
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
// Context switch
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
}
}
@ -152,8 +156,14 @@ impl IoFactory for UvIoFactory {
match watcher.bind(addr) {
Ok(_) => Ok(~UvTcpListener::new(watcher)),
Err(uverr) => {
// XXX: Should we wait until close completes?
watcher.as_stream().close(||());
let scheduler = local_sched::take();
do scheduler.deschedule_running_task_and_then |task| {
let task_cell = Cell(task);
do watcher.as_stream().close {
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
}
}
Err(uv_error_to_io_error(uverr))
}
}
@ -181,8 +191,15 @@ impl UvTcpListener {
impl Drop for UvTcpListener {
fn finalize(&self) {
// XXX: Need to wait until close finishes before returning
self.watcher().as_stream().close(||());
let watcher = self.watcher();
let scheduler = local_sched::take();
do scheduler.deschedule_running_task_and_then |task| {
let task_cell = Cell(task);
do watcher.as_stream().close {
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
}
}
}
}
@ -235,8 +252,16 @@ impl UvTcpStream {
impl Drop for UvTcpStream {
fn finalize(&self) {
rtdebug!("closing stream");
self.watcher().close(||());
rtdebug!("closing tcp stream");
let watcher = self.watcher();
let scheduler = local_sched::take();
do scheduler.deschedule_running_task_and_then |task| {
let task_cell = Cell(task);
do watcher.close {
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
}
}
}
}