From 1d3e08d8c6248c4e8c668bf53ff0a308873da31d Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Tue, 21 Feb 2012 11:29:36 -0800 Subject: [PATCH] finishing up simple uv_timer impl as it stands, basic async nad timer support is added --- src/libstd/uvtmp.rs | 182 +++++++++++++++++++++++++++++++++++++----- src/rt/rust_uvtmp.cpp | 59 ++++++++++++-- 2 files changed, 211 insertions(+), 30 deletions(-) diff --git a/src/libstd/uvtmp.rs b/src/libstd/uvtmp.rs index 75e962bd331..123cd8ac2a8 100644 --- a/src/libstd/uvtmp.rs +++ b/src/libstd/uvtmp.rs @@ -6,11 +6,15 @@ // process_operation() crust fn below enum uv_operation { op_async_init([u8]), - op_close(uv_handle, *ctypes::void) + op_close(uv_handle, *ctypes::void), + op_timer_init([u8]), + op_timer_start([u8], *ctypes::void, u32, u32), + op_timer_stop([u8], *ctypes::void, fn~(uv_handle)) } enum uv_handle { - uv_async([u8], uv_loop) + uv_async([u8], uv_loop), + uv_timer([u8], uv_loop) } enum uv_msg { @@ -20,11 +24,17 @@ enum uv_msg { msg_async_init(fn~(uv_handle), fn~(uv_handle)), msg_async_send([u8]), msg_close(uv_handle, fn~()), + msg_timer_init(fn~(uv_handle)), + msg_timer_start([u8], u32, u32, fn~(uv_handle)), + msg_timer_stop([u8], fn~(uv_handle)), // dispatches from libuv uv_async_init([u8], *ctypes::void), uv_async_send([u8]), uv_close([u8]), + uv_timer_init([u8], *ctypes::void), + uv_timer_call([u8]), + uv_timer_stop([u8], fn~(uv_handle)), uv_end() } @@ -74,16 +84,26 @@ native mod rustrt { fn rust_uvtmp_uv_run(loop_handle: *ctypes::void); fn rust_uvtmp_uv_close(handle: *ctypes::void, cb: *u8); fn rust_uvtmp_uv_close_async(handle: *ctypes::void); + fn rust_uvtmp_uv_close_timer(handle: *ctypes::void); fn rust_uvtmp_uv_async_send(handle: *ctypes::void); fn rust_uvtmp_uv_async_init( loop_handle: *ctypes::void, cb: *u8, id: *u8) -> *ctypes::void; + fn rust_uvtmp_uv_timer_init( + loop_handle: *ctypes::void, + cb: *u8, + id: *u8) -> *ctypes::void; + fn rust_uvtmp_uv_timer_start( + timer_handle: *ctypes::void, + timeout: ctypes::c_uint, + repeat: ctypes::c_uint); + fn rust_uvtmp_uv_timer_stop(handle: *ctypes::void); } mod uv { export loop_new, run, close, run_in_bg, async_init, async_send, - timer_init; + timer_init, timer_start, timer_stop; // public functions fn loop_new() -> uv_loop unsafe { @@ -92,9 +112,7 @@ mod uv { let ret_recv_chan: comm::chan = comm::chan(ret_recv_port); - let num_threads = 4u; // would be cool to tie this to - // the number of logical procs - task::spawn_sched(num_threads) {|| + task::spawn_sched(task::manual_threads(4u)) {|| // our beloved uv_loop_t ptr let loop_handle = rustrt:: rust_uvtmp_uv_loop_new(); @@ -140,13 +158,15 @@ mod uv { map::new_bytes_hash(); let id_to_handle: map::map<[u8], uv_handle> = map::new_bytes_hash(); - let async_cbs: map::map<[u8], fn~(uv_handle)> = - map::new_bytes_hash(); - let async_init_after_cbs: map::map<[u8], - fn~(uv_handle)> = + let after_cbs: map::map<[u8], fn~(uv_handle)> = map::new_bytes_hash(); let close_callbacks: map::map<[u8], fn~()> = map::new_bytes_hash(); + + let async_cbs: map::map<[u8], fn~(uv_handle)> = + map::new_bytes_hash(); + let timer_cbs: map::map<[u8], fn~(uv_handle)> = + map::new_bytes_hash(); // the main loop that this task blocks on. // should have the same lifetime as the C libuv @@ -160,7 +180,7 @@ mod uv { // the operation handle to have the // loop process any pending operations // once its up and running - task::spawn_sched(1u) {|| + task::spawn_sched(task::manual_threads(1u)) {|| // this call blocks rustrt::rust_uvtmp_uv_run(loop_handle); // when we're done, msg the @@ -172,7 +192,7 @@ mod uv { } msg_run_in_bg { - task::spawn_sched(1u) {|| + task::spawn_sched(task::manual_threads(1u)) {|| // this call blocks rustrt::rust_uvtmp_uv_run(loop_handle); }; @@ -194,6 +214,9 @@ mod uv { uv_async(id, _) { async_cbs.remove(id); } + uv_timer(id, _) { + timer_cbs.remove(id); + } _ { fail "unknown form of uv_handle encountered " + "in uv_close handler"; @@ -213,7 +236,7 @@ mod uv { // invocation on msg_async_send let id = gen_handle_id(); async_cbs.insert(id, callback); - async_init_after_cbs.insert(id, after_cb); + after_cbs.insert(id, after_cb); let op = op_async_init(id); pass_to_libuv(op_handle, operation_chan, op); } @@ -223,8 +246,8 @@ mod uv { // then invoke the supplied callback // for after completion handles.insert(id, async_handle); - let after_cb = async_init_after_cbs.get(id); - async_init_after_cbs.remove(id); + let after_cb = after_cbs.get(id); + after_cbs.remove(id); let async = uv_async(id, rust_loop_chan); id_to_handle.insert(id, copy(async)); task::spawn {|| @@ -242,6 +265,50 @@ mod uv { async_cb(uv_async(id, rust_loop_chan)); }; } + + msg_timer_init(after_cb) { + let id = gen_handle_id(); + after_cbs.insert(id, after_cb); + let op = op_timer_init(id); + pass_to_libuv(op_handle, operation_chan, op); + } + uv_timer_init(id, handle) { + handles.insert(id, handle); + let after_cb = after_cbs.get(id); + after_cbs.remove(id); + let new_timer = uv_timer(id, rust_loop_chan); + id_to_handle.insert(id, copy(new_timer)); + task::spawn {|| + after_cb(new_timer); + }; + } + + uv_timer_call(id) { + let cb = timer_cbs.get(id); + let the_timer = id_to_handle.get(id); + task::spawn {|| + cb(the_timer); + }; + } + + msg_timer_start(id, timeout, repeat, timer_call_cb) { + timer_cbs.insert(id, timer_call_cb); + let handle = handles.get(id); + let op = op_timer_start(id, handle, timeout, + repeat); + pass_to_libuv(op_handle, operation_chan, op); + } + + msg_timer_stop(id, after_cb) { + let handle = handles.get(id); + let op = op_timer_stop(id, handle, after_cb); + pass_to_libuv(op_handle, operation_chan, op); + } + uv_timer_stop(id, after_cb) { + let the_timer = id_to_handle.get(id); + after_cb(the_timer); + } + uv_end() { keep_going = false; } @@ -294,6 +361,33 @@ mod uv { comm::send(loop, msg); } + fn timer_start(the_timer: uv_handle, timeout: u32, repeat:u32, + timer_cb: fn~(uv_handle)) { + alt the_timer { + uv_timer(id, loop_chan) { + let msg = msg_timer_start(id, timeout, repeat, timer_cb); + comm::send(loop_chan, msg); + } + _ { + fail "can only pass a uv_timer form of uv_handle to "+ + " uv::timer_start()"; + } + } + } + + fn timer_stop(the_timer: uv_handle, after_cb: fn~(uv_handle)) { + alt the_timer { + uv_timer(id, loop_chan) { + let msg = msg_timer_stop(id, after_cb); + comm::send(loop_chan, msg); + } + _ { + fail "only uv_timer form is allowed in calls to "+ + " uv::timer_stop()"; + } + } + } + // internal functions fn pass_to_libuv( op_handle: *ctypes::void, @@ -320,7 +414,7 @@ mod uv { fn get_loop_chan_from_handle(handle: uv_handle) -> uv_loop { alt handle { - uv_async(id,loop) { + uv_async(id,loop) | uv_timer(id,loop) { ret loop; } _ { @@ -332,7 +426,7 @@ mod uv { fn get_id_from_handle(handle: uv_handle) -> [u8] { alt handle { - uv_async(id,loop) { + uv_async(id,loop) | uv_timer(id,loop) { ret id; } _ { @@ -363,6 +457,24 @@ mod uv { op_close(handle, handle_ptr) { handle_op_close(handle, handle_ptr); } + op_timer_init(id) { + let id_ptr = vec::unsafe::to_ptr(id); + let timer_handle = rustrt::rust_uvtmp_uv_timer_init( + loop, + process_timer_call, + id_ptr); + comm::send(loop_chan, uv_timer_init( + id, + timer_handle)); + } + op_timer_start(id, handle, timeout, repeat) { + rustrt::rust_uvtmp_uv_timer_start(handle, timeout, + repeat); + } + op_timer_stop(id, handle, after_cb) { + rustrt::rust_uvtmp_uv_timer_stop(handle); + comm::send(loop_chan, uv_timer_stop(id, after_cb)); + } _ { fail "unknown form of uv_operation received"; } } @@ -378,6 +490,11 @@ mod uv { rustrt::rust_uvtmp_uv_close( handle_ptr, cb); } + uv_timer(id, loop) { + let cb = process_close_timer; + rustrt::rust_uvtmp_uv_close( + handle_ptr, cb); + } _ { fail "unknown form of uv_handle encountered " + "in process_operation/op_close"; @@ -386,12 +503,19 @@ mod uv { } crust fn process_async_send(id_buf: *u8, data: *uv_loop_data) - unsafe { + unsafe { let handle_id = get_handle_id_from(id_buf); let loop_chan = get_loop_chan_from_data(data); comm::send(loop_chan, uv_async_send(handle_id)); } + crust fn process_timer_call(id_buf: *u8, data: *uv_loop_data) + unsafe { + let handle_id = get_handle_id_from(id_buf); + let loop_chan = get_loop_chan_from_data(data); + comm::send(loop_chan, uv_timer_call(handle_id)); + } + fn process_close_common(id: [u8], data: *uv_loop_data) unsafe { // notify the rust loop that their handle is closed, then @@ -414,6 +538,16 @@ mod uv { // close cb process_close_common(id, data); } + + crust fn process_close_timer( + id_buf: *u8, + handle_ptr: *ctypes::void, + data: *uv_loop_data) + unsafe { + let id = get_handle_id_from(id_buf); + rustrt::rust_uvtmp_uv_close_timer(handle_ptr); + process_close_common(id, data); + } } @@ -446,11 +580,15 @@ fn test_uvtmp_uv_timer() { let test_loop = uv::loop_new(); let exit_port = comm::port::(); let exit_chan = comm::chan::(exit_port); - uv::timer(test_loop, {|new_timer| - uv::timer_start(new_async) {|| - comm::send(exit_chan, true); + uv::timer_init(test_loop) {|new_timer| + uv::timer_start(new_timer, 1u32, 0u32) {|started_timer| + uv::timer_stop(started_timer) {|stopped_timer| + uv::close(stopped_timer) {|| + comm::send(exit_chan, true); + }; + }; }; - }); + }; uv::run(test_loop); assert comm::recv(exit_port); } diff --git a/src/rt/rust_uvtmp.cpp b/src/rt/rust_uvtmp.cpp index f97312eaf44..697231dff1b 100644 --- a/src/rt/rust_uvtmp.cpp +++ b/src/rt/rust_uvtmp.cpp @@ -59,7 +59,7 @@ struct timer_start_data { // crust fn pointers typedef void (*crust_async_op_cb)(uv_loop_t* loop, void* data); -typedef void (*crust_async_cb)(uint8_t* id_buf, void* loop_data); +typedef void (*crust_simple_cb)(uint8_t* id_buf, void* loop_data); typedef void (*crust_close_cb)(uint8_t* id_buf, void* handle, void* data); @@ -68,7 +68,7 @@ typedef void (*crust_close_cb)(uint8_t* id_buf, void* handle, struct handle_data { uint8_t id_buf[RUST_UV_HANDLE_LEN]; - crust_async_cb cb; + crust_simple_cb cb; crust_close_cb close_cb; }; @@ -84,6 +84,16 @@ current_kernel_free(void* ptr) { rust_task_thread::get_task()->kernel->free(ptr); } +static handle_data* +new_handle_data_from(uint8_t* buf, crust_simple_cb cb) { + handle_data* data = (handle_data*)current_kernel_malloc( + sizeof(handle_data), + "handle_data"); + memcpy(data->id_buf, buf, RUST_UV_HANDLE_LEN); + data->cb = cb; + return data; +} + // libuv callback impls static void native_crust_async_op_cb(uv_async_t* handle, int status) { @@ -99,6 +109,13 @@ native_async_cb(uv_async_t* handle, int status) { handle_d->cb(handle_d->id_buf, loop_data); } +static void +native_timer_cb(uv_timer_t* handle, int status) { + handle_data* handle_d = (handle_data*)handle->data; + void* loop_data = handle->loop->data; + handle_d->cb(handle_d->id_buf, loop_data); +} + static void native_close_cb(uv_handle_t* handle) { handle_data* data = (handle_data*)handle->data; @@ -172,28 +189,54 @@ rust_uvtmp_uv_close_async(uv_async_t* handle) { current_kernel_free(handle); } +extern "C" void +rust_uvtmp_uv_close_timer(uv_async_t* handle) { + current_kernel_free(handle->data); + current_kernel_free(handle); +} + extern "C" void rust_uvtmp_uv_async_send(uv_async_t* handle) { uv_async_send(handle); } extern "C" void* -rust_uvtmp_uv_async_init(uv_loop_t* loop, crust_async_cb cb, +rust_uvtmp_uv_async_init(uv_loop_t* loop, crust_simple_cb cb, uint8_t* buf) { uv_async_t* async = (uv_async_t*)current_kernel_malloc( sizeof(uv_async_t), "uv_async_t"); uv_async_init(loop, async, native_async_cb); - handle_data* data = (handle_data*)current_kernel_malloc( - sizeof(handle_data), - "handle_data"); - memcpy(data->id_buf, buf, RUST_UV_HANDLE_LEN); - data->cb = cb; + handle_data* data = new_handle_data_from(buf, cb); async->data = data; return async; } +extern "C" void* +rust_uvtmp_uv_timer_init(uv_loop_t* loop, crust_simple_cb cb, + uint8_t* buf) { + uv_timer_t* new_timer = (uv_timer_t*)current_kernel_malloc( + sizeof(uv_timer_t), + "uv_timer_t"); + uv_timer_init(loop, new_timer); + handle_data* data = new_handle_data_from(buf, cb); + new_timer->data = data; + + return new_timer; +} + +extern "C" void +rust_uvtmp_uv_timer_start(uv_timer_t* the_timer, uint32_t timeout, + uint32_t repeat) { + uv_timer_start(the_timer, native_timer_cb, timeout, repeat); +} + +extern "C" void +rust_uvtmp_uv_timer_stop(uv_timer_t* the_timer) { + uv_timer_stop(the_timer); +} + // UVTMP REWORK // FIXME: Copied from rust_builtins.cpp. Could bitrot easily