From bb5960aa57a04920614fce021c6792ad8a9a0305 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Wed, 22 Feb 2012 14:00:34 -0800 Subject: [PATCH] moving new uv stuff into uv.rs and rust_uv.cpp - removing the remains of uvtmp.rs and rust_uvtmp.rs - removing the displaced, low-level libuv bindings in uv.rs and rust_uv.cpp --- src/libstd/std.rc | 3 +- src/libstd/uv.rs | 682 ++++++++++++++++++++++++++++++--------- src/libstd/uvtmp.rs | 730 ------------------------------------------ src/rt/rust_uv.cpp | 192 +++++++++-- src/rt/rust_uvtmp.cpp | 612 ----------------------------------- src/rt/rustrt.def.in | 41 +-- 6 files changed, 700 insertions(+), 1560 deletions(-) delete mode 100644 src/libstd/uvtmp.rs delete mode 100644 src/rt/rust_uvtmp.cpp diff --git a/src/libstd/std.rc b/src/libstd/std.rc index 478dbb69513..83e4a04f0df 100644 --- a/src/libstd/std.rc +++ b/src/libstd/std.rc @@ -7,7 +7,7 @@ #[license = "MIT"]; #[crate_type = "lib"]; -export fs, io, net, run, uv, uvtmp; +export fs, io, net, run, uv; export c_vec, four, tri, util; export bitv, deque, fun_treemap, list, map, smallintmap, sort, treemap, ufind; export rope; @@ -25,7 +25,6 @@ mod net; #[path = "run_program.rs"] mod run; mod uv; -mod uvtmp; // Utility modules diff --git a/src/libstd/uv.rs b/src/libstd/uv.rs index 27f2a3c54a8..90ac5ce4b1c 100644 --- a/src/libstd/uv.rs +++ b/src/libstd/uv.rs @@ -1,188 +1,558 @@ -/* -This is intended to be a low-level binding to libuv that very closely mimics -the C libuv API. Does very little right now pending scheduler improvements. -*/ +export loop_new, run, close, run_in_bg, async_init, async_send, + timer_init, timer_start, timer_stop; -export sanity_check; -export loop_t, idle_t; -export loop_new, loop_delete, default_loop, run, unref; -export idle_init, idle_start; -export idle_new; - -import core::ctypes; - -#[link_name = "rustrt"] -native mod uv { - fn rust_uv_loop_new() -> *loop_t; - fn rust_uv_loop_delete(loop: *loop_t); - fn rust_uv_default_loop() -> *loop_t; - fn rust_uv_run(loop: *loop_t) -> ctypes::c_int; - fn rust_uv_unref(loop: *loop_t); - fn rust_uv_idle_init(loop: *loop_t, idle: *idle_t) -> ctypes::c_int; - fn rust_uv_idle_start(idle: *idle_t, cb: idle_cb) -> ctypes::c_int; +// these are processed solely in the +// process_operation() crust fn below +enum uv_operation { + op_async_init([u8]), + op_close(uv_handle, *ctypes::void), + op_timer_init([u8]), + op_timer_start([u8], *ctypes::void, u32, u32), + op_timer_stop([u8], *ctypes::void, fn~(uv_handle)) } -#[link_name = "rustrt"] -native mod helpers { - fn rust_uv_size_of_idle_t() -> ctypes::size_t; +enum uv_handle { + uv_async([u8], uv_loop), + uv_timer([u8], uv_loop) } -type opaque_cb = *ctypes::void; +enum uv_msg { + // requests from library users + msg_run(comm::chan), + msg_run_in_bg(), + msg_async_init(fn~(uv_handle), fn~(uv_handle)), + msg_async_send([u8]), + msg_close(uv_handle, fn~()), + msg_timer_init(fn~(uv_handle)), + msg_timer_start([u8], u32, u32, fn~(uv_handle)), + msg_timer_stop([u8], fn~(uv_handle)), -type handle_type = ctypes::enum; + // dispatches from libuv + uv_async_init([u8], *ctypes::void), + uv_async_send([u8]), + uv_close([u8]), + uv_timer_init([u8], *ctypes::void), + uv_timer_call([u8]), + uv_timer_stop([u8], fn~(uv_handle)), + uv_end() +} -type close_cb = opaque_cb; -type idle_cb = opaque_cb; - -type handle_private_fields = { - a00: ctypes::c_int, - a01: ctypes::c_int, - a02: ctypes::c_int, - a03: ctypes::c_int, - a04: ctypes::c_int, - a05: ctypes::c_int, - a06: int, - a07: int, - a08: int, - a09: int, - a10: int, - a11: int, - a12: int +type uv_loop_data = { + operation_port: comm::port, + rust_loop_chan: comm::chan }; -type handle_fields = { - loop: *loop_t, - type_: handle_type, - close_cb: close_cb, - data: *ctypes::void, - private: handle_private_fields -}; +type uv_loop = comm::chan; -type handle_t = { - fields: handle_fields -}; - -type loop_t = int; - - - - -type idle_t = { - fields: handle_fields - /* private: idle_private_fields */ -}; - -fn idle_init(loop: *loop_t, idle: *idle_t) -> ctypes::c_int { - uv::rust_uv_idle_init(loop, idle) +#[nolink] +native mod rustrt { + fn rust_uv_loop_new() -> *ctypes::void; + fn rust_uv_loop_set_data( + loop: *ctypes::void, + data: *uv_loop_data); + fn rust_uv_bind_op_cb(loop: *ctypes::void, cb: *u8) + -> *ctypes::void; + fn rust_uv_stop_op_cb(handle: *ctypes::void); + fn rust_uv_run(loop_handle: *ctypes::void); + fn rust_uv_close(handle: *ctypes::void, cb: *u8); + fn rust_uv_close_async(handle: *ctypes::void); + fn rust_uv_close_timer(handle: *ctypes::void); + fn rust_uv_async_send(handle: *ctypes::void); + fn rust_uv_async_init( + loop_handle: *ctypes::void, + cb: *u8, + id: *u8) -> *ctypes::void; + fn rust_uv_timer_init( + loop_handle: *ctypes::void, + cb: *u8, + id: *u8) -> *ctypes::void; + fn rust_uv_timer_start( + timer_handle: *ctypes::void, + timeout: ctypes::c_uint, + repeat: ctypes::c_uint); + fn rust_uv_timer_stop(handle: *ctypes::void); } -fn idle_start(idle: *idle_t, cb: idle_cb) -> ctypes::c_int { - uv::rust_uv_idle_start(idle, cb) -} +// public functions +fn loop_new() -> uv_loop unsafe { + let ret_recv_port: comm::port = + comm::port(); + let ret_recv_chan: comm::chan = + comm::chan(ret_recv_port); + task::spawn_sched(task::manual_threads(4u)) {|| + // our beloved uv_loop_t ptr + let loop_handle = rustrt:: + rust_uv_loop_new(); + // this port/chan pair are used to send messages to + // libuv. libuv processes any pending messages on the + // port (via crust) after receiving an async "wakeup" + // on a special uv_async_t handle created below + let operation_port = comm::port::(); + let operation_chan = comm::chan::( + operation_port); + // this port/chan pair as used in the while() loop + // below. It takes dispatches, originating from libuv + // callbacks, to invoke handles registered by the + // user + let rust_loop_port = comm::port::(); + let rust_loop_chan = + comm::chan::(rust_loop_port); + // let the task-spawner return + comm::send(ret_recv_chan, copy(rust_loop_chan)); -fn default_loop() -> *loop_t { - uv::rust_uv_default_loop() -} + // create our "special" async handle that will + // allow all operations against libuv to be + // "buffered" in the operation_port, for processing + // from the thread that libuv runs on + let loop_data: uv_loop_data = { + operation_port: operation_port, + rust_loop_chan: rust_loop_chan + }; + rustrt::rust_uv_loop_set_data( + loop_handle, + ptr::addr_of(loop_data)); // pass an opaque C-ptr + // to libuv, this will be + // in the process_operation + // crust fn + let op_handle = rustrt::rust_uv_bind_op_cb( + loop_handle, + process_operation); -fn loop_new() -> *loop_t { - uv::rust_uv_loop_new() -} + // all state goes here + let handles: map::map<[u8], *ctypes::void> = + map::new_bytes_hash(); + let id_to_handle: map::map<[u8], uv_handle> = + map::new_bytes_hash(); + let after_cbs: map::map<[u8], fn~(uv_handle)> = + map::new_bytes_hash(); + let close_callbacks: map::map<[u8], fn~()> = + map::new_bytes_hash(); + let async_cbs: map::map<[u8], fn~(uv_handle)> = + map::new_bytes_hash(); + let timer_cbs: map::map<[u8], fn~(uv_handle)> = + map::new_bytes_hash(); -fn loop_delete(loop: *loop_t) { - uv::rust_uv_loop_delete(loop) -} + // the main loop that this task blocks on. + // should have the same lifetime as the C libuv + // event loop. + let keep_going = true; + while (keep_going) { + alt comm::recv(rust_loop_port) { + msg_run(end_chan) { + // start the libuv event loop + // we'll also do a uv_async_send with + // the operation handle to have the + // loop process any pending operations + // once its up and running + task::spawn_sched(task::manual_threads(1u)) {|| + // this call blocks + rustrt::rust_uv_run(loop_handle); + // when we're done, msg the + // end chan + rustrt::rust_uv_stop_op_cb(op_handle); + comm::send(end_chan, true); + comm::send(rust_loop_chan, uv_end); + }; + } -fn run(loop: *loop_t) -> ctypes::c_int { - uv::rust_uv_run(loop) -} + msg_run_in_bg { + task::spawn_sched(task::manual_threads(1u)) {|| + // this call blocks + rustrt::rust_uv_run(loop_handle); + }; + } -fn unref(loop: *loop_t) { - uv::rust_uv_unref(loop) -} + msg_close(handle, cb) { + let id = get_id_from_handle(handle); + close_callbacks.insert(id, cb); + let handle_ptr = handles.get(id); + let op = op_close(handle, handle_ptr); + pass_to_libuv(op_handle, operation_chan, op); + } + uv_close(id) { + handles.remove(id); + let handle = id_to_handle.get(id); + id_to_handle.remove(id); + alt handle { + uv_async(id, _) { + async_cbs.remove(id); + } + uv_timer(id, _) { + timer_cbs.remove(id); + } + _ { + fail "unknown form of uv_handle encountered " + + "in uv_close handler"; + } + } + let cb = close_callbacks.get(id); + close_callbacks.remove(id); + task::spawn {|| + cb(); + }; + } -fn sanity_check() { - fn check_size(t: str, uv: ctypes::size_t, rust: ctypes::size_t) { - #debug("size of %s: uv: %u, rust: %u", t, uv, rust); - assert uv <= rust; - } - check_size("idle_t", - helpers::rust_uv_size_of_idle_t(), - sys::size_of::()); -} + msg_async_init(callback, after_cb) { + // create a new async handle + // with the id as the handle's + // data and save the callback for + // invocation on msg_async_send + let id = gen_handle_id(); + async_cbs.insert(id, callback); + after_cbs.insert(id, after_cb); + let op = op_async_init(id); + pass_to_libuv(op_handle, operation_chan, op); + } + uv_async_init(id, async_handle) { + // libuv created a handle, which is + // passed back to us. save it and + // then invoke the supplied callback + // for after completion + handles.insert(id, async_handle); + let after_cb = after_cbs.get(id); + after_cbs.remove(id); + let async = uv_async(id, rust_loop_chan); + id_to_handle.insert(id, copy(async)); + task::spawn {|| + after_cb(async); + }; + } -fn handle_fields_new() -> handle_fields { - { - loop: ptr::null(), - type_: 0u32, - close_cb: ptr::null(), - data: ptr::null(), - private: { - a00: 0i32, - a01: 0i32, - a02: 0i32, - a03: 0i32, - a04: 0i32, - a05: 0i32, - a06: 0, - a07: 0, - a08: 0, - a09: 0, - a10: 0, - a11: 0, - a12: 0 + msg_async_send(id) { + let async_handle = handles.get(id); + do_send(async_handle); + } + uv_async_send(id) { + let async_cb = async_cbs.get(id); + task::spawn {|| + async_cb(uv_async(id, rust_loop_chan)); + }; + } + + msg_timer_init(after_cb) { + let id = gen_handle_id(); + after_cbs.insert(id, after_cb); + let op = op_timer_init(id); + pass_to_libuv(op_handle, operation_chan, op); + } + uv_timer_init(id, handle) { + handles.insert(id, handle); + let after_cb = after_cbs.get(id); + after_cbs.remove(id); + let new_timer = uv_timer(id, rust_loop_chan); + id_to_handle.insert(id, copy(new_timer)); + task::spawn {|| + after_cb(new_timer); + }; + } + + uv_timer_call(id) { + let cb = timer_cbs.get(id); + let the_timer = id_to_handle.get(id); + task::spawn {|| + cb(the_timer); + }; + } + + msg_timer_start(id, timeout, repeat, timer_call_cb) { + timer_cbs.insert(id, timer_call_cb); + let handle = handles.get(id); + let op = op_timer_start(id, handle, timeout, + repeat); + pass_to_libuv(op_handle, operation_chan, op); + } + + msg_timer_stop(id, after_cb) { + let handle = handles.get(id); + let op = op_timer_stop(id, handle, after_cb); + pass_to_libuv(op_handle, operation_chan, op); + } + uv_timer_stop(id, after_cb) { + let the_timer = id_to_handle.get(id); + after_cb(the_timer); + } + + uv_end() { + keep_going = false; + } + + _ { fail "unknown form of uv_msg received"; } + } } + }; + ret comm::recv(ret_recv_port); +} + +fn run(loop: uv_loop) { + let end_port = comm::port::(); + let end_chan = comm::chan::(end_port); + comm::send(loop, msg_run(end_chan)); + comm::recv(end_port); +} + +fn run_in_bg(loop: uv_loop) { + comm::send(loop, msg_run_in_bg); +} + +fn async_init ( + loop: uv_loop, + async_cb: fn~(uv_handle), + after_cb: fn~(uv_handle)) { + let msg = msg_async_init(async_cb, after_cb); + comm::send(loop, msg); +} + +fn async_send(async: uv_handle) { + alt async { + uv_async(id, loop) { + comm::send(loop, msg_async_send(id)); + } + _ { + fail "attempting to call async_send() with a" + + " uv_async uv_handle"; + } } } -fn idle_new() -> idle_t { - { - fields: handle_fields_new() +fn close(h: uv_handle, cb: fn~()) { + let loop_chan = get_loop_chan_from_handle(h); + comm::send(loop_chan, msg_close(h, cb)); +} + +fn timer_init(loop: uv_loop, after_cb: fn~(uv_handle)) { + let msg = msg_timer_init(after_cb); + comm::send(loop, msg); +} + +fn timer_start(the_timer: uv_handle, timeout: u32, repeat:u32, + timer_cb: fn~(uv_handle)) { + alt the_timer { + uv_timer(id, loop_chan) { + let msg = msg_timer_start(id, timeout, repeat, timer_cb); + comm::send(loop_chan, msg); + } + _ { + fail "can only pass a uv_timer form of uv_handle to "+ + " uv::timer_start()"; + } } } -#[cfg(test)] -mod tests { - - #[test] - fn test_sanity_check() { - sanity_check(); - } - - // From test-ref.c - mod test_ref { - - #[test] - fn ref() { - let loop = loop_new(); - run(loop); - loop_delete(loop); - } - - #[test] - fn idle_ref() { - let loop = loop_new(); - let h = idle_new(); - idle_init(loop, ptr::addr_of(h)); - idle_start(ptr::addr_of(h), ptr::null()); - unref(loop); - run(loop); - loop_delete(loop); - } - - #[test] - fn async_ref() { - /* - let loop = loop_new(); - let h = async_new(); - async_init(loop, ptr::addr_of(h), ptr::null()); - unref(loop); - run(loop); - loop_delete(loop); - */ - } +fn timer_stop(the_timer: uv_handle, after_cb: fn~(uv_handle)) { + alt the_timer { + uv_timer(id, loop_chan) { + let msg = msg_timer_stop(id, after_cb); + comm::send(loop_chan, msg); + } + _ { + fail "only uv_timer form is allowed in calls to "+ + " uv::timer_stop()"; + } } } + +// internal functions +fn pass_to_libuv( + op_handle: *ctypes::void, + operation_chan: comm::chan, + op: uv_operation) unsafe { + comm::send(operation_chan, copy(op)); + do_send(op_handle); +} +fn do_send(h: *ctypes::void) { + rustrt::rust_uv_async_send(h); +} +fn gen_handle_id() -> [u8] { + ret rand::mk_rng().gen_bytes(16u); +} +fn get_handle_id_from(buf: *u8) -> [u8] unsafe { + ret vec::unsafe::from_buf(buf, 16u); +} + +fn get_loop_chan_from_data(data: *uv_loop_data) + -> uv_loop unsafe { + ret (*data).rust_loop_chan; +} + +fn get_loop_chan_from_handle(handle: uv_handle) + -> uv_loop { + alt handle { + uv_async(id,loop) | uv_timer(id,loop) { + ret loop; + } + _ { + fail "unknown form of uv_handle for get_loop_chan_from " + + " handle"; + } + } +} + +fn get_id_from_handle(handle: uv_handle) -> [u8] { + alt handle { + uv_async(id,loop) | uv_timer(id,loop) { + ret id; + } + _ { + fail "unknown form of uv_handle for get_id_from handle"; + } + } +} + +// crust +crust fn process_operation( + loop: *ctypes::void, + data: *uv_loop_data) unsafe { + let op_port = (*data).operation_port; + let loop_chan = get_loop_chan_from_data(data); + let op_pending = comm::peek(op_port); + while(op_pending) { + alt comm::recv(op_port) { + op_async_init(id) { + let id_ptr = vec::unsafe::to_ptr(id); + let async_handle = rustrt::rust_uv_async_init( + loop, + process_async_send, + id_ptr); + comm::send(loop_chan, uv_async_init( + id, + async_handle)); + } + op_close(handle, handle_ptr) { + handle_op_close(handle, handle_ptr); + } + op_timer_init(id) { + let id_ptr = vec::unsafe::to_ptr(id); + let timer_handle = rustrt::rust_uv_timer_init( + loop, + process_timer_call, + id_ptr); + comm::send(loop_chan, uv_timer_init( + id, + timer_handle)); + } + op_timer_start(id, handle, timeout, repeat) { + rustrt::rust_uv_timer_start(handle, timeout, + repeat); + } + op_timer_stop(id, handle, after_cb) { + rustrt::rust_uv_timer_stop(handle); + comm::send(loop_chan, uv_timer_stop(id, after_cb)); + } + _ { fail "unknown form of uv_operation received"; } + } + op_pending = comm::peek(op_port); + } +} + +fn handle_op_close(handle: uv_handle, handle_ptr: *ctypes::void) { + // it's just like im doing C + alt handle { + uv_async(id, loop) { + let cb = process_close_async; + rustrt::rust_uv_close( + handle_ptr, cb); + } + uv_timer(id, loop) { + let cb = process_close_timer; + rustrt::rust_uv_close( + handle_ptr, cb); + } + _ { + fail "unknown form of uv_handle encountered " + + "in process_operation/op_close"; + } + } +} + +crust fn process_async_send(id_buf: *u8, data: *uv_loop_data) + unsafe { + let handle_id = get_handle_id_from(id_buf); + let loop_chan = get_loop_chan_from_data(data); + comm::send(loop_chan, uv_async_send(handle_id)); +} + +crust fn process_timer_call(id_buf: *u8, data: *uv_loop_data) + unsafe { + let handle_id = get_handle_id_from(id_buf); + let loop_chan = get_loop_chan_from_data(data); + comm::send(loop_chan, uv_timer_call(handle_id)); +} + +fn process_close_common(id: [u8], data: *uv_loop_data) + unsafe { + // notify the rust loop that their handle is closed, then + // the caller will invoke a per-handle-type c++ func to + // free allocated memory + let loop_chan = get_loop_chan_from_data(data); + comm::send(loop_chan, uv_close(id)); +} + +crust fn process_close_async( + id_buf: *u8, + handle_ptr: *ctypes::void, + data: *uv_loop_data) + unsafe { + let id = get_handle_id_from(id_buf); + rustrt::rust_uv_close_async(handle_ptr); + // at this point, the handle and its data has been + // released. notify the rust loop to remove the + // handle and its data and call the user-supplied + // close cb + process_close_common(id, data); +} + +crust fn process_close_timer( + id_buf: *u8, + handle_ptr: *ctypes::void, + data: *uv_loop_data) + unsafe { + let id = get_handle_id_from(id_buf); + rustrt::rust_uv_close_timer(handle_ptr); + process_close_common(id, data); +} + + +#[test] +fn test_uv_new_loop_no_handles() { + let test_loop = uv::loop_new(); + run(test_loop); // this should return immediately + // since there aren't any handles.. +} + +#[test] +fn test_uv_simple_async() { + let test_loop = loop_new(); + let exit_port = comm::port::(); + let exit_chan = comm::chan::(exit_port); + async_init(test_loop, {|new_async| + close(new_async) {|| + comm::send(exit_chan, true); + }; + }, {|new_async| + async_send(new_async); + }); + run(test_loop); + assert comm::recv(exit_port); +} + +#[test] +fn test_uv_timer() { + let test_loop = loop_new(); + let exit_port = comm::port::(); + let exit_chan = comm::chan::(exit_port); + timer_init(test_loop) {|new_timer| + timer_start(new_timer, 1u32, 0u32) {|started_timer| + timer_stop(started_timer) {|stopped_timer| + close(stopped_timer) {|| + comm::send(exit_chan, true); + }; + }; + }; + }; + run(test_loop); + assert comm::recv(exit_port); +} \ No newline at end of file diff --git a/src/libstd/uvtmp.rs b/src/libstd/uvtmp.rs deleted file mode 100644 index b64de837b09..00000000000 --- a/src/libstd/uvtmp.rs +++ /dev/null @@ -1,730 +0,0 @@ -// Some temporary libuv hacks for servo - -// UV2 - -// these are processed solely in the -// process_operation() crust fn below -enum uv_operation { - op_async_init([u8]), - op_close(uv_handle, *ctypes::void), - op_timer_init([u8]), - op_timer_start([u8], *ctypes::void, u32, u32), - op_timer_stop([u8], *ctypes::void, fn~(uv_handle)) -} - -enum uv_handle { - uv_async([u8], uv_loop), - uv_timer([u8], uv_loop) -} - -enum uv_msg { - // requests from library users - msg_run(comm::chan), - msg_run_in_bg(), - msg_async_init(fn~(uv_handle), fn~(uv_handle)), - msg_async_send([u8]), - msg_close(uv_handle, fn~()), - msg_timer_init(fn~(uv_handle)), - msg_timer_start([u8], u32, u32, fn~(uv_handle)), - msg_timer_stop([u8], fn~(uv_handle)), - - // dispatches from libuv - uv_async_init([u8], *ctypes::void), - uv_async_send([u8]), - uv_close([u8]), - uv_timer_init([u8], *ctypes::void), - uv_timer_call([u8]), - uv_timer_stop([u8], fn~(uv_handle)), - uv_end() -} - -type uv_loop_data = { - operation_port: comm::port, - rust_loop_chan: comm::chan -}; - -type uv_loop = comm::chan; - -#[nolink] -native mod rustrt { - fn rust_uvtmp_create_thread() -> thread; - fn rust_uvtmp_start_thread(thread: thread); - fn rust_uvtmp_join_thread(thread: thread); - fn rust_uvtmp_delete_thread(thread: thread); - fn rust_uvtmp_connect( - thread: thread, - req_id: u32, - ip: str::sbuf, - chan: comm::chan) -> connect_data; - fn rust_uvtmp_close_connection(thread: thread, req_id: u32); - fn rust_uvtmp_write( - thread: thread, - req_id: u32, - buf: *u8, - len: ctypes::size_t, - chan: comm::chan); - fn rust_uvtmp_read_start( - thread: thread, - req_id: u32, - chan: comm::chan); - fn rust_uvtmp_timer( - thread: thread, - timeout: u32, - req_id: u32, - chan: comm::chan); - fn rust_uvtmp_delete_buf(buf: *u8); - fn rust_uvtmp_get_req_id(cd: connect_data) -> u32; - - fn rust_uvtmp_uv_loop_new() -> *ctypes::void; - fn rust_uvtmp_uv_loop_set_data( - loop: *ctypes::void, - data: *uv_loop_data); - fn rust_uvtmp_uv_bind_op_cb(loop: *ctypes::void, cb: *u8) - -> *ctypes::void; - fn rust_uvtmp_uv_stop_op_cb(handle: *ctypes::void); - fn rust_uvtmp_uv_run(loop_handle: *ctypes::void); - fn rust_uvtmp_uv_close(handle: *ctypes::void, cb: *u8); - fn rust_uvtmp_uv_close_async(handle: *ctypes::void); - fn rust_uvtmp_uv_close_timer(handle: *ctypes::void); - fn rust_uvtmp_uv_async_send(handle: *ctypes::void); - fn rust_uvtmp_uv_async_init( - loop_handle: *ctypes::void, - cb: *u8, - id: *u8) -> *ctypes::void; - fn rust_uvtmp_uv_timer_init( - loop_handle: *ctypes::void, - cb: *u8, - id: *u8) -> *ctypes::void; - fn rust_uvtmp_uv_timer_start( - timer_handle: *ctypes::void, - timeout: ctypes::c_uint, - repeat: ctypes::c_uint); - fn rust_uvtmp_uv_timer_stop(handle: *ctypes::void); -} - -mod uv { - export loop_new, run, close, run_in_bg, async_init, async_send, - timer_init, timer_start, timer_stop; - - // public functions - fn loop_new() -> uv_loop unsafe { - let ret_recv_port: comm::port = - comm::port(); - let ret_recv_chan: comm::chan = - comm::chan(ret_recv_port); - - task::spawn_sched(task::manual_threads(4u)) {|| - // our beloved uv_loop_t ptr - let loop_handle = rustrt:: - rust_uvtmp_uv_loop_new(); - - // this port/chan pair are used to send messages to - // libuv. libuv processes any pending messages on the - // port (via crust) after receiving an async "wakeup" - // on a special uv_async_t handle created below - let operation_port = comm::port::(); - let operation_chan = comm::chan::( - operation_port); - - // this port/chan pair as used in the while() loop - // below. It takes dispatches, originating from libuv - // callbacks, to invoke handles registered by the - // user - let rust_loop_port = comm::port::(); - let rust_loop_chan = - comm::chan::(rust_loop_port); - // let the task-spawner return - comm::send(ret_recv_chan, copy(rust_loop_chan)); - - // create our "special" async handle that will - // allow all operations against libuv to be - // "buffered" in the operation_port, for processing - // from the thread that libuv runs on - let loop_data: uv_loop_data = { - operation_port: operation_port, - rust_loop_chan: rust_loop_chan - }; - rustrt::rust_uvtmp_uv_loop_set_data( - loop_handle, - ptr::addr_of(loop_data)); // pass an opaque C-ptr - // to libuv, this will be - // in the process_operation - // crust fn - let op_handle = rustrt::rust_uvtmp_uv_bind_op_cb( - loop_handle, - process_operation); - - // all state goes here - let handles: map::map<[u8], *ctypes::void> = - map::new_bytes_hash(); - let id_to_handle: map::map<[u8], uv_handle> = - map::new_bytes_hash(); - let after_cbs: map::map<[u8], fn~(uv_handle)> = - map::new_bytes_hash(); - let close_callbacks: map::map<[u8], fn~()> = - map::new_bytes_hash(); - let async_cbs: map::map<[u8], fn~(uv_handle)> = - map::new_bytes_hash(); - let timer_cbs: map::map<[u8], fn~(uv_handle)> = - map::new_bytes_hash(); - - // the main loop that this task blocks on. - // should have the same lifetime as the C libuv - // event loop. - let keep_going = true; - while (keep_going) { - alt comm::recv(rust_loop_port) { - msg_run(end_chan) { - // start the libuv event loop - // we'll also do a uv_async_send with - // the operation handle to have the - // loop process any pending operations - // once its up and running - task::spawn_sched(task::manual_threads(1u)) {|| - // this call blocks - rustrt::rust_uvtmp_uv_run(loop_handle); - // when we're done, msg the - // end chan - rustrt::rust_uvtmp_uv_stop_op_cb(op_handle); - comm::send(end_chan, true); - comm::send(rust_loop_chan, uv_end); - }; - } - - msg_run_in_bg { - task::spawn_sched(task::manual_threads(1u)) {|| - // this call blocks - rustrt::rust_uvtmp_uv_run(loop_handle); - }; - } - - msg_close(handle, cb) { - let id = get_id_from_handle(handle); - close_callbacks.insert(id, cb); - let handle_ptr = handles.get(id); - let op = op_close(handle, handle_ptr); - - pass_to_libuv(op_handle, operation_chan, op); - } - uv_close(id) { - handles.remove(id); - let handle = id_to_handle.get(id); - id_to_handle.remove(id); - alt handle { - uv_async(id, _) { - async_cbs.remove(id); - } - uv_timer(id, _) { - timer_cbs.remove(id); - } - _ { - fail "unknown form of uv_handle encountered " - + "in uv_close handler"; - } - } - let cb = close_callbacks.get(id); - close_callbacks.remove(id); - task::spawn {|| - cb(); - }; - } - - msg_async_init(callback, after_cb) { - // create a new async handle - // with the id as the handle's - // data and save the callback for - // invocation on msg_async_send - let id = gen_handle_id(); - async_cbs.insert(id, callback); - after_cbs.insert(id, after_cb); - let op = op_async_init(id); - pass_to_libuv(op_handle, operation_chan, op); - } - uv_async_init(id, async_handle) { - // libuv created a handle, which is - // passed back to us. save it and - // then invoke the supplied callback - // for after completion - handles.insert(id, async_handle); - let after_cb = after_cbs.get(id); - after_cbs.remove(id); - let async = uv_async(id, rust_loop_chan); - id_to_handle.insert(id, copy(async)); - task::spawn {|| - after_cb(async); - }; - } - - msg_async_send(id) { - let async_handle = handles.get(id); - do_send(async_handle); - } - uv_async_send(id) { - let async_cb = async_cbs.get(id); - task::spawn {|| - async_cb(uv_async(id, rust_loop_chan)); - }; - } - - msg_timer_init(after_cb) { - let id = gen_handle_id(); - after_cbs.insert(id, after_cb); - let op = op_timer_init(id); - pass_to_libuv(op_handle, operation_chan, op); - } - uv_timer_init(id, handle) { - handles.insert(id, handle); - let after_cb = after_cbs.get(id); - after_cbs.remove(id); - let new_timer = uv_timer(id, rust_loop_chan); - id_to_handle.insert(id, copy(new_timer)); - task::spawn {|| - after_cb(new_timer); - }; - } - - uv_timer_call(id) { - let cb = timer_cbs.get(id); - let the_timer = id_to_handle.get(id); - task::spawn {|| - cb(the_timer); - }; - } - - msg_timer_start(id, timeout, repeat, timer_call_cb) { - timer_cbs.insert(id, timer_call_cb); - let handle = handles.get(id); - let op = op_timer_start(id, handle, timeout, - repeat); - pass_to_libuv(op_handle, operation_chan, op); - } - - msg_timer_stop(id, after_cb) { - let handle = handles.get(id); - let op = op_timer_stop(id, handle, after_cb); - pass_to_libuv(op_handle, operation_chan, op); - } - uv_timer_stop(id, after_cb) { - let the_timer = id_to_handle.get(id); - after_cb(the_timer); - } - - uv_end() { - keep_going = false; - } - - _ { fail "unknown form of uv_msg received"; } - } - } - }; - ret comm::recv(ret_recv_port); - } - - fn run(loop: uv_loop) { - let end_port = comm::port::(); - let end_chan = comm::chan::(end_port); - comm::send(loop, msg_run(end_chan)); - comm::recv(end_port); - } - - fn run_in_bg(loop: uv_loop) { - comm::send(loop, msg_run_in_bg); - } - - fn async_init ( - loop: uv_loop, - async_cb: fn~(uv_handle), - after_cb: fn~(uv_handle)) { - let msg = msg_async_init(async_cb, after_cb); - comm::send(loop, msg); - } - - fn async_send(async: uv_handle) { - alt async { - uv_async(id, loop) { - comm::send(loop, msg_async_send(id)); - } - _ { - fail "attempting to call async_send() with a" + - " uv_async uv_handle"; - } - } - } - - fn close(h: uv_handle, cb: fn~()) { - let loop_chan = get_loop_chan_from_handle(h); - comm::send(loop_chan, msg_close(h, cb)); - } - - fn timer_init(loop: uv_loop, after_cb: fn~(uv_handle)) { - let msg = msg_timer_init(after_cb); - comm::send(loop, msg); - } - - fn timer_start(the_timer: uv_handle, timeout: u32, repeat:u32, - timer_cb: fn~(uv_handle)) { - alt the_timer { - uv_timer(id, loop_chan) { - let msg = msg_timer_start(id, timeout, repeat, timer_cb); - comm::send(loop_chan, msg); - } - _ { - fail "can only pass a uv_timer form of uv_handle to "+ - " uv::timer_start()"; - } - } - } - - fn timer_stop(the_timer: uv_handle, after_cb: fn~(uv_handle)) { - alt the_timer { - uv_timer(id, loop_chan) { - let msg = msg_timer_stop(id, after_cb); - comm::send(loop_chan, msg); - } - _ { - fail "only uv_timer form is allowed in calls to "+ - " uv::timer_stop()"; - } - } - } - - // internal functions - fn pass_to_libuv( - op_handle: *ctypes::void, - operation_chan: comm::chan, - op: uv_operation) unsafe { - comm::send(operation_chan, copy(op)); - do_send(op_handle); - } - fn do_send(h: *ctypes::void) { - rustrt::rust_uvtmp_uv_async_send(h); - } - fn gen_handle_id() -> [u8] { - ret rand::mk_rng().gen_bytes(16u); - } - fn get_handle_id_from(buf: *u8) -> [u8] unsafe { - ret vec::unsafe::from_buf(buf, 16u); - } - - fn get_loop_chan_from_data(data: *uv_loop_data) - -> uv_loop unsafe { - ret (*data).rust_loop_chan; - } - - fn get_loop_chan_from_handle(handle: uv_handle) - -> uv_loop { - alt handle { - uv_async(id,loop) | uv_timer(id,loop) { - ret loop; - } - _ { - fail "unknown form of uv_handle for get_loop_chan_from " - + " handle"; - } - } - } - - fn get_id_from_handle(handle: uv_handle) -> [u8] { - alt handle { - uv_async(id,loop) | uv_timer(id,loop) { - ret id; - } - _ { - fail "unknown form of uv_handle for get_id_from handle"; - } - } - } - - // crust - crust fn process_operation( - loop: *ctypes::void, - data: *uv_loop_data) unsafe { - let op_port = (*data).operation_port; - let loop_chan = get_loop_chan_from_data(data); - let op_pending = comm::peek(op_port); - while(op_pending) { - alt comm::recv(op_port) { - op_async_init(id) { - let id_ptr = vec::unsafe::to_ptr(id); - let async_handle = rustrt::rust_uvtmp_uv_async_init( - loop, - process_async_send, - id_ptr); - comm::send(loop_chan, uv_async_init( - id, - async_handle)); - } - op_close(handle, handle_ptr) { - handle_op_close(handle, handle_ptr); - } - op_timer_init(id) { - let id_ptr = vec::unsafe::to_ptr(id); - let timer_handle = rustrt::rust_uvtmp_uv_timer_init( - loop, - process_timer_call, - id_ptr); - comm::send(loop_chan, uv_timer_init( - id, - timer_handle)); - } - op_timer_start(id, handle, timeout, repeat) { - rustrt::rust_uvtmp_uv_timer_start(handle, timeout, - repeat); - } - op_timer_stop(id, handle, after_cb) { - rustrt::rust_uvtmp_uv_timer_stop(handle); - comm::send(loop_chan, uv_timer_stop(id, after_cb)); - } - _ { fail "unknown form of uv_operation received"; } - } - op_pending = comm::peek(op_port); - } - } - - fn handle_op_close(handle: uv_handle, handle_ptr: *ctypes::void) { - // it's just like im doing C - alt handle { - uv_async(id, loop) { - let cb = process_close_async; - rustrt::rust_uvtmp_uv_close( - handle_ptr, cb); - } - uv_timer(id, loop) { - let cb = process_close_timer; - rustrt::rust_uvtmp_uv_close( - handle_ptr, cb); - } - _ { - fail "unknown form of uv_handle encountered " + - "in process_operation/op_close"; - } - } - } - - crust fn process_async_send(id_buf: *u8, data: *uv_loop_data) - unsafe { - let handle_id = get_handle_id_from(id_buf); - let loop_chan = get_loop_chan_from_data(data); - comm::send(loop_chan, uv_async_send(handle_id)); - } - - crust fn process_timer_call(id_buf: *u8, data: *uv_loop_data) - unsafe { - let handle_id = get_handle_id_from(id_buf); - let loop_chan = get_loop_chan_from_data(data); - comm::send(loop_chan, uv_timer_call(handle_id)); - } - - fn process_close_common(id: [u8], data: *uv_loop_data) - unsafe { - // notify the rust loop that their handle is closed, then - // the caller will invoke a per-handle-type c++ func to - // free allocated memory - let loop_chan = get_loop_chan_from_data(data); - comm::send(loop_chan, uv_close(id)); - } - - crust fn process_close_async( - id_buf: *u8, - handle_ptr: *ctypes::void, - data: *uv_loop_data) - unsafe { - let id = get_handle_id_from(id_buf); - rustrt::rust_uvtmp_uv_close_async(handle_ptr); - // at this point, the handle and its data has been - // released. notify the rust loop to remove the - // handle and its data and call the user-supplied - // close cb - process_close_common(id, data); - } - - crust fn process_close_timer( - id_buf: *u8, - handle_ptr: *ctypes::void, - data: *uv_loop_data) - unsafe { - let id = get_handle_id_from(id_buf); - rustrt::rust_uvtmp_uv_close_timer(handle_ptr); - process_close_common(id, data); - } - -} - -#[test] -fn test_uvtmp_uv_new_loop_no_handles() { - let test_loop = uv::loop_new(); - uv::run(test_loop); // this should return immediately - // since there aren't any handles.. -} - -#[test] -fn test_uvtmp_uv_simple_async() { - let test_loop = uv::loop_new(); - let exit_port = comm::port::(); - let exit_chan = comm::chan::(exit_port); - uv::async_init(test_loop, {|new_async| - uv::close(new_async) {|| - comm::send(exit_chan, true); - }; - }, {|new_async| - uv::async_send(new_async); - }); - uv::run(test_loop); - assert comm::recv(exit_port); -} - -#[test] -fn test_uvtmp_uv_timer() { - let test_loop = uv::loop_new(); - let exit_port = comm::port::(); - let exit_chan = comm::chan::(exit_port); - uv::timer_init(test_loop) {|new_timer| - uv::timer_start(new_timer, 1u32, 0u32) {|started_timer| - uv::timer_stop(started_timer) {|stopped_timer| - uv::close(stopped_timer) {|| - comm::send(exit_chan, true); - }; - }; - }; - }; - uv::run(test_loop); - assert comm::recv(exit_port); -} - -// END OF UV2 - -type thread = *ctypes::void; - -type connect_data = *ctypes::void; - -enum iomsg { - whatever, - connected(connect_data), - wrote(connect_data), - read(connect_data, *u8, ctypes::ssize_t), - timer(u32), - exit -} - -fn create_thread() -> thread { - rustrt::rust_uvtmp_create_thread() -} - -fn start_thread(thread: thread) { - rustrt::rust_uvtmp_start_thread(thread) -} - -fn join_thread(thread: thread) { - rustrt::rust_uvtmp_join_thread(thread) -} - -fn delete_thread(thread: thread) { - rustrt::rust_uvtmp_delete_thread(thread) -} - -fn connect(thread: thread, req_id: u32, - ip: str, ch: comm::chan) -> connect_data { - str::as_buf(ip) {|ipbuf| - rustrt::rust_uvtmp_connect(thread, req_id, ipbuf, ch) - } -} - -fn close_connection(thread: thread, req_id: u32) { - rustrt::rust_uvtmp_close_connection(thread, req_id); -} - -fn write(thread: thread, req_id: u32, bytes: [u8], - chan: comm::chan) unsafe { - rustrt::rust_uvtmp_write( - thread, req_id, vec::to_ptr(bytes), vec::len(bytes), chan); -} - -fn read_start(thread: thread, req_id: u32, - chan: comm::chan) { - rustrt::rust_uvtmp_read_start(thread, req_id, chan); -} - -fn timer_start(thread: thread, timeout: u32, req_id: u32, - chan: comm::chan) { - rustrt::rust_uvtmp_timer(thread, timeout, req_id, chan); -} - -fn delete_buf(buf: *u8) { - rustrt::rust_uvtmp_delete_buf(buf); -} - -fn get_req_id(cd: connect_data) -> u32 { - ret rustrt::rust_uvtmp_get_req_id(cd); -} - -#[test] -fn test_start_stop() { - let thread = create_thread(); - start_thread(thread); - join_thread(thread); - delete_thread(thread); -} - -#[test] -#[ignore] -fn test_connect() { - let thread = create_thread(); - start_thread(thread); - let port = comm::port(); - let chan = comm::chan(port); - connect(thread, 0u32, "74.125.224.146", chan); - alt comm::recv(port) { - connected(cd) { - close_connection(thread, 0u32); - } - _ { fail "test_connect: port isn't connected"; } - } - join_thread(thread); - delete_thread(thread); -} - -#[test] -#[ignore] -fn test_http() { - let thread = create_thread(); - start_thread(thread); - let port = comm::port(); - let chan = comm::chan(port); - connect(thread, 0u32, "74.125.224.146", chan); - alt comm::recv(port) { - connected(cd) { - write(thread, 0u32, str::bytes("GET / HTTP/1.0\n\n"), chan); - alt comm::recv(port) { - wrote(cd) { - read_start(thread, 0u32, chan); - let keep_going = true; - while keep_going { - alt comm::recv(port) { - read(_, buf, -1) { - keep_going = false; - delete_buf(buf); - } - read(_, buf, len) { - unsafe { - log(error, len); - let buf = vec::unsafe::from_buf(buf, - len as uint); - let str = str::from_bytes(buf); - #error("read something"); - io::println(str); - } - delete_buf(buf); - } - _ { fail "test_http: protocol error"; } - } - } - close_connection(thread, 0u32); - } - _ { fail "test_http: expected `wrote`"; } - } - } - _ { fail "test_http: port not connected"; } - } - join_thread(thread); - delete_thread(thread); -} diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index 9ee1b844add..03cd75d4805 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -1,51 +1,183 @@ #include "rust_internal.h" #include "uv.h" -/* - Wrappers of uv_* functions. These can be eliminated by figuring - out how to build static uv with externs, or by just using dynamic libuv - */ +// crust fn pointers +typedef void (*crust_async_op_cb)(uv_loop_t* loop, void* data); +typedef void (*crust_simple_cb)(uint8_t* id_buf, void* loop_data); +typedef void (*crust_close_cb)(uint8_t* id_buf, void* handle, + void* data); -extern "C" CDECL uv_loop_t* -rust_uv_default_loop() { - return uv_default_loop(); +// data types +#define RUST_UV_HANDLE_LEN 16 + +struct handle_data { + uint8_t id_buf[RUST_UV_HANDLE_LEN]; + crust_simple_cb cb; + crust_close_cb close_cb; +}; + +// helpers +static void* +current_kernel_malloc(size_t size, const char* tag) { + void* ptr = rust_task_thread::get_task()->kernel->malloc(size, tag); + return ptr; } -extern "C" CDECL uv_loop_t* +static void +current_kernel_free(void* ptr) { + rust_task_thread::get_task()->kernel->free(ptr); +} + +static handle_data* +new_handle_data_from(uint8_t* buf, crust_simple_cb cb) { + handle_data* data = (handle_data*)current_kernel_malloc( + sizeof(handle_data), + "handle_data"); + memcpy(data->id_buf, buf, RUST_UV_HANDLE_LEN); + data->cb = cb; + return data; +} + +// libuv callback impls +static void +native_crust_async_op_cb(uv_async_t* handle, int status) { + crust_async_op_cb cb = (crust_async_op_cb)handle->data; + void* loop_data = handle->loop->data; + cb(handle->loop, loop_data); +} + +static void +native_async_cb(uv_async_t* handle, int status) { + handle_data* handle_d = (handle_data*)handle->data; + void* loop_data = handle->loop->data; + handle_d->cb(handle_d->id_buf, loop_data); +} + +static void +native_timer_cb(uv_timer_t* handle, int status) { + handle_data* handle_d = (handle_data*)handle->data; + void* loop_data = handle->loop->data; + handle_d->cb(handle_d->id_buf, loop_data); +} + +static void +native_close_cb(uv_handle_t* handle) { + handle_data* data = (handle_data*)handle->data; + data->close_cb(data->id_buf, handle, handle->loop->data); +} + +static void +native_close_op_cb(uv_handle_t* op_handle) { + uv_loop_t* loop = op_handle->loop; + current_kernel_free(op_handle); + loop->data = 0; // a ptr to some stack-allocated rust mem + uv_loop_delete(loop); +} + +// native fns bound in rust +extern "C" void* rust_uv_loop_new() { - return uv_loop_new(); + return (void*)uv_loop_new(); } -extern "C" CDECL void -rust_uv_loop_delete(uv_loop_t *loop) { - return uv_loop_delete(loop); +extern "C" void +rust_uv_loop_set_data(uv_loop_t* loop, void* data) { + loop->data = data; } -extern "C" CDECL int -rust_uv_run(uv_loop_t *loop) { - return uv_run(loop); +extern "C" void* +rust_uv_bind_op_cb(uv_loop_t* loop, crust_async_op_cb cb) { + uv_async_t* async = (uv_async_t*)current_kernel_malloc( + sizeof(uv_async_t), + "uv_async_t"); + uv_async_init(loop, async, native_crust_async_op_cb); + async->data = (void*)cb; + // decrement the ref count, so that our async bind + // doesn't count towards keeping the loop alive + uv_unref(loop); + return async; } -extern "C" CDECL void -rust_uv_unref(uv_loop_t *loop) { - return uv_unref(loop); +extern "C" void +rust_uv_stop_op_cb(uv_handle_t* op_handle) { + /* // this is a hack to get libuv to cleanup a + // handle that was made to not prevent the loop + // from exiting via uv_unref(). + uv_ref(op_handle->loop); + uv_close(op_handle, native_close_op_cb); + uv_run(op_handle->loop); // this should process the handle's + // close event and then return + */ + // the above code is supposed to work to cleanly close + // a handler that was uv_unref()'d. but it causes much spew + // instead. this is the ugly/quick way to deal w/ it for now. + uv_close(op_handle, native_close_op_cb); + native_close_op_cb(op_handle); } -extern "C" CDECL int -rust_uv_idle_init(uv_loop_t* loop, uv_idle_t* idle) { - return uv_idle_init(loop, idle); +extern "C" void +rust_uv_run(uv_loop_t* loop) { + uv_run(loop); } -extern "C" CDECL int -rust_uv_idle_start(uv_idle_t* idle, uv_idle_cb cb) { - return uv_idle_start(idle, cb); +extern "C" void +rust_uv_close(uv_handle_t* handle, crust_close_cb cb) { + handle_data* data = (handle_data*)handle->data; + data->close_cb = cb; + uv_close(handle, native_close_cb); } - - - -extern "C" CDECL size_t -rust_uv_size_of_idle_t() { - return sizeof(uv_idle_t); +extern "C" void +rust_uv_close_async(uv_async_t* handle) { + current_kernel_free(handle->data); + current_kernel_free(handle); +} + +extern "C" void +rust_uv_close_timer(uv_async_t* handle) { + current_kernel_free(handle->data); + current_kernel_free(handle); +} + +extern "C" void +rust_uv_async_send(uv_async_t* handle) { + uv_async_send(handle); +} + +extern "C" void* +rust_uv_async_init(uv_loop_t* loop, crust_simple_cb cb, + uint8_t* buf) { + uv_async_t* async = (uv_async_t*)current_kernel_malloc( + sizeof(uv_async_t), + "uv_async_t"); + uv_async_init(loop, async, native_async_cb); + handle_data* data = new_handle_data_from(buf, cb); + async->data = data; + + return async; +} + +extern "C" void* +rust_uv_timer_init(uv_loop_t* loop, crust_simple_cb cb, + uint8_t* buf) { + uv_timer_t* new_timer = (uv_timer_t*)current_kernel_malloc( + sizeof(uv_timer_t), + "uv_timer_t"); + uv_timer_init(loop, new_timer); + handle_data* data = new_handle_data_from(buf, cb); + new_timer->data = data; + + return new_timer; +} + +extern "C" void +rust_uv_timer_start(uv_timer_t* the_timer, uint32_t timeout, + uint32_t repeat) { + uv_timer_start(the_timer, native_timer_cb, timeout, repeat); +} + +extern "C" void +rust_uv_timer_stop(uv_timer_t* the_timer) { + uv_timer_stop(the_timer); } diff --git a/src/rt/rust_uvtmp.cpp b/src/rt/rust_uvtmp.cpp deleted file mode 100644 index 697231dff1b..00000000000 --- a/src/rt/rust_uvtmp.cpp +++ /dev/null @@ -1,612 +0,0 @@ -#include -#include -#include -#include "rust_internal.h" -#include "uv.h" - -class rust_uvtmp_thread; - -struct connect_data { - uint32_t req_id; - rust_uvtmp_thread *thread; - char * ip_addr; - uv_connect_t connect; - uv_tcp_t tcp; - chan_handle chan; -}; - -const intptr_t whatever_tag = 0; -const intptr_t connected_tag = 1; -const intptr_t wrote_tag = 2; -const intptr_t read_tag = 3; -const intptr_t timer_tag = 4; -const intptr_t exit_tag = 5; - -struct iomsg { - intptr_t tag; - union { - connect_data *connected_val; - connect_data *wrote_val; - struct { - connect_data *cd; - uint8_t *buf; - ssize_t nread; - } read_val; - uint32_t timer_req_id; - } val; -}; - -struct write_data { - connect_data *cd; - uint8_t *buf; - size_t len; - chan_handle chan; -}; - -struct read_start_data { - connect_data *cd; - chan_handle chan; -}; - -struct timer_start_data { - rust_uvtmp_thread *thread; - uint32_t timeout; - uint32_t req_id; - chan_handle chan; -}; - -// UVTMP REWORK - -// crust fn pointers -typedef void (*crust_async_op_cb)(uv_loop_t* loop, void* data); -typedef void (*crust_simple_cb)(uint8_t* id_buf, void* loop_data); -typedef void (*crust_close_cb)(uint8_t* id_buf, void* handle, - void* data); - -// data types -#define RUST_UV_HANDLE_LEN 16 - -struct handle_data { - uint8_t id_buf[RUST_UV_HANDLE_LEN]; - crust_simple_cb cb; - crust_close_cb close_cb; -}; - -// helpers -static void* -current_kernel_malloc(size_t size, const char* tag) { - void* ptr = rust_task_thread::get_task()->kernel->malloc(size, tag); - return ptr; -} - -static void -current_kernel_free(void* ptr) { - rust_task_thread::get_task()->kernel->free(ptr); -} - -static handle_data* -new_handle_data_from(uint8_t* buf, crust_simple_cb cb) { - handle_data* data = (handle_data*)current_kernel_malloc( - sizeof(handle_data), - "handle_data"); - memcpy(data->id_buf, buf, RUST_UV_HANDLE_LEN); - data->cb = cb; - return data; -} - -// libuv callback impls -static void -native_crust_async_op_cb(uv_async_t* handle, int status) { - crust_async_op_cb cb = (crust_async_op_cb)handle->data; - void* loop_data = handle->loop->data; - cb(handle->loop, loop_data); -} - -static void -native_async_cb(uv_async_t* handle, int status) { - handle_data* handle_d = (handle_data*)handle->data; - void* loop_data = handle->loop->data; - handle_d->cb(handle_d->id_buf, loop_data); -} - -static void -native_timer_cb(uv_timer_t* handle, int status) { - handle_data* handle_d = (handle_data*)handle->data; - void* loop_data = handle->loop->data; - handle_d->cb(handle_d->id_buf, loop_data); -} - -static void -native_close_cb(uv_handle_t* handle) { - handle_data* data = (handle_data*)handle->data; - data->close_cb(data->id_buf, handle, handle->loop->data); -} - -static void -native_close_op_cb(uv_handle_t* op_handle) { - uv_loop_t* loop = op_handle->loop; - current_kernel_free(op_handle); - loop->data = 0; // a ptr to some stack-allocated rust mem - uv_loop_delete(loop); -} - -// native fns bound in rust -extern "C" void* -rust_uvtmp_uv_loop_new() { - return (void*)uv_loop_new(); -} - -extern "C" void -rust_uvtmp_uv_loop_set_data(uv_loop_t* loop, void* data) { - loop->data = data; -} - -extern "C" void* -rust_uvtmp_uv_bind_op_cb(uv_loop_t* loop, crust_async_op_cb cb) { - uv_async_t* async = (uv_async_t*)current_kernel_malloc( - sizeof(uv_async_t), - "uv_async_t"); - uv_async_init(loop, async, native_crust_async_op_cb); - async->data = (void*)cb; - // decrement the ref count, so that our async bind - // doesn't count towards keeping the loop alive - uv_unref(loop); - return async; -} - -extern "C" void -rust_uvtmp_uv_stop_op_cb(uv_handle_t* op_handle) { - /* // this is a hack to get libuv to cleanup a - // handle that was made to not prevent the loop - // from exiting via uv_unref(). - uv_ref(op_handle->loop); - uv_close(op_handle, native_close_op_cb); - uv_run(op_handle->loop); // this should process the handle's - // close event and then return - */ - // the above code is supposed to work to cleanly close - // a handler that was uv_unref()'d. but it causes much spew - // instead. this is the ugly/quick way to deal w/ it for now. - uv_close(op_handle, native_close_op_cb); - native_close_op_cb(op_handle); -} - -extern "C" void -rust_uvtmp_uv_run(uv_loop_t* loop) { - uv_run(loop); -} - -extern "C" void -rust_uvtmp_uv_close(uv_handle_t* handle, crust_close_cb cb) { - handle_data* data = (handle_data*)handle->data; - data->close_cb = cb; - uv_close(handle, native_close_cb); -} - -extern "C" void -rust_uvtmp_uv_close_async(uv_async_t* handle) { - current_kernel_free(handle->data); - current_kernel_free(handle); -} - -extern "C" void -rust_uvtmp_uv_close_timer(uv_async_t* handle) { - current_kernel_free(handle->data); - current_kernel_free(handle); -} - -extern "C" void -rust_uvtmp_uv_async_send(uv_async_t* handle) { - uv_async_send(handle); -} - -extern "C" void* -rust_uvtmp_uv_async_init(uv_loop_t* loop, crust_simple_cb cb, - uint8_t* buf) { - uv_async_t* async = (uv_async_t*)current_kernel_malloc( - sizeof(uv_async_t), - "uv_async_t"); - uv_async_init(loop, async, native_async_cb); - handle_data* data = new_handle_data_from(buf, cb); - async->data = data; - - return async; -} - -extern "C" void* -rust_uvtmp_uv_timer_init(uv_loop_t* loop, crust_simple_cb cb, - uint8_t* buf) { - uv_timer_t* new_timer = (uv_timer_t*)current_kernel_malloc( - sizeof(uv_timer_t), - "uv_timer_t"); - uv_timer_init(loop, new_timer); - handle_data* data = new_handle_data_from(buf, cb); - new_timer->data = data; - - return new_timer; -} - -extern "C" void -rust_uvtmp_uv_timer_start(uv_timer_t* the_timer, uint32_t timeout, - uint32_t repeat) { - uv_timer_start(the_timer, native_timer_cb, timeout, repeat); -} - -extern "C" void -rust_uvtmp_uv_timer_stop(uv_timer_t* the_timer) { - uv_timer_stop(the_timer); -} - -// UVTMP REWORK - -// FIXME: Copied from rust_builtins.cpp. Could bitrot easily -static void -send(rust_task *task, chan_handle chan, void *data) { - rust_task *target_task = task->kernel->get_task_by_id(chan.task); - if(target_task) { - rust_port *port = target_task->get_port_by_id(chan.port); - if(port) { - port->send(data); - scoped_lock with(target_task->lock); - port->deref(); - } - target_task->deref(); - } -} - -class rust_uvtmp_thread : public rust_thread { - -private: - std::map req_map; - rust_task *task; - uv_loop_t *loop; - uv_idle_t idle; - lock_and_signal lock; - bool stop_flag; - std::queue > connect_queue; - std::queue close_connection_queue; - std::queue write_queue; - std::queue read_start_queue; - std::queue timer_start_queue; -public: - - rust_uvtmp_thread() { - task = rust_task_thread::get_task(); - stop_flag = false; - loop = uv_loop_new(); - uv_idle_init(loop, &idle); - idle.data = this; - uv_idle_start(&idle, idle_cb); - } - - ~rust_uvtmp_thread() { - uv_loop_delete(loop); - } - - void stop() { - scoped_lock with(lock); - stop_flag = true; - } - - connect_data *connect(uint32_t req_id, char *ip, chan_handle chan) { - scoped_lock with(lock); - if (req_map.count(req_id)) return NULL; - connect_data *cd = new connect_data(); - req_map[req_id] = cd; - cd->req_id = req_id; - cd->ip_addr = ip; - connect_queue.push( - std::pair(cd, chan)); - return cd; - } - - void - close_connection(uint32_t req_id) { - scoped_lock with(lock); - connect_data *cd = req_map[req_id]; - close_connection_queue.push(cd); - req_map.erase(req_id); - } - - void - write(uint32_t req_id, uint8_t *buf, size_t len, chan_handle chan) { - scoped_lock with(lock); - connect_data *cd = req_map[req_id]; - write_data *wd = new write_data(); - wd->cd = cd; - wd->buf = new uint8_t[len]; - wd->len = len; - wd->chan = chan; - - memcpy(wd->buf, buf, len); - - write_queue.push(wd); - } - - void - read_start(uint32_t req_id, chan_handle chan) { - scoped_lock with(lock); - connect_data *cd = req_map[req_id]; - read_start_data *rd = new read_start_data(); - rd->cd = cd; - rd->chan = chan; - - read_start_queue.push(rd); - } - - void - timer(uint32_t timeout, uint32_t req_id, chan_handle chan) { - scoped_lock with(lock); - - timer_start_data *td = new timer_start_data(); - td->timeout = timeout; - td->req_id = req_id; - td->chan = chan; - timer_start_queue.push(td); - } - -private: - - virtual void - run() { - uv_run(loop); - } - - static void - idle_cb(uv_idle_t* handle, int status) { - rust_uvtmp_thread *self = (rust_uvtmp_thread*) handle->data; - self->on_idle(); - } - - void - on_idle() { - scoped_lock with(lock); - make_new_connections(); - close_connections(); - write_buffers(); - start_reads(); - start_timers(); - close_idle_if_stop(); - } - - void - make_new_connections() { - assert(lock.lock_held_by_current_thread()); - while (!connect_queue.empty()) { - std::pair pair = connect_queue.front(); - connect_queue.pop(); - connect_data *cd = pair.first; - struct sockaddr_in client_addr = uv_ip4_addr("0.0.0.0", 0); - struct sockaddr_in server_addr = uv_ip4_addr(cd->ip_addr, 80); - - cd->thread = this; - cd->chan = pair.second; - cd->connect.data = cd; - - uv_tcp_init(loop, &cd->tcp); - uv_tcp_bind(&cd->tcp, client_addr); - - uv_tcp_connect(&cd->connect, &cd->tcp, server_addr, connect_cb); - } - } - - static void - connect_cb(uv_connect_t *handle, int status) { - connect_data *cd = (connect_data*)handle->data; - cd->thread->on_connect(cd); - } - - void - on_connect(connect_data *cd) { - iomsg msg; - msg.tag = connected_tag; - msg.val.connected_val = cd; - - send(task, cd->chan, &msg); - } - - void - close_connections() { - assert(lock.lock_held_by_current_thread()); - while (!close_connection_queue.empty()) { - connect_data *cd = close_connection_queue.front(); - close_connection_queue.pop(); - - cd->tcp.data = cd; - - uv_close((uv_handle_t*)&cd->tcp, tcp_close_cb); - } - } - - static void - tcp_close_cb(uv_handle_t *handle) { - connect_data *cd = (connect_data*)handle->data; - delete cd; - } - - void - write_buffers() { - assert(lock.lock_held_by_current_thread()); - while (!write_queue.empty()) { - write_data *wd = write_queue.front(); - write_queue.pop(); - - uv_write_t *write = new uv_write_t(); - - write->data = wd; - - uv_buf_t buf; - buf.base = (char*)wd->buf; - buf.len = wd->len; - - uv_write(write, (uv_stream_t*)&wd->cd->tcp, &buf, 1, write_cb); - } - } - - static void - write_cb(uv_write_t *handle, int status) { - write_data *wd = (write_data*)handle->data; - rust_uvtmp_thread *self = wd->cd->thread; - self->on_write(handle, wd); - } - - void - on_write(uv_write_t *handle, write_data *wd) { - iomsg msg; - msg.tag = timer_tag; - msg.val.wrote_val = wd->cd; - - send(task, wd->chan, &msg); - - delete [] wd->buf; - delete wd; - delete handle; - } - - void - start_reads() { - assert (lock.lock_held_by_current_thread()); - while (!read_start_queue.empty()) { - read_start_data *rd = read_start_queue.front(); - read_start_queue.pop(); - - connect_data *cd = rd->cd; - cd->tcp.data = rd; - - uv_read_start((uv_stream_t*)&cd->tcp, alloc_cb, read_cb); - } - } - - static uv_buf_t - alloc_cb(uv_handle_t* handle, size_t size) { - uv_buf_t buf; - buf.base = new char[size]; - buf.len = size; - return buf; - } - - static void - read_cb(uv_stream_t *handle, ssize_t nread, uv_buf_t buf) { - read_start_data *rd = (read_start_data*)handle->data; - rust_uvtmp_thread *self = rd->cd->thread; - self->on_read(rd, nread, buf); - } - - void - on_read(read_start_data *rd, ssize_t nread, uv_buf_t buf) { - iomsg msg; - msg.tag = read_tag; - msg.val.read_val.cd = rd->cd; - msg.val.read_val.buf = (uint8_t*)buf.base; - msg.val.read_val.nread = nread; - - send(task, rd->chan, &msg); - if (nread == -1) { - delete rd; - } - } - - void - start_timers() { - assert (lock.lock_held_by_current_thread()); - while (!timer_start_queue.empty()) { - timer_start_data *td = timer_start_queue.front(); - timer_start_queue.pop(); - - td->thread = this; - - uv_timer_t *timer = (uv_timer_t *)malloc(sizeof(uv_timer_t)); - timer->data = td; - uv_timer_init(loop, timer); - uv_timer_start(timer, timer_cb, td->timeout, 0); - } - } - - static void - timer_cb(uv_timer_t *handle, int what) { - timer_start_data *td = (timer_start_data*)handle->data; - rust_uvtmp_thread *self = td->thread; - self->on_timer(td); - free(handle); - } - - void - on_timer(timer_start_data *rd) { - iomsg msg; - msg.tag = timer_tag; - msg.val.timer_req_id = rd->req_id; - - send(task, rd->chan, &msg); - delete rd; - } - - void - close_idle_if_stop() { - assert(lock.lock_held_by_current_thread()); - if (stop_flag) { - uv_close((uv_handle_t*)&idle, NULL); - } - } - -}; - -extern "C" rust_uvtmp_thread * -rust_uvtmp_create_thread() { - rust_uvtmp_thread *thread = new rust_uvtmp_thread(); - return thread; -} - -extern "C" void -rust_uvtmp_start_thread(rust_uvtmp_thread *thread) { - thread->start(); -} - -extern "C" void -rust_uvtmp_join_thread(rust_uvtmp_thread *thread) { - thread->stop(); - thread->join(); -} - -extern "C" void -rust_uvtmp_delete_thread(rust_uvtmp_thread *thread) { - delete thread; -} - -extern "C" connect_data * -rust_uvtmp_connect(rust_uvtmp_thread *thread, uint32_t req_id, char *ip, chan_handle *chan) { - return thread->connect(req_id, ip, *chan); -} - -extern "C" void -rust_uvtmp_close_connection(rust_uvtmp_thread *thread, uint32_t req_id) { - thread->close_connection(req_id); -} - -extern "C" void -rust_uvtmp_write(rust_uvtmp_thread *thread, uint32_t req_id, - uint8_t *buf, size_t len, chan_handle *chan) { - thread->write(req_id, buf, len, *chan); -} - -extern "C" void -rust_uvtmp_read_start(rust_uvtmp_thread *thread, uint32_t req_id, - chan_handle *chan) { - thread->read_start(req_id, *chan); -} - -extern "C" void -rust_uvtmp_timer(rust_uvtmp_thread *thread, uint32_t timeout, uint32_t req_id, chan_handle *chan) { - thread->timer(timeout, req_id, *chan); -} - -extern "C" void -rust_uvtmp_delete_buf(uint8_t *buf) { - delete [] buf; -} - -extern "C" uint32_t -rust_uvtmp_get_req_id(connect_data *cd) { - return cd->req_id; -} - - diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 2c7265ff58a..12bb6937748 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -80,38 +80,19 @@ upcall_call_shim_on_rust_stack upcall_new_stack upcall_del_stack upcall_reset_stack_limit -rust_uv_default_loop rust_uv_loop_new -rust_uv_loop_delete +rust_uv_loop_set_data +rust_uv_bind_op_cb +rust_uv_stop_op_cb rust_uv_run -rust_uv_unref -rust_uv_idle_init -rust_uv_idle_start -rust_uv_size_of_idle_t -rust_uvtmp_create_thread -rust_uvtmp_start_thread -rust_uvtmp_join_thread -rust_uvtmp_delete_thread -rust_uvtmp_connect -rust_uvtmp_close_connection -rust_uvtmp_write -rust_uvtmp_read_start -rust_uvtmp_timer -rust_uvtmp_delete_buf -rust_uvtmp_get_req_id -rust_uvtmp_uv_loop_new -rust_uvtmp_uv_loop_set_data -rust_uvtmp_uv_bind_op_cb -rust_uvtmp_uv_stop_op_cb -rust_uvtmp_uv_run -rust_uvtmp_uv_close -rust_uvtmp_uv_close_async -rust_uvtmp_uv_close_timer -rust_uvtmp_uv_async_send -rust_uvtmp_uv_async_init -rust_uvtmp_uv_timer_init -rust_uvtmp_uv_timer_start -rust_uvtmp_uv_timer_stop +rust_uv_close +rust_uv_close_async +rust_uv_close_timer +rust_uv_async_send +rust_uv_async_init +rust_uv_timer_init +rust_uv_timer_start +rust_uv_timer_stop rust_dbg_lock_create rust_dbg_lock_destroy rust_dbg_lock_lock