From c7656f67ad6c6ee687332f613a6074f76107c1b8 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Mon, 21 May 2012 09:58:33 -0700 Subject: [PATCH] std:: adding tcp::read fn as simple, blocking read operation, akin to write also: read_future ala write_future .. woooooohooooooooo --- src/libstd/net_tcp.rs | 309 ++++++++++++++++++++++++++++++------------ 1 file changed, 219 insertions(+), 90 deletions(-) diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs index f19919ceebf..806fe6944aa 100644 --- a/src/libstd/net_tcp.rs +++ b/src/libstd/net_tcp.rs @@ -217,7 +217,7 @@ Write binary data to tcp stream; Returns a `future::future` value immediately This function can produce unsafe results if the call to `write_future` is made, the `future::future` value returned is never resolved via `future::get`, and then the `tcp_socket` passed in to `write_future` leaves -scope and is destructured before the task that runs the libuv write +scope and is destructed before the task that runs the libuv write operation completes. As such: If using `write_future`, always be sure to resolve the returned @@ -261,34 +261,7 @@ fn read_start(sock: tcp_socket) -> result::result>, tcp_err_data> unsafe { let socket_data = ptr::addr_of(**sock); - let stream_handle_ptr = (*socket_data).stream_handle_ptr; - let start_po = comm::port::>(); - let start_ch = comm::chan(start_po); - log(debug, "in tcp::read_start before interact loop"); - uv::hl::interact((*socket_data).hl_loop) {|loop_ptr| - log(debug, #fmt("in tcp::read_start interact cb %?", loop_ptr)); - alt uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t, - on_alloc_cb, - on_tcp_read_cb) { - 0i32 { - log(debug, "success doing uv_read_start"); - comm::send(start_ch, none); - } - _ { - log(debug, "error attempting uv_read_start"); - let err_data = uv::ll::get_last_err_data(loop_ptr); - comm::send(start_ch, some(err_data)); - } - } - }; - alt comm::recv(start_po) { - some(err_data) { - result::err(err_data.to_tcp_err()) - } - none { - result::ok((*socket_data).reader_po) - } - } + read_start_common_impl(socket_data) } #[doc=" @@ -301,30 +274,62 @@ Stop reading from an open TCP connection; used with `read_start` fn read_stop(sock: tcp_socket) -> result::result<(), tcp_err_data> unsafe { let socket_data = ptr::addr_of(**sock); - let stream_handle_ptr = (*socket_data).stream_handle_ptr; - let stop_po = comm::port::>(); - let stop_ch = comm::chan(stop_po); - uv::hl::interact((*socket_data).hl_loop) {|loop_ptr| - log(debug, "in interact cb for tcp::read_stop"); - alt uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) { - 0i32 { - log(debug, "successfully called uv_read_stop"); - comm::send(stop_ch, none); - } - _ { - log(debug, "failure in calling uv_read_stop"); - let err_data = uv::ll::get_last_err_data(loop_ptr); - comm::send(stop_ch, some(err_data.to_tcp_err())); - } - } - }; - alt comm::recv(stop_po) { - some(err_data) { - result::err(err_data.to_tcp_err()) - } - none { - result::ok(()) - } + read_stop_common_impl(socket_data) +} + +#[doc=" +Reads a single chunk of data from `tcp_socket`; block until data/error recv'd + +Does a blocking read operation for a single chunk of data from a `tcp_socket` +until a data arrives or an error is received. The provided `timeout_msecs` +value is used to raise an error if the timeout period passes without any +data received. + +# Arguments + +* `sock` - a `net::tcp::tcp_socket` that you wish to read from +* `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the +read attempt. Pass `0u` to wait indefinitely +"] +fn read(sock: tcp_socket, timeout_msecs: uint) + -> result::result<[u8],tcp_err_data> { + let socket_data = ptr::addr_of(**sock); + read_common_impl(socket_data, timeout_msecs) +} + +#[doc=" +Reads a single chunk of data; returns a `future::future<[u8]>` immediately + +Does a non-blocking read operation for a single chunk of data from a +`tcp_socket` and immediately returns a `future` value representing the +result. When resolving the returned `future`, it will block until data +arrives or an error is received. The provided `timeout_msecs` +value is used to raise an error if the timeout period passes without any +data received. + +# Safety + +This function can produce unsafe results if the call to `read_future` is +made, the `future::future` value returned is never resolved via +`future::get`, and then the `tcp_socket` passed in to `read_future` leaves +scope and is destructed before the task that runs the libuv read +operation completes. + +As such: If using `read_future`, always be sure to resolve the returned +`future` so as to ensure libuv doesn't try to access a released read handle. +Otherwise, use the blocking `tcp::read` function instead. + +# Arguments + +* `sock` - a `net::tcp::tcp_socket` that you wish to read from +* `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the +read attempt. Pass `0u` to wait indefinitely +"] +fn read_future(sock: tcp_socket, timeout_msecs: uint) + -> future::future> { + let socket_data = ptr::addr_of(**sock); + future::spawn {|| + read_common_impl(socket_data, timeout_msecs) } } @@ -778,13 +783,166 @@ impl sock_methods for tcp_socket { result::result<(), tcp_err_data> { read_stop(self) } - fn write(raw_write_data: [[u8]]) + fn read(timeout_msecs: uint) -> + result::result<[u8], tcp_err_data> { + read(self, timeout_msecs) + } + fn read_future(timeout_msecs: uint) -> + future::future> { + read_future(self, timeout_msecs) + } + fn write(raw_write_data: [u8]) -> result::result<(), tcp_err_data> { write(self, raw_write_data) } + fn write_future(raw_write_data: [u8]) + -> future::future> { + write_future(self, raw_write_data) + } } // INTERNAL API +// shared implementation for tcp::read +fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint) + -> result::result<[u8],tcp_err_data> { + log(debug, "starting tcp::read"); + let rs_result = read_start_common_impl(socket_data); + if result::is_failure(rs_result) { + let err_data = result::get_err(rs_result); + result::err(err_data) + } + else { + log(debug, "tcp::read before recv_timeout"); + let read_result = if timeout_msecs > 0u { + timer::recv_timeout( + timeout_msecs, result::get(rs_result)) + } else { + some(comm::recv(result::get(rs_result))) + }; + log(debug, "tcp::read after recv_timeout"); + alt read_result { + none { + log(debug, "tcp::read: timed out.."); + let err_data = { + err_name: "TIMEOUT", + err_msg: "req timed out" + }; + read_stop_common_impl(socket_data); + result::err(err_data) + } + some(data_result) { + log(debug, "tcp::read got data"); + read_stop_common_impl(socket_data); + data_result + } + } + } +} + +// shared impl for read_stop +fn read_stop_common_impl(socket_data: *tcp_socket_data) -> + result::result<(), tcp_err_data> unsafe { + let stream_handle_ptr = (*socket_data).stream_handle_ptr; + let stop_po = comm::port::>(); + let stop_ch = comm::chan(stop_po); + uv::hl::interact((*socket_data).hl_loop) {|loop_ptr| + log(debug, "in interact cb for tcp::read_stop"); + alt uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) { + 0i32 { + log(debug, "successfully called uv_read_stop"); + comm::send(stop_ch, none); + } + _ { + log(debug, "failure in calling uv_read_stop"); + let err_data = uv::ll::get_last_err_data(loop_ptr); + comm::send(stop_ch, some(err_data.to_tcp_err())); + } + } + }; + alt comm::recv(stop_po) { + some(err_data) { + result::err(err_data.to_tcp_err()) + } + none { + result::ok(()) + } + } +} + +// shared impl for read_start +fn read_start_common_impl(socket_data: *tcp_socket_data) + -> result::result>, tcp_err_data> unsafe { + let stream_handle_ptr = (*socket_data).stream_handle_ptr; + let start_po = comm::port::>(); + let start_ch = comm::chan(start_po); + log(debug, "in tcp::read_start before interact loop"); + uv::hl::interact((*socket_data).hl_loop) {|loop_ptr| + log(debug, #fmt("in tcp::read_start interact cb %?", loop_ptr)); + alt uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t, + on_alloc_cb, + on_tcp_read_cb) { + 0i32 { + log(debug, "success doing uv_read_start"); + comm::send(start_ch, none); + } + _ { + log(debug, "error attempting uv_read_start"); + let err_data = uv::ll::get_last_err_data(loop_ptr); + comm::send(start_ch, some(err_data)); + } + } + }; + alt comm::recv(start_po) { + some(err_data) { + result::err(err_data.to_tcp_err()) + } + none { + result::ok((*socket_data).reader_po) + } + } +} + +// shared implementation used by write and write_future +fn write_common_impl(socket_data_ptr: *tcp_socket_data, + raw_write_data: [u8]) + -> result::result<(), tcp_err_data> unsafe { + let write_req_ptr = ptr::addr_of((*socket_data_ptr).write_req); + let stream_handle_ptr = + (*socket_data_ptr).stream_handle_ptr; + let write_buf_vec = [ uv::ll::buf_init( + vec::unsafe::to_ptr(raw_write_data), + vec::len(raw_write_data)) ]; + let write_buf_vec_ptr = ptr::addr_of(write_buf_vec); + let result_po = comm::port::(); + let write_data = { + result_ch: comm::chan(result_po) + }; + let write_data_ptr = ptr::addr_of(write_data); + uv::hl::interact((*socket_data_ptr).hl_loop) {|loop_ptr| + log(debug, #fmt("in interact cb for tcp::write %?", loop_ptr)); + alt uv::ll::write(write_req_ptr, + stream_handle_ptr, + write_buf_vec_ptr, + tcp_write_complete_cb) { + 0i32 { + log(debug, "uv_write() invoked successfully"); + uv::ll::set_data_for_req(write_req_ptr, write_data_ptr); + } + _ { + log(debug, "error invoking uv_write()"); + let err_data = uv::ll::get_last_err_data(loop_ptr); + comm::send((*write_data_ptr).result_ch, + tcp_write_error(err_data.to_tcp_err())); + } + } + }; + alt comm::recv(result_po) { + tcp_write_success { result::ok(()) } + tcp_write_error(err_data) { result::err(err_data.to_tcp_err()) } + } +} + // various recv_* can use a tcp_conn_port can re-use this.. fn conn_port_new_tcp_socket( stream_handle_ptr: *uv::ll::uv_tcp_t, @@ -1266,8 +1424,7 @@ mod test { let sock = result::unwrap(accept_result); log(debug, "SERVER: successfully accepted"+ "connection!"); - let received_req_bytes = - tcp_read_single(sock); + let received_req_bytes = sock.read(2000u); alt received_req_bytes { result::ok(data) { server_ch.send( @@ -1278,6 +1435,8 @@ mod test { comm::send(kill_ch, none); } result::err(err_data) { + log(debug, #fmt("SERVER: error recvd: %s %s", + err_data.err_name, err_data.err_msg)); comm::send(kill_ch, some(err_data)); server_ch.send(""); } @@ -1333,7 +1492,7 @@ mod test { log(debug, "SERVER: successfully accepted"+ "connection!"); let received_req_bytes = - tcp_read_single(sock); + sock.read(2000u); alt received_req_bytes { result::ok(data) { server_ch.send( @@ -1370,7 +1529,7 @@ mod test { let sock = result::unwrap(connect_result); let resp_bytes = str::bytes(resp); tcp_write_single(sock, resp_bytes); - let read_result = tcp_read_single(sock); + let read_result = sock.read(2000u); if read_result.is_failure() { log(debug, "CLIENT: failure to read"); "" @@ -1385,39 +1544,9 @@ mod test { } } - fn tcp_read_single(sock: tcp_socket) - -> result::result<[u8],tcp_err_data> { - log(debug, "starting tcp_read_single"); - let rs_result = sock.read_start(); - if result::is_failure(rs_result) { - let err_data = result::get_err(rs_result); - result::err(err_data) - } - else { - log(debug, "before recv_timeout"); - let read_result = timer::recv_timeout( - 2000u, result::get(rs_result)); - log(debug, "after recv_timeout"); - alt read_result { - none { - log(debug, "tcp_read_single: timed out.."); - let err_data = { - err_name: "TIMEOUT", - err_msg: "req timed out" - }; - result::err(err_data) - } - some(data_result) { - log(debug, "tcp_read_single: got data"); - sock.read_stop(); - data_result - } - } - } - } - fn tcp_write_single(sock: tcp_socket, val: [u8]) { - let write_result = sock.write([val]); + let write_result_future = sock.write_future(val); + let write_result = write_result_future.get(); if result::is_failure(write_result) { log(debug, "tcp_write_single: write failed!"); let err_data = result::get_err(write_result);