uv::hl::get_global_loop() -> uv::global_loop::get()
- moved global loop tests, as well.. will add tests in uv_hl that encompass rolling your own high_level_loop via uv::hl::run_high_level_loop() - also whitespace cleanups and misc warning cleanup.. - doesn't work on 32bit linux
This commit is contained in:
parent
253fad7788
commit
83ae83c3b3
@ -28,6 +28,7 @@ mod net;
|
||||
mod uv;
|
||||
mod uv_ll;
|
||||
mod uv_hl;
|
||||
mod uv_global_loop;
|
||||
|
||||
|
||||
// Utility modules
|
||||
|
@ -34,6 +34,9 @@ export ll;
|
||||
import hl = uv_hl;
|
||||
export hl;
|
||||
|
||||
import global_loop = uv_global_loop;
|
||||
export global_loop;
|
||||
|
||||
#[nolink]
|
||||
native mod rustrt {
|
||||
fn rust_uv_loop_new() -> *libc::c_void;
|
||||
|
237
src/libstd/uv_global_loop.rs
Normal file
237
src/libstd/uv_global_loop.rs
Normal file
@ -0,0 +1,237 @@
|
||||
#[doc="
|
||||
Process-wide, lazily started/stopped libuv event loop interaction.
|
||||
"];
|
||||
|
||||
import ll = uv_ll;
|
||||
import hl = uv_hl;
|
||||
import get_gl = get;
|
||||
|
||||
export get;
|
||||
|
||||
native mod rustrt {
|
||||
fn rust_uv_get_kernel_global_chan_ptr() -> *libc::uintptr_t;
|
||||
fn rust_uv_get_kernel_global_async_handle() -> *libc::uintptr_t;
|
||||
fn rust_compare_and_swap_ptr(address: *libc::uintptr_t,
|
||||
oldval: libc::uintptr_t,
|
||||
newval: libc::uintptr_t) -> bool;
|
||||
}
|
||||
|
||||
#[doc ="
|
||||
Race-free helper to get access to a global task where a libuv
|
||||
loop is running.
|
||||
|
||||
# Return
|
||||
|
||||
* A `hl::high_level_loop` that encapsulates communication with the global
|
||||
loop.
|
||||
"]
|
||||
fn get() -> hl::high_level_loop {
|
||||
let global_loop_chan_ptr = rustrt::rust_uv_get_kernel_global_chan_ptr();
|
||||
log(debug, #fmt("ENTERING global_loop::get() loop chan: %?",
|
||||
global_loop_chan_ptr));
|
||||
|
||||
let builder_fn = {||
|
||||
let builder = task::builder();
|
||||
let opts = {
|
||||
supervise: false,
|
||||
notify_chan: none,
|
||||
sched:
|
||||
some({mode: task::manual_threads(1u),
|
||||
native_stack_size: none })
|
||||
};
|
||||
task::set_opts(builder, opts);
|
||||
builder
|
||||
};
|
||||
unsafe {
|
||||
log(debug, "before priv::chan_from_global_ptr");
|
||||
let chan = priv::chan_from_global_ptr::<hl::high_level_msg>(
|
||||
global_loop_chan_ptr,
|
||||
builder_fn) {|port|
|
||||
|
||||
// the actual body of our global loop lives here
|
||||
log(debug, "initialized global port task!");
|
||||
log(debug, "GLOBAL initialized global port task!");
|
||||
outer_global_loop_body(port);
|
||||
};
|
||||
log(debug, "after priv::chan_from_global_ptr");
|
||||
let handle = get_global_async_handle_native_representation()
|
||||
as **ll::uv_async_t;
|
||||
ret { async_handle: handle, op_chan: chan };
|
||||
}
|
||||
}
|
||||
|
||||
// INTERNAL API
|
||||
|
||||
unsafe fn outer_global_loop_body(msg_po: comm::port<hl::high_level_msg>) {
|
||||
// we're going to use a single libuv-generated loop ptr
|
||||
// for the duration of the process
|
||||
let loop_ptr = ll::loop_new();
|
||||
|
||||
// data structure for loop goes here..
|
||||
|
||||
// immediately weaken the task this is running in.
|
||||
priv::weaken_task() {|weak_exit_po|
|
||||
// when we first enter this loop, we're going
|
||||
// to wait on stand-by to receive a request to
|
||||
// fire-up the libuv loop
|
||||
let mut continue = true;
|
||||
while continue {
|
||||
log(debug, "in outer_loop...");
|
||||
continue = either::either(
|
||||
{|left_val|
|
||||
// bail out..
|
||||
// if we catch this msg at this point,
|
||||
// we should just be able to exit because
|
||||
// the loop isn't active
|
||||
log(debug, #fmt("weak_exit_po recv'd msg: %?",
|
||||
left_val));
|
||||
false
|
||||
}, {|right_val|
|
||||
log(debug, "about to enter inner loop");
|
||||
inner_global_loop_body(weak_exit_po, msg_po, loop_ptr,
|
||||
copy(right_val))
|
||||
}, comm::select2(weak_exit_po, msg_po));
|
||||
log(debug,#fmt("GLOBAL LOOP EXITED, WAITING TO RESTART? %?",
|
||||
continue));
|
||||
}
|
||||
};
|
||||
|
||||
ll::loop_delete(loop_ptr);
|
||||
}
|
||||
|
||||
unsafe fn inner_global_loop_body(weak_exit_po_in: comm::port<()>,
|
||||
msg_po_in: comm::port<hl::high_level_msg>,
|
||||
loop_ptr: *libc::c_void,
|
||||
-first_interaction: hl::high_level_msg) -> bool {
|
||||
// resend the msg
|
||||
comm::send(comm::chan(msg_po_in), first_interaction);
|
||||
|
||||
// black magic
|
||||
let weak_exit_po_ptr = ptr::addr_of(weak_exit_po_in);
|
||||
hl::run_high_level_loop(
|
||||
loop_ptr,
|
||||
msg_po_in,
|
||||
// before_run
|
||||
{|async_handle|
|
||||
log(debug,#fmt("global_loop before_run: async_handle %?",
|
||||
async_handle));
|
||||
// set the handle as the global
|
||||
set_global_async_handle(0u as *ll::uv_async_t,
|
||||
async_handle);
|
||||
// when this is ran, our async_handle is set up, so let's
|
||||
// do an async_send with it
|
||||
ll::async_send(async_handle);
|
||||
},
|
||||
// before_msg_drain
|
||||
{|async_handle|
|
||||
log(debug,#fmt("global_loop before_msg_drain: async_handle %?",
|
||||
async_handle));
|
||||
let weak_exit_po = *weak_exit_po_ptr;
|
||||
if(comm::peek(weak_exit_po)) {
|
||||
// if this is true, immediately bail and return false, causing
|
||||
// the libuv loop to start tearing down
|
||||
log(debug,"got weak_exit meg inside libuv loop");
|
||||
comm::recv(weak_exit_po);
|
||||
false
|
||||
}
|
||||
// if no weak_exit_po msg is received, then we'll let the
|
||||
// loop continue
|
||||
else {
|
||||
true
|
||||
}
|
||||
},
|
||||
// before_tear_down
|
||||
{|async_handle|
|
||||
log(debug,#fmt("global_loop before_tear_down: async_handle %?",
|
||||
async_handle));
|
||||
set_global_async_handle(async_handle,
|
||||
0 as *ll::uv_async_t);
|
||||
});
|
||||
// supposed to return a bool to indicate to the enclosing loop whether
|
||||
// it should continue or not..
|
||||
ret true;
|
||||
}
|
||||
|
||||
unsafe fn get_global_async_handle_native_representation()
|
||||
-> *libc::uintptr_t {
|
||||
ret rustrt::rust_uv_get_kernel_global_async_handle();
|
||||
}
|
||||
|
||||
unsafe fn get_global_async_handle() -> *ll::uv_async_t {
|
||||
ret (*get_global_async_handle_native_representation()) as *ll::uv_async_t;
|
||||
}
|
||||
|
||||
unsafe fn set_global_async_handle(old: *ll::uv_async_t,
|
||||
new_ptr: *ll::uv_async_t) {
|
||||
rustrt::rust_compare_and_swap_ptr(
|
||||
get_global_async_handle_native_representation(),
|
||||
old as libc::uintptr_t,
|
||||
new_ptr as libc::uintptr_t);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
crust fn simple_timer_close_cb(timer_ptr: *ll::uv_timer_t) unsafe {
|
||||
let exit_ch_ptr = ll::get_data_for_uv_handle(
|
||||
timer_ptr as *libc::c_void) as *comm::chan<bool>;
|
||||
let exit_ch = *exit_ch_ptr;
|
||||
comm::send(exit_ch, true);
|
||||
log(debug, #fmt("EXIT_CH_PTR simple_timer_close_cb exit_ch_ptr: %?",
|
||||
exit_ch_ptr));
|
||||
}
|
||||
crust fn simple_timer_cb(timer_ptr: *ll::uv_timer_t,
|
||||
status: libc::c_int) unsafe {
|
||||
log(debug, "in simple timer cb");
|
||||
ll::timer_stop(timer_ptr);
|
||||
let hl_loop = get_gl();
|
||||
hl::interact(hl_loop) {|loop_ptr|
|
||||
log(debug, "closing timer");
|
||||
//ll::close(timer_ptr as *libc::c_void, simple_timer_close_cb);
|
||||
hl::unref_handle(hl_loop, timer_ptr, simple_timer_close_cb);
|
||||
log(debug, "about to deref exit_ch_ptr");
|
||||
log(debug, "after msg sent on deref'd exit_ch");
|
||||
};
|
||||
log(debug, "exiting simple timer cb");
|
||||
}
|
||||
|
||||
fn impl_uv_hl_simple_timer(hl_loop: hl::high_level_loop) unsafe {
|
||||
let exit_po = comm::port::<bool>();
|
||||
let exit_ch = comm::chan(exit_po);
|
||||
let exit_ch_ptr = ptr::addr_of(exit_ch);
|
||||
log(debug, #fmt("EXIT_CH_PTR newly created exit_ch_ptr: %?",
|
||||
exit_ch_ptr));
|
||||
let timer_handle = ll::timer_t();
|
||||
let timer_ptr = ptr::addr_of(timer_handle);
|
||||
hl::interact(hl_loop) {|loop_ptr|
|
||||
log(debug, "user code inside interact loop!!!");
|
||||
let init_status = ll::timer_init(loop_ptr, timer_ptr);
|
||||
if(init_status == 0i32) {
|
||||
hl::ref_handle(hl_loop, timer_ptr);
|
||||
ll::set_data_for_uv_handle(
|
||||
timer_ptr as *libc::c_void,
|
||||
exit_ch_ptr as *libc::c_void);
|
||||
let start_status = ll::timer_start(timer_ptr, simple_timer_cb,
|
||||
1u, 0u);
|
||||
if(start_status == 0i32) {
|
||||
}
|
||||
else {
|
||||
fail "failure on ll::timer_start()";
|
||||
}
|
||||
}
|
||||
else {
|
||||
fail "failure on ll::timer_init()";
|
||||
}
|
||||
};
|
||||
comm::recv(exit_po);
|
||||
log(debug, "global_loop timer test: msg recv on exit_po, done..");
|
||||
}
|
||||
#[test]
|
||||
#[ignore(cfg(target_os = "freebsd"))]
|
||||
fn test_uv_global_loop_high_level_global_timer() unsafe {
|
||||
let hl_loop = get_gl();
|
||||
task::spawn_sched(task::manual_threads(1u), {||
|
||||
impl_uv_hl_simple_timer(hl_loop);
|
||||
});
|
||||
impl_uv_hl_simple_timer(hl_loop);
|
||||
}
|
||||
}
|
@ -6,21 +6,11 @@ provide a high-level, abstracted interface to some set of
|
||||
libuv functionality.
|
||||
"];
|
||||
|
||||
export high_level_loop;
|
||||
export high_level_loop, high_level_msg;
|
||||
export run_high_level_loop, interact, ref_handle, unref_handle;
|
||||
// this will eventually move into its own, unexported (from std) module
|
||||
export get_global_loop;
|
||||
|
||||
import ll = uv_ll;
|
||||
|
||||
native mod rustrt {
|
||||
fn rust_uv_get_kernel_global_chan_ptr() -> *libc::uintptr_t;
|
||||
fn rust_uv_get_kernel_global_async_handle() -> *libc::uintptr_t;
|
||||
fn rust_compare_and_swap_ptr(address: *libc::uintptr_t,
|
||||
oldval: libc::uintptr_t,
|
||||
newval: libc::uintptr_t) -> bool;
|
||||
}
|
||||
|
||||
#[doc = "
|
||||
Used to abstract-away direct interaction with a libuv loop.
|
||||
|
||||
@ -37,77 +27,38 @@ type high_level_loop = {
|
||||
op_chan: comm::chan<high_level_msg>
|
||||
};
|
||||
|
||||
#[doc = "
|
||||
Race-free helper to get access to a global task where a libuv
|
||||
loop is running.
|
||||
|
||||
# Return
|
||||
|
||||
* A `high_level_loop` that encapsulates communication with the global loop.
|
||||
#[doc="
|
||||
Represents the range of interactions with a `high_level_loop`
|
||||
"]
|
||||
fn get_global_loop() -> high_level_loop unsafe {
|
||||
let global_loop_chan_ptr = rustrt::rust_uv_get_kernel_global_chan_ptr();
|
||||
log(debug, #fmt("ENTERING get_global_loop() loop chan: %?",
|
||||
global_loop_chan_ptr));
|
||||
log(debug, #fmt("ENTERING get_global_loop() loop chan: %?",
|
||||
global_loop_chan_ptr));
|
||||
log(debug,#fmt("value of loop ptr: %?", *global_loop_chan_ptr));
|
||||
|
||||
let builder_fn = {||
|
||||
let builder = task::builder();
|
||||
let opts = {
|
||||
supervise: false,
|
||||
notify_chan: none,
|
||||
sched:
|
||||
some({mode: task::manual_threads(1u),
|
||||
native_stack_size: none })
|
||||
};
|
||||
task::set_opts(builder, opts);
|
||||
builder
|
||||
};
|
||||
unsafe {
|
||||
log(debug, "before priv::chan_from_global_ptr");
|
||||
let chan = priv::chan_from_global_ptr::<high_level_msg>(
|
||||
global_loop_chan_ptr,
|
||||
builder_fn) {|port|
|
||||
|
||||
// the actual body of our global loop lives here
|
||||
log(debug, "initialized global port task!");
|
||||
log(debug, "GLOBAL!!!! initialized global port task!");
|
||||
outer_global_loop_body(port);
|
||||
};
|
||||
log(debug, "after priv::chan_from_global_ptr");
|
||||
let handle = get_global_async_handle_native_representation()
|
||||
as **ll::uv_async_t;
|
||||
ret { async_handle: handle, op_chan: chan };
|
||||
}
|
||||
enum high_level_msg {
|
||||
interaction (fn~(*libc::c_void)),
|
||||
auto_ref_handle (*libc::c_void),
|
||||
auto_unref_handle (*libc::c_void, *u8),
|
||||
tear_down
|
||||
}
|
||||
|
||||
#[doc = "
|
||||
Takes a vanilla libuv `uv_loop_t*` ptr, performs some setup and then calls
|
||||
`uv_run()`. Users will be able to access this loop via a provided
|
||||
`async_handle` and `msg_ptr_po`. On top of libuv's internal handle refcount,
|
||||
the high_level_loop manages its own lifetime with a similar refcount scheme.
|
||||
|
||||
This call blocks for the lifetime of the libuv loop.
|
||||
Given a vanilla `uv_loop_t*`
|
||||
|
||||
# Arguments
|
||||
|
||||
* loop_ptr - a pointer to a currently unused libuv loop. Its `data` field
|
||||
will be overwritten before the loop begins
|
||||
* async_handle - a pointer to a _fresh_ `ll::uv_async_t` record that _has
|
||||
not_ been initialized via `uv_async_init`, `ll::uv::async_init`, etc. It
|
||||
must be a pointer to a clean rust `uv_async_t` record
|
||||
* before_run - a unique closure that is invoked just before the call to
|
||||
`uv_run`
|
||||
* msg_po - an active port that receives `high_level_msg`s
|
||||
* before_run - a unique closure that is invoked after `uv_async_init` is
|
||||
called on the `async_handle` passed into this callback, just before `uv_run`
|
||||
is called on the provided `loop_ptr`
|
||||
* before_msg_drain - a unique closure that is invoked every time the loop is
|
||||
awoken, but before the port pointed to in the `msg_po` argument is drained.
|
||||
awoken, but before the port pointed to in the `msg_po` argument is drained
|
||||
* before_tear_down - called just before the loop invokes `uv_close()` on the
|
||||
provided `async_handle`. `uv_run` should return shortly after
|
||||
"]
|
||||
unsafe fn run_high_level_loop(loop_ptr: *libc::c_void,
|
||||
msg_po: comm::port<high_level_msg>,
|
||||
before_run: fn~(*global_loop_data),
|
||||
before_msg_drain: fn~() -> bool,
|
||||
before_tear_down: fn~(*global_loop_data)) {
|
||||
before_run: fn~(*ll::uv_async_t),
|
||||
before_msg_drain: fn~(*ll::uv_async_t) -> bool,
|
||||
before_tear_down: fn~(*ll::uv_async_t)) {
|
||||
// set up the special async handle we'll use to allow multi-task
|
||||
// communication with this loop
|
||||
let async = ll::async_t();
|
||||
@ -116,20 +67,20 @@ unsafe fn run_high_level_loop(loop_ptr: *libc::c_void,
|
||||
ll::async_init(loop_ptr, async_handle, high_level_wake_up_cb);
|
||||
|
||||
// initialize our loop data and store it in the loop
|
||||
let data: global_loop_data = {
|
||||
let data: global_loop_data = default_gl_data({
|
||||
async_handle: async_handle,
|
||||
mut active: true,
|
||||
before_msg_drain: before_msg_drain,
|
||||
before_tear_down: gdc_callback(before_tear_down),
|
||||
before_tear_down: before_tear_down,
|
||||
msg_po_ptr: ptr::addr_of(msg_po),
|
||||
mut refd_handles: [mut],
|
||||
mut unrefd_handles: [mut]
|
||||
};
|
||||
});
|
||||
let data_ptr = ptr::addr_of(data);
|
||||
ll::set_data_for_uv_handle(async_handle, data_ptr);
|
||||
|
||||
// call before_run
|
||||
before_run(data_ptr);
|
||||
before_run(async_handle);
|
||||
|
||||
log(debug, "about to run high level loop");
|
||||
// enter the loop... this blocks until the loop is done..
|
||||
@ -138,17 +89,18 @@ unsafe fn run_high_level_loop(loop_ptr: *libc::c_void,
|
||||
}
|
||||
|
||||
#[doc = "
|
||||
Pass in a callback to be processed on the running libuv loop's thread
|
||||
Provide a callback to be processed by `a_loop`
|
||||
|
||||
The primary way to do operations again a running `high_level_loop` that
|
||||
doesn't involve creating a uv handle via `safe_handle`
|
||||
|
||||
# Arguments
|
||||
|
||||
* a_loop - a high_level_loop record that represents a channel of
|
||||
communication with an active libuv loop running on a thread
|
||||
somwhere in the current process
|
||||
* a_loop - a `high_level_loop` that you want to do operations against
|
||||
* cb - a function callback to be processed on the running loop's
|
||||
thread. The only parameter is an opaque pointer to the running
|
||||
uv_loop_t. You can use this pointer to initiate or continue any
|
||||
operations against the loop
|
||||
uv_loop_t. In the context of this callback, it is safe to use this pointer
|
||||
to do various uv_* API calls. _DO NOT_ send this pointer out via ports/chans
|
||||
"]
|
||||
unsafe fn interact(a_loop: high_level_loop,
|
||||
-cb: fn~(*libc::c_void)) {
|
||||
@ -159,15 +111,30 @@ iface uv_handle_manager<T> {
|
||||
fn init() -> T;
|
||||
}
|
||||
|
||||
resource uv_safe_handle<T>(handle_val: uv_handle_manager<T>) {
|
||||
type safe_handle_fields<T> = {
|
||||
hl_loop: high_level_loop,
|
||||
handle: T,
|
||||
close_cb: *u8
|
||||
};
|
||||
|
||||
/*fn safe_handle<T>(a_loop: high_level_loop,
|
||||
handle_val: T,
|
||||
handle_init_cb: fn~(*libc::c_void, *T),
|
||||
close_cb: *u8) {
|
||||
|
||||
resource safe_handle_container<T>(handle_fields: safe_handle_fields<T>) {
|
||||
}
|
||||
}*/
|
||||
|
||||
|
||||
#[doc="
|
||||
Needs to be encapsulated within `safe_handle`
|
||||
"]
|
||||
fn ref_handle<T>(hl_loop: high_level_loop, handle: *T) unsafe {
|
||||
send_high_level_msg(hl_loop, auto_ref_handle(handle as *libc::c_void));
|
||||
}
|
||||
#[doc="
|
||||
Needs to be encapsulated within `safe_handle`
|
||||
"]
|
||||
fn unref_handle<T>(hl_loop: high_level_loop, handle: *T,
|
||||
user_close_cb: *u8) unsafe {
|
||||
@ -175,9 +142,19 @@ fn unref_handle<T>(hl_loop: high_level_loop, handle: *T,
|
||||
user_close_cb));
|
||||
}
|
||||
|
||||
/////////////////////
|
||||
// INTERNAL API
|
||||
/////////////////////
|
||||
|
||||
// data that lives for the lifetime of the high-evel oo
|
||||
enum global_loop_data {
|
||||
default_gl_data({
|
||||
async_handle: *ll::uv_async_t,
|
||||
mut active: bool,
|
||||
before_msg_drain: fn~(*ll::uv_async_t) -> bool,
|
||||
before_tear_down: fn~(*ll::uv_async_t),
|
||||
msg_po_ptr: *comm::port<high_level_msg>,
|
||||
mut refd_handles: [mut *libc::c_void],
|
||||
mut unrefd_handles: [mut *libc::c_void]})
|
||||
}
|
||||
|
||||
unsafe fn send_high_level_msg(hl_loop: high_level_loop,
|
||||
-msg: high_level_msg) unsafe {
|
||||
@ -201,11 +178,11 @@ unsafe fn send_high_level_msg(hl_loop: high_level_loop,
|
||||
// simply check if the loop is active and, if so, invoke the
|
||||
// user-supplied on_wake callback that is stored in the loop's
|
||||
// data member
|
||||
crust fn high_level_wake_up_cb(async_handle: *libc::c_void,
|
||||
crust fn high_level_wake_up_cb(async_handle: *ll::uv_async_t,
|
||||
status: int) unsafe {
|
||||
// nothing here, yet.
|
||||
log(debug, #fmt("high_level_wake_up_cb crust.. handle: %?",
|
||||
async_handle));
|
||||
log(debug, #fmt("high_level_wake_up_cb crust.. handle: %? status: %?",
|
||||
async_handle, status));
|
||||
let loop_ptr = ll::get_loop_for_uv_handle(async_handle);
|
||||
let data = ll::get_data_for_uv_handle(async_handle) as *global_loop_data;
|
||||
// we check to see if the loop is "active" (the loop is set to
|
||||
@ -217,7 +194,7 @@ crust fn high_level_wake_up_cb(async_handle: *libc::c_void,
|
||||
// in the loops msg_po)
|
||||
if (*data).active {
|
||||
log(debug, "before on_wake");
|
||||
let mut do_msg_drain = (*data).before_msg_drain();
|
||||
let mut do_msg_drain = (*data).before_msg_drain(async_handle);
|
||||
let mut continue = true;
|
||||
if do_msg_drain {
|
||||
let msg_po = *((*data).msg_po_ptr);
|
||||
@ -266,12 +243,8 @@ crust fn tear_down_close_cb(handle: *ll::uv_async_t) unsafe {
|
||||
fn high_level_tear_down(data: *global_loop_data) unsafe {
|
||||
log(debug, "high_level_tear_down() called, close async_handle");
|
||||
// call user-suppled before_tear_down cb
|
||||
alt (*data).before_tear_down {
|
||||
gdc_callback(cb) {
|
||||
cb(data);
|
||||
}
|
||||
}
|
||||
let async_handle = (*data).async_handle;
|
||||
(*data).before_tear_down(async_handle);
|
||||
ll::close(async_handle as *libc::c_void, tear_down_close_cb);
|
||||
}
|
||||
|
||||
@ -329,187 +302,3 @@ unsafe fn high_level_unref(data: *global_loop_data, handle: *libc::c_void,
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
enum high_level_msg {
|
||||
interaction (fn~(*libc::c_void)),
|
||||
auto_ref_handle (*libc::c_void),
|
||||
auto_unref_handle (*libc::c_void, *u8),
|
||||
tear_down
|
||||
}
|
||||
|
||||
unsafe fn get_global_async_handle_native_representation()
|
||||
-> *libc::uintptr_t {
|
||||
ret rustrt::rust_uv_get_kernel_global_async_handle();
|
||||
}
|
||||
|
||||
unsafe fn get_global_async_handle() -> *ll::uv_async_t {
|
||||
ret (*get_global_async_handle_native_representation()) as *ll::uv_async_t;
|
||||
}
|
||||
|
||||
unsafe fn set_global_async_handle(old: *ll::uv_async_t,
|
||||
new_ptr: *ll::uv_async_t) {
|
||||
rustrt::rust_compare_and_swap_ptr(
|
||||
get_global_async_handle_native_representation(),
|
||||
old as libc::uintptr_t,
|
||||
new_ptr as libc::uintptr_t);
|
||||
}
|
||||
|
||||
enum global_data_callback {
|
||||
gdc_callback(fn~(*global_loop_data))
|
||||
}
|
||||
|
||||
type global_loop_data = {
|
||||
async_handle: *ll::uv_async_t,
|
||||
mut active: bool,
|
||||
before_msg_drain: fn~() -> bool,
|
||||
before_tear_down: global_data_callback,
|
||||
msg_po_ptr: *comm::port<high_level_msg>,
|
||||
mut refd_handles: [mut *libc::c_void],
|
||||
mut unrefd_handles: [mut *libc::c_void]
|
||||
};
|
||||
|
||||
unsafe fn outer_global_loop_body(msg_po: comm::port<high_level_msg>) {
|
||||
// we're going to use a single libuv-generated loop ptr
|
||||
// for the duration of the process
|
||||
let loop_ptr = ll::loop_new();
|
||||
|
||||
// data structure for loop goes here..
|
||||
|
||||
// immediately weaken the task this is running in.
|
||||
priv::weaken_task() {|weak_exit_po|
|
||||
// when we first enter this loop, we're going
|
||||
// to wait on stand-by to receive a request to
|
||||
// fire-up the libuv loop
|
||||
let mut continue = true;
|
||||
while continue {
|
||||
log(debug, "in outer_loop...");
|
||||
continue = either::either(
|
||||
{|left_val|
|
||||
// bail out..
|
||||
// if we catch this msg at this point,
|
||||
// we should just be able to exit because
|
||||
// the loop isn't active
|
||||
log(debug, "got msg on weak_exit_po in outer loop");
|
||||
false
|
||||
}, {|right_val|
|
||||
log(debug, "about to enter inner loop");
|
||||
inner_global_loop_body(weak_exit_po, msg_po, loop_ptr,
|
||||
copy(right_val))
|
||||
}, comm::select2(weak_exit_po, msg_po));
|
||||
log(debug,#fmt("GLOBAL LOOP EXITED, WAITING TO RESTART? %?",
|
||||
continue));
|
||||
}
|
||||
};
|
||||
|
||||
ll::loop_delete(loop_ptr);
|
||||
}
|
||||
|
||||
unsafe fn inner_global_loop_body(weak_exit_po_in: comm::port<()>,
|
||||
msg_po_in: comm::port<high_level_msg>,
|
||||
loop_ptr: *libc::c_void,
|
||||
-first_interaction: high_level_msg) -> bool {
|
||||
// resend the msg
|
||||
comm::send(comm::chan(msg_po_in), first_interaction);
|
||||
|
||||
// black magic
|
||||
let weak_exit_po_ptr = ptr::addr_of(weak_exit_po_in);
|
||||
run_high_level_loop(
|
||||
loop_ptr,
|
||||
msg_po_in,
|
||||
// before_run
|
||||
{|data|
|
||||
// set the handle as the global
|
||||
set_global_async_handle(0u as *ll::uv_async_t,
|
||||
(*data).async_handle);
|
||||
// when this is ran, our async_handle is set up, so let's
|
||||
// do an async_send with it
|
||||
ll::async_send((*data).async_handle);
|
||||
},
|
||||
// before_msg_drain
|
||||
{||
|
||||
log(debug,"entering before_msg_drain for the global loop");
|
||||
let weak_exit_po = *weak_exit_po_ptr;
|
||||
if(comm::peek(weak_exit_po)) {
|
||||
// if this is true, immediately bail and return false, causing
|
||||
// the libuv loop to start tearing down
|
||||
log(debug,"got weak_exit meg inside libuv loop");
|
||||
comm::recv(weak_exit_po);
|
||||
false
|
||||
}
|
||||
// if no weak_exit_po msg is received, then we'll let the
|
||||
// loop continue
|
||||
else {
|
||||
true
|
||||
}
|
||||
},
|
||||
// before_tear_down
|
||||
{|data|
|
||||
set_global_async_handle((*data).async_handle,
|
||||
0 as *ll::uv_async_t);
|
||||
});
|
||||
// supposed to return a bool to indicate to the enclosing loop whether
|
||||
// it should continue or not..
|
||||
ret true;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
crust fn simple_timer_close_cb(timer_ptr: *ll::uv_timer_t) unsafe {
|
||||
log(debug, "user close cb for timer_ptr");
|
||||
let exit_ch_ptr = ll::get_data_for_uv_handle(
|
||||
timer_ptr as *libc::c_void) as *comm::chan<bool>;
|
||||
let exit_ch = *exit_ch_ptr;
|
||||
comm::send(exit_ch, true);
|
||||
}
|
||||
crust fn simple_timer_cb(timer_ptr: *ll::uv_timer_t,
|
||||
status: libc::c_int) unsafe {
|
||||
log(debug, "in simple timer cb");
|
||||
ll::timer_stop(timer_ptr);
|
||||
let hl_loop = get_global_loop();
|
||||
interact(hl_loop) {|loop_ptr|
|
||||
log(debug, "closing timer");
|
||||
//ll::close(timer_ptr as *libc::c_void, simple_timer_close_cb);
|
||||
unref_handle(hl_loop, timer_ptr, simple_timer_close_cb);
|
||||
log(debug, "about to deref exit_ch_ptr");
|
||||
log(debug, "after msg sent on deref'd exit_ch");
|
||||
};
|
||||
log(debug, "exiting simple timer cb");
|
||||
}
|
||||
|
||||
fn impl_uv_hl_simple_timer(hl_loop: high_level_loop) unsafe {
|
||||
let exit_po = comm::port::<bool>();
|
||||
let exit_ch = comm::chan(exit_po);
|
||||
let exit_ch_ptr = ptr::addr_of(exit_ch);
|
||||
let timer_handle = ll::timer_t();
|
||||
let timer_ptr = ptr::addr_of(timer_handle);
|
||||
interact(hl_loop) {|loop_ptr|
|
||||
log(debug, "user code inside interact loop!!!");
|
||||
let init_status = ll::timer_init(loop_ptr, timer_ptr);
|
||||
if(init_status == 0i32) {
|
||||
ref_handle(hl_loop, timer_ptr);
|
||||
ll::set_data_for_uv_handle(
|
||||
timer_ptr as *libc::c_void,
|
||||
exit_ch_ptr as *libc::c_void);
|
||||
let start_status = ll::timer_start(timer_ptr, simple_timer_cb,
|
||||
1u, 0u);
|
||||
if(start_status == 0i32) {
|
||||
}
|
||||
else {
|
||||
fail "failure on ll::timer_start()";
|
||||
}
|
||||
}
|
||||
else {
|
||||
fail "failure on ll::timer_init()";
|
||||
}
|
||||
};
|
||||
comm::recv(exit_po);
|
||||
log(debug, "test_uv_hl_simple_timer: msg recv on exit_po, done..");
|
||||
}
|
||||
#[test]
|
||||
#[ignore(cfg(target_os = "freebsd"))]
|
||||
fn test_uv_hl_high_level_global_timer() unsafe {
|
||||
let hl_loop = get_global_loop();
|
||||
impl_uv_hl_simple_timer(hl_loop);
|
||||
impl_uv_hl_simple_timer(hl_loop);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user