diff --git a/src/libstd/std.rc b/src/libstd/std.rc index ab7128f4768..cd9dfdc7fd7 100644 --- a/src/libstd/std.rc +++ b/src/libstd/std.rc @@ -28,6 +28,7 @@ mod net; mod uv; mod uv_ll; mod uv_hl; +mod uv_global_loop; // Utility modules diff --git a/src/libstd/uv.rs b/src/libstd/uv.rs index 941b34c52a3..9d3ad9bf68d 100644 --- a/src/libstd/uv.rs +++ b/src/libstd/uv.rs @@ -34,6 +34,9 @@ export ll; import hl = uv_hl; export hl; +import global_loop = uv_global_loop; +export global_loop; + #[nolink] native mod rustrt { fn rust_uv_loop_new() -> *libc::c_void; diff --git a/src/libstd/uv_global_loop.rs b/src/libstd/uv_global_loop.rs new file mode 100644 index 00000000000..74cb9e4442c --- /dev/null +++ b/src/libstd/uv_global_loop.rs @@ -0,0 +1,237 @@ +#[doc=" +Process-wide, lazily started/stopped libuv event loop interaction. +"]; + +import ll = uv_ll; +import hl = uv_hl; +import get_gl = get; + +export get; + +native mod rustrt { + fn rust_uv_get_kernel_global_chan_ptr() -> *libc::uintptr_t; + fn rust_uv_get_kernel_global_async_handle() -> *libc::uintptr_t; + fn rust_compare_and_swap_ptr(address: *libc::uintptr_t, + oldval: libc::uintptr_t, + newval: libc::uintptr_t) -> bool; +} + +#[doc =" +Race-free helper to get access to a global task where a libuv +loop is running. + +# Return + +* A `hl::high_level_loop` that encapsulates communication with the global +loop. +"] +fn get() -> hl::high_level_loop { + let global_loop_chan_ptr = rustrt::rust_uv_get_kernel_global_chan_ptr(); + log(debug, #fmt("ENTERING global_loop::get() loop chan: %?", + global_loop_chan_ptr)); + + let builder_fn = {|| + let builder = task::builder(); + let opts = { + supervise: false, + notify_chan: none, + sched: + some({mode: task::manual_threads(1u), + native_stack_size: none }) + }; + task::set_opts(builder, opts); + builder + }; + unsafe { + log(debug, "before priv::chan_from_global_ptr"); + let chan = priv::chan_from_global_ptr::( + global_loop_chan_ptr, + builder_fn) {|port| + + // the actual body of our global loop lives here + log(debug, "initialized global port task!"); + log(debug, "GLOBAL initialized global port task!"); + outer_global_loop_body(port); + }; + log(debug, "after priv::chan_from_global_ptr"); + let handle = get_global_async_handle_native_representation() + as **ll::uv_async_t; + ret { async_handle: handle, op_chan: chan }; + } +} + +// INTERNAL API + +unsafe fn outer_global_loop_body(msg_po: comm::port) { + // we're going to use a single libuv-generated loop ptr + // for the duration of the process + let loop_ptr = ll::loop_new(); + + // data structure for loop goes here.. + + // immediately weaken the task this is running in. + priv::weaken_task() {|weak_exit_po| + // when we first enter this loop, we're going + // to wait on stand-by to receive a request to + // fire-up the libuv loop + let mut continue = true; + while continue { + log(debug, "in outer_loop..."); + continue = either::either( + {|left_val| + // bail out.. + // if we catch this msg at this point, + // we should just be able to exit because + // the loop isn't active + log(debug, #fmt("weak_exit_po recv'd msg: %?", + left_val)); + false + }, {|right_val| + log(debug, "about to enter inner loop"); + inner_global_loop_body(weak_exit_po, msg_po, loop_ptr, + copy(right_val)) + }, comm::select2(weak_exit_po, msg_po)); + log(debug,#fmt("GLOBAL LOOP EXITED, WAITING TO RESTART? %?", + continue)); + } + }; + + ll::loop_delete(loop_ptr); +} + +unsafe fn inner_global_loop_body(weak_exit_po_in: comm::port<()>, + msg_po_in: comm::port, + loop_ptr: *libc::c_void, + -first_interaction: hl::high_level_msg) -> bool { + // resend the msg + comm::send(comm::chan(msg_po_in), first_interaction); + + // black magic + let weak_exit_po_ptr = ptr::addr_of(weak_exit_po_in); + hl::run_high_level_loop( + loop_ptr, + msg_po_in, + // before_run + {|async_handle| + log(debug,#fmt("global_loop before_run: async_handle %?", + async_handle)); + // set the handle as the global + set_global_async_handle(0u as *ll::uv_async_t, + async_handle); + // when this is ran, our async_handle is set up, so let's + // do an async_send with it + ll::async_send(async_handle); + }, + // before_msg_drain + {|async_handle| + log(debug,#fmt("global_loop before_msg_drain: async_handle %?", + async_handle)); + let weak_exit_po = *weak_exit_po_ptr; + if(comm::peek(weak_exit_po)) { + // if this is true, immediately bail and return false, causing + // the libuv loop to start tearing down + log(debug,"got weak_exit meg inside libuv loop"); + comm::recv(weak_exit_po); + false + } + // if no weak_exit_po msg is received, then we'll let the + // loop continue + else { + true + } + }, + // before_tear_down + {|async_handle| + log(debug,#fmt("global_loop before_tear_down: async_handle %?", + async_handle)); + set_global_async_handle(async_handle, + 0 as *ll::uv_async_t); + }); + // supposed to return a bool to indicate to the enclosing loop whether + // it should continue or not.. + ret true; +} + +unsafe fn get_global_async_handle_native_representation() + -> *libc::uintptr_t { + ret rustrt::rust_uv_get_kernel_global_async_handle(); +} + +unsafe fn get_global_async_handle() -> *ll::uv_async_t { + ret (*get_global_async_handle_native_representation()) as *ll::uv_async_t; +} + +unsafe fn set_global_async_handle(old: *ll::uv_async_t, + new_ptr: *ll::uv_async_t) { + rustrt::rust_compare_and_swap_ptr( + get_global_async_handle_native_representation(), + old as libc::uintptr_t, + new_ptr as libc::uintptr_t); +} + +#[cfg(test)] +mod test { + crust fn simple_timer_close_cb(timer_ptr: *ll::uv_timer_t) unsafe { + let exit_ch_ptr = ll::get_data_for_uv_handle( + timer_ptr as *libc::c_void) as *comm::chan; + let exit_ch = *exit_ch_ptr; + comm::send(exit_ch, true); + log(debug, #fmt("EXIT_CH_PTR simple_timer_close_cb exit_ch_ptr: %?", + exit_ch_ptr)); + } + crust fn simple_timer_cb(timer_ptr: *ll::uv_timer_t, + status: libc::c_int) unsafe { + log(debug, "in simple timer cb"); + ll::timer_stop(timer_ptr); + let hl_loop = get_gl(); + hl::interact(hl_loop) {|loop_ptr| + log(debug, "closing timer"); + //ll::close(timer_ptr as *libc::c_void, simple_timer_close_cb); + hl::unref_handle(hl_loop, timer_ptr, simple_timer_close_cb); + log(debug, "about to deref exit_ch_ptr"); + log(debug, "after msg sent on deref'd exit_ch"); + }; + log(debug, "exiting simple timer cb"); + } + + fn impl_uv_hl_simple_timer(hl_loop: hl::high_level_loop) unsafe { + let exit_po = comm::port::(); + let exit_ch = comm::chan(exit_po); + let exit_ch_ptr = ptr::addr_of(exit_ch); + log(debug, #fmt("EXIT_CH_PTR newly created exit_ch_ptr: %?", + exit_ch_ptr)); + let timer_handle = ll::timer_t(); + let timer_ptr = ptr::addr_of(timer_handle); + hl::interact(hl_loop) {|loop_ptr| + log(debug, "user code inside interact loop!!!"); + let init_status = ll::timer_init(loop_ptr, timer_ptr); + if(init_status == 0i32) { + hl::ref_handle(hl_loop, timer_ptr); + ll::set_data_for_uv_handle( + timer_ptr as *libc::c_void, + exit_ch_ptr as *libc::c_void); + let start_status = ll::timer_start(timer_ptr, simple_timer_cb, + 1u, 0u); + if(start_status == 0i32) { + } + else { + fail "failure on ll::timer_start()"; + } + } + else { + fail "failure on ll::timer_init()"; + } + }; + comm::recv(exit_po); + log(debug, "global_loop timer test: msg recv on exit_po, done.."); + } + #[test] + #[ignore(cfg(target_os = "freebsd"))] + fn test_uv_global_loop_high_level_global_timer() unsafe { + let hl_loop = get_gl(); + task::spawn_sched(task::manual_threads(1u), {|| + impl_uv_hl_simple_timer(hl_loop); + }); + impl_uv_hl_simple_timer(hl_loop); + } +} \ No newline at end of file diff --git a/src/libstd/uv_hl.rs b/src/libstd/uv_hl.rs index bf1bc68ff3a..5cc58932c4b 100644 --- a/src/libstd/uv_hl.rs +++ b/src/libstd/uv_hl.rs @@ -6,21 +6,11 @@ provide a high-level, abstracted interface to some set of libuv functionality. "]; -export high_level_loop; +export high_level_loop, high_level_msg; export run_high_level_loop, interact, ref_handle, unref_handle; -// this will eventually move into its own, unexported (from std) module -export get_global_loop; import ll = uv_ll; -native mod rustrt { - fn rust_uv_get_kernel_global_chan_ptr() -> *libc::uintptr_t; - fn rust_uv_get_kernel_global_async_handle() -> *libc::uintptr_t; - fn rust_compare_and_swap_ptr(address: *libc::uintptr_t, - oldval: libc::uintptr_t, - newval: libc::uintptr_t) -> bool; -} - #[doc = " Used to abstract-away direct interaction with a libuv loop. @@ -37,77 +27,38 @@ type high_level_loop = { op_chan: comm::chan }; -#[doc = " -Race-free helper to get access to a global task where a libuv -loop is running. - -# Return - -* A `high_level_loop` that encapsulates communication with the global loop. +#[doc=" +Represents the range of interactions with a `high_level_loop` "] -fn get_global_loop() -> high_level_loop unsafe { - let global_loop_chan_ptr = rustrt::rust_uv_get_kernel_global_chan_ptr(); - log(debug, #fmt("ENTERING get_global_loop() loop chan: %?", - global_loop_chan_ptr)); - log(debug, #fmt("ENTERING get_global_loop() loop chan: %?", - global_loop_chan_ptr)); - log(debug,#fmt("value of loop ptr: %?", *global_loop_chan_ptr)); - - let builder_fn = {|| - let builder = task::builder(); - let opts = { - supervise: false, - notify_chan: none, - sched: - some({mode: task::manual_threads(1u), - native_stack_size: none }) - }; - task::set_opts(builder, opts); - builder - }; - unsafe { - log(debug, "before priv::chan_from_global_ptr"); - let chan = priv::chan_from_global_ptr::( - global_loop_chan_ptr, - builder_fn) {|port| - - // the actual body of our global loop lives here - log(debug, "initialized global port task!"); - log(debug, "GLOBAL!!!! initialized global port task!"); - outer_global_loop_body(port); - }; - log(debug, "after priv::chan_from_global_ptr"); - let handle = get_global_async_handle_native_representation() - as **ll::uv_async_t; - ret { async_handle: handle, op_chan: chan }; - } +enum high_level_msg { + interaction (fn~(*libc::c_void)), + auto_ref_handle (*libc::c_void), + auto_unref_handle (*libc::c_void, *u8), + tear_down } #[doc = " -Takes a vanilla libuv `uv_loop_t*` ptr, performs some setup and then calls -`uv_run()`. Users will be able to access this loop via a provided -`async_handle` and `msg_ptr_po`. On top of libuv's internal handle refcount, -the high_level_loop manages its own lifetime with a similar refcount scheme. - -This call blocks for the lifetime of the libuv loop. +Given a vanilla `uv_loop_t*` # Arguments * loop_ptr - a pointer to a currently unused libuv loop. Its `data` field will be overwritten before the loop begins -* async_handle - a pointer to a _fresh_ `ll::uv_async_t` record that _has -not_ been initialized via `uv_async_init`, `ll::uv::async_init`, etc. It must be a pointer to a clean rust `uv_async_t` record -* before_run - a unique closure that is invoked just before the call to -`uv_run` +* msg_po - an active port that receives `high_level_msg`s +* before_run - a unique closure that is invoked after `uv_async_init` is +called on the `async_handle` passed into this callback, just before `uv_run` +is called on the provided `loop_ptr` * before_msg_drain - a unique closure that is invoked every time the loop is -awoken, but before the port pointed to in the `msg_po` argument is drained. +awoken, but before the port pointed to in the `msg_po` argument is drained +* before_tear_down - called just before the loop invokes `uv_close()` on the +provided `async_handle`. `uv_run` should return shortly after "] unsafe fn run_high_level_loop(loop_ptr: *libc::c_void, msg_po: comm::port, - before_run: fn~(*global_loop_data), - before_msg_drain: fn~() -> bool, - before_tear_down: fn~(*global_loop_data)) { + before_run: fn~(*ll::uv_async_t), + before_msg_drain: fn~(*ll::uv_async_t) -> bool, + before_tear_down: fn~(*ll::uv_async_t)) { // set up the special async handle we'll use to allow multi-task // communication with this loop let async = ll::async_t(); @@ -116,20 +67,20 @@ unsafe fn run_high_level_loop(loop_ptr: *libc::c_void, ll::async_init(loop_ptr, async_handle, high_level_wake_up_cb); // initialize our loop data and store it in the loop - let data: global_loop_data = { + let data: global_loop_data = default_gl_data({ async_handle: async_handle, mut active: true, before_msg_drain: before_msg_drain, - before_tear_down: gdc_callback(before_tear_down), + before_tear_down: before_tear_down, msg_po_ptr: ptr::addr_of(msg_po), mut refd_handles: [mut], mut unrefd_handles: [mut] - }; + }); let data_ptr = ptr::addr_of(data); ll::set_data_for_uv_handle(async_handle, data_ptr); // call before_run - before_run(data_ptr); + before_run(async_handle); log(debug, "about to run high level loop"); // enter the loop... this blocks until the loop is done.. @@ -138,17 +89,18 @@ unsafe fn run_high_level_loop(loop_ptr: *libc::c_void, } #[doc = " -Pass in a callback to be processed on the running libuv loop's thread +Provide a callback to be processed by `a_loop` + +The primary way to do operations again a running `high_level_loop` that +doesn't involve creating a uv handle via `safe_handle` # Arguments -* a_loop - a high_level_loop record that represents a channel of -communication with an active libuv loop running on a thread -somwhere in the current process +* a_loop - a `high_level_loop` that you want to do operations against * cb - a function callback to be processed on the running loop's thread. The only parameter is an opaque pointer to the running -uv_loop_t. You can use this pointer to initiate or continue any -operations against the loop +uv_loop_t. In the context of this callback, it is safe to use this pointer +to do various uv_* API calls. _DO NOT_ send this pointer out via ports/chans "] unsafe fn interact(a_loop: high_level_loop, -cb: fn~(*libc::c_void)) { @@ -159,15 +111,30 @@ iface uv_handle_manager { fn init() -> T; } -resource uv_safe_handle(handle_val: uv_handle_manager) { +type safe_handle_fields = { + hl_loop: high_level_loop, + handle: T, + close_cb: *u8 +}; + +/*fn safe_handle(a_loop: high_level_loop, + handle_val: T, + handle_init_cb: fn~(*libc::c_void, *T), + close_cb: *u8) { + +resource safe_handle_container(handle_fields: safe_handle_fields) { } +}*/ + #[doc=" +Needs to be encapsulated within `safe_handle` "] fn ref_handle(hl_loop: high_level_loop, handle: *T) unsafe { send_high_level_msg(hl_loop, auto_ref_handle(handle as *libc::c_void)); } #[doc=" +Needs to be encapsulated within `safe_handle` "] fn unref_handle(hl_loop: high_level_loop, handle: *T, user_close_cb: *u8) unsafe { @@ -175,9 +142,19 @@ fn unref_handle(hl_loop: high_level_loop, handle: *T, user_close_cb)); } -///////////////////// // INTERNAL API -///////////////////// + +// data that lives for the lifetime of the high-evel oo +enum global_loop_data { + default_gl_data({ + async_handle: *ll::uv_async_t, + mut active: bool, + before_msg_drain: fn~(*ll::uv_async_t) -> bool, + before_tear_down: fn~(*ll::uv_async_t), + msg_po_ptr: *comm::port, + mut refd_handles: [mut *libc::c_void], + mut unrefd_handles: [mut *libc::c_void]}) +} unsafe fn send_high_level_msg(hl_loop: high_level_loop, -msg: high_level_msg) unsafe { @@ -201,11 +178,11 @@ unsafe fn send_high_level_msg(hl_loop: high_level_loop, // simply check if the loop is active and, if so, invoke the // user-supplied on_wake callback that is stored in the loop's // data member -crust fn high_level_wake_up_cb(async_handle: *libc::c_void, +crust fn high_level_wake_up_cb(async_handle: *ll::uv_async_t, status: int) unsafe { // nothing here, yet. - log(debug, #fmt("high_level_wake_up_cb crust.. handle: %?", - async_handle)); + log(debug, #fmt("high_level_wake_up_cb crust.. handle: %? status: %?", + async_handle, status)); let loop_ptr = ll::get_loop_for_uv_handle(async_handle); let data = ll::get_data_for_uv_handle(async_handle) as *global_loop_data; // we check to see if the loop is "active" (the loop is set to @@ -217,7 +194,7 @@ crust fn high_level_wake_up_cb(async_handle: *libc::c_void, // in the loops msg_po) if (*data).active { log(debug, "before on_wake"); - let mut do_msg_drain = (*data).before_msg_drain(); + let mut do_msg_drain = (*data).before_msg_drain(async_handle); let mut continue = true; if do_msg_drain { let msg_po = *((*data).msg_po_ptr); @@ -266,12 +243,8 @@ crust fn tear_down_close_cb(handle: *ll::uv_async_t) unsafe { fn high_level_tear_down(data: *global_loop_data) unsafe { log(debug, "high_level_tear_down() called, close async_handle"); // call user-suppled before_tear_down cb - alt (*data).before_tear_down { - gdc_callback(cb) { - cb(data); - } - } let async_handle = (*data).async_handle; + (*data).before_tear_down(async_handle); ll::close(async_handle as *libc::c_void, tear_down_close_cb); } @@ -329,187 +302,3 @@ unsafe fn high_level_unref(data: *global_loop_data, handle: *libc::c_void, } } - -enum high_level_msg { - interaction (fn~(*libc::c_void)), - auto_ref_handle (*libc::c_void), - auto_unref_handle (*libc::c_void, *u8), - tear_down -} - -unsafe fn get_global_async_handle_native_representation() - -> *libc::uintptr_t { - ret rustrt::rust_uv_get_kernel_global_async_handle(); -} - -unsafe fn get_global_async_handle() -> *ll::uv_async_t { - ret (*get_global_async_handle_native_representation()) as *ll::uv_async_t; -} - -unsafe fn set_global_async_handle(old: *ll::uv_async_t, - new_ptr: *ll::uv_async_t) { - rustrt::rust_compare_and_swap_ptr( - get_global_async_handle_native_representation(), - old as libc::uintptr_t, - new_ptr as libc::uintptr_t); -} - -enum global_data_callback { - gdc_callback(fn~(*global_loop_data)) -} - -type global_loop_data = { - async_handle: *ll::uv_async_t, - mut active: bool, - before_msg_drain: fn~() -> bool, - before_tear_down: global_data_callback, - msg_po_ptr: *comm::port, - mut refd_handles: [mut *libc::c_void], - mut unrefd_handles: [mut *libc::c_void] -}; - -unsafe fn outer_global_loop_body(msg_po: comm::port) { - // we're going to use a single libuv-generated loop ptr - // for the duration of the process - let loop_ptr = ll::loop_new(); - - // data structure for loop goes here.. - - // immediately weaken the task this is running in. - priv::weaken_task() {|weak_exit_po| - // when we first enter this loop, we're going - // to wait on stand-by to receive a request to - // fire-up the libuv loop - let mut continue = true; - while continue { - log(debug, "in outer_loop..."); - continue = either::either( - {|left_val| - // bail out.. - // if we catch this msg at this point, - // we should just be able to exit because - // the loop isn't active - log(debug, "got msg on weak_exit_po in outer loop"); - false - }, {|right_val| - log(debug, "about to enter inner loop"); - inner_global_loop_body(weak_exit_po, msg_po, loop_ptr, - copy(right_val)) - }, comm::select2(weak_exit_po, msg_po)); - log(debug,#fmt("GLOBAL LOOP EXITED, WAITING TO RESTART? %?", - continue)); - } - }; - - ll::loop_delete(loop_ptr); -} - -unsafe fn inner_global_loop_body(weak_exit_po_in: comm::port<()>, - msg_po_in: comm::port, - loop_ptr: *libc::c_void, - -first_interaction: high_level_msg) -> bool { - // resend the msg - comm::send(comm::chan(msg_po_in), first_interaction); - - // black magic - let weak_exit_po_ptr = ptr::addr_of(weak_exit_po_in); - run_high_level_loop( - loop_ptr, - msg_po_in, - // before_run - {|data| - // set the handle as the global - set_global_async_handle(0u as *ll::uv_async_t, - (*data).async_handle); - // when this is ran, our async_handle is set up, so let's - // do an async_send with it - ll::async_send((*data).async_handle); - }, - // before_msg_drain - {|| - log(debug,"entering before_msg_drain for the global loop"); - let weak_exit_po = *weak_exit_po_ptr; - if(comm::peek(weak_exit_po)) { - // if this is true, immediately bail and return false, causing - // the libuv loop to start tearing down - log(debug,"got weak_exit meg inside libuv loop"); - comm::recv(weak_exit_po); - false - } - // if no weak_exit_po msg is received, then we'll let the - // loop continue - else { - true - } - }, - // before_tear_down - {|data| - set_global_async_handle((*data).async_handle, - 0 as *ll::uv_async_t); - }); - // supposed to return a bool to indicate to the enclosing loop whether - // it should continue or not.. - ret true; -} - -#[cfg(test)] -mod test { - crust fn simple_timer_close_cb(timer_ptr: *ll::uv_timer_t) unsafe { - log(debug, "user close cb for timer_ptr"); - let exit_ch_ptr = ll::get_data_for_uv_handle( - timer_ptr as *libc::c_void) as *comm::chan; - let exit_ch = *exit_ch_ptr; - comm::send(exit_ch, true); - } - crust fn simple_timer_cb(timer_ptr: *ll::uv_timer_t, - status: libc::c_int) unsafe { - log(debug, "in simple timer cb"); - ll::timer_stop(timer_ptr); - let hl_loop = get_global_loop(); - interact(hl_loop) {|loop_ptr| - log(debug, "closing timer"); - //ll::close(timer_ptr as *libc::c_void, simple_timer_close_cb); - unref_handle(hl_loop, timer_ptr, simple_timer_close_cb); - log(debug, "about to deref exit_ch_ptr"); - log(debug, "after msg sent on deref'd exit_ch"); - }; - log(debug, "exiting simple timer cb"); - } - - fn impl_uv_hl_simple_timer(hl_loop: high_level_loop) unsafe { - let exit_po = comm::port::(); - let exit_ch = comm::chan(exit_po); - let exit_ch_ptr = ptr::addr_of(exit_ch); - let timer_handle = ll::timer_t(); - let timer_ptr = ptr::addr_of(timer_handle); - interact(hl_loop) {|loop_ptr| - log(debug, "user code inside interact loop!!!"); - let init_status = ll::timer_init(loop_ptr, timer_ptr); - if(init_status == 0i32) { - ref_handle(hl_loop, timer_ptr); - ll::set_data_for_uv_handle( - timer_ptr as *libc::c_void, - exit_ch_ptr as *libc::c_void); - let start_status = ll::timer_start(timer_ptr, simple_timer_cb, - 1u, 0u); - if(start_status == 0i32) { - } - else { - fail "failure on ll::timer_start()"; - } - } - else { - fail "failure on ll::timer_init()"; - } - }; - comm::recv(exit_po); - log(debug, "test_uv_hl_simple_timer: msg recv on exit_po, done.."); - } - #[test] - #[ignore(cfg(target_os = "freebsd"))] - fn test_uv_hl_high_level_global_timer() unsafe { - let hl_loop = get_global_loop(); - impl_uv_hl_simple_timer(hl_loop); - impl_uv_hl_simple_timer(hl_loop); - } -} \ No newline at end of file