From 9ad67e8c1454f65331d28f9a371f7742ec675093 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Mon, 26 Mar 2012 22:51:18 -0700 Subject: [PATCH] test_uv_tcp_request() fully working on linux .. up next: windows! .. impl'd uv::direct::read_stop() and uv::direct::close() to wrap things up .. demonstrated sending data out of the uv_read_cb via a channel (which we block on to recv all of it, complete w/ EOF notification) that is read from after the loop exits. .. helpers to read the guts of a uv_buf_t .. an idea im kicking around: starting to pile up all of these hideous data accessor functions in uv::direct .. I might make impl/iface pairs for the various uv_* types that I'm using, in order to encapsulate those data access functions and, perhaps, make the access look a little cleaner (it still won't be straight field access, but it'll be a lot better) .. formatting cleanup to satisfy make check --- src/libstd/uv.rs | 84 +++++++++++++++++++++++++++++++++++----------- src/rt/rust_uv.cpp | 5 +++ 2 files changed, 70 insertions(+), 19 deletions(-) diff --git a/src/libstd/uv.rs b/src/libstd/uv.rs index fe22df30ae1..1539fbb392b 100644 --- a/src/libstd/uv.rs +++ b/src/libstd/uv.rs @@ -82,7 +82,7 @@ type uv_handle_fields = { loop_handle: *libc::c_void, type_: handle_type, close_cb: *u8, - mutable data: *libc::c_void, + mut data: *libc::c_void, }; // unix size: 8 @@ -124,7 +124,7 @@ type uv_tcp_t = { fn gen_stub_uv_tcp_t() -> uv_tcp_t { ret { fields: { loop_handle: ptr::null(), type_: 0u32, close_cb: ptr::null(), - mutable data: ptr::null() }, + mut data: ptr::null() }, a00: 0 as *u8, a01: 0 as *u8, a02: 0 as *u8, a03: 0 as *u8, a04: 0 as *u8, a05: 0 as *u8, a06: 0 as *u8, a07: 0 as *u8, a08: 0 as *u8, a09: 0 as *u8, a10: 0 as *u8, a11: 0 as *u8, @@ -157,13 +157,13 @@ type uv_connect_t = { #[cfg(target_os = "macos")] #[cfg(target_os = "freebsd")] fn gen_stub_uv_connect_t() -> uv_connect_t { - ret { + ret { a00: 0 as *u8, a01: 0 as *u8, a02: 0 as *u8, a03: 0 as *u8, a04: 0 as *u8, a05: 0 as *u8 }; } -// ref #1402 .. don't use this, like sockaddr_in +// ref #1402 .. don't use this, like sockaddr_in // unix size: 16 #[cfg(target_os = "linux")] #[cfg(target_os = "macos")] @@ -201,7 +201,7 @@ type uv_write_t = { fn gen_stub_uv_write_t() -> uv_write_t { ret { fields: { loop_handle: ptr::null(), type_: 0u32, close_cb: ptr::null(), - mutable data: ptr::null() }, + mut data: ptr::null() }, a00: 0 as *u8, a01: 0 as *u8, a02: 0 as *u8, a03: 0 as *u8, a04: 0 as *u8, a05: 0 as *u8, a06: 0 as *u8, a07: 0 as *u8, a08: 0 as *u8, a09: 0 as *u8, a10: 0 as *u8, a11: 0 as *u8, @@ -287,6 +287,7 @@ native mod rustrt { cb: *u8) -> libc::c_int; fn rust_uv_read_start(stream: *libc::c_void, on_alloc: *u8, on_read: *u8) -> libc::c_int; + fn rust_uv_read_stop(stream: *libc::c_void) -> libc::c_int; fn rust_uv_malloc_buf_base_of(sug_size: libc::size_t) -> *u8; fn rust_uv_free_base_of_buf(++buf: uv_buf_t); @@ -314,6 +315,8 @@ native mod rustrt { fn rust_uv_get_data_for_req(req: *libc::c_void) -> *libc::c_void; fn rust_uv_set_data_for_req(req: *libc::c_void, data: *libc::c_void); + fn rust_uv_get_base_from_buf(++buf: uv_buf_t) -> *u8; + fn rust_uv_get_len_from_buf(++buf: uv_buf_t) -> libc::size_t; } // this module is structured around functions that directly @@ -332,6 +335,10 @@ mod direct { rustrt::rust_uv_run(loop_handle); } + unsafe fn close(handle: *libc::c_void, cb: *u8) { + rustrt::rust_uv_close(handle, cb); + } + unsafe fn tcp_init(loop_handle: *libc::c_void, handle: *uv_tcp_t) -> libc::c_int { ret rustrt::rust_uv_tcp_init(loop_handle, handle); @@ -341,7 +348,8 @@ mod direct { address: sockaddr_in, after_connect_cb: *u8) -> libc::c_int { - io::println(#fmt("before native tcp_connect -- addr port: %u", address.sin_port as uint)); + io::println(#fmt("b4 native tcp_connect--addr port: %u", + address.sin_port as uint)); ret rustrt::rust_uv_tcp_connect(connect_ptr, tcp_handle_ptr, address, after_connect_cb); } @@ -361,6 +369,10 @@ mod direct { on_alloc, on_read); } + unsafe fn read_stop(stream: *uv_stream_t) -> libc::c_int { + ret rustrt::rust_uv_read_stop(stream as *libc::c_void); + } + unsafe fn uv_last_error(loop_handle: *libc::c_void) -> uv_err_t { ret rustrt::rust_uv_last_error(loop_handle); } @@ -391,6 +403,13 @@ mod direct { write_req); } + unsafe fn get_data_for_uv_handle(handle: *libc::c_void) -> *libc::c_void { + ret rustrt::rust_uv_get_data_for_uv_handle(handle); + } + unsafe fn set_data_for_uv_handle(handle: *libc::c_void, + data: *libc::c_void) { + rustrt::rust_uv_set_data_for_uv_handle(handle, data); + } unsafe fn get_data_for_req(req: *libc::c_void) -> *libc::c_void { ret rustrt::rust_uv_get_data_for_req(req); } @@ -398,7 +417,12 @@ mod direct { data: *libc::c_void) { rustrt::rust_uv_set_data_for_req(req, data); } - // TODO: see github issue #1402 + unsafe fn get_base_from_buf(buf: uv_buf_t) -> *u8 { + ret rustrt::rust_uv_get_base_from_buf(buf); + } + unsafe fn get_len_from_buf(buf: uv_buf_t) -> libc::size_t { + ret rustrt::rust_uv_get_len_from_buf(buf); + } unsafe fn buf_init(input: *u8, len: uint) -> uv_buf_t { ret rustrt::rust_uv_buf_init(input, len); } @@ -408,7 +432,8 @@ mod direct { addr_vec += [0u8]; // add null terminator let addr_vec_ptr = vec::unsafe::to_ptr(addr_vec); let ip_back = str::from_bytes(addr_vec); - io::println(#fmt("vec val: '%s' length: %u",ip_back, vec::len(addr_vec))); + io::println(#fmt("vec val: '%s' length: %u", + ip_back, vec::len(addr_vec))); ret rustrt::rust_uv_ip4_addr(addr_vec_ptr, port as libc::c_int); } @@ -982,6 +1007,10 @@ type request_wrapper = { read_chan: comm::chan }; +crust fn after_close_cb(handle: *libc::c_void) { + io::println("after uv_close!"); +} + crust fn on_alloc_cb(handle: *libc::c_void, suggested_size: libc::size_t) -> uv_buf_t unsafe { @@ -990,12 +1019,22 @@ crust fn on_alloc_cb(handle: *libc::c_void, ret direct::buf_init(char_ptr, suggested_size); } -// do I need the explicit copy on the buf param? crust fn on_read_cb(stream: *uv_stream_t, nread: libc::ssize_t, ++buf: uv_buf_t) unsafe { if (nread > 0) { // we have data io::println(#fmt("read: data! nread: %d", nread)); + direct::read_stop(stream); + let client_data = direct:: + get_data_for_uv_handle(stream as *libc::c_void) + as *request_wrapper; + let buf_base = direct::get_base_from_buf(buf); + let buf_len = direct::get_len_from_buf(buf); + let bytes = vec::unsafe::from_buf(buf_base, buf_len); + let read_chan = (*client_data).read_chan; + comm::send(read_chan, tcp_read_more(bytes)); + comm::send(read_chan, tcp_read_eof); + direct::close(stream as *libc::c_void, after_close_cb) } else if (nread == -1) { // err .. possibly EOF @@ -1015,10 +1054,11 @@ crust fn on_write_complete_cb(write_req: *uv_write_t, io::println(#fmt("beginning on_write_complete_cb status: %d", status as int)); let stream = direct::get_stream_handle_from_write_req(write_req); - io::println(#fmt("on_write_complete_cb: tcp stream: %d write_handle addr %d", + io::println(#fmt("on_write_complete_cb: tcp:%d write_handle:%d", stream as int, write_req as int)); let result = direct::read_start(stream, on_alloc_cb, on_read_cb); - io::println(#fmt("ending on_write_complete_cb .. uv_read_start status: %d", result as int)); + io::println(#fmt("ending on_write_complete_cb .. status: %d", + result as int)); } crust fn on_connect_cb(connect_req_ptr: *uv_connect_t, @@ -1078,19 +1118,19 @@ fn impl_uv_tcp_request() unsafe { let client_data = { writer_handle: write_handle_ptr, req_buf: ptr::addr_of(req_msg), read_chan: read_chan }; - + let tcp_init_result = direct::tcp_init( test_loop as *libc::c_void, tcp_handle_ptr); if (tcp_init_result == 0i32) { io::println("sucessful tcp_init_result"); - + io::println("building addr..."); let addr = direct::ip4_addr("173.194.33.40", 80); io::println(#fmt("after build addr in rust. port: %u", addr.sin_port as uint)); //let addr: *libc::c_void = ptr::addr_of(addr_val) as // *libc::c_void; - + // this should set up the connection request.. let tcp_connect_result = direct::tcp_connect( connect_req_ptr, tcp_handle_ptr, @@ -1101,6 +1141,9 @@ fn impl_uv_tcp_request() unsafe { direct::set_data_for_req( connect_req_ptr as *libc::c_void, ptr::addr_of(client_data) as *libc::c_void); + direct::set_data_for_uv_handle( + tcp_handle_ptr as *libc::c_void, + ptr::addr_of(client_data) as *libc::c_void); io::println("before run tcp req loop"); direct::run(test_loop); io::println("after run tcp req loop"); @@ -1120,10 +1163,11 @@ fn impl_uv_tcp_request() unsafe { // the stubbed out vec above // with our initial set of read // data + read_bytes = new_bytes; } else { // otherwise append - read_bytes = new_bytes; + read_bytes += new_bytes; } } _ { @@ -1131,10 +1175,12 @@ fn impl_uv_tcp_request() unsafe { } } } - io::println("finished reading data"); + io::println("finished reading data, output to follow:"); let read_str = str::from_bytes(read_bytes); - - + + io::println(read_str); + io::println(">>>>EOF<<<<"); + direct::loop_delete(test_loop); } else { io::println("direct::tcp_connect() failure"); @@ -1145,7 +1191,7 @@ fn impl_uv_tcp_request() unsafe { io::println("direct::tcp_init() failure"); assert false; } - + } // START HERE AND WORK YOUR WAY UP VIA CALLBACKS #[test] diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index f753c55c69e..90832f7c3cd 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -322,6 +322,11 @@ rust_uv_read_start(uv_stream_t* stream, uv_alloc_cb on_alloc, return uv_read_start(stream, on_alloc, on_read); } +extern "C" int +rust_uv_read_stop(uv_stream_t* stream) { + return uv_read_stop(stream); +} + extern "C" char* rust_uv_malloc_buf_base_of(size_t suggested_size) { return (char*) current_kernel_malloc(sizeof(char)*suggested_size,