std:: adding tcp::read fn as simple, blocking read operation, akin to write

also: read_future ala write_future .. woooooohooooooooo
This commit is contained in:
Jeff Olson 2012-05-21 09:58:33 -07:00 committed by Brian Anderson
parent c2ae062e90
commit c7656f67ad

View File

@ -217,7 +217,7 @@ Write binary data to tcp stream; Returns a `future::future` value immediately
This function can produce unsafe results if the call to `write_future` is
made, the `future::future` value returned is never resolved via
`future::get`, and then the `tcp_socket` passed in to `write_future` leaves
scope and is destructured before the task that runs the libuv write
scope and is destructed before the task that runs the libuv write
operation completes.
As such: If using `write_future`, always be sure to resolve the returned
@ -261,34 +261,7 @@ fn read_start(sock: tcp_socket)
-> result::result<comm::port<
result::result<[u8], tcp_err_data>>, tcp_err_data> unsafe {
let socket_data = ptr::addr_of(**sock);
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
let start_po = comm::port::<option<uv::ll::uv_err_data>>();
let start_ch = comm::chan(start_po);
log(debug, "in tcp::read_start before interact loop");
uv::hl::interact((*socket_data).hl_loop) {|loop_ptr|
log(debug, #fmt("in tcp::read_start interact cb %?", loop_ptr));
alt uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
on_alloc_cb,
on_tcp_read_cb) {
0i32 {
log(debug, "success doing uv_read_start");
comm::send(start_ch, none);
}
_ {
log(debug, "error attempting uv_read_start");
let err_data = uv::ll::get_last_err_data(loop_ptr);
comm::send(start_ch, some(err_data));
}
}
};
alt comm::recv(start_po) {
some(err_data) {
result::err(err_data.to_tcp_err())
}
none {
result::ok((*socket_data).reader_po)
}
}
read_start_common_impl(socket_data)
}
#[doc="
@ -301,30 +274,62 @@ Stop reading from an open TCP connection; used with `read_start`
fn read_stop(sock: tcp_socket) ->
result::result<(), tcp_err_data> unsafe {
let socket_data = ptr::addr_of(**sock);
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
let stop_po = comm::port::<option<tcp_err_data>>();
let stop_ch = comm::chan(stop_po);
uv::hl::interact((*socket_data).hl_loop) {|loop_ptr|
log(debug, "in interact cb for tcp::read_stop");
alt uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
0i32 {
log(debug, "successfully called uv_read_stop");
comm::send(stop_ch, none);
}
_ {
log(debug, "failure in calling uv_read_stop");
let err_data = uv::ll::get_last_err_data(loop_ptr);
comm::send(stop_ch, some(err_data.to_tcp_err()));
}
}
};
alt comm::recv(stop_po) {
some(err_data) {
result::err(err_data.to_tcp_err())
}
none {
result::ok(())
}
read_stop_common_impl(socket_data)
}
#[doc="
Reads a single chunk of data from `tcp_socket`; block until data/error recv'd
Does a blocking read operation for a single chunk of data from a `tcp_socket`
until a data arrives or an error is received. The provided `timeout_msecs`
value is used to raise an error if the timeout period passes without any
data received.
# Arguments
* `sock` - a `net::tcp::tcp_socket` that you wish to read from
* `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
read attempt. Pass `0u` to wait indefinitely
"]
fn read(sock: tcp_socket, timeout_msecs: uint)
-> result::result<[u8],tcp_err_data> {
let socket_data = ptr::addr_of(**sock);
read_common_impl(socket_data, timeout_msecs)
}
#[doc="
Reads a single chunk of data; returns a `future::future<[u8]>` immediately
Does a non-blocking read operation for a single chunk of data from a
`tcp_socket` and immediately returns a `future` value representing the
result. When resolving the returned `future`, it will block until data
arrives or an error is received. The provided `timeout_msecs`
value is used to raise an error if the timeout period passes without any
data received.
# Safety
This function can produce unsafe results if the call to `read_future` is
made, the `future::future` value returned is never resolved via
`future::get`, and then the `tcp_socket` passed in to `read_future` leaves
scope and is destructed before the task that runs the libuv read
operation completes.
As such: If using `read_future`, always be sure to resolve the returned
`future` so as to ensure libuv doesn't try to access a released read handle.
Otherwise, use the blocking `tcp::read` function instead.
# Arguments
* `sock` - a `net::tcp::tcp_socket` that you wish to read from
* `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
read attempt. Pass `0u` to wait indefinitely
"]
fn read_future(sock: tcp_socket, timeout_msecs: uint)
-> future::future<result::result<[u8],tcp_err_data>> {
let socket_data = ptr::addr_of(**sock);
future::spawn {||
read_common_impl(socket_data, timeout_msecs)
}
}
@ -778,13 +783,166 @@ impl sock_methods for tcp_socket {
result::result<(), tcp_err_data> {
read_stop(self)
}
fn write(raw_write_data: [[u8]])
fn read(timeout_msecs: uint) ->
result::result<[u8], tcp_err_data> {
read(self, timeout_msecs)
}
fn read_future(timeout_msecs: uint) ->
future::future<result::result<[u8], tcp_err_data>> {
read_future(self, timeout_msecs)
}
fn write(raw_write_data: [u8])
-> result::result<(), tcp_err_data> {
write(self, raw_write_data)
}
fn write_future(raw_write_data: [u8])
-> future::future<result::result<(), tcp_err_data>> {
write_future(self, raw_write_data)
}
}
// INTERNAL API
// shared implementation for tcp::read
fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint)
-> result::result<[u8],tcp_err_data> {
log(debug, "starting tcp::read");
let rs_result = read_start_common_impl(socket_data);
if result::is_failure(rs_result) {
let err_data = result::get_err(rs_result);
result::err(err_data)
}
else {
log(debug, "tcp::read before recv_timeout");
let read_result = if timeout_msecs > 0u {
timer::recv_timeout(
timeout_msecs, result::get(rs_result))
} else {
some(comm::recv(result::get(rs_result)))
};
log(debug, "tcp::read after recv_timeout");
alt read_result {
none {
log(debug, "tcp::read: timed out..");
let err_data = {
err_name: "TIMEOUT",
err_msg: "req timed out"
};
read_stop_common_impl(socket_data);
result::err(err_data)
}
some(data_result) {
log(debug, "tcp::read got data");
read_stop_common_impl(socket_data);
data_result
}
}
}
}
// shared impl for read_stop
fn read_stop_common_impl(socket_data: *tcp_socket_data) ->
result::result<(), tcp_err_data> unsafe {
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
let stop_po = comm::port::<option<tcp_err_data>>();
let stop_ch = comm::chan(stop_po);
uv::hl::interact((*socket_data).hl_loop) {|loop_ptr|
log(debug, "in interact cb for tcp::read_stop");
alt uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
0i32 {
log(debug, "successfully called uv_read_stop");
comm::send(stop_ch, none);
}
_ {
log(debug, "failure in calling uv_read_stop");
let err_data = uv::ll::get_last_err_data(loop_ptr);
comm::send(stop_ch, some(err_data.to_tcp_err()));
}
}
};
alt comm::recv(stop_po) {
some(err_data) {
result::err(err_data.to_tcp_err())
}
none {
result::ok(())
}
}
}
// shared impl for read_start
fn read_start_common_impl(socket_data: *tcp_socket_data)
-> result::result<comm::port<
result::result<[u8], tcp_err_data>>, tcp_err_data> unsafe {
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
let start_po = comm::port::<option<uv::ll::uv_err_data>>();
let start_ch = comm::chan(start_po);
log(debug, "in tcp::read_start before interact loop");
uv::hl::interact((*socket_data).hl_loop) {|loop_ptr|
log(debug, #fmt("in tcp::read_start interact cb %?", loop_ptr));
alt uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
on_alloc_cb,
on_tcp_read_cb) {
0i32 {
log(debug, "success doing uv_read_start");
comm::send(start_ch, none);
}
_ {
log(debug, "error attempting uv_read_start");
let err_data = uv::ll::get_last_err_data(loop_ptr);
comm::send(start_ch, some(err_data));
}
}
};
alt comm::recv(start_po) {
some(err_data) {
result::err(err_data.to_tcp_err())
}
none {
result::ok((*socket_data).reader_po)
}
}
}
// shared implementation used by write and write_future
fn write_common_impl(socket_data_ptr: *tcp_socket_data,
raw_write_data: [u8])
-> result::result<(), tcp_err_data> unsafe {
let write_req_ptr = ptr::addr_of((*socket_data_ptr).write_req);
let stream_handle_ptr =
(*socket_data_ptr).stream_handle_ptr;
let write_buf_vec = [ uv::ll::buf_init(
vec::unsafe::to_ptr(raw_write_data),
vec::len(raw_write_data)) ];
let write_buf_vec_ptr = ptr::addr_of(write_buf_vec);
let result_po = comm::port::<tcp_write_result>();
let write_data = {
result_ch: comm::chan(result_po)
};
let write_data_ptr = ptr::addr_of(write_data);
uv::hl::interact((*socket_data_ptr).hl_loop) {|loop_ptr|
log(debug, #fmt("in interact cb for tcp::write %?", loop_ptr));
alt uv::ll::write(write_req_ptr,
stream_handle_ptr,
write_buf_vec_ptr,
tcp_write_complete_cb) {
0i32 {
log(debug, "uv_write() invoked successfully");
uv::ll::set_data_for_req(write_req_ptr, write_data_ptr);
}
_ {
log(debug, "error invoking uv_write()");
let err_data = uv::ll::get_last_err_data(loop_ptr);
comm::send((*write_data_ptr).result_ch,
tcp_write_error(err_data.to_tcp_err()));
}
}
};
alt comm::recv(result_po) {
tcp_write_success { result::ok(()) }
tcp_write_error(err_data) { result::err(err_data.to_tcp_err()) }
}
}
// various recv_* can use a tcp_conn_port can re-use this..
fn conn_port_new_tcp_socket(
stream_handle_ptr: *uv::ll::uv_tcp_t,
@ -1266,8 +1424,7 @@ mod test {
let sock = result::unwrap(accept_result);
log(debug, "SERVER: successfully accepted"+
"connection!");
let received_req_bytes =
tcp_read_single(sock);
let received_req_bytes = sock.read(2000u);
alt received_req_bytes {
result::ok(data) {
server_ch.send(
@ -1278,6 +1435,8 @@ mod test {
comm::send(kill_ch, none);
}
result::err(err_data) {
log(debug, #fmt("SERVER: error recvd: %s %s",
err_data.err_name, err_data.err_msg));
comm::send(kill_ch, some(err_data));
server_ch.send("");
}
@ -1333,7 +1492,7 @@ mod test {
log(debug, "SERVER: successfully accepted"+
"connection!");
let received_req_bytes =
tcp_read_single(sock);
sock.read(2000u);
alt received_req_bytes {
result::ok(data) {
server_ch.send(
@ -1370,7 +1529,7 @@ mod test {
let sock = result::unwrap(connect_result);
let resp_bytes = str::bytes(resp);
tcp_write_single(sock, resp_bytes);
let read_result = tcp_read_single(sock);
let read_result = sock.read(2000u);
if read_result.is_failure() {
log(debug, "CLIENT: failure to read");
""
@ -1385,39 +1544,9 @@ mod test {
}
}
fn tcp_read_single(sock: tcp_socket)
-> result::result<[u8],tcp_err_data> {
log(debug, "starting tcp_read_single");
let rs_result = sock.read_start();
if result::is_failure(rs_result) {
let err_data = result::get_err(rs_result);
result::err(err_data)
}
else {
log(debug, "before recv_timeout");
let read_result = timer::recv_timeout(
2000u, result::get(rs_result));
log(debug, "after recv_timeout");
alt read_result {
none {
log(debug, "tcp_read_single: timed out..");
let err_data = {
err_name: "TIMEOUT",
err_msg: "req timed out"
};
result::err(err_data)
}
some(data_result) {
log(debug, "tcp_read_single: got data");
sock.read_stop();
data_result
}
}
}
}
fn tcp_write_single(sock: tcp_socket, val: [u8]) {
let write_result = sock.write([val]);
let write_result_future = sock.write_future(val);
let write_result = write_result_future.get();
if result::is_failure(write_result) {
log(debug, "tcp_write_single: write failed!");
let err_data = result::get_err(write_result);