test_uv_tcp_request() fully working on linux
.. up next: windows! .. impl'd uv::direct::read_stop() and uv::direct::close() to wrap things up .. demonstrated sending data out of the uv_read_cb via a channel (which we block on to recv all of it, complete w/ EOF notification) that is read from after the loop exits. .. helpers to read the guts of a uv_buf_t .. an idea im kicking around: starting to pile up all of these hideous data accessor functions in uv::direct .. I might make impl/iface pairs for the various uv_* types that I'm using, in order to encapsulate those data access functions and, perhaps, make the access look a little cleaner (it still won't be straight field access, but it'll be a lot better) .. formatting cleanup to satisfy make check
This commit is contained in:
parent
877747d0ac
commit
9ad67e8c14
@ -82,7 +82,7 @@ type uv_handle_fields = {
|
|||||||
loop_handle: *libc::c_void,
|
loop_handle: *libc::c_void,
|
||||||
type_: handle_type,
|
type_: handle_type,
|
||||||
close_cb: *u8,
|
close_cb: *u8,
|
||||||
mutable data: *libc::c_void,
|
mut data: *libc::c_void,
|
||||||
};
|
};
|
||||||
|
|
||||||
// unix size: 8
|
// unix size: 8
|
||||||
@ -124,7 +124,7 @@ type uv_tcp_t = {
|
|||||||
fn gen_stub_uv_tcp_t() -> uv_tcp_t {
|
fn gen_stub_uv_tcp_t() -> uv_tcp_t {
|
||||||
ret { fields: { loop_handle: ptr::null(), type_: 0u32,
|
ret { fields: { loop_handle: ptr::null(), type_: 0u32,
|
||||||
close_cb: ptr::null(),
|
close_cb: ptr::null(),
|
||||||
mutable data: ptr::null() },
|
mut data: ptr::null() },
|
||||||
a00: 0 as *u8, a01: 0 as *u8, a02: 0 as *u8, a03: 0 as *u8,
|
a00: 0 as *u8, a01: 0 as *u8, a02: 0 as *u8, a03: 0 as *u8,
|
||||||
a04: 0 as *u8, a05: 0 as *u8, a06: 0 as *u8, a07: 0 as *u8,
|
a04: 0 as *u8, a05: 0 as *u8, a06: 0 as *u8, a07: 0 as *u8,
|
||||||
a08: 0 as *u8, a09: 0 as *u8, a10: 0 as *u8, a11: 0 as *u8,
|
a08: 0 as *u8, a09: 0 as *u8, a10: 0 as *u8, a11: 0 as *u8,
|
||||||
@ -201,7 +201,7 @@ type uv_write_t = {
|
|||||||
fn gen_stub_uv_write_t() -> uv_write_t {
|
fn gen_stub_uv_write_t() -> uv_write_t {
|
||||||
ret { fields: { loop_handle: ptr::null(), type_: 0u32,
|
ret { fields: { loop_handle: ptr::null(), type_: 0u32,
|
||||||
close_cb: ptr::null(),
|
close_cb: ptr::null(),
|
||||||
mutable data: ptr::null() },
|
mut data: ptr::null() },
|
||||||
a00: 0 as *u8, a01: 0 as *u8, a02: 0 as *u8, a03: 0 as *u8,
|
a00: 0 as *u8, a01: 0 as *u8, a02: 0 as *u8, a03: 0 as *u8,
|
||||||
a04: 0 as *u8, a05: 0 as *u8, a06: 0 as *u8, a07: 0 as *u8,
|
a04: 0 as *u8, a05: 0 as *u8, a06: 0 as *u8, a07: 0 as *u8,
|
||||||
a08: 0 as *u8, a09: 0 as *u8, a10: 0 as *u8, a11: 0 as *u8,
|
a08: 0 as *u8, a09: 0 as *u8, a10: 0 as *u8, a11: 0 as *u8,
|
||||||
@ -287,6 +287,7 @@ native mod rustrt {
|
|||||||
cb: *u8) -> libc::c_int;
|
cb: *u8) -> libc::c_int;
|
||||||
fn rust_uv_read_start(stream: *libc::c_void, on_alloc: *u8,
|
fn rust_uv_read_start(stream: *libc::c_void, on_alloc: *u8,
|
||||||
on_read: *u8) -> libc::c_int;
|
on_read: *u8) -> libc::c_int;
|
||||||
|
fn rust_uv_read_stop(stream: *libc::c_void) -> libc::c_int;
|
||||||
fn rust_uv_malloc_buf_base_of(sug_size: libc::size_t) -> *u8;
|
fn rust_uv_malloc_buf_base_of(sug_size: libc::size_t) -> *u8;
|
||||||
fn rust_uv_free_base_of_buf(++buf: uv_buf_t);
|
fn rust_uv_free_base_of_buf(++buf: uv_buf_t);
|
||||||
|
|
||||||
@ -314,6 +315,8 @@ native mod rustrt {
|
|||||||
fn rust_uv_get_data_for_req(req: *libc::c_void) -> *libc::c_void;
|
fn rust_uv_get_data_for_req(req: *libc::c_void) -> *libc::c_void;
|
||||||
fn rust_uv_set_data_for_req(req: *libc::c_void,
|
fn rust_uv_set_data_for_req(req: *libc::c_void,
|
||||||
data: *libc::c_void);
|
data: *libc::c_void);
|
||||||
|
fn rust_uv_get_base_from_buf(++buf: uv_buf_t) -> *u8;
|
||||||
|
fn rust_uv_get_len_from_buf(++buf: uv_buf_t) -> libc::size_t;
|
||||||
}
|
}
|
||||||
|
|
||||||
// this module is structured around functions that directly
|
// this module is structured around functions that directly
|
||||||
@ -332,6 +335,10 @@ mod direct {
|
|||||||
rustrt::rust_uv_run(loop_handle);
|
rustrt::rust_uv_run(loop_handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
unsafe fn close(handle: *libc::c_void, cb: *u8) {
|
||||||
|
rustrt::rust_uv_close(handle, cb);
|
||||||
|
}
|
||||||
|
|
||||||
unsafe fn tcp_init(loop_handle: *libc::c_void, handle: *uv_tcp_t)
|
unsafe fn tcp_init(loop_handle: *libc::c_void, handle: *uv_tcp_t)
|
||||||
-> libc::c_int {
|
-> libc::c_int {
|
||||||
ret rustrt::rust_uv_tcp_init(loop_handle, handle);
|
ret rustrt::rust_uv_tcp_init(loop_handle, handle);
|
||||||
@ -341,7 +348,8 @@ mod direct {
|
|||||||
address: sockaddr_in,
|
address: sockaddr_in,
|
||||||
after_connect_cb: *u8)
|
after_connect_cb: *u8)
|
||||||
-> libc::c_int {
|
-> libc::c_int {
|
||||||
io::println(#fmt("before native tcp_connect -- addr port: %u", address.sin_port as uint));
|
io::println(#fmt("b4 native tcp_connect--addr port: %u",
|
||||||
|
address.sin_port as uint));
|
||||||
ret rustrt::rust_uv_tcp_connect(connect_ptr, tcp_handle_ptr,
|
ret rustrt::rust_uv_tcp_connect(connect_ptr, tcp_handle_ptr,
|
||||||
address, after_connect_cb);
|
address, after_connect_cb);
|
||||||
}
|
}
|
||||||
@ -361,6 +369,10 @@ mod direct {
|
|||||||
on_alloc, on_read);
|
on_alloc, on_read);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
unsafe fn read_stop(stream: *uv_stream_t) -> libc::c_int {
|
||||||
|
ret rustrt::rust_uv_read_stop(stream as *libc::c_void);
|
||||||
|
}
|
||||||
|
|
||||||
unsafe fn uv_last_error(loop_handle: *libc::c_void) -> uv_err_t {
|
unsafe fn uv_last_error(loop_handle: *libc::c_void) -> uv_err_t {
|
||||||
ret rustrt::rust_uv_last_error(loop_handle);
|
ret rustrt::rust_uv_last_error(loop_handle);
|
||||||
}
|
}
|
||||||
@ -391,6 +403,13 @@ mod direct {
|
|||||||
write_req);
|
write_req);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
unsafe fn get_data_for_uv_handle(handle: *libc::c_void) -> *libc::c_void {
|
||||||
|
ret rustrt::rust_uv_get_data_for_uv_handle(handle);
|
||||||
|
}
|
||||||
|
unsafe fn set_data_for_uv_handle(handle: *libc::c_void,
|
||||||
|
data: *libc::c_void) {
|
||||||
|
rustrt::rust_uv_set_data_for_uv_handle(handle, data);
|
||||||
|
}
|
||||||
unsafe fn get_data_for_req(req: *libc::c_void) -> *libc::c_void {
|
unsafe fn get_data_for_req(req: *libc::c_void) -> *libc::c_void {
|
||||||
ret rustrt::rust_uv_get_data_for_req(req);
|
ret rustrt::rust_uv_get_data_for_req(req);
|
||||||
}
|
}
|
||||||
@ -398,7 +417,12 @@ mod direct {
|
|||||||
data: *libc::c_void) {
|
data: *libc::c_void) {
|
||||||
rustrt::rust_uv_set_data_for_req(req, data);
|
rustrt::rust_uv_set_data_for_req(req, data);
|
||||||
}
|
}
|
||||||
// TODO: see github issue #1402
|
unsafe fn get_base_from_buf(buf: uv_buf_t) -> *u8 {
|
||||||
|
ret rustrt::rust_uv_get_base_from_buf(buf);
|
||||||
|
}
|
||||||
|
unsafe fn get_len_from_buf(buf: uv_buf_t) -> libc::size_t {
|
||||||
|
ret rustrt::rust_uv_get_len_from_buf(buf);
|
||||||
|
}
|
||||||
unsafe fn buf_init(input: *u8, len: uint) -> uv_buf_t {
|
unsafe fn buf_init(input: *u8, len: uint) -> uv_buf_t {
|
||||||
ret rustrt::rust_uv_buf_init(input, len);
|
ret rustrt::rust_uv_buf_init(input, len);
|
||||||
}
|
}
|
||||||
@ -408,7 +432,8 @@ mod direct {
|
|||||||
addr_vec += [0u8]; // add null terminator
|
addr_vec += [0u8]; // add null terminator
|
||||||
let addr_vec_ptr = vec::unsafe::to_ptr(addr_vec);
|
let addr_vec_ptr = vec::unsafe::to_ptr(addr_vec);
|
||||||
let ip_back = str::from_bytes(addr_vec);
|
let ip_back = str::from_bytes(addr_vec);
|
||||||
io::println(#fmt("vec val: '%s' length: %u",ip_back, vec::len(addr_vec)));
|
io::println(#fmt("vec val: '%s' length: %u",
|
||||||
|
ip_back, vec::len(addr_vec)));
|
||||||
ret rustrt::rust_uv_ip4_addr(addr_vec_ptr,
|
ret rustrt::rust_uv_ip4_addr(addr_vec_ptr,
|
||||||
port as libc::c_int);
|
port as libc::c_int);
|
||||||
}
|
}
|
||||||
@ -982,6 +1007,10 @@ type request_wrapper = {
|
|||||||
read_chan: comm::chan<tcp_read_data>
|
read_chan: comm::chan<tcp_read_data>
|
||||||
};
|
};
|
||||||
|
|
||||||
|
crust fn after_close_cb(handle: *libc::c_void) {
|
||||||
|
io::println("after uv_close!");
|
||||||
|
}
|
||||||
|
|
||||||
crust fn on_alloc_cb(handle: *libc::c_void,
|
crust fn on_alloc_cb(handle: *libc::c_void,
|
||||||
suggested_size: libc::size_t) -> uv_buf_t
|
suggested_size: libc::size_t) -> uv_buf_t
|
||||||
unsafe {
|
unsafe {
|
||||||
@ -990,12 +1019,22 @@ crust fn on_alloc_cb(handle: *libc::c_void,
|
|||||||
ret direct::buf_init(char_ptr, suggested_size);
|
ret direct::buf_init(char_ptr, suggested_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
// do I need the explicit copy on the buf param?
|
|
||||||
crust fn on_read_cb(stream: *uv_stream_t, nread: libc::ssize_t,
|
crust fn on_read_cb(stream: *uv_stream_t, nread: libc::ssize_t,
|
||||||
++buf: uv_buf_t) unsafe {
|
++buf: uv_buf_t) unsafe {
|
||||||
if (nread > 0) {
|
if (nread > 0) {
|
||||||
// we have data
|
// we have data
|
||||||
io::println(#fmt("read: data! nread: %d", nread));
|
io::println(#fmt("read: data! nread: %d", nread));
|
||||||
|
direct::read_stop(stream);
|
||||||
|
let client_data = direct::
|
||||||
|
get_data_for_uv_handle(stream as *libc::c_void)
|
||||||
|
as *request_wrapper;
|
||||||
|
let buf_base = direct::get_base_from_buf(buf);
|
||||||
|
let buf_len = direct::get_len_from_buf(buf);
|
||||||
|
let bytes = vec::unsafe::from_buf(buf_base, buf_len);
|
||||||
|
let read_chan = (*client_data).read_chan;
|
||||||
|
comm::send(read_chan, tcp_read_more(bytes));
|
||||||
|
comm::send(read_chan, tcp_read_eof);
|
||||||
|
direct::close(stream as *libc::c_void, after_close_cb)
|
||||||
}
|
}
|
||||||
else if (nread == -1) {
|
else if (nread == -1) {
|
||||||
// err .. possibly EOF
|
// err .. possibly EOF
|
||||||
@ -1015,10 +1054,11 @@ crust fn on_write_complete_cb(write_req: *uv_write_t,
|
|||||||
io::println(#fmt("beginning on_write_complete_cb status: %d",
|
io::println(#fmt("beginning on_write_complete_cb status: %d",
|
||||||
status as int));
|
status as int));
|
||||||
let stream = direct::get_stream_handle_from_write_req(write_req);
|
let stream = direct::get_stream_handle_from_write_req(write_req);
|
||||||
io::println(#fmt("on_write_complete_cb: tcp stream: %d write_handle addr %d",
|
io::println(#fmt("on_write_complete_cb: tcp:%d write_handle:%d",
|
||||||
stream as int, write_req as int));
|
stream as int, write_req as int));
|
||||||
let result = direct::read_start(stream, on_alloc_cb, on_read_cb);
|
let result = direct::read_start(stream, on_alloc_cb, on_read_cb);
|
||||||
io::println(#fmt("ending on_write_complete_cb .. uv_read_start status: %d", result as int));
|
io::println(#fmt("ending on_write_complete_cb .. status: %d",
|
||||||
|
result as int));
|
||||||
}
|
}
|
||||||
|
|
||||||
crust fn on_connect_cb(connect_req_ptr: *uv_connect_t,
|
crust fn on_connect_cb(connect_req_ptr: *uv_connect_t,
|
||||||
@ -1101,6 +1141,9 @@ fn impl_uv_tcp_request() unsafe {
|
|||||||
direct::set_data_for_req(
|
direct::set_data_for_req(
|
||||||
connect_req_ptr as *libc::c_void,
|
connect_req_ptr as *libc::c_void,
|
||||||
ptr::addr_of(client_data) as *libc::c_void);
|
ptr::addr_of(client_data) as *libc::c_void);
|
||||||
|
direct::set_data_for_uv_handle(
|
||||||
|
tcp_handle_ptr as *libc::c_void,
|
||||||
|
ptr::addr_of(client_data) as *libc::c_void);
|
||||||
io::println("before run tcp req loop");
|
io::println("before run tcp req loop");
|
||||||
direct::run(test_loop);
|
direct::run(test_loop);
|
||||||
io::println("after run tcp req loop");
|
io::println("after run tcp req loop");
|
||||||
@ -1120,10 +1163,11 @@ fn impl_uv_tcp_request() unsafe {
|
|||||||
// the stubbed out vec above
|
// the stubbed out vec above
|
||||||
// with our initial set of read
|
// with our initial set of read
|
||||||
// data
|
// data
|
||||||
|
read_bytes = new_bytes;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// otherwise append
|
// otherwise append
|
||||||
read_bytes = new_bytes;
|
read_bytes += new_bytes;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ {
|
_ {
|
||||||
@ -1131,10 +1175,12 @@ fn impl_uv_tcp_request() unsafe {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
io::println("finished reading data");
|
io::println("finished reading data, output to follow:");
|
||||||
let read_str = str::from_bytes(read_bytes);
|
let read_str = str::from_bytes(read_bytes);
|
||||||
|
|
||||||
|
io::println(read_str);
|
||||||
|
io::println(">>>>EOF<<<<");
|
||||||
|
direct::loop_delete(test_loop);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
io::println("direct::tcp_connect() failure");
|
io::println("direct::tcp_connect() failure");
|
||||||
|
@ -322,6 +322,11 @@ rust_uv_read_start(uv_stream_t* stream, uv_alloc_cb on_alloc,
|
|||||||
return uv_read_start(stream, on_alloc, on_read);
|
return uv_read_start(stream, on_alloc, on_read);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
extern "C" int
|
||||||
|
rust_uv_read_stop(uv_stream_t* stream) {
|
||||||
|
return uv_read_stop(stream);
|
||||||
|
}
|
||||||
|
|
||||||
extern "C" char*
|
extern "C" char*
|
||||||
rust_uv_malloc_buf_base_of(size_t suggested_size) {
|
rust_uv_malloc_buf_base_of(size_t suggested_size) {
|
||||||
return (char*) current_kernel_malloc(sizeof(char)*suggested_size,
|
return (char*) current_kernel_malloc(sizeof(char)*suggested_size,
|
||||||
|
Loading…
Reference in New Issue
Block a user