removed hello world and added uv_async_*

This commit is contained in:
Jeff Olson 2012-02-17 23:11:02 -08:00 committed by Brian Anderson
parent ffad8d7f0c
commit 974c23cbeb
2 changed files with 154 additions and 42 deletions

View File

@ -2,20 +2,25 @@
// UV2
enum uv_operation {
op_hw()
op_async_init([u8])
}
type uv_async = {
id: [u8],
loop: uv_loop
};
enum uv_msg {
// requests from library users
msg_run(comm::chan<bool>),
msg_run_in_bg(),
msg_loop_delete(),
msg_async_init([u8], fn~()),
msg_async_init(fn~(uv_async), fn~(uv_async)),
msg_async_send([u8]),
msg_hw(),
// dispatches from libuv
uv_hw()
uv_async_init([u8], *ctypes::void),
uv_async_send([u8])
}
type uv_loop_data = {
@ -25,10 +30,6 @@ type uv_loop_data = {
type uv_loop = comm::chan<uv_msg>;
enum uv_handle {
handle([u8], *ctypes::void)
}
#[nolink]
native mod rustrt {
fn rust_uvtmp_create_thread() -> thread;
@ -66,10 +67,14 @@ native mod rustrt {
fn rust_uvtmp_uv_bind_op_cb(loop: *ctypes::void, cb: *u8) -> *ctypes::void;
fn rust_uvtmp_uv_run(loop_handle: *ctypes::void);
fn rust_uvtmp_uv_async_send(handle: *ctypes::void);
fn rust_uvtmp_uv_async_init(
loop_handle: *ctypes::void,
cb: *u8,
id: *u8) -> *ctypes::void;
}
mod uv {
export loop_new, run, run_in_bg, hw;
export loop_new, run, run_in_bg, async_init, async_send;
// public functions
fn loop_new() -> uv_loop unsafe {
@ -78,7 +83,9 @@ mod uv {
let ret_recv_chan: comm::chan<uv_loop> =
comm::chan(ret_recv_port);
task::spawn_sched(3u) {||
let num_threads = 4u; // would be cool to tie this to
// the number of logical procs
task::spawn_sched(num_threads) {||
// our beloved uv_loop_t ptr
let loop_handle = rustrt::
rust_uvtmp_uv_loop_new();
@ -115,12 +122,17 @@ mod uv {
// to libuv, this will be
// in the process_operation
// crust fn
let async_handle = rustrt::rust_uvtmp_uv_bind_op_cb(
let op_handle = rustrt::rust_uvtmp_uv_bind_op_cb(
loop_handle,
process_operation);
// all state goes here
let handles: map::map<[u8], uv_handle> =
let handles: map::map<[u8], *ctypes::void> =
map::new_bytes_hash();
let async_cbs: map::map<[u8], fn~(uv_async)> =
map::new_bytes_hash();
let async_init_after_cbs: map::map<[u8],
fn~(uv_async)> =
map::new_bytes_hash();
// the main loop that this task blocks on.
@ -143,36 +155,51 @@ mod uv {
comm::send(end_chan, true);
};
}
msg_run_in_bg {
task::spawn_sched(1u) {||
// this call blocks
rustrt::rust_uvtmp_uv_run(loop_handle);
};
}
msg_hw() {
comm::send(operation_chan, op_hw);
io::println("CALLING ASYNC_SEND FOR HW");
rustrt::rust_uvtmp_uv_async_send(async_handle);
}
uv_hw() {
io::println("HELLO WORLD!!!");
}
////// STUBS ///////
msg_loop_delete {
// delete the event loop's c ptr
// this will of course stop any
// further processing
}
msg_async_init(id, callback) {
msg_async_init(callback, after_cb) {
// create a new async handle
// with the id as the handle's
// data and save the callback for
// invocation on msg_async_send
let id = gen_handle_id();
async_cbs.insert(id, callback);
async_init_after_cbs.insert(id, after_cb);
let op = op_async_init(id);
comm::send(operation_chan, op);
rustrt::rust_uvtmp_uv_async_send(op_handle);
io::println("MSG_ASYNC_INIT");
}
uv_async_init(id, async_handle) {
// libuv created a handle, which is
// passed back to us. save it and
// then invoke the supplied callback
// for after completion
handles.insert(id, async_handle);
let after_cb = async_init_after_cbs.get(id);
async_init_after_cbs.remove(id);
task::spawn {||
let async: uv_async = {
id: id,
loop: rust_loop_chan
};
after_cb(async);
};
}
msg_async_send(id) {
// get the callback matching the
// supplied id and invoke it
let async_handle = handles.get(id);
rustrt::rust_uvtmp_uv_async_send(async_handle);
}
uv_async_send(id) {
let async_cb = async_cbs.get(id);
async_cb({id: id, loop: rust_loop_chan});
}
_ { fail "unknown form of uv_msg received"; }
@ -193,37 +220,88 @@ mod uv {
comm::send(loop, msg_run_in_bg);
}
fn hw(loop: uv_loop) {
comm::send(loop, msg_hw);
fn async_init (
loop: uv_loop,
async_cb: fn~(uv_async),
after_cb: fn~(uv_async)) {
let msg = msg_async_init(async_cb, after_cb);
comm::send(loop, msg);
}
fn async_send(async: uv_async) {
comm::send(async.loop, msg_async_send(async.id));
}
// internal functions
fn gen_handle_id() -> [u8] {
ret rand::mk_rng().gen_bytes(16u);
}
fn get_handle_id_from(buf: *u8) -> [u8] unsafe {
ret vec::unsafe::from_buf(buf, 16u);
}
fn get_loop_chan_from(data: *uv_loop_data)
-> comm::chan<uv_msg> unsafe {
ret (*data).rust_loop_chan;
}
// crust
crust fn process_operation(data: *uv_loop_data) unsafe {
crust fn process_operation(
loop: *ctypes::void,
data: *uv_loop_data) unsafe {
io::println("IN PROCESS_OPERATION");
let op_port = (*data).operation_port;
let loop_chan = (*data).rust_loop_chan;
let loop_chan = get_loop_chan_from(data);
let op_pending = comm::peek(op_port);
while(op_pending) {
io::println("OPERATION PENDING!");
alt comm::recv(op_port) {
op_hw() {
io::println("GOT OP_HW IN CRUST");
comm::send(loop_chan, uv_hw);
op_async_init(id) {
io::println("OP_ASYNC_INIT");
let id_ptr = vec::unsafe::to_ptr(id);
let async_handle = rustrt::rust_uvtmp_uv_async_init(
loop,
process_async_send,
id_ptr);
comm::send(loop_chan, uv_async_init(
id,
async_handle));
}
_ { fail "unknown form of uv_operation received"; }
}
op_pending = comm::peek(op_port);
}
io::println("NO MORE OPERATIONS PENDING!");
}
crust fn process_async_send(id_buf: *u8, data: *uv_loop_data)
unsafe {
let handle_id = get_handle_id_from(id_buf);
let loop_chan = get_loop_chan_from(data);
comm::send(loop_chan, uv_async_send(handle_id));
}
}
#[test]
fn uvtmp_uv_test_hello_world() {
fn test_uvtmp_uv_new_loop_no_handles() {
let test_loop = uv::loop_new();
uv::hw(test_loop);
uv::run(test_loop); // this should return immediately
// since there aren't any handles..
}
#[test]
fn test_uvtmp_uv_simple_async() {
let test_loop = uv::loop_new();
let cb: fn~(uv_async) = fn~(h: uv_async) {
io::println("HELLO FROM ASYNC CALLBACK!");
};
uv::async_init(test_loop, cb) {|new_async|
io::println("NEW_ASYNC CREATED!");
uv::async_send(new_async);
};
uv::run(test_loop);
}

View File

@ -57,6 +57,9 @@ struct timer_start_data {
// UVTMP REWORK
typedef void (*async_op_cb)(uv_loop_t* loop, void* data);
typedef void (*rust_async_cb)(uint8_t* id_buf, void* loop_data);
static void*
current_kernel_malloc(size_t size, const char* tag) {
return rust_task_thread::get_task()->malloc(size, tag);
@ -68,6 +71,11 @@ current_kernel_free(void* ptr) {
rust_task_thread::get_task()->free(ptr);
}
*/
#define RUST_UV_HANDLE_LEN 16
struct async_data {
uint8_t id_buf[RUST_UV_HANDLE_LEN];
rust_async_cb cb;
};
extern "C" void*
rust_uvtmp_uv_loop_new() {
@ -79,11 +87,11 @@ rust_uvtmp_uv_loop_set_data(uv_loop_t* loop, void* data) {
loop->data = data;
}
typedef void (*async_op_cb)(void* data);
void native_async_op_cb(uv_async_t* handle, int status) {
static void
native_async_op_cb(uv_async_t* handle, int status) {
async_op_cb cb = (async_op_cb)handle->data;
void* loop_data = handle->loop->data;
cb(loop_data);
cb(handle->loop, loop_data);
}
extern "C" void*
@ -92,6 +100,8 @@ rust_uvtmp_uv_bind_op_cb(uv_loop_t* loop, async_op_cb cb) {
sizeof(uv_async_t),
"uv_async_t");
uv_async_init(loop, async, native_async_op_cb);
// decrement the ref count, so that our async bind
// does count towards keeping the loop alive
async->data = (void*)cb;
return async;
}
@ -105,6 +115,30 @@ rust_uvtmp_uv_async_send(uv_async_t* handle) {
uv_async_send(handle);
}
static void
native_async_cb(uv_async_t* handle, int status) {
async_data* handle_data = (async_data*)handle->data;
void* loop_data = handle->loop->data;
handle_data->cb(handle_data->id_buf, loop_data);
}
extern "C" void*
rust_uvtmp_uv_async_init(uv_loop_t* loop, rust_async_cb cb,
uint8_t* buf) {
uv_async_t* async = (uv_async_t*)current_kernel_malloc(
sizeof(uv_async_t),
"uv_async_t");
uv_async_init(loop, async, native_async_cb);
async_data* data = (async_data*)current_kernel_malloc(
sizeof(async_data),
"async_data");
memcpy(data->id_buf, buf, RUST_UV_HANDLE_LEN);
data->cb = cb;
async->data = data;
return async;
}
// UVTMP REWORK
// FIXME: Copied from rust_builtins.cpp. Could bitrot easily