From 92e9e736fab57a169882897337cef344a48c0c2d Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Tue, 22 May 2012 16:33:33 -0700 Subject: [PATCH] std: high-level libuv-leverage APIs now take a hl_loop as arg (tcp/timer) --- src/libstd/net_tcp.rs | 52 +++++++++++++++++++++++++++---------------- src/libstd/timer.rs | 37 +++++++++++++++++------------- 2 files changed, 55 insertions(+), 34 deletions(-) diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs index 7ac0b3f0015..2fe24715083 100644 --- a/src/libstd/net_tcp.rs +++ b/src/libstd/net_tcp.rs @@ -83,8 +83,9 @@ Initiate a client connection over TCP/IP # Arguments -* ip - The IP address (versions 4 or 6) of the remote host -* port - the unsigned integer of the desired remote host port +* `ip` - The IP address (versions 4 or 6) of the remote host +* `port` - the unsigned integer of the desired remote host port +* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on # Returns @@ -92,7 +93,8 @@ A `result` that, if the operation succeeds, contains a `tcp_socket` that can be used to send and receive data to/from the remote host. In the event of failure, a `tcp_err_data` will be returned "] -fn connect(input_ip: ip::ip_addr, port: uint) +fn connect(input_ip: ip::ip_addr, port: uint, + hl_loop: uv::hl::high_level_loop) -> result::result unsafe { let result_po = comm::port::(); let closed_signal_po = comm::port::<()>(); @@ -101,7 +103,6 @@ fn connect(input_ip: ip::ip_addr, port: uint) closed_signal_ch: comm::chan(closed_signal_po) }; let conn_data_ptr = ptr::addr_of(conn_data); - let hl_loop = uv::global_loop::get(); let reader_po = comm::port::>(); let stream_handle_ptr = malloc_uv_tcp_t(); *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t(); @@ -343,6 +344,7 @@ Bind to a given IP/port and listen for new connections * `port` - a uint representing the port to listen on * `backlog` - a uint representing the number of incoming connections to cache in memory +* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on # Returns @@ -350,11 +352,11 @@ A `result` instance containing either a `tcp_conn_port` which can used to listen for, and accept, new connections, or a `tcp_err_data` if failure to create the tcp listener occurs "] -fn new_listener(host_ip: ip::ip_addr, port: uint, backlog: uint) +fn new_listener(host_ip: ip::ip_addr, port: uint, backlog: uint, + hl_loop: uv::hl::high_level_loop) -> result::result unsafe { let stream_closed_po = comm::port::<()>(); let stream_closed_ch = comm::chan(stream_closed_po); - let hl_loop = uv::global_loop::get(); let new_conn_po = comm::port::>(); let new_conn_ch = comm::chan(new_conn_po); @@ -653,6 +655,7 @@ Bind to a given IP/port and listen for new connections * `port` - a uint representing the port to listen on * `backlog` - a uint representing the number of incoming connections to cache in memory +* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on * `on_establish_cb` - a callback that is evaluated if/when the listener is successfully established. it takes no parameters * `new_connect_cb` - a callback to be evaluated, on the libuv thread, @@ -671,6 +674,7 @@ successful/normal shutdown, and a `tcp_err_data` record in the event of listen exiting because of an error "] fn listen_for_conn(host_ip: ip::ip_addr, port: uint, backlog: uint, + hl_loop: uv::hl::high_level_loop, on_establish_cb: fn~(comm::chan>), new_connect_cb: fn~(tcp_new_connection, comm::chan>)) @@ -680,7 +684,6 @@ fn listen_for_conn(host_ip: ip::ip_addr, port: uint, backlog: uint, let kill_ch = comm::chan(kill_po); let server_stream = uv::ll::tcp_t(); let server_stream_ptr = ptr::addr_of(server_stream); - let hl_loop = uv::global_loop::get(); let server_data = { server_stream_ptr: server_stream_ptr, stream_closed_ch: comm::chan(stream_closed_po), @@ -804,8 +807,9 @@ impl sock_methods for tcp_socket { // shared implementation for tcp::read fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint) - -> result::result<[u8],tcp_err_data> { + -> result::result<[u8],tcp_err_data> unsafe { log(debug, "starting tcp::read"); + let hl_loop = (*socket_data).hl_loop; let rs_result = read_start_common_impl(socket_data); if result::is_failure(rs_result) { let err_data = result::get_err(rs_result); @@ -815,7 +819,7 @@ fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint) log(debug, "tcp::read before recv_timeout"); let read_result = if timeout_msecs > 0u { timer::recv_timeout( - timeout_msecs, result::get(rs_result)) + hl_loop, timeout_msecs, result::get(rs_result)) } else { some(comm::recv(result::get(rs_result))) }; @@ -1270,7 +1274,7 @@ fn ipv4_ip_addr_to_sockaddr_in(input_ip: ip::ip_addr, } } -//#[cfg(test)] +#[cfg(test)] mod test { // FIXME don't run on fbsd or linux 32 bit(#2064) #[cfg(target_os="win32")] @@ -1303,6 +1307,7 @@ mod test { } } fn impl_gl_tcp_ipv4_server_and_client() { + let hl_loop = uv::global_loop::get(); let server_ip = "127.0.0.1"; let server_port = 8888u; let expected_req = "ping"; @@ -1321,7 +1326,8 @@ mod test { server_port, expected_resp, server_ch, - cont_ch) + cont_ch, + hl_loop) }; server_result_ch.send(actual_req); }; @@ -1333,7 +1339,8 @@ mod test { server_ip, server_port, expected_req, - client_ch) + client_ch, + hl_loop) }; let actual_req = comm::recv(server_result_po); log(debug, #fmt("REQ: expected: '%s' actual: '%s'", @@ -1344,6 +1351,7 @@ mod test { assert str::contains(actual_resp, expected_resp); } fn impl_gl_tcp_ipv4_server_listener_and_client() { + let hl_loop = uv::global_loop::get(); let server_ip = "127.0.0.1"; let server_port = 8889u; let expected_req = "ping"; @@ -1362,7 +1370,8 @@ mod test { server_port, expected_resp, server_ch, - cont_ch) + cont_ch, + hl_loop) }; server_result_ch.send(actual_req); }; @@ -1374,7 +1383,8 @@ mod test { server_ip, server_port, expected_req, - client_ch) + client_ch, + hl_loop) }; let actual_req = comm::recv(server_result_po); log(debug, #fmt("REQ: expected: '%s' actual: '%s'", @@ -1387,12 +1397,14 @@ mod test { fn run_tcp_test_server(server_ip: str, server_port: uint, resp: str, server_ch: comm::chan, - cont_ch: comm::chan<()>) -> str { + cont_ch: comm::chan<()>, + hl_loop: uv::hl::high_level_loop) -> str { task::spawn_sched(task::manual_threads(1u)) {|| let server_ip_addr = ip::v4::parse_addr(server_ip); let listen_result = listen_for_conn(server_ip_addr, server_port, 128u, + hl_loop, // on_establish_cb -- called when listener is set up {|kill_ch| log(debug, #fmt("establish_cb %?", @@ -1464,12 +1476,13 @@ mod test { fn run_tcp_test_server_listener(server_ip: str, server_port: uint, resp: str, server_ch: comm::chan, - cont_ch: comm::chan<()>) -> str { + cont_ch: comm::chan<()>, + hl_loop: uv::hl::high_level_loop) -> str { task::spawn_sched(task::manual_threads(1u)) {|| let server_ip_addr = ip::v4::parse_addr(server_ip); let new_listener_result = - new_listener(server_ip_addr, server_port, 128u); + new_listener(server_ip_addr, server_port, 128u, hl_loop); if result::is_failure(new_listener_result) { let err_data = result::get_err(new_listener_result); log(debug, #fmt("SERVER: exited abnormally name %s msg %s", @@ -1512,12 +1525,13 @@ mod test { } fn run_tcp_test_client(server_ip: str, server_port: uint, resp: str, - client_ch: comm::chan) -> str { + client_ch: comm::chan, + hl_loop: uv::hl::high_level_loop) -> str { let server_ip_addr = ip::v4::parse_addr(server_ip); log(debug, "CLIENT: starting.."); - let connect_result = connect(server_ip_addr, server_port); + let connect_result = connect(server_ip_addr, server_port, hl_loop); if result::is_failure(connect_result) { log(debug, "CLIENT: failed to connect"); let err_data = result::get_err(connect_result); diff --git a/src/libstd/timer.rs b/src/libstd/timer.rs index d130326cd00..7e5a458a158 100644 --- a/src/libstd/timer.rs +++ b/src/libstd/timer.rs @@ -16,11 +16,13 @@ for *at least* that period of time. # Arguments +* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on * msecs - a timeout period, in milliseconds, to wait * ch - a channel of type T to send a `val` on * val - a value of type T to send over the provided `ch` "] -fn delayed_send(msecs: uint, ch: comm::chan, val: T) { +fn delayed_send(hl_loop: uv::hl::high_level_loop, + msecs: uint, ch: comm::chan, val: T) { task::spawn() {|| unsafe { let timer_done_po = comm::port::<()>(); @@ -28,7 +30,6 @@ fn delayed_send(msecs: uint, ch: comm::chan, val: T) { let timer_done_ch_ptr = ptr::addr_of(timer_done_ch); let timer = uv::ll::timer_t(); let timer_ptr = ptr::addr_of(timer); - let hl_loop = uv::global_loop::get(); uv::hl::interact(hl_loop) {|loop_ptr| let init_result = uv::ll::timer_init(loop_ptr, timer_ptr); if (init_result == 0i32) { @@ -67,12 +68,13 @@ for *at least* that period of time. # Arguments +* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on * msecs - an amount of time, in milliseconds, for the current task to block "] -fn sleep(msecs: uint) { +fn sleep(hl_loop: uv::hl::high_level_loop, msecs: uint) { let exit_po = comm::port::<()>(); let exit_ch = comm::chan(exit_po); - delayed_send(msecs, exit_ch, ()); + delayed_send(hl_loop, msecs, exit_ch, ()); comm::recv(exit_po); } @@ -85,6 +87,7 @@ timeout. Depending on whether the provided port receives in that time period, # Arguments +* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on * msecs - an mount of time, in milliseconds, to wait to receive * wait_port - a `comm::port` to receive on @@ -94,12 +97,11 @@ An `option` representing the outcome of the call. If the call `recv`'d on the provided port in the allotted timeout period, then the result will be a `some(T)`. If not, then `none` will be returned. "] -fn recv_timeout(msecs: uint, wait_po: comm::port) - -> option { - +fn recv_timeout(hl_loop: uv::hl::high_level_loop, + msecs: uint, wait_po: comm::port) -> option { let timeout_po = comm::port::<()>(); let timeout_ch = comm::chan(timeout_po); - delayed_send(msecs, timeout_ch, ()); + delayed_send(hl_loop, msecs, timeout_ch, ()); either::either( {|left_val| log(debug, #fmt("recv_time .. left_val %?", @@ -140,13 +142,15 @@ crust fn delayed_send_close_cb(handle: *uv::ll::uv_timer_t) unsafe { mod test { #[test] fn test_gl_timer_simple_sleep_test() { - sleep(1u); + let hl_loop = uv::global_loop::get(); + sleep(hl_loop, 1u); } #[test] fn test_gl_timer_sleep_stress1() { + let hl_loop = uv::global_loop::get(); iter::repeat(200u) {|| - sleep(1u); + sleep(hl_loop, 1u); } } @@ -154,6 +158,7 @@ mod test { fn test_gl_timer_sleep_stress2() { let po = comm::port(); let ch = comm::chan(po); + let hl_loop = uv::global_loop::get(); let repeat = 20u; let spec = { @@ -172,7 +177,7 @@ mod test { import rand::*; let rng = rng(); iter::repeat(times) {|| - sleep(rng.next() as uint % maxms); + sleep(hl_loop, rng.next() as uint % maxms); } comm::send(ch, ()); } @@ -195,6 +200,7 @@ mod test { let times = 100; let mut successes = 0; let mut failures = 0; + let hl_loop = uv::global_loop::get(); iter::repeat(times as uint) {|| task::yield(); @@ -204,10 +210,10 @@ mod test { let test_ch = comm::chan(test_po); task::spawn() {|| - delayed_send(1u, test_ch, expected); + delayed_send(hl_loop, 1u, test_ch, expected); }; - alt recv_timeout(10u, test_po) { + alt recv_timeout(hl_loop, 10u, test_po) { some(val) { assert val == expected; successes += 1; } _ { failures += 1; } }; @@ -221,6 +227,7 @@ mod test { let times = 100; let mut successes = 0; let mut failures = 0; + let hl_loop = uv::global_loop::get(); iter::repeat(times as uint) {|| let expected = rand::rng().gen_str(16u); @@ -228,10 +235,10 @@ mod test { let test_ch = comm::chan(test_po); task::spawn() {|| - delayed_send(1000u, test_ch, expected); + delayed_send(hl_loop, 1000u, test_ch, expected); }; - let actual = alt recv_timeout(1u, test_po) { + let actual = alt recv_timeout(hl_loop, 1u, test_po) { none { successes += 1; } _ { failures += 1; } };