diff --git a/src/libstd/net.rs b/src/libstd/net.rs index 3707851bedd..11425c2b612 100644 --- a/src/libstd/net.rs +++ b/src/libstd/net.rs @@ -6,4 +6,4 @@ import tcp = net_tcp; export tcp; import ip = net_ip; -export ip; +export ip; diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs index 379c6d1936c..a43ff0c8152 100644 --- a/src/libstd/net_tcp.rs +++ b/src/libstd/net_tcp.rs @@ -5,7 +5,7 @@ High-level interface to libuv's TCP functionality import ip = net_ip; export tcp_socket, tcp_err_data; -export connect, write, read_start, read_stop; +export connect, write, read_start, read_stop, listen, accept; #[doc=" Encapsulates an open TCP/IP connection through libuv @@ -279,8 +279,246 @@ fn read_stop(sock: tcp_socket) -> } } +#[doc=" +Bind to a given IP/port and listen for new connections + +# Arguments + +* `host_ip` - a `net::ip::ip_addr` representing a unique IP +(versions 4 or 6) +* `port` - a uint representing the port to listen on +* `backlog` - a uint representing the number of incoming connections +to cache in memory +* `new_connect_cb` - a callback to be evaluated, on the libuv thread, +whenever a client attempts to conect on the provided ip/port +"] +fn listen(host_ip: ip::ip_addr, port: uint, backlog: uint, + new_connect_cb: fn~(tcp_new_connection, + comm::chan<option<tcp_err_data>>)) + -> result::result<(), tcp_err_data> unsafe { + let stream_closed_po = comm::port::<()>(); + let kill_po = comm::port::<option<tcp_err_data>>(); + let server_stream = uv::ll::tcp_t(); + let server_stream_ptr = ptr::addr_of(server_stream); + let hl_loop = uv::global_loop::get(); + let server_data = { + server_stream_ptr: server_stream_ptr, + stream_closed_ch: comm::chan(stream_closed_po), + kill_ch: comm::chan(kill_po), + new_connect_cb: new_connect_cb, + hl_loop: hl_loop, + mut active: true + }; + let server_data_ptr = ptr::addr_of(server_data); + + let setup_po = comm::port::<option<tcp_err_data>>(); + let setup_ch = comm::chan(setup_po); + uv::hl::interact(hl_loop) {|loop_ptr| + let tcp_addr = ipv4_ip_addr_to_sockaddr_in(host_ip, + port); + alt uv::ll::tcp_init(loop_ptr, server_stream_ptr) { + 0i32 { + alt uv::ll::tcp_bind(server_stream_ptr, + ptr::addr_of(tcp_addr)) { + 0i32 { + alt uv::ll::listen(server_stream_ptr, + backlog as libc::c_int, + tcp_listen_on_connection_cb) { + 0i32 { + uv::ll::set_data_for_uv_handle( + server_stream_ptr, + server_data_ptr); + comm::send(setup_ch, none); + } + _ { + log(debug, "failure to uv_listen()"); + let err_data = uv::ll::get_last_err_data(loop_ptr); + comm::send(setup_ch, some(err_data)); + } + } + } + _ { + log(debug, "failure to uv_tcp_bind"); + let err_data = uv::ll::get_last_err_data(loop_ptr); + comm::send(setup_ch, some(err_data)); + } + } + } + _ { + log(debug, "failure to uv_tcp_init"); + let err_data = uv::ll::get_last_err_data(loop_ptr); + comm::send(setup_ch, some(err_data)); + } + } + }; + let mut kill_result: option<tcp_err_data> = none; + alt comm::recv(setup_po) { + some(err_data) { + // we failed to bind/list w/ libuv + result::err(err_data.to_tcp_err()) + } + none { + kill_result = comm::recv(kill_po); + uv::hl::interact(hl_loop) {|loop_ptr| + log(debug, #fmt("tcp::listen post-kill recv hl interact %?", + loop_ptr)); + (*server_data_ptr).active = false; + uv::ll::close(server_stream_ptr, tcp_listen_close_cb); + }; + comm::recv(stream_closed_po); + alt kill_result { + // some failure post bind/listen + some(err_data) { + result::err(err_data) + } + // clean exit + none { + result::ok(()) + } + } + } + } +} + +#[doc=" +Bind an incoming client connection to a `net::tcp::tcp_socket` + +# Notes + +It is safe to call `net::tcp::accept` _only_ within the callback +provided as the final argument of the `net::tcp::listen` function. + +The `new_conn` opaque value provided _only_ as the first argument to the +`new_connect_cb`. It can be safely sent to another task but it _must_ be +used (via `net::tcp::accept`) before the `new_connect_cb` call it was +provided within returns. + +This means that a port/chan pair must be used to make sure that the +`new_connect_cb` call blocks until an attempt to create a +`net::tcp::tcp_socket` is completed. + +# Arguments + +* `new_conn` - an opaque value used to create a new `tcp_socket` + +# Returns + +* Success + * On success, this function will return a `net::tcp::tcp_socket` as the + `ok` variant of a `result`. The `net::tcp::tcp_socket` is anchored within + the task that `accept` was called within for its lifetime. +* Failure + * On failure, this function will return a `net::tcp::tcp_err_data` record + as the `err` variant of a `result`. +"] +fn accept(new_conn: tcp_new_connection) + -> result::result<tcp_socket, tcp_err_data> unsafe { + + alt new_conn{ + new_tcp_conn(server_handle_ptr) { + let server_data_ptr = uv::ll::get_data_for_uv_handle( + server_handle_ptr) as *tcp_server_data; + let hl_loop = (*server_data_ptr).hl_loop;// FIXME + let reader_po = comm::port::<result::result<[u8], tcp_err_data>>(); + let client_socket_data = @{ + reader_po: reader_po, + reader_ch: comm::chan(reader_po), + stream_handle : uv::ll::tcp_t(), + connect_req : uv::ll::connect_t(), + write_req : uv::ll::write_t(), + hl_loop: hl_loop + }; + let client_socket_data_ptr = ptr::addr_of(*client_socket_data); + let client_stream_handle_ptr = ptr::addr_of( + (*client_socket_data_ptr).stream_handle); + + let result_po = comm::port::<option<tcp_err_data>>(); + let result_ch = comm::chan(result_po); + uv::hl::interact(hl_loop) {|loop_ptr| + log(debug, "in interact cb for tcp::accept"); + alt uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) { + 0i32 { + log(debug, "uv_tcp_init successful for client stream"); + alt uv::ll::accept(server_handle_ptr, + client_stream_handle_ptr) { + 0i32 { + log(debug, "successfully accepted client connection"); + comm::send(result_ch, none); + } + _ { + log(debug, "failed to accept client conn"); + comm::send(result_ch, some( + uv::ll::get_last_err_data(loop_ptr).to_tcp_err())); + } + } + } + _ { + log(debug, "failed to init client stream"); + comm::send(result_ch, some( + uv::ll::get_last_err_data(loop_ptr).to_tcp_err())); + } + } + }; + alt comm::recv(result_po) { + some(err_data) { + result::err(err_data) + } + none { + result::ok(tcp_socket(client_socket_data)) + } + } + } + } +} + // INTERNAL API +enum tcp_new_connection { + new_tcp_conn(*uv::ll::uv_tcp_t) +} + +type tcp_server_data = { + server_stream_ptr: *uv::ll::uv_tcp_t, + stream_closed_ch: comm::chan<()>, + kill_ch: comm::chan<option<tcp_err_data>>, + new_connect_cb: fn~(tcp_new_connection, + comm::chan<option<tcp_err_data>>), + hl_loop: uv::hl::high_level_loop, + mut active: bool +}; + +crust fn tcp_listen_close_cb(handle: *uv::ll::uv_tcp_t) unsafe { + let server_data_ptr = uv::ll::get_data_for_uv_handle( + handle) as *tcp_server_data; + comm::send((*server_data_ptr).stream_closed_ch, ()); +} + +crust fn tcp_listen_on_connection_cb(handle: *uv::ll::uv_tcp_t, + status: libc::c_int) unsafe { + let server_data_ptr = uv::ll::get_data_for_uv_handle(handle) + as *tcp_server_data; + let kill_ch = (*server_data_ptr).kill_ch; + alt (*server_data_ptr).active { + true { + alt status { + 0i32 { + let new_conn = new_tcp_conn(handle); + (*server_data_ptr).new_connect_cb(new_conn, kill_ch); + } + _ { + let loop_ptr = uv::ll::get_loop_for_uv_handle(handle); + comm::send(kill_ch, + some(uv::ll::get_last_err_data(loop_ptr) + .to_tcp_err())); + (*server_data_ptr).active = false; + } + } + } + _ { + } + } +} + enum tcp_connect_result { tcp_connected(tcp_socket), tcp_connect_error(tcp_err_data) @@ -315,6 +553,7 @@ impl of to_tcp_err_iface for uv::ll::uv_err_data { crust fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t, nread: libc::ssize_t, ++buf: uv::ll::uv_buf_t) unsafe { + log(debug, "entering on_tcp_read_cb"); let loop_ptr = uv::ll::get_loop_for_uv_handle(stream); let socket_data_ptr = uv::ll::get_data_for_uv_handle(stream) as *tcp_socket_data; @@ -338,6 +577,7 @@ crust fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t, } } uv::ll::free_base_of_buf(buf); + log(debug, "exiting on_tcp_read_cb"); } crust fn on_alloc_cb(handle: *libc::c_void, @@ -447,27 +687,108 @@ type tcp_socket_data = { }; // convert rust ip_addr to libuv's native representation -fn ipv4_ip_addr_to_sockaddr_in(input: ip::ip_addr, +fn ipv4_ip_addr_to_sockaddr_in(input_ip: ip::ip_addr, port: uint) -> uv::ll::sockaddr_in unsafe { - uv::ll::ip4_addr(ip::format_addr(input), port as int) + // FIXME ipv6 + alt input_ip { + ip::ipv4(_,_,_,_) { + uv::ll::ip4_addr(ip::format_addr(input_ip), port as int) + } + _ { + fail "FIXME ipv6 not yet supported"; + } + } } //#[cfg(test)] mod test { #[test] - fn test_gl_tcp_ipv4_request() { - impl_gl_tcp_ipv4_request(); - } - fn impl_gl_tcp_ipv4_request() { - // pre-connection/input data + fn test_gl_tcp_ipv4_client() { let ip_str = "173.194.79.99"; let port = 80u; - let expected_read_msg = "foo"; - let actual_write_msg = "GET / HTTP/1.1\r\n\r\n"; - let host_ip = ip::v4::parse_addr(ip_str); + let write_input = "GET / HTTP/1.1\r\n\r\n"; + let read_output = + impl_gl_tcp_ipv4_client(ip_str, port, write_input); + log(debug, "DATA RECEIVED: "+read_output); + } - let data_po = comm::port::<[u8]>(); - let data_ch = comm::chan(data_po); + #[test] + fn test_gl_tcp_ipv4_server() { + let server_ip = "127.0.0.1"; + let server_port = 8888u; + let kill_str = "asdf"; + let resp_str = "hw"; + + let result_po = comm::port::<str>(); + let result_ch = comm::chan(result_po); + task::spawn_sched(task::manual_threads(4u)) {|| + let inner_result_po = comm::port::<str>(); + let inner_result_ch = comm::chan(inner_result_po); + + impl_gl_tcp_ipv4_server(server_ip, server_port, + kill_str, resp_str, + inner_result_ch); + let result_str = comm::recv(inner_result_po); + comm::send(result_ch, result_str); + }; + let output = comm::recv(result_po); + log(debug, #fmt("RECEIVED REQ %? FROM USER", output)); + } + + fn impl_gl_tcp_ipv4_server(host_str: str, port: uint, + kill_str: str, resp_str: str, + output_ch: comm::chan<str>) { + let host_ip = ip::v4::parse_addr(host_str); + log(debug, "about to enter listen() call for test server"); + listen(host_ip, port, 128u) {|new_conn, kill_ch| + // this is a callback that is going to be invoked on the + // loop's thread (can't be avoided). + let cont_po = comm::port::<()>(); + let cont_ch = comm::chan(cont_po); + task::spawn {|| + log(debug, "starting worker for incoming req"); + + // work loop + let accept_result = accept(new_conn); + if result::is_failure(accept_result) { + // accept failed.. + log(debug,"accept in worker task failed"); + comm::send(kill_ch, + some(result::get_err(accept_result) + .to_tcp_err())); + } + // accept() succeeded, let the task that is + // listen()'ing know so it can continue and + // unblock libuv.. + comm::send(cont_ch, ()); + + // our precious sock.. from here on out, things + // match the tcp request/client api, as they're + // both on tcp_sockets at this point.. + let sock = result::unwrap(accept_result); + let req_bytes = single_read_bytes_from(sock); + let req_str = str::from_bytes(req_bytes); + if str::contains(req_str, kill_str) { + // our signal to shut down the tcp + // server was received. shut it down. + comm::send(kill_ch, none); + } + write_single_str(sock, resp_str); + + comm::send(output_ch, req_str); + // work's complete, let socket close.. + log(debug, "exiting worker"); + }; + + comm::recv(cont_po); + }; + log(debug, "exiting listen() block for test server"); + } + + fn impl_gl_tcp_ipv4_client(ip_str: str, port: uint, + write_input: str) -> str { + // pre-connection/input data + let host_ip = ip::v4::parse_addr(ip_str); // connect to remote host let connect_result = connect(host_ip, port); @@ -486,7 +807,7 @@ mod test { log(debug, "successful tcp connect"); // set up write data - let write_data = [str::as_bytes(actual_write_msg) {|str_bytes| + let write_data = [str::as_bytes(write_input) {|str_bytes| str_bytes }]; @@ -538,9 +859,64 @@ mod test { } break; } - comm::send(data_ch, total_read_data); - let actual_data = comm::recv(data_po); - let resp = str::from_bytes(actual_data); - log(debug, "DATA RECEIVED: "+resp); + str::from_bytes(total_read_data) } -} \ No newline at end of file + + fn single_read_bytes_from(sock: tcp_socket) -> [u8] { + let mut total_read_data: [u8] = []; + let read_start_result = read_start(sock); + if result::is_failure(read_start_result) { + let err_data = result::get_err(read_start_result); + log(debug, "srbf tcp read_start err received.."); + log(debug, #fmt("srbf read_start error: %? %?", + err_data.err_name, + err_data.err_msg)); + assert false; + } + let reader_po = result::get(read_start_result); + + let read_data_result = comm::recv(reader_po); + if result::is_failure(read_data_result) { + let err_data = result::get_err(read_data_result); + log(debug, "srbf read error data recv'd"); + log(debug, #fmt("srbf read error: %? %?", + err_data.err_name, + err_data.err_msg)); + assert false; + } + let new_data = result::unwrap(read_data_result); + total_read_data += new_data; + // theoretically, we could keep iterating, if + // we expect the server on the other end to keep + // streaming/chunking data to us, but.. + let read_stop_result = read_stop(sock); + if result::is_failure(read_stop_result) { + let err_data = result::get_err(read_stop_result); + log(debug, "srbf error while calling read_stop"); + log(debug, #fmt("srbf read_stop error: %? %?", + err_data.err_name, + err_data.err_msg)); + assert false; + } + total_read_data + } + + fn write_single_str(sock: tcp_socket, write_input: str) { + // set up write data + let write_data = [str::as_bytes(write_input) {|str_bytes| + str_bytes + }]; + + // write data to tcp socket + let write_result = write(sock, write_data); + if result::is_failure(write_result) { + let err_data = result::get_err(write_result); + log(debug, "wss tcp_write_error received.."); + log(debug, #fmt("wss tcp write error: %? %?", + err_data.err_name, + err_data.err_msg)); + assert false; + } + log(debug, "wss tcp::write successful"); + } +}