std: Rename uv::hl to uv::iotask. Additional cleanup

This commit is contained in:
Brian Anderson 2012-05-24 23:42:12 -07:00
parent 59262dfc62
commit 81b8e20f31
6 changed files with 156 additions and 158 deletions

View File

@ -4,6 +4,8 @@ High-level interface to libuv's TCP functionality
// FIXME: Fewer import *'s
import ip = net_ip;
import uv::iotask;
import uv::iotask::iotask;
import comm::*;
import result::*;
import str::*;
@ -44,7 +46,7 @@ resource tcp_socket(socket_data: @tcp_socket_data)
};
let close_data_ptr = ptr::addr_of(close_data);
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
uv::hl::interact((*socket_data).hl_loop) {|loop_ptr|
iotask::interact((*socket_data).iotask) {|loop_ptr|
log(debug, #fmt("interact dtor for tcp_socket stream %? loop %?",
stream_handle_ptr, loop_ptr));
uv::ll::set_data_for_uv_handle(stream_handle_ptr,
@ -62,8 +64,8 @@ resource tcp_conn_port(conn_data: @tcp_conn_port_data) unsafe {
let conn_data_ptr = ptr::addr_of(*conn_data);
let server_stream_ptr = ptr::addr_of((*conn_data_ptr).server_stream);
let stream_closed_po = (*conn_data).stream_closed_po;
let hl_loop = (*conn_data_ptr).hl_loop;
uv::hl::interact(hl_loop) {|loop_ptr|
let iotask = (*conn_data_ptr).iotask;
iotask::interact(iotask) {|loop_ptr|
log(debug, #fmt("dtor for tcp_conn_port loop: %?",
loop_ptr));
uv::ll::close(server_stream_ptr, tcp_nl_close_cb);
@ -86,7 +88,7 @@ Initiate a client connection over TCP/IP
* `ip` - The IP address (versions 4 or 6) of the remote host
* `port` - the unsigned integer of the desired remote host port
* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
* `iotask` - a `uv::iotask` that the tcp request will run on
# Returns
@ -95,7 +97,7 @@ can be used to send and receive data to/from the remote host. In the event
of failure, a `tcp_err_data` will be returned
"]
fn connect(input_ip: ip::ip_addr, port: uint,
hl_loop: uv::hl::high_level_loop)
iotask: iotask)
-> result::result<tcp_socket, tcp_err_data> unsafe {
let result_po = comm::port::<conn_attempt>();
let closed_signal_po = comm::port::<()>();
@ -113,7 +115,7 @@ fn connect(input_ip: ip::ip_addr, port: uint,
stream_handle_ptr: stream_handle_ptr,
connect_req: uv::ll::connect_t(),
write_req: uv::ll::write_t(),
hl_loop: hl_loop
iotask: iotask
};
let socket_data_ptr = ptr::addr_of(*socket_data);
log(debug, #fmt("tcp_connect result_ch %?", conn_data.result_ch));
@ -121,7 +123,7 @@ fn connect(input_ip: ip::ip_addr, port: uint,
// we can send into the interact cb to be handled in libuv..
log(debug, #fmt("stream_handle_ptr outside interact %?",
stream_handle_ptr));
uv::hl::interact(hl_loop) {|loop_ptr|
iotask::interact(iotask) {|loop_ptr|
log(debug, "in interact cb for tcp client connect..");
log(debug, #fmt("stream_handle_ptr in interact %?",
stream_handle_ptr));
@ -354,7 +356,7 @@ to listen for, and accept, new connections, or a `tcp_err_data` if
failure to create the tcp listener occurs
"]
fn new_listener(host_ip: ip::ip_addr, port: uint, backlog: uint,
hl_loop: uv::hl::high_level_loop)
iotask: iotask)
-> result::result<tcp_conn_port, tcp_err_data> unsafe {
let stream_closed_po = comm::port::<()>();
let stream_closed_ch = comm::chan(stream_closed_po);
@ -367,7 +369,7 @@ fn new_listener(host_ip: ip::ip_addr, port: uint, backlog: uint,
server_stream: uv::ll::tcp_t(),
stream_closed_po: stream_closed_po,
stream_closed_ch: stream_closed_ch,
hl_loop: hl_loop,
iotask: iotask,
new_conn_po: new_conn_po,
new_conn_ch: new_conn_ch
};
@ -377,7 +379,7 @@ fn new_listener(host_ip: ip::ip_addr, port: uint, backlog: uint,
let setup_po = comm::port::<option<tcp_err_data>>();
let setup_ch = comm::chan(setup_po);
uv::hl::interact(hl_loop) {|loop_ptr|
iotask::interact(iotask) {|loop_ptr|
let tcp_addr = ipv4_ip_addr_to_sockaddr_in(host_ip,
port);
alt uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
@ -445,11 +447,11 @@ variant
fn conn_recv(server_port: tcp_conn_port)
-> result::result<tcp_socket, tcp_err_data> {
let new_conn_po = (**server_port).new_conn_po;
let hl_loop = (**server_port).hl_loop;
let iotask = (**server_port).iotask;
let new_conn_result = comm::recv(new_conn_po);
alt new_conn_result {
ok(client_stream_ptr) {
conn_port_new_tcp_socket(client_stream_ptr, hl_loop)
conn_port_new_tcp_socket(client_stream_ptr, iotask)
}
err(err_data) {
result::err(err_data)
@ -476,12 +478,12 @@ once a new connection is recv'd. Its parameter:
fn conn_recv_spawn(server_port: tcp_conn_port,
cb: fn~(result::result<tcp_socket, tcp_err_data>)) {
let new_conn_po = (**server_port).new_conn_po;
let hl_loop = (**server_port).hl_loop;
let iotask = (**server_port).iotask;
let new_conn_result = comm::recv(new_conn_po);
task::spawn {||
let sock_create_result = alt new_conn_result {
ok(client_stream_ptr) {
conn_port_new_tcp_socket(client_stream_ptr, hl_loop)
conn_port_new_tcp_socket(client_stream_ptr, iotask)
}
err(err_data) {
result::err(err_data)
@ -582,7 +584,7 @@ fn accept(new_conn: tcp_new_connection)
let server_data_ptr = uv::ll::get_data_for_uv_handle(
server_handle_ptr) as *tcp_listen_fc_data;
let reader_po = comm::port::<result::result<[u8], tcp_err_data>>();
let hl_loop = (*server_data_ptr).hl_loop;
let iotask = (*server_data_ptr).iotask;
let stream_handle_ptr = malloc_uv_tcp_t();
*(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
let client_socket_data = @{
@ -591,7 +593,7 @@ fn accept(new_conn: tcp_new_connection)
stream_handle_ptr : stream_handle_ptr,
connect_req : uv::ll::connect_t(),
write_req : uv::ll::write_t(),
hl_loop: hl_loop
iotask : iotask
};
let client_socket_data_ptr = ptr::addr_of(*client_socket_data);
let client_stream_handle_ptr =
@ -677,7 +679,7 @@ successful/normal shutdown, and a `tcp_err_data` record in the event
of listen exiting because of an error
"]
fn listen_for_conn(host_ip: ip::ip_addr, port: uint, backlog: uint,
hl_loop: uv::hl::high_level_loop,
iotask: iotask,
on_establish_cb: fn~(comm::chan<option<tcp_err_data>>),
new_connect_cb: fn~(tcp_new_connection,
comm::chan<option<tcp_err_data>>))
@ -692,14 +694,14 @@ fn listen_for_conn(host_ip: ip::ip_addr, port: uint, backlog: uint,
stream_closed_ch: comm::chan(stream_closed_po),
kill_ch: kill_ch,
new_connect_cb: new_connect_cb,
hl_loop: hl_loop,
iotask: iotask,
mut active: true
};
let server_data_ptr = ptr::addr_of(server_data);
let setup_po = comm::port::<option<tcp_err_data>>();
let setup_ch = comm::chan(setup_po);
uv::hl::interact(hl_loop) {|loop_ptr|
iotask::interact(iotask) {|loop_ptr|
let tcp_addr = ipv4_ip_addr_to_sockaddr_in(host_ip,
port);
alt uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
@ -745,7 +747,7 @@ fn listen_for_conn(host_ip: ip::ip_addr, port: uint, backlog: uint,
none {
on_establish_cb(kill_ch);
let kill_result = comm::recv(kill_po);
uv::hl::interact(hl_loop) {|loop_ptr|
iotask::interact(iotask) {|loop_ptr|
log(debug, #fmt("tcp::listen post-kill recv hl interact %?",
loop_ptr));
(*server_data_ptr).active = false;
@ -811,7 +813,7 @@ impl sock_methods for tcp_socket {
fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint)
-> result::result<[u8],tcp_err_data> unsafe {
log(debug, "starting tcp::read");
let hl_loop = (*socket_data).hl_loop;
let iotask = (*socket_data).iotask;
let rs_result = read_start_common_impl(socket_data);
if result::is_failure(rs_result) {
let err_data = result::get_err(rs_result);
@ -821,7 +823,7 @@ fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint)
log(debug, "tcp::read before recv_timeout");
let read_result = if timeout_msecs > 0u {
timer::recv_timeout(
hl_loop, timeout_msecs, result::get(rs_result))
iotask, timeout_msecs, result::get(rs_result))
} else {
some(comm::recv(result::get(rs_result)))
};
@ -851,7 +853,7 @@ fn read_stop_common_impl(socket_data: *tcp_socket_data) ->
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
let stop_po = comm::port::<option<tcp_err_data>>();
let stop_ch = comm::chan(stop_po);
uv::hl::interact((*socket_data).hl_loop) {|loop_ptr|
iotask::interact((*socket_data).iotask) {|loop_ptr|
log(debug, "in interact cb for tcp::read_stop");
alt uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
0i32 {
@ -883,7 +885,7 @@ fn read_start_common_impl(socket_data: *tcp_socket_data)
let start_po = comm::port::<option<uv::ll::uv_err_data>>();
let start_ch = comm::chan(start_po);
log(debug, "in tcp::read_start before interact loop");
uv::hl::interact((*socket_data).hl_loop) {|loop_ptr|
iotask::interact((*socket_data).iotask) {|loop_ptr|
log(debug, #fmt("in tcp::read_start interact cb %?", loop_ptr));
alt uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
on_alloc_cb,
@ -925,7 +927,7 @@ fn write_common_impl(socket_data_ptr: *tcp_socket_data,
result_ch: comm::chan(result_po)
};
let write_data_ptr = ptr::addr_of(write_data);
uv::hl::interact((*socket_data_ptr).hl_loop) {|loop_ptr|
iotask::interact((*socket_data_ptr).iotask) {|loop_ptr|
log(debug, #fmt("in interact cb for tcp::write %?", loop_ptr));
alt uv::ll::write(write_req_ptr,
stream_handle_ptr,
@ -956,7 +958,7 @@ fn write_common_impl(socket_data_ptr: *tcp_socket_data,
// various recv_* can use a tcp_conn_port can re-use this..
fn conn_port_new_tcp_socket(
stream_handle_ptr: *uv::ll::uv_tcp_t,
hl_loop: uv::hl::high_level_loop)
iotask: iotask)
-> result::result<tcp_socket,tcp_err_data> unsafe {
// tcp_nl_on_connection_cb
let reader_po = comm::port::<result::result<[u8], tcp_err_data>>();
@ -966,11 +968,11 @@ fn conn_port_new_tcp_socket(
stream_handle_ptr : stream_handle_ptr,
connect_req : uv::ll::connect_t(),
write_req : uv::ll::write_t(),
hl_loop : hl_loop
iotask : iotask
};
let client_socket_data_ptr = ptr::addr_of(*client_socket_data);
comm::listen {|cont_ch|
uv::hl::interact(hl_loop) {|loop_ptr|
iotask::interact(iotask) {|loop_ptr|
log(debug, #fmt("in interact cb 4 conn_port_new_tcp.. loop %?",
loop_ptr));
uv::ll::set_data_for_uv_handle(stream_handle_ptr,
@ -990,7 +992,7 @@ type tcp_conn_port_data = {
server_stream: uv::ll::uv_tcp_t,
stream_closed_po: comm::port<()>,
stream_closed_ch: comm::chan<()>,
hl_loop: uv::hl::high_level_loop,
iotask: iotask,
new_conn_po: comm::port<result::result<*uv::ll::uv_tcp_t,
tcp_err_data>>,
new_conn_ch: comm::chan<result::result<*uv::ll::uv_tcp_t,
@ -1003,7 +1005,7 @@ type tcp_listen_fc_data = {
kill_ch: comm::chan<option<tcp_err_data>>,
new_connect_cb: fn~(tcp_new_connection,
comm::chan<option<tcp_err_data>>),
hl_loop: uv::hl::high_level_loop,
iotask: iotask,
mut active: bool
};
@ -1264,7 +1266,7 @@ type tcp_socket_data = {
stream_handle_ptr: *uv::ll::uv_tcp_t,
connect_req: uv::ll::uv_connect_t,
write_req: uv::ll::uv_write_t,
hl_loop: uv::hl::high_level_loop
iotask: iotask
};
// convert rust ip_addr to libuv's native representation
@ -1405,13 +1407,13 @@ mod test {
fn run_tcp_test_server(server_ip: str, server_port: uint, resp: str,
server_ch: comm::chan<str>,
cont_ch: comm::chan<()>,
hl_loop: uv::hl::high_level_loop) -> str {
iotask: iotask) -> str {
task::spawn_sched(task::manual_threads(1u)) {||
let server_ip_addr = ip::v4::parse_addr(server_ip);
let listen_result =
listen_for_conn(server_ip_addr, server_port, 128u,
hl_loop,
iotask,
// on_establish_cb -- called when listener is set up
{|kill_ch|
log(debug, #fmt("establish_cb %?",
@ -1484,12 +1486,12 @@ mod test {
server_port: uint, resp: str,
server_ch: comm::chan<str>,
cont_ch: comm::chan<()>,
hl_loop: uv::hl::high_level_loop) -> str {
iotask: iotask) -> str {
task::spawn_sched(task::manual_threads(1u)) {||
let server_ip_addr = ip::v4::parse_addr(server_ip);
let new_listener_result =
new_listener(server_ip_addr, server_port, 128u, hl_loop);
new_listener(server_ip_addr, server_port, 128u, iotask);
if result::is_failure(new_listener_result) {
let err_data = result::get_err(new_listener_result);
log(debug, #fmt("SERVER: exited abnormally name %s msg %s",
@ -1533,12 +1535,12 @@ mod test {
fn run_tcp_test_client(server_ip: str, server_port: uint, resp: str,
client_ch: comm::chan<str>,
hl_loop: uv::hl::high_level_loop) -> str {
iotask: iotask) -> str {
let server_ip_addr = ip::v4::parse_addr(server_ip);
log(debug, "CLIENT: starting..");
let connect_result = connect(server_ip_addr, server_port, hl_loop);
let connect_result = connect(server_ip_addr, server_port, iotask);
if result::is_failure(connect_result) {
log(debug, "CLIENT: failed to connect");
let err_data = result::get_err(connect_result);

View File

@ -14,7 +14,7 @@ use core(vers = "0.2");
import core::*;
export net, net_tcp;
export uv, uv_ll, uv_hl, uv_global_loop;
export uv, uv_ll, uv_iotask, uv_global_loop;
export c_vec, util, timer;
export bitv, deque, fun_treemap, list, map, smallintmap, sort, treemap;
export rope, arena, arc;
@ -30,7 +30,7 @@ mod net_tcp;
// libuv modules
mod uv;
mod uv_ll;
mod uv_hl;
mod uv_iotask;
mod uv_global_loop;

View File

@ -3,6 +3,8 @@ Utilities that leverage libuv's `uv_timer_*` API
"];
import uv = uv;
import uv::iotask;
import iotask::iotask;
export delayed_send, sleep, recv_timeout;
#[doc = "
@ -21,7 +23,7 @@ for *at least* that period of time.
* ch - a channel of type T to send a `val` on
* val - a value of type T to send over the provided `ch`
"]
fn delayed_send<T: copy send>(hl_loop: uv::hl::high_level_loop,
fn delayed_send<T: copy send>(iotask: iotask,
msecs: uint, ch: comm::chan<T>, val: T) {
// FIME: Looks like we don't need to spawn here
task::spawn() {||
@ -31,7 +33,7 @@ fn delayed_send<T: copy send>(hl_loop: uv::hl::high_level_loop,
let timer_done_ch_ptr = ptr::addr_of(timer_done_ch);
let timer = uv::ll::timer_t();
let timer_ptr = ptr::addr_of(timer);
uv::hl::interact(hl_loop) {|loop_ptr|
iotask::interact(iotask) {|loop_ptr|
let init_result = uv::ll::timer_init(loop_ptr, timer_ptr);
if (init_result == 0i32) {
let start_result = uv::ll::timer_start(
@ -69,13 +71,13 @@ for *at least* that period of time.
# Arguments
* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
* `iotask` - a `uv::iotask` that the tcp request will run on
* msecs - an amount of time, in milliseconds, for the current task to block
"]
fn sleep(hl_loop: uv::hl::high_level_loop, msecs: uint) {
fn sleep(iotask: iotask, msecs: uint) {
let exit_po = comm::port::<()>();
let exit_ch = comm::chan(exit_po);
delayed_send(hl_loop, msecs, exit_ch, ());
delayed_send(iotask, msecs, exit_ch, ());
comm::recv(exit_po);
}
@ -88,7 +90,7 @@ timeout. Depending on whether the provided port receives in that time period,
# Arguments
* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
* `iotask' - `uv::iotask` that the tcp request will run on
* msecs - an mount of time, in milliseconds, to wait to receive
* wait_port - a `comm::port<T>` to receive on
@ -98,12 +100,12 @@ An `option<T>` representing the outcome of the call. If the call `recv`'d on
the provided port in the allotted timeout period, then the result will be a
`some(T)`. If not, then `none` will be returned.
"]
fn recv_timeout<T: copy send>(hl_loop: uv::hl::high_level_loop,
fn recv_timeout<T: copy send>(iotask: iotask,
msecs: uint,
wait_po: comm::port<T>) -> option<T> {
let timeout_po = comm::port::<()>();
let timeout_ch = comm::chan(timeout_po);
delayed_send(hl_loop, msecs, timeout_ch, ());
delayed_send(iotask, msecs, timeout_ch, ());
// FIXME: This could be written clearer
either::either(
{|left_val|

View File

@ -26,8 +26,8 @@ facilities.
import ll = uv_ll;
export ll;
import hl = uv_hl;
export hl;
import iotask = uv_iotask;
export iotask;
import global_loop = uv_global_loop;
export global_loop;

View File

@ -5,8 +5,9 @@ A process-wide libuv event loop for library use.
export get, get_monitor_task_gl;
import ll = uv_ll;
import hl = uv_hl;
import iotask = uv_iotask;
import get_gl = get;
import iotask::{iotask, spawn_iotask};
import priv::{chan_from_global_ptr, weaken_task};
import comm::{port, chan, methods, select2, listen};
import either::{left, right};
@ -27,12 +28,12 @@ loop that this function returns.
* A `hl::high_level_loop` that encapsulates communication with the global
loop.
"]
fn get() -> hl::high_level_loop {
fn get() -> iotask {
ret get_monitor_task_gl();
}
#[doc(hidden)]
fn get_monitor_task_gl() -> hl::high_level_loop unsafe {
fn get_monitor_task_gl() -> iotask unsafe {
let monitor_loop_chan_ptr = rustrt::rust_uv_get_kernel_global_chan_ptr();
@ -53,7 +54,7 @@ fn get_monitor_task_gl() -> hl::high_level_loop unsafe {
};
#debug("before priv::chan_from_global_ptr");
type monchan = chan<hl::high_level_loop>;
type monchan = chan<iotask>;
let monitor_ch = chan_from_global_ptr::<monchan>(monitor_loop_chan_ptr,
builder_fn) {|msg_po|
@ -70,7 +71,7 @@ fn get_monitor_task_gl() -> hl::high_level_loop unsafe {
// all normal tasks have ended, tell the
// libuv loop to tear_down, then exit
#debug("weak_exit_po recv'd msg: %?", weak_exit);
hl::exit(hl_loop);
iotask::exit(hl_loop);
break;
}
right(fetch_ch) {
@ -92,7 +93,7 @@ fn get_monitor_task_gl() -> hl::high_level_loop unsafe {
}
}
fn spawn_loop() -> hl::high_level_loop unsafe {
fn spawn_loop() -> iotask unsafe {
let builder = task::builder();
task::add_wrapper(builder) {|task_body|
fn~(move task_body) {
@ -110,7 +111,7 @@ fn spawn_loop() -> hl::high_level_loop unsafe {
}
}
}
hl::spawn_high_level_loop(builder)
spawn_iotask(builder)
}
#[cfg(test)]
@ -128,7 +129,7 @@ mod test {
log(debug, "in simple timer cb");
ll::timer_stop(timer_ptr);
let hl_loop = get_gl();
hl::interact(hl_loop) {|_loop_ptr|
iotask::interact(hl_loop) {|_loop_ptr|
log(debug, "closing timer");
ll::close(timer_ptr, simple_timer_close_cb);
log(debug, "about to deref exit_ch_ptr");
@ -137,7 +138,7 @@ mod test {
log(debug, "exiting simple timer cb");
}
fn impl_uv_hl_simple_timer(hl_loop: hl::high_level_loop) unsafe {
fn impl_uv_hl_simple_timer(iotask: iotask) unsafe {
let exit_po = comm::port::<bool>();
let exit_ch = comm::chan(exit_po);
let exit_ch_ptr = ptr::addr_of(exit_ch);
@ -145,7 +146,7 @@ mod test {
exit_ch_ptr));
let timer_handle = ll::timer_t();
let timer_ptr = ptr::addr_of(timer_handle);
hl::interact(hl_loop) {|loop_ptr|
iotask::interact(iotask) {|loop_ptr|
log(debug, "user code inside interact loop!!!");
let init_status = ll::timer_init(loop_ptr, timer_ptr);
if(init_status == 0i32) {

View File

@ -1,13 +1,14 @@
#[doc = "
High-level bindings to work with the libuv library.
This module is geared towards library developers who want to
provide a high-level, abstracted interface to some set of
libuv functionality.
A task-based interface to the uv loop
The I/O task runs in its own single-threaded scheduler. By using the
`interact` function you can execute code in a uv callback.
"];
export high_level_loop;
export spawn_high_level_loop;
export iotask::{};
export spawn_iotask;
export interact;
export exit;
@ -19,18 +20,19 @@ import ll = uv_ll;
#[doc = "
Used to abstract-away direct interaction with a libuv loop.
"]
enum high_level_loop = {
async_handle: *ll::uv_async_t,
op_chan: chan<high_level_msg>
};
enum iotask {
iotask_({
async_handle: *ll::uv_async_t,
op_chan: chan<iotask_msg>
})
}
fn spawn_high_level_loop(-builder: task::builder
) -> high_level_loop unsafe {
fn spawn_iotask(-builder: task::builder) -> iotask {
import task::{set_opts, get_opts, single_threaded, run};
let hll_po = port::<high_level_loop>();
let hll_ch = hll_po.chan();
let iotask_po = port::<iotask>();
let iotask_ch = iotask_po.chan();
set_opts(builder, {
sched: some({
@ -42,45 +44,75 @@ fn spawn_high_level_loop(-builder: task::builder
run(builder) {||
#debug("entering libuv task");
run_high_level_loop(hll_ch);
run_loop(iotask_ch);
#debug("libuv task exiting");
};
hll_po.recv()
iotask_po.recv()
}
#[doc = "
Provide a callback to be processed by `iotask`
The primary way to do operations again a running `iotask` that
doesn't involve creating a uv handle via `safe_handle`
# Warning
This function is the only safe way to interact with _any_ `iotask`.
Using functions in the `uv::ll` module outside of the `cb` passed into
this function is _very dangerous_.
# Arguments
* iotask - a uv I/O task that you want to do operations against
* cb - a function callback to be processed on the running loop's
thread. The only parameter passed in is an opaque pointer representing the
running `uv_loop_t*`. In the context of this callback, it is safe to use
this pointer to do various uv_* API calls contained within the `uv::ll`
module. It is not safe to send the `loop_ptr` param to this callback out
via ports/chans.
"]
unsafe fn interact(iotask: iotask,
-cb: fn~(*c_void)) {
send_msg(iotask, interaction(cb));
}
#[doc="
Represents the range of interactions with a `high_level_loop`
"]
enum high_level_msg {
interaction (fn~(*libc::c_void)),
#[doc="
For use in libraries that roll their own `high_level_loop` (like
`std::uv::global_loop`)
Shut down the I/O task
Is used to signal to the loop that it should close the internally-held
async handle and do a sanity check to make sure that all other handles are
closed, causing a failure otherwise. This should not be sent/used from
'normal' user code.
"]
closed, causing a failure otherwise.
"]
fn exit(iotask: iotask) unsafe {
send_msg(iotask, teardown_loop);
}
// INTERNAL API
enum iotask_msg {
interaction (fn~(*libc::c_void)),
teardown_loop
}
#[doc = "
Useful for anyone who wants to roll their own `high_level_loop`.
Run the loop and begin handling messages
"]
unsafe fn run_high_level_loop(hll_ch: chan<high_level_loop>) {
let msg_po = port::<high_level_msg>();
fn run_loop(iotask_ch: chan<iotask>) unsafe {
let msg_po = port::<iotask_msg>();
let loop_ptr = ll::loop_new();
// set up the special async handle we'll use to allow multi-task
// communication with this loop
let async = ll::async_t();
let async_handle = addr_of(async);
// associate the async handle with the loop
ll::async_init(loop_ptr, async_handle, high_level_wake_up_cb);
ll::async_init(loop_ptr, async_handle, wake_up_cb);
// initialize our loop data and store it in the loop
let data: hl_loop_data = {
let data: iotask_loop_data = {
async_handle: async_handle,
mut active: true,
msg_po_ptr: addr_of(msg_po)
@ -89,63 +121,30 @@ unsafe fn run_high_level_loop(hll_ch: chan<high_level_loop>) {
// Send out a handle through which folks can talk to us
// while we dwell in the I/O loop
let hll = high_level_loop({
let iotask = iotask_({
async_handle: async_handle,
op_chan: msg_po.chan()
});
hll_ch.send(hll);
iotask_ch.send(iotask);
log(debug, "about to run high level loop");
log(debug, "about to run uv loop");
// enter the loop... this blocks until the loop is done..
ll::run(loop_ptr);
log(debug, "high-level loop ended");
log(debug, "uv loop ended");
ll::loop_delete(loop_ptr);
}
#[doc = "
Provide a callback to be processed by `a_loop`
The primary way to do operations again a running `high_level_loop` that
doesn't involve creating a uv handle via `safe_handle`
# Warning
This function is the only safe way to interact with _any_ `high_level_loop`.
Using functions in the `uv::ll` module outside of the `cb` passed into
this function is _very dangerous_.
# Arguments
* hl_loop - a `uv::hl::high_level_loop` that you want to do operations against
* cb - a function callback to be processed on the running loop's
thread. The only parameter passed in is an opaque pointer representing the
running `uv_loop_t*`. In the context of this callback, it is safe to use
this pointer to do various uv_* API calls contained within the `uv::ll`
module. It is not safe to send the `loop_ptr` param to this callback out
via ports/chans.
"]
unsafe fn interact(hl_loop: high_level_loop,
-cb: fn~(*c_void)) {
send_high_level_msg(hl_loop, interaction(cb));
}
fn exit(hl_loop: high_level_loop) unsafe {
send_high_level_msg(hl_loop, teardown_loop);
}
// INTERNAL API
// data that lives for the lifetime of the high-evel oo
type hl_loop_data = {
type iotask_loop_data = {
async_handle: *ll::uv_async_t,
mut active: bool,
msg_po_ptr: *port<high_level_msg>
msg_po_ptr: *port<iotask_msg>
};
unsafe fn send_high_level_msg(hl_loop: high_level_loop,
-msg: high_level_msg) {
comm::send(hl_loop.op_chan, msg);
ll::async_send(hl_loop.async_handle);
fn send_msg(iotask: iotask,
-msg: iotask_msg) unsafe {
iotask.op_chan.send(msg);
ll::async_send(iotask.async_handle);
}
// this will be invoked by a call to uv::hl::interact() with
@ -153,12 +152,12 @@ unsafe fn send_high_level_msg(hl_loop: high_level_loop,
// simply check if the loop is active and, if so, invoke the
// user-supplied on_wake callback that is stored in the loop's
// data member
crust fn high_level_wake_up_cb(async_handle: *ll::uv_async_t,
status: int) unsafe {
log(debug, #fmt("high_level_wake_up_cb crust.. handle: %? status: %?",
crust fn wake_up_cb(async_handle: *ll::uv_async_t,
status: int) unsafe {
log(debug, #fmt("wake_up_cb crust.. handle: %? status: %?",
async_handle, status));
let loop_ptr = ll::get_loop_for_uv_handle(async_handle);
let data = ll::get_data_for_uv_handle(async_handle) as *hl_loop_data;
let data = ll::get_data_for_uv_handle(async_handle) as *iotask_loop_data;
// FIXME: What is this checking?
if (*data).active {
let msg_po = *((*data).msg_po_ptr);
@ -190,8 +189,8 @@ crust fn tear_down_close_cb(handle: *ll::uv_async_t) unsafe {
assert loop_refs == 1i32;
}
fn begin_teardown(data: *hl_loop_data) unsafe {
log(debug, "high_level_tear_down() called, close async_handle");
fn begin_teardown(data: *iotask_loop_data) unsafe {
log(debug, "iotask begin_teardown() called, close async_handle");
// call user-suppled before_tear_down cb
let async_handle = (*data).async_handle;
ll::close(async_handle as *c_void, tear_down_close_cb);
@ -211,20 +210,20 @@ mod test {
ll::close(handle, async_close_cb);
}
type ah_data = {
hl_loop: high_level_loop,
iotask: iotask,
exit_ch: comm::chan<()>
};
fn impl_uv_hl_async(hl_loop: high_level_loop) unsafe {
fn impl_uv_iotask_async(iotask: iotask) unsafe {
let async_handle = ll::async_t();
let ah_ptr = ptr::addr_of(async_handle);
let exit_po = comm::port::<()>();
let exit_ch = comm::chan(exit_po);
let ah_data = {
hl_loop: hl_loop,
iotask: iotask,
exit_ch: exit_ch
};
let ah_data_ptr = ptr::addr_of(ah_data);
interact(hl_loop) {|loop_ptr|
interact(iotask) {|loop_ptr|
ll::async_init(loop_ptr, ah_ptr, async_handle_cb);
ll::set_data_for_uv_handle(ah_ptr, ah_data_ptr as *libc::c_void);
ll::async_send(ah_ptr);
@ -234,14 +233,14 @@ mod test {
// this fn documents the bear minimum neccesary to roll your own
// high_level_loop
unsafe fn spawn_test_loop(exit_ch: comm::chan<()>) -> high_level_loop {
let hl_loop_port = comm::port::<high_level_loop>();
let hl_loop_ch = comm::chan(hl_loop_port);
unsafe fn spawn_test_loop(exit_ch: comm::chan<()>) -> iotask {
let iotask_port = comm::port::<iotask>();
let iotask_ch = comm::chan(iotask_port);
task::spawn_sched(task::manual_threads(1u)) {||
run_high_level_loop(hl_loop_ch);
run_loop(iotask_ch);
exit_ch.send(());
};
ret comm::recv(hl_loop_port);
ret comm::recv(iotask_port);
}
crust fn lifetime_handle_close(handle: *libc::c_void) unsafe {
@ -255,10 +254,10 @@ mod test {
}
#[test]
fn test_uv_hl_async() unsafe {
fn test_uv_iotask_async() unsafe {
let exit_po = comm::port::<()>();
let exit_ch = comm::chan(exit_po);
let hl_loop = spawn_test_loop(exit_ch);
let iotask = spawn_test_loop(exit_ch);
// using this handle to manage the lifetime of the high_level_loop,
// as it will exit the first time one of the impl_uv_hl_async() is
@ -270,7 +269,7 @@ mod test {
let work_exit_ch = comm::chan(work_exit_po);
iter::repeat(7u) {||
task::spawn_sched(task::manual_threads(1u), {||
impl_uv_hl_async(hl_loop);
impl_uv_iotask_async(iotask);
comm::send(work_exit_ch, ());
});
};
@ -278,13 +277,7 @@ mod test {
comm::recv(work_exit_po);
};
log(debug, "sending teardown_loop msg..");
// the teardown msg usually comes, in the case of the global loop,
// as a result of receiving a msg on the weaken_task port. but,
// anyone rolling their own high_level_loop can decide when to
// send the msg. it's assert and barf, though, if all of your
// handles aren't uv_close'd first
comm::send(hl_loop.op_chan, teardown_loop);
ll::async_send(hl_loop.async_handle);
exit(iotask);
comm::recv(exit_po);
log(debug, "after recv on exit_po.. exiting..");
}