std::uv : cleanup and an isolated test for hand-rolled high_level_loops
This commit is contained in:
parent
e02057c5a5
commit
791ea3466d
@ -21,8 +21,9 @@ native mod rustrt {
|
||||
Race-free helper to get access to a global task where a libuv
|
||||
loop is running.
|
||||
|
||||
Use `uv::hl::interact`, `uv::hl::ref_handle` and `uv::hl::unref_handle` to
|
||||
do operations against the global loop that this function returns.
|
||||
Use `uv::hl::interact`, `uv::hl::ref`, `uv::hl::unref` and
|
||||
uv `uv::hl::unref_and_close` to do operations against the global
|
||||
loop that this function returns.
|
||||
|
||||
# Return
|
||||
|
||||
@ -33,6 +34,7 @@ fn get() -> hl::high_level_loop {
|
||||
ret get_monitor_task_gl();
|
||||
}
|
||||
|
||||
// WARNING: USE ONLY ONE get_*_task_gl fn in the scope of a process lifetime.
|
||||
#[doc(hidden)]
|
||||
fn get_monitor_task_gl() -> hl::high_level_loop {
|
||||
let monitor_loop_chan =
|
||||
@ -51,6 +53,7 @@ fn get_monitor_task_gl() -> hl::high_level_loop {
|
||||
});
|
||||
}
|
||||
|
||||
// WARNING: USE ONLY ONE get_*_task_gl fn in the scope of a process lifetime.
|
||||
#[doc(hidden)]
|
||||
fn get_single_task_gl() -> hl::high_level_loop {
|
||||
let global_loop_chan_ptr = rustrt::rust_uv_get_kernel_global_chan_ptr();
|
||||
@ -323,7 +326,7 @@ mod test {
|
||||
hl::interact(hl_loop) {|loop_ptr|
|
||||
log(debug, "closing timer");
|
||||
//ll::close(timer_ptr as *libc::c_void, simple_timer_close_cb);
|
||||
hl::unref_handle(hl_loop, timer_ptr, simple_timer_close_cb);
|
||||
hl::unref_and_close(hl_loop, timer_ptr, simple_timer_close_cb);
|
||||
log(debug, "about to deref exit_ch_ptr");
|
||||
log(debug, "after msg sent on deref'd exit_ch");
|
||||
};
|
||||
@ -342,7 +345,7 @@ mod test {
|
||||
log(debug, "user code inside interact loop!!!");
|
||||
let init_status = ll::timer_init(loop_ptr, timer_ptr);
|
||||
if(init_status == 0i32) {
|
||||
hl::ref_handle(hl_loop, timer_ptr);
|
||||
hl::ref(hl_loop, timer_ptr);
|
||||
ll::set_data_for_uv_handle(
|
||||
timer_ptr as *libc::c_void,
|
||||
exit_ch_ptr as *libc::c_void);
|
||||
|
@ -7,7 +7,7 @@ libuv functionality.
|
||||
"];
|
||||
|
||||
export high_level_loop, hl_loop_ext, high_level_msg;
|
||||
export run_high_level_loop, interact, ref_handle, unref_handle;
|
||||
export run_high_level_loop, interact, ref, unref, unref_and_close;
|
||||
|
||||
import ll = uv_ll;
|
||||
|
||||
@ -23,6 +23,10 @@ the C uv loop to process any pending callbacks
|
||||
by the C uv loop
|
||||
"]
|
||||
enum high_level_loop {
|
||||
simple_task_loop({
|
||||
async_handle: *ll::uv_async_t,
|
||||
op_chan: comm::chan<high_level_msg>
|
||||
}),
|
||||
single_task_loop({
|
||||
async_handle: **ll::uv_async_t,
|
||||
op_chan: comm::chan<high_level_msg>
|
||||
@ -52,6 +56,9 @@ impl hl_loop_ext for high_level_loop {
|
||||
monitor_task_loop({op_chan}) {
|
||||
ret op_chan;
|
||||
}
|
||||
simple_task_loop({async_handle, op_chan}) {
|
||||
ret op_chan;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -61,8 +68,8 @@ Represents the range of interactions with a `high_level_loop`
|
||||
"]
|
||||
enum high_level_msg {
|
||||
interaction (fn~(*libc::c_void)),
|
||||
auto_ref_handle (*libc::c_void),
|
||||
auto_unref_handle (*libc::c_void, *u8),
|
||||
ref_handle (*libc::c_void),
|
||||
manual_unref_handle (*libc::c_void, option<*u8>),
|
||||
tear_down
|
||||
}
|
||||
|
||||
@ -96,7 +103,7 @@ unsafe fn run_high_level_loop(loop_ptr: *libc::c_void,
|
||||
ll::async_init(loop_ptr, async_handle, high_level_wake_up_cb);
|
||||
|
||||
// initialize our loop data and store it in the loop
|
||||
let data: global_loop_data = default_gl_data({
|
||||
let data: hl_loop_data = default_gl_data({
|
||||
async_handle: async_handle,
|
||||
mut active: true,
|
||||
before_msg_drain: before_msg_drain,
|
||||
@ -159,22 +166,25 @@ resource safe_handle_container<T>(handle_fields: safe_handle_fields<T>) {
|
||||
#[doc="
|
||||
Needs to be encapsulated within `safe_handle`
|
||||
"]
|
||||
fn ref_handle<T>(hl_loop: high_level_loop, handle: *T) unsafe {
|
||||
send_high_level_msg(hl_loop, auto_ref_handle(handle as *libc::c_void));
|
||||
fn ref<T>(hl_loop: high_level_loop, handle: *T) unsafe {
|
||||
send_high_level_msg(hl_loop, ref_handle(handle as *libc::c_void));
|
||||
}
|
||||
#[doc="
|
||||
Needs to be encapsulated within `safe_handle`
|
||||
"]
|
||||
fn unref_handle<T>(hl_loop: high_level_loop, handle: *T,
|
||||
user_close_cb: *u8) unsafe {
|
||||
send_high_level_msg(hl_loop, auto_unref_handle(handle as *libc::c_void,
|
||||
user_close_cb));
|
||||
fn unref<T>(hl_loop: high_level_loop, handle: *T) unsafe {
|
||||
send_high_level_msg(hl_loop, manual_unref_handle(handle as *libc::c_void,
|
||||
none));
|
||||
}
|
||||
fn unref_and_close<T>(hl_loop: high_level_loop, handle: *T, cb: *u8) unsafe {
|
||||
send_high_level_msg(hl_loop, manual_unref_handle(handle as *libc::c_void,
|
||||
some(cb)));
|
||||
}
|
||||
|
||||
// INTERNAL API
|
||||
|
||||
// data that lives for the lifetime of the high-evel oo
|
||||
enum global_loop_data {
|
||||
enum hl_loop_data {
|
||||
default_gl_data({
|
||||
async_handle: *ll::uv_async_t,
|
||||
mut active: bool,
|
||||
@ -203,6 +213,10 @@ unsafe fn send_high_level_msg(hl_loop: high_level_loop,
|
||||
log(debug,"GLOBAL ASYNC handle == 0");
|
||||
}
|
||||
}
|
||||
simple_task_loop({async_handle, op_chan}) {
|
||||
log(debug,"simple async handle != 0, waking up loop..");
|
||||
ll::async_send((async_handle));
|
||||
}
|
||||
_ {}
|
||||
}
|
||||
}
|
||||
@ -218,7 +232,7 @@ crust fn high_level_wake_up_cb(async_handle: *ll::uv_async_t,
|
||||
log(debug, #fmt("high_level_wake_up_cb crust.. handle: %? status: %?",
|
||||
async_handle, status));
|
||||
let loop_ptr = ll::get_loop_for_uv_handle(async_handle);
|
||||
let data = ll::get_data_for_uv_handle(async_handle) as *global_loop_data;
|
||||
let data = ll::get_data_for_uv_handle(async_handle) as *hl_loop_data;
|
||||
// we check to see if the loop is "active" (the loop is set to
|
||||
// active = false the first time we realize we need to 'tear down',
|
||||
// set subsequent calls to the global async handle may be triggered
|
||||
@ -245,11 +259,11 @@ crust fn high_level_wake_up_cb(async_handle: *ll::uv_async_t,
|
||||
cb(loop_ptr);
|
||||
log(debug,"after calling cb");
|
||||
}
|
||||
auto_ref_handle(handle) {
|
||||
ref_handle(handle) {
|
||||
high_level_ref(data, handle);
|
||||
}
|
||||
auto_unref_handle(handle, user_close_cb) {
|
||||
high_level_unref(data, handle, false, user_close_cb);
|
||||
manual_unref_handle(handle, user_close_cb) {
|
||||
high_level_unref(data, handle, true, user_close_cb);
|
||||
}
|
||||
tear_down {
|
||||
log(debug,"incoming hl_msg: got tear_down");
|
||||
@ -258,6 +272,9 @@ crust fn high_level_wake_up_cb(async_handle: *ll::uv_async_t,
|
||||
continue = comm::peek(msg_po);
|
||||
}
|
||||
}
|
||||
else {
|
||||
log(debug, "in hl wake_cb, no pending messages");
|
||||
}
|
||||
}
|
||||
log(debug, #fmt("after on_wake, continue? %?", continue));
|
||||
if !do_msg_drain {
|
||||
@ -269,13 +286,13 @@ crust fn high_level_wake_up_cb(async_handle: *ll::uv_async_t,
|
||||
crust fn tear_down_close_cb(handle: *ll::uv_async_t) unsafe {
|
||||
log(debug, #fmt("tear_down_close_cb called, closing handle at %?",
|
||||
handle));
|
||||
let data = ll::get_data_for_uv_handle(handle) as *global_loop_data;
|
||||
if vec::len((*data).refd_handles) > 0 {
|
||||
let data = ll::get_data_for_uv_handle(handle) as *hl_loop_data;
|
||||
if vec::len((*data).refd_handles) > 0u {
|
||||
fail "Didn't unref all high-level handles";
|
||||
}
|
||||
}
|
||||
|
||||
fn high_level_tear_down(data: *global_loop_data) unsafe {
|
||||
fn high_level_tear_down(data: *hl_loop_data) unsafe {
|
||||
log(debug, "high_level_tear_down() called, close async_handle");
|
||||
// call user-suppled before_tear_down cb
|
||||
let async_handle = (*data).async_handle;
|
||||
@ -283,36 +300,66 @@ fn high_level_tear_down(data: *global_loop_data) unsafe {
|
||||
ll::close(async_handle as *libc::c_void, tear_down_close_cb);
|
||||
}
|
||||
|
||||
unsafe fn high_level_ref(data: *global_loop_data, handle: *libc::c_void) {
|
||||
log(debug,"incoming hl_msg: got auto_ref_handle");
|
||||
unsafe fn high_level_ref(data: *hl_loop_data, handle: *libc::c_void) {
|
||||
log(debug,"incoming hl_msg: got ..ref_handle");
|
||||
let mut refd_handles = (*data).refd_handles;
|
||||
let mut unrefd_handles = (*data).unrefd_handles;
|
||||
let handle_already_refd = refd_handles.contains(handle);
|
||||
if handle_already_refd {
|
||||
fail "attempt to do a high-level ref an already ref'd handle";
|
||||
}
|
||||
let handle_already_unrefd = unrefd_handles.contains(handle);
|
||||
// if we are ref'ing a handle (by ptr) that was already unref'd,
|
||||
// probably
|
||||
if handle_already_unrefd {
|
||||
let last_idx = vec::len(unrefd_handles) - 1u;
|
||||
let handle_idx = vec::position_elem(unrefd_handles, handle);
|
||||
alt handle_idx {
|
||||
none {
|
||||
fail "trying to remove handle that isn't in unrefd_handles";
|
||||
}
|
||||
some(idx) {
|
||||
unrefd_handles[idx] <-> unrefd_handles[last_idx];
|
||||
vec::pop(unrefd_handles);
|
||||
}
|
||||
}
|
||||
(*data).unrefd_handles = unrefd_handles;
|
||||
}
|
||||
refd_handles += [handle];
|
||||
(*data).refd_handles = refd_handles;
|
||||
}
|
||||
|
||||
unsafe fn high_level_unref(data: *global_loop_data, handle: *libc::c_void,
|
||||
manual_unref: bool, user_close_cb: *u8) {
|
||||
unsafe fn high_level_unref(data: *hl_loop_data, handle: *libc::c_void,
|
||||
manual_unref: bool, user_close_cb: option<*u8>) {
|
||||
log(debug,"incoming hl_msg: got auto_unref_handle");
|
||||
let mut refd_handles = (*data).refd_handles;
|
||||
let mut unrefd_handles = (*data).unrefd_handles;
|
||||
log(debug, #fmt("refs: %?, unrefs %? handle %?", vec::len(refd_handles),
|
||||
vec::len(unrefd_handles), handle));
|
||||
let handle_already_refd = refd_handles.contains(handle);
|
||||
if !handle_already_refd {
|
||||
fail "attempting to high-level unref an untracked handle";
|
||||
}
|
||||
let double_unref = unrefd_handles.contains(handle);
|
||||
if double_unref {
|
||||
log(debug, "double unref encountered");
|
||||
if manual_unref {
|
||||
// will allow a user to manual unref, but only signal
|
||||
// a fail when a double-unref is caused by a user
|
||||
fail "attempting to high-level unref an unrefd handle";
|
||||
}
|
||||
else {
|
||||
log(debug, "not failing...");
|
||||
}
|
||||
}
|
||||
else {
|
||||
ll::close(handle, user_close_cb);
|
||||
log(debug, "attempting to unref handle");
|
||||
alt user_close_cb {
|
||||
some(cb) {
|
||||
ll::close(handle, cb);
|
||||
}
|
||||
none { }
|
||||
}
|
||||
let last_idx = vec::len(refd_handles) - 1u;
|
||||
let handle_idx = vec::position_elem(refd_handles, handle);
|
||||
alt handle_idx {
|
||||
@ -337,3 +384,128 @@ unsafe fn high_level_unref(data: *global_loop_data, handle: *libc::c_void,
|
||||
}
|
||||
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
crust fn async_close_cb(handle: *ll::uv_async_t) unsafe {
|
||||
log(debug, #fmt("async_close_cb handle %?", handle));
|
||||
let exit_ch = (*(ll::get_data_for_uv_handle(handle)
|
||||
as *ah_data)).exit_ch;
|
||||
comm::send(exit_ch, ());
|
||||
}
|
||||
crust fn async_handle_cb(handle: *ll::uv_async_t, status: libc::c_int)
|
||||
unsafe {
|
||||
log(debug, #fmt("async_handle_cb handle %? status %?",handle,status));
|
||||
let hl_loop = (*(ll::get_data_for_uv_handle(handle)
|
||||
as *ah_data)).hl_loop;
|
||||
unref_and_close(hl_loop, handle, async_close_cb);
|
||||
}
|
||||
type ah_data = {
|
||||
hl_loop: high_level_loop,
|
||||
exit_ch: comm::chan<()>
|
||||
};
|
||||
fn impl_uv_hl_async(hl_loop: high_level_loop) unsafe {
|
||||
let async_handle = ll::async_t();
|
||||
let ah_ptr = ptr::addr_of(async_handle);
|
||||
let exit_po = comm::port::<()>();
|
||||
let exit_ch = comm::chan(exit_po);
|
||||
let ah_data = {
|
||||
hl_loop: hl_loop,
|
||||
exit_ch: exit_ch
|
||||
};
|
||||
let ah_data_ptr = ptr::addr_of(ah_data);
|
||||
interact(hl_loop) {|loop_ptr|
|
||||
ref(hl_loop, ah_ptr);
|
||||
ll::async_init(loop_ptr, ah_ptr, async_handle_cb);
|
||||
ll::set_data_for_uv_handle(ah_ptr, ah_data_ptr as *libc::c_void);
|
||||
ll::async_send(ah_ptr);
|
||||
};
|
||||
comm::recv(exit_po);
|
||||
}
|
||||
|
||||
// this fn documents the bear minimum neccesary to roll your own
|
||||
// high_level_loop
|
||||
unsafe fn spawn_test_loop(exit_ch: comm::chan<()>) -> high_level_loop {
|
||||
let hl_loop_port = comm::port::<high_level_loop>();
|
||||
let hl_loop_ch = comm::chan(hl_loop_port);
|
||||
task::spawn_sched(task::manual_threads(1u)) {||
|
||||
let loop_ptr = ll::loop_new();
|
||||
let msg_po = comm::port::<high_level_msg>();
|
||||
let msg_ch = comm::chan(msg_po);
|
||||
run_high_level_loop(
|
||||
loop_ptr,
|
||||
msg_po,
|
||||
// before_run
|
||||
{|async_handle|
|
||||
log(debug,#fmt("hltest before_run: async_handle %?",
|
||||
async_handle));
|
||||
// do an async_send with it
|
||||
ll::async_send(async_handle);
|
||||
comm::send(hl_loop_ch, simple_task_loop({
|
||||
async_handle: async_handle,
|
||||
op_chan: msg_ch
|
||||
}));
|
||||
},
|
||||
// before_msg_drain
|
||||
{|async_handle|
|
||||
log(debug,#fmt("hltest before_msg_drain: async_handle %?",
|
||||
async_handle));
|
||||
true
|
||||
},
|
||||
// before_tear_down
|
||||
{|async_handle|
|
||||
log(debug,#fmt("hl test_loop b4_tear_down: async %?",
|
||||
async_handle));
|
||||
});
|
||||
ll::loop_delete(loop_ptr);
|
||||
comm::send(exit_ch, ());
|
||||
};
|
||||
ret comm::recv(hl_loop_port);
|
||||
}
|
||||
|
||||
crust fn lifetime_handle_close(handle: *libc::c_void) unsafe {
|
||||
log(debug, #fmt("lifetime_handle_close ptr %?", handle));
|
||||
}
|
||||
|
||||
crust fn lifetime_async_callback(handle: *libc::c_void,
|
||||
status: libc::c_int) {
|
||||
log(debug, #fmt("lifetime_handle_close ptr %? status %?",
|
||||
handle, status));
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore(cfg(target_os = "freebsd"))]
|
||||
fn test_uv_hl_async() unsafe {
|
||||
let exit_po = comm::port::<()>();
|
||||
let exit_ch = comm::chan(exit_po);
|
||||
let hl_loop = spawn_test_loop(exit_ch);
|
||||
|
||||
// using this handle to manage the lifetime of the high_level_loop,
|
||||
// as it will exit the first time one of the impl_uv_hl_async() is
|
||||
// cleaned up with no one ref'd handles on the loop (Which can happen
|
||||
// under race-condition type situations.. this ensures that the loop
|
||||
// lives until, at least, all of the impl_uv_hl_async() runs have been
|
||||
// called, at least.
|
||||
let lifetime_handle = ll::async_t();
|
||||
let lifetime_handle_ptr = ptr::addr_of(lifetime_handle);
|
||||
interact(hl_loop) {|loop_ptr|
|
||||
ref(hl_loop, lifetime_handle_ptr);
|
||||
ll::async_init(loop_ptr, lifetime_handle_ptr,
|
||||
lifetime_async_callback);
|
||||
};
|
||||
|
||||
iter::repeat(7u) {||
|
||||
task::spawn_sched(task::manual_threads(1u), {||
|
||||
impl_uv_hl_async(hl_loop);
|
||||
});
|
||||
};
|
||||
impl_uv_hl_async(hl_loop);
|
||||
impl_uv_hl_async(hl_loop);
|
||||
impl_uv_hl_async(hl_loop);
|
||||
interact(hl_loop) {|loop_ptr|
|
||||
ll::close(lifetime_handle_ptr, lifetime_handle_close);
|
||||
unref(hl_loop, lifetime_handle_ptr);
|
||||
log(debug, "close and unref lifetime handle");
|
||||
};
|
||||
comm::recv(exit_po);
|
||||
}
|
||||
}
|
||||
|
@ -595,8 +595,8 @@ unsafe fn run(loop_handle: *libc::c_void) {
|
||||
rustrt::rust_uv_run(loop_handle);
|
||||
}
|
||||
|
||||
unsafe fn close(handle: *libc::c_void, cb: *u8) {
|
||||
rustrt::rust_uv_close(handle, cb);
|
||||
unsafe fn close<T>(handle: *T, cb: *u8) {
|
||||
rustrt::rust_uv_close(handle as *libc::c_void, cb);
|
||||
}
|
||||
|
||||
unsafe fn tcp_init(loop_handle: *libc::c_void, handle: *uv_tcp_t)
|
||||
|
Loading…
Reference in New Issue
Block a user