std: first-pass at a tcp server API, with a basic (non-robust) test
also whitespace cleanup .. for now, the test just spins up the server and listens for messages, echoing them back to an output port. there's a "kill" msg that it will listen for. need to point the tcp client and server test impls at each other for a loopback server/client test, like how its done in uv::ll once ipv6 parse/format lands, i can add another test using the entirely same codebase, but substituting an ip_addr ipv6 varient for the ipv4 varient used in the existing code still need some other plumbing to get the client/server tests to work together.
This commit is contained in:
parent
f2fd3bcf17
commit
465412aeff
@ -6,4 +6,4 @@ import tcp = net_tcp;
|
||||
export tcp;
|
||||
|
||||
import ip = net_ip;
|
||||
export ip;
|
||||
export ip;
|
||||
|
@ -5,7 +5,7 @@ High-level interface to libuv's TCP functionality
|
||||
import ip = net_ip;
|
||||
|
||||
export tcp_socket, tcp_err_data;
|
||||
export connect, write, read_start, read_stop;
|
||||
export connect, write, read_start, read_stop, listen, accept;
|
||||
|
||||
#[doc="
|
||||
Encapsulates an open TCP/IP connection through libuv
|
||||
@ -279,8 +279,246 @@ fn read_stop(sock: tcp_socket) ->
|
||||
}
|
||||
}
|
||||
|
||||
#[doc="
|
||||
Bind to a given IP/port and listen for new connections
|
||||
|
||||
# Arguments
|
||||
|
||||
* `host_ip` - a `net::ip::ip_addr` representing a unique IP
|
||||
(versions 4 or 6)
|
||||
* `port` - a uint representing the port to listen on
|
||||
* `backlog` - a uint representing the number of incoming connections
|
||||
to cache in memory
|
||||
* `new_connect_cb` - a callback to be evaluated, on the libuv thread,
|
||||
whenever a client attempts to conect on the provided ip/port
|
||||
"]
|
||||
fn listen(host_ip: ip::ip_addr, port: uint, backlog: uint,
|
||||
new_connect_cb: fn~(tcp_new_connection,
|
||||
comm::chan<option<tcp_err_data>>))
|
||||
-> result::result<(), tcp_err_data> unsafe {
|
||||
let stream_closed_po = comm::port::<()>();
|
||||
let kill_po = comm::port::<option<tcp_err_data>>();
|
||||
let server_stream = uv::ll::tcp_t();
|
||||
let server_stream_ptr = ptr::addr_of(server_stream);
|
||||
let hl_loop = uv::global_loop::get();
|
||||
let server_data = {
|
||||
server_stream_ptr: server_stream_ptr,
|
||||
stream_closed_ch: comm::chan(stream_closed_po),
|
||||
kill_ch: comm::chan(kill_po),
|
||||
new_connect_cb: new_connect_cb,
|
||||
hl_loop: hl_loop,
|
||||
mut active: true
|
||||
};
|
||||
let server_data_ptr = ptr::addr_of(server_data);
|
||||
|
||||
let setup_po = comm::port::<option<tcp_err_data>>();
|
||||
let setup_ch = comm::chan(setup_po);
|
||||
uv::hl::interact(hl_loop) {|loop_ptr|
|
||||
let tcp_addr = ipv4_ip_addr_to_sockaddr_in(host_ip,
|
||||
port);
|
||||
alt uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
|
||||
0i32 {
|
||||
alt uv::ll::tcp_bind(server_stream_ptr,
|
||||
ptr::addr_of(tcp_addr)) {
|
||||
0i32 {
|
||||
alt uv::ll::listen(server_stream_ptr,
|
||||
backlog as libc::c_int,
|
||||
tcp_listen_on_connection_cb) {
|
||||
0i32 {
|
||||
uv::ll::set_data_for_uv_handle(
|
||||
server_stream_ptr,
|
||||
server_data_ptr);
|
||||
comm::send(setup_ch, none);
|
||||
}
|
||||
_ {
|
||||
log(debug, "failure to uv_listen()");
|
||||
let err_data = uv::ll::get_last_err_data(loop_ptr);
|
||||
comm::send(setup_ch, some(err_data));
|
||||
}
|
||||
}
|
||||
}
|
||||
_ {
|
||||
log(debug, "failure to uv_tcp_bind");
|
||||
let err_data = uv::ll::get_last_err_data(loop_ptr);
|
||||
comm::send(setup_ch, some(err_data));
|
||||
}
|
||||
}
|
||||
}
|
||||
_ {
|
||||
log(debug, "failure to uv_tcp_init");
|
||||
let err_data = uv::ll::get_last_err_data(loop_ptr);
|
||||
comm::send(setup_ch, some(err_data));
|
||||
}
|
||||
}
|
||||
};
|
||||
let mut kill_result: option<tcp_err_data> = none;
|
||||
alt comm::recv(setup_po) {
|
||||
some(err_data) {
|
||||
// we failed to bind/list w/ libuv
|
||||
result::err(err_data.to_tcp_err())
|
||||
}
|
||||
none {
|
||||
kill_result = comm::recv(kill_po);
|
||||
uv::hl::interact(hl_loop) {|loop_ptr|
|
||||
log(debug, #fmt("tcp::listen post-kill recv hl interact %?",
|
||||
loop_ptr));
|
||||
(*server_data_ptr).active = false;
|
||||
uv::ll::close(server_stream_ptr, tcp_listen_close_cb);
|
||||
};
|
||||
comm::recv(stream_closed_po);
|
||||
alt kill_result {
|
||||
// some failure post bind/listen
|
||||
some(err_data) {
|
||||
result::err(err_data)
|
||||
}
|
||||
// clean exit
|
||||
none {
|
||||
result::ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[doc="
|
||||
Bind an incoming client connection to a `net::tcp::tcp_socket`
|
||||
|
||||
# Notes
|
||||
|
||||
It is safe to call `net::tcp::accept` _only_ within the callback
|
||||
provided as the final argument of the `net::tcp::listen` function.
|
||||
|
||||
The `new_conn` opaque value provided _only_ as the first argument to the
|
||||
`new_connect_cb`. It can be safely sent to another task but it _must_ be
|
||||
used (via `net::tcp::accept`) before the `new_connect_cb` call it was
|
||||
provided within returns.
|
||||
|
||||
This means that a port/chan pair must be used to make sure that the
|
||||
`new_connect_cb` call blocks until an attempt to create a
|
||||
`net::tcp::tcp_socket` is completed.
|
||||
|
||||
# Arguments
|
||||
|
||||
* `new_conn` - an opaque value used to create a new `tcp_socket`
|
||||
|
||||
# Returns
|
||||
|
||||
* Success
|
||||
* On success, this function will return a `net::tcp::tcp_socket` as the
|
||||
`ok` variant of a `result`. The `net::tcp::tcp_socket` is anchored within
|
||||
the task that `accept` was called within for its lifetime.
|
||||
* Failure
|
||||
* On failure, this function will return a `net::tcp::tcp_err_data` record
|
||||
as the `err` variant of a `result`.
|
||||
"]
|
||||
fn accept(new_conn: tcp_new_connection)
|
||||
-> result::result<tcp_socket, tcp_err_data> unsafe {
|
||||
|
||||
alt new_conn{
|
||||
new_tcp_conn(server_handle_ptr) {
|
||||
let server_data_ptr = uv::ll::get_data_for_uv_handle(
|
||||
server_handle_ptr) as *tcp_server_data;
|
||||
let hl_loop = (*server_data_ptr).hl_loop;// FIXME
|
||||
let reader_po = comm::port::<result::result<[u8], tcp_err_data>>();
|
||||
let client_socket_data = @{
|
||||
reader_po: reader_po,
|
||||
reader_ch: comm::chan(reader_po),
|
||||
stream_handle : uv::ll::tcp_t(),
|
||||
connect_req : uv::ll::connect_t(),
|
||||
write_req : uv::ll::write_t(),
|
||||
hl_loop: hl_loop
|
||||
};
|
||||
let client_socket_data_ptr = ptr::addr_of(*client_socket_data);
|
||||
let client_stream_handle_ptr = ptr::addr_of(
|
||||
(*client_socket_data_ptr).stream_handle);
|
||||
|
||||
let result_po = comm::port::<option<tcp_err_data>>();
|
||||
let result_ch = comm::chan(result_po);
|
||||
uv::hl::interact(hl_loop) {|loop_ptr|
|
||||
log(debug, "in interact cb for tcp::accept");
|
||||
alt uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
|
||||
0i32 {
|
||||
log(debug, "uv_tcp_init successful for client stream");
|
||||
alt uv::ll::accept(server_handle_ptr,
|
||||
client_stream_handle_ptr) {
|
||||
0i32 {
|
||||
log(debug, "successfully accepted client connection");
|
||||
comm::send(result_ch, none);
|
||||
}
|
||||
_ {
|
||||
log(debug, "failed to accept client conn");
|
||||
comm::send(result_ch, some(
|
||||
uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
|
||||
}
|
||||
}
|
||||
}
|
||||
_ {
|
||||
log(debug, "failed to init client stream");
|
||||
comm::send(result_ch, some(
|
||||
uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
|
||||
}
|
||||
}
|
||||
};
|
||||
alt comm::recv(result_po) {
|
||||
some(err_data) {
|
||||
result::err(err_data)
|
||||
}
|
||||
none {
|
||||
result::ok(tcp_socket(client_socket_data))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// INTERNAL API
|
||||
|
||||
enum tcp_new_connection {
|
||||
new_tcp_conn(*uv::ll::uv_tcp_t)
|
||||
}
|
||||
|
||||
type tcp_server_data = {
|
||||
server_stream_ptr: *uv::ll::uv_tcp_t,
|
||||
stream_closed_ch: comm::chan<()>,
|
||||
kill_ch: comm::chan<option<tcp_err_data>>,
|
||||
new_connect_cb: fn~(tcp_new_connection,
|
||||
comm::chan<option<tcp_err_data>>),
|
||||
hl_loop: uv::hl::high_level_loop,
|
||||
mut active: bool
|
||||
};
|
||||
|
||||
crust fn tcp_listen_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
|
||||
let server_data_ptr = uv::ll::get_data_for_uv_handle(
|
||||
handle) as *tcp_server_data;
|
||||
comm::send((*server_data_ptr).stream_closed_ch, ());
|
||||
}
|
||||
|
||||
crust fn tcp_listen_on_connection_cb(handle: *uv::ll::uv_tcp_t,
|
||||
status: libc::c_int) unsafe {
|
||||
let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
|
||||
as *tcp_server_data;
|
||||
let kill_ch = (*server_data_ptr).kill_ch;
|
||||
alt (*server_data_ptr).active {
|
||||
true {
|
||||
alt status {
|
||||
0i32 {
|
||||
let new_conn = new_tcp_conn(handle);
|
||||
(*server_data_ptr).new_connect_cb(new_conn, kill_ch);
|
||||
}
|
||||
_ {
|
||||
let loop_ptr = uv::ll::get_loop_for_uv_handle(handle);
|
||||
comm::send(kill_ch,
|
||||
some(uv::ll::get_last_err_data(loop_ptr)
|
||||
.to_tcp_err()));
|
||||
(*server_data_ptr).active = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum tcp_connect_result {
|
||||
tcp_connected(tcp_socket),
|
||||
tcp_connect_error(tcp_err_data)
|
||||
@ -315,6 +553,7 @@ impl of to_tcp_err_iface for uv::ll::uv_err_data {
|
||||
crust fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
|
||||
nread: libc::ssize_t,
|
||||
++buf: uv::ll::uv_buf_t) unsafe {
|
||||
log(debug, "entering on_tcp_read_cb");
|
||||
let loop_ptr = uv::ll::get_loop_for_uv_handle(stream);
|
||||
let socket_data_ptr = uv::ll::get_data_for_uv_handle(stream)
|
||||
as *tcp_socket_data;
|
||||
@ -338,6 +577,7 @@ crust fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
|
||||
}
|
||||
}
|
||||
uv::ll::free_base_of_buf(buf);
|
||||
log(debug, "exiting on_tcp_read_cb");
|
||||
}
|
||||
|
||||
crust fn on_alloc_cb(handle: *libc::c_void,
|
||||
@ -447,27 +687,108 @@ type tcp_socket_data = {
|
||||
};
|
||||
|
||||
// convert rust ip_addr to libuv's native representation
|
||||
fn ipv4_ip_addr_to_sockaddr_in(input: ip::ip_addr,
|
||||
fn ipv4_ip_addr_to_sockaddr_in(input_ip: ip::ip_addr,
|
||||
port: uint) -> uv::ll::sockaddr_in unsafe {
|
||||
uv::ll::ip4_addr(ip::format_addr(input), port as int)
|
||||
// FIXME ipv6
|
||||
alt input_ip {
|
||||
ip::ipv4(_,_,_,_) {
|
||||
uv::ll::ip4_addr(ip::format_addr(input_ip), port as int)
|
||||
}
|
||||
_ {
|
||||
fail "FIXME ipv6 not yet supported";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//#[cfg(test)]
|
||||
mod test {
|
||||
#[test]
|
||||
fn test_gl_tcp_ipv4_request() {
|
||||
impl_gl_tcp_ipv4_request();
|
||||
}
|
||||
fn impl_gl_tcp_ipv4_request() {
|
||||
// pre-connection/input data
|
||||
fn test_gl_tcp_ipv4_client() {
|
||||
let ip_str = "173.194.79.99";
|
||||
let port = 80u;
|
||||
let expected_read_msg = "foo";
|
||||
let actual_write_msg = "GET / HTTP/1.1\r\n\r\n";
|
||||
let host_ip = ip::v4::parse_addr(ip_str);
|
||||
let write_input = "GET / HTTP/1.1\r\n\r\n";
|
||||
let read_output =
|
||||
impl_gl_tcp_ipv4_client(ip_str, port, write_input);
|
||||
log(debug, "DATA RECEIVED: "+read_output);
|
||||
}
|
||||
|
||||
let data_po = comm::port::<[u8]>();
|
||||
let data_ch = comm::chan(data_po);
|
||||
#[test]
|
||||
fn test_gl_tcp_ipv4_server() {
|
||||
let server_ip = "127.0.0.1";
|
||||
let server_port = 8888u;
|
||||
let kill_str = "asdf";
|
||||
let resp_str = "hw";
|
||||
|
||||
let result_po = comm::port::<str>();
|
||||
let result_ch = comm::chan(result_po);
|
||||
task::spawn_sched(task::manual_threads(4u)) {||
|
||||
let inner_result_po = comm::port::<str>();
|
||||
let inner_result_ch = comm::chan(inner_result_po);
|
||||
|
||||
impl_gl_tcp_ipv4_server(server_ip, server_port,
|
||||
kill_str, resp_str,
|
||||
inner_result_ch);
|
||||
let result_str = comm::recv(inner_result_po);
|
||||
comm::send(result_ch, result_str);
|
||||
};
|
||||
let output = comm::recv(result_po);
|
||||
log(debug, #fmt("RECEIVED REQ %? FROM USER", output));
|
||||
}
|
||||
|
||||
fn impl_gl_tcp_ipv4_server(host_str: str, port: uint,
|
||||
kill_str: str, resp_str: str,
|
||||
output_ch: comm::chan<str>) {
|
||||
let host_ip = ip::v4::parse_addr(host_str);
|
||||
log(debug, "about to enter listen() call for test server");
|
||||
listen(host_ip, port, 128u) {|new_conn, kill_ch|
|
||||
// this is a callback that is going to be invoked on the
|
||||
// loop's thread (can't be avoided).
|
||||
let cont_po = comm::port::<()>();
|
||||
let cont_ch = comm::chan(cont_po);
|
||||
task::spawn {||
|
||||
log(debug, "starting worker for incoming req");
|
||||
|
||||
// work loop
|
||||
let accept_result = accept(new_conn);
|
||||
if result::is_failure(accept_result) {
|
||||
// accept failed..
|
||||
log(debug,"accept in worker task failed");
|
||||
comm::send(kill_ch,
|
||||
some(result::get_err(accept_result)
|
||||
.to_tcp_err()));
|
||||
}
|
||||
// accept() succeeded, let the task that is
|
||||
// listen()'ing know so it can continue and
|
||||
// unblock libuv..
|
||||
comm::send(cont_ch, ());
|
||||
|
||||
// our precious sock.. from here on out, things
|
||||
// match the tcp request/client api, as they're
|
||||
// both on tcp_sockets at this point..
|
||||
let sock = result::unwrap(accept_result);
|
||||
let req_bytes = single_read_bytes_from(sock);
|
||||
let req_str = str::from_bytes(req_bytes);
|
||||
if str::contains(req_str, kill_str) {
|
||||
// our signal to shut down the tcp
|
||||
// server was received. shut it down.
|
||||
comm::send(kill_ch, none);
|
||||
}
|
||||
write_single_str(sock, resp_str);
|
||||
|
||||
comm::send(output_ch, req_str);
|
||||
// work's complete, let socket close..
|
||||
log(debug, "exiting worker");
|
||||
};
|
||||
|
||||
comm::recv(cont_po);
|
||||
};
|
||||
log(debug, "exiting listen() block for test server");
|
||||
}
|
||||
|
||||
fn impl_gl_tcp_ipv4_client(ip_str: str, port: uint,
|
||||
write_input: str) -> str {
|
||||
// pre-connection/input data
|
||||
let host_ip = ip::v4::parse_addr(ip_str);
|
||||
|
||||
// connect to remote host
|
||||
let connect_result = connect(host_ip, port);
|
||||
@ -486,7 +807,7 @@ mod test {
|
||||
log(debug, "successful tcp connect");
|
||||
|
||||
// set up write data
|
||||
let write_data = [str::as_bytes(actual_write_msg) {|str_bytes|
|
||||
let write_data = [str::as_bytes(write_input) {|str_bytes|
|
||||
str_bytes
|
||||
}];
|
||||
|
||||
@ -538,9 +859,64 @@ mod test {
|
||||
}
|
||||
break;
|
||||
}
|
||||
comm::send(data_ch, total_read_data);
|
||||
let actual_data = comm::recv(data_po);
|
||||
let resp = str::from_bytes(actual_data);
|
||||
log(debug, "DATA RECEIVED: "+resp);
|
||||
str::from_bytes(total_read_data)
|
||||
}
|
||||
}
|
||||
|
||||
fn single_read_bytes_from(sock: tcp_socket) -> [u8] {
|
||||
let mut total_read_data: [u8] = [];
|
||||
let read_start_result = read_start(sock);
|
||||
if result::is_failure(read_start_result) {
|
||||
let err_data = result::get_err(read_start_result);
|
||||
log(debug, "srbf tcp read_start err received..");
|
||||
log(debug, #fmt("srbf read_start error: %? %?",
|
||||
err_data.err_name,
|
||||
err_data.err_msg));
|
||||
assert false;
|
||||
}
|
||||
let reader_po = result::get(read_start_result);
|
||||
|
||||
let read_data_result = comm::recv(reader_po);
|
||||
if result::is_failure(read_data_result) {
|
||||
let err_data = result::get_err(read_data_result);
|
||||
log(debug, "srbf read error data recv'd");
|
||||
log(debug, #fmt("srbf read error: %? %?",
|
||||
err_data.err_name,
|
||||
err_data.err_msg));
|
||||
assert false;
|
||||
}
|
||||
let new_data = result::unwrap(read_data_result);
|
||||
total_read_data += new_data;
|
||||
// theoretically, we could keep iterating, if
|
||||
// we expect the server on the other end to keep
|
||||
// streaming/chunking data to us, but..
|
||||
let read_stop_result = read_stop(sock);
|
||||
if result::is_failure(read_stop_result) {
|
||||
let err_data = result::get_err(read_stop_result);
|
||||
log(debug, "srbf error while calling read_stop");
|
||||
log(debug, #fmt("srbf read_stop error: %? %?",
|
||||
err_data.err_name,
|
||||
err_data.err_msg));
|
||||
assert false;
|
||||
}
|
||||
total_read_data
|
||||
}
|
||||
|
||||
fn write_single_str(sock: tcp_socket, write_input: str) {
|
||||
// set up write data
|
||||
let write_data = [str::as_bytes(write_input) {|str_bytes|
|
||||
str_bytes
|
||||
}];
|
||||
|
||||
// write data to tcp socket
|
||||
let write_result = write(sock, write_data);
|
||||
if result::is_failure(write_result) {
|
||||
let err_data = result::get_err(write_result);
|
||||
log(debug, "wss tcp_write_error received..");
|
||||
log(debug, #fmt("wss tcp write error: %? %?",
|
||||
err_data.err_name,
|
||||
err_data.err_msg));
|
||||
assert false;
|
||||
}
|
||||
log(debug, "wss tcp::write successful");
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user