From f4b87c749fc5dc085cd31ba3b5f91f11d863e0fa Mon Sep 17 00:00:00 2001 From: Rob Arnold Date: Wed, 15 Jun 2011 22:04:31 -0700 Subject: [PATCH] Basic async IO module using libuv --- mk/rt.mk | 1 + src/lib/aio.rs | 184 ++++++++++++++++++++ src/lib/sio.rs | 104 +++++++++++ src/lib/std.rc | 2 + src/rt/rust_log.cpp | 2 + src/rt/rust_log.h | 1 + src/rt/rust_upcall.cpp | 15 +- src/rt/rust_upcall.h | 17 ++ src/rt/rust_uv.cpp | 300 ++++++++++++++++++++++++++++++++ src/rt/rustrt.def.in | 10 ++ src/test/run-pass/ivec-tag.rs | 14 ++ src/test/run-pass/sio-client.rs | 21 +++ src/test/run-pass/sio-ctx.rs | 7 + src/test/run-pass/sio-read.rs | 25 +++ src/test/run-pass/sio-srv.rs | 9 + src/test/run-pass/sio-write.rs | 24 +++ 16 files changed, 722 insertions(+), 14 deletions(-) create mode 100644 src/lib/aio.rs create mode 100644 src/lib/sio.rs create mode 100644 src/rt/rust_upcall.h create mode 100644 src/rt/rust_uv.cpp create mode 100644 src/test/run-pass/ivec-tag.rs create mode 100644 src/test/run-pass/sio-client.rs create mode 100644 src/test/run-pass/sio-ctx.rs create mode 100644 src/test/run-pass/sio-read.rs create mode 100644 src/test/run-pass/sio-srv.rs create mode 100644 src/test/run-pass/sio-write.rs diff --git a/mk/rt.mk b/mk/rt.mk index 8f6f96b17e8..ed3e5422923 100644 --- a/mk/rt.mk +++ b/mk/rt.mk @@ -17,6 +17,7 @@ RUNTIME_CS := rt/sync/timer.cpp \ rt/rust_chan.cpp \ rt/rust_port.cpp \ rt/rust_upcall.cpp \ + rt/rust_uv.cpp \ rt/rust_log.cpp \ rt/rust_timer.cpp \ rt/circular_buffer.cpp \ diff --git a/src/lib/aio.rs b/src/lib/aio.rs new file mode 100644 index 00000000000..b6f5730d850 --- /dev/null +++ b/src/lib/aio.rs @@ -0,0 +1,184 @@ +import str::sbuf; +import vec::vbuf; + +native "rust" mod rustrt { + type socket; + type server; + fn aio_init(); + fn aio_run(); + fn aio_stop(); + fn aio_connect(host: sbuf, port: int, connected: chan[socket]); + fn aio_serve(host: sbuf, port: int, acceptChan: chan[socket]) -> server; + fn aio_writedata(s: socket, buf: *u8, size: uint, status: chan[bool]); + fn aio_read(s: socket, reader: chan[u8[]]); + fn aio_close_server(s: server, status: chan[bool]); + fn aio_close_socket(s: socket); + fn aio_is_null_client(s: socket) -> bool; +} + +type server = rustrt::server; +type client = rustrt::socket; +tag pending_connection { + remote(str,int); + incoming(server); +} + +tag socket_event { + connected(client); + closed; + received(u8[]); +} + +tag server_event { + pending(chan[chan[socket_event]]); +} + +tag request { + quit; + connect(pending_connection,chan[socket_event]); + serve(str,int,chan[server_event],chan[server]); + write(client,u8[],chan[bool]); + close_server(server, chan[bool]); + close_client(client); +} + +type ctx = chan[request]; + +fn connect_task(ip: str, portnum: int, evt: chan[socket_event]) { + let connecter: port[client] = port(); + rustrt::aio_connect(str::buf(ip), portnum, chan(connecter)); + let client: client; + connecter |> client; + new_client(client, evt); +} + +fn new_client(client: client, evt: chan[socket_event]) { + // Start the read before notifying about the connect. This avoids a race + // condition where the receiver can close the socket before we start + // reading. + let reader: port[u8[]] = port(); + rustrt::aio_read(client, chan(reader)); + + evt <| connected(client); + + while (true) { + log "waiting for bytes"; + let data: u8[]; + reader |> data; + log "got some bytes"; + log ivec::len[u8](data); + if (ivec::len[u8](data) == 0u) { + log "got empty buffer, bailing"; + break; + } + log "got non-empty buffer, sending"; + evt <| received(data); + log "sent non-empty buffer"; + } + log "done reading"; + evt <| closed; + log "close message sent"; +} + +fn accept_task(client: client, events: chan[server_event]) { + log "accept task was spawned"; + let p: port[chan[socket_event]] = port(); + events <| pending(chan(p)); + let evt: chan[socket_event]; + p |> evt; + new_client(client, evt); + log "done accepting"; +} + +fn server_task(ip: str, portnum: int, events: chan[server_event], + server: chan[server]) { + let accepter: port[client] = port(); + server <| rustrt::aio_serve(str::buf(ip), portnum, chan(accepter)); + + let client: client; + while (true) { + log "preparing to accept a client"; + accepter |> client; + if (rustrt::aio_is_null_client(client)) { + log "client was actually null, returning"; + ret; + } else { + spawn accept_task(client, events); + } + } +} + +fn request_task(c: chan[ctx]) { + // Create a port to accept IO requests on + let p: port[request] = port(); + // Hand of its channel to our spawner + c <| chan(p); + log "uv run task spawned"; + // Spin for requests + let req: request; + while (true) { + p |> req; + alt req { + quit. { + log "got quit message"; + + log "stopping libuv"; + rustrt::aio_stop(); + ret; + } + connect(remote(ip,portnum),client) { + spawn connect_task(ip, portnum, client); + } + serve(ip,portnum,events,server) { + spawn server_task(ip, portnum, events, server); + } + write(socket,v,status) { + rustrt::aio_writedata(socket, + ivec::to_ptr[u8](v), ivec::len[u8](v), + status); + } + close_server(server,status) { + log "closing server"; + rustrt::aio_close_server(server,status); + } + close_client(client) { + log "closing client"; + rustrt::aio_close_socket(client); + } + } + } +} + +fn iotask(c: chan[ctx]) { + log "io task spawned"; + // Initialize before accepting requests + rustrt::aio_init(); + + log "io task init"; + // Spawn our request task + let reqtask: task = spawn request_task(c); + + log "uv run task init"; + // Enter IO loop. This never returns until aio_stop is called. + rustrt::aio_run(); + log "waiting for request task to finish"; + + task::join(reqtask); +} + +fn new() -> ctx { + let p: port[ctx] = port(); + let t: task = spawn iotask(chan(p)); + let cx: ctx; + p |> cx; + ret cx; +} + +// Local Variables: +// mode: rust; +// fill-column: 78; +// indent-tabs-mode: nil +// c-basic-offset: 4 +// buffer-file-coding-system: utf-8-unix +// compile-command: "make -k -C .. 2>&1 | sed -e 's/\\/x\\//x:\\//g'"; +// End: diff --git a/src/lib/sio.rs b/src/lib/sio.rs new file mode 100644 index 00000000000..1ba432c0e8f --- /dev/null +++ b/src/lib/sio.rs @@ -0,0 +1,104 @@ +type ctx = aio::ctx; +type client = { ctx: ctx, client: aio::client, evt: port[aio::socket_event] }; +type server = { ctx: ctx, server: aio::server, evt: port[aio::server_event] }; + +fn new() -> ctx { + ret aio::new(); +} + +fn destroy(ctx: ctx) { + ctx <| aio::quit; +} + +fn make_socket(ctx: ctx, p: port[aio::socket_event]) -> client { + let evt: aio::socket_event; + p |> evt; + alt evt { + aio::connected(client) { + ret { ctx: ctx, client: client, evt: p }; + } + } + log_err ("Could not connect to client"); + fail; +} + +fn connect_to(ctx: ctx, ip: str, portnum: int) -> client { + let p: port[aio::socket_event] = port(); + ctx <| aio::connect(aio::remote(ip, portnum), chan(p)); + ret make_socket(ctx, p); +} + +fn read(c: client) -> u8[] { + let evt: aio::socket_event; + c.evt |> evt; + alt evt { + aio::closed. { + ret ~[]; + } + aio::received(buf) { + ret buf; + } + } +} + +fn create_server(ctx: ctx, ip: str, portnum: int) -> server { + let evt: port[aio::server_event] = port(); + let p: port[aio::server] = port(); + ctx <| aio::serve(ip, portnum, chan(evt), chan(p)); + let srv: aio::server; + p |> srv; + ret { ctx: ctx, server: srv, evt: evt }; +} + +fn accept_from(server: server) -> client { + let evt: aio::server_event; + server.evt |> evt; + alt evt { + aio::pending(callback) { + let p: port[aio::socket_event] = port(); + callback <| chan(p); + ret make_socket(server.ctx, p); + } + } +} + +fn write_data(c: client, data: u8[]) -> bool { + let p: port[bool] = port(); + c.ctx <| aio::write(c.client, data, chan(p)); + let success: bool; + p |> success; + ret success; +} + +fn close_server(server: server) { + // TODO: make this unit once we learn to send those from native code + let p: port[bool] = port(); + server.ctx <| aio::close_server(server.server, chan(p)); + let success: bool; + log "Waiting for close"; + p |> success; + log "Got close"; +} + +fn close_client(client: client) { + client.ctx <| aio::close_client(client.client); + let evt: aio::socket_event; + do { + client.evt |> evt; + alt evt { + aio::closed. { + ret; + } + _ {} + } + } while (true); +} + +// Local Variables: +// mode: rust; +// fill-column: 78; +// indent-tabs-mode: nil +// c-basic-offset: 4 +// buffer-file-coding-system: utf-8-unix +// compile-command: "make -k -C .. 2>&1 | sed -e 's/\\/x\\//x:\\//g'"; +// End: diff --git a/src/lib/std.rc b/src/lib/std.rc index 79903102d7e..bd864f3973a 100644 --- a/src/lib/std.rc +++ b/src/lib/std.rc @@ -70,6 +70,8 @@ mod os_fs = "posix_fs.rs"; mod run = "run_program.rs"; mod fs; +mod aio; +mod sio; // FIXME: parametric mod map; diff --git a/src/rt/rust_log.cpp b/src/rt/rust_log.cpp index c23a38bc1da..b4d26c780e6 100644 --- a/src/rt/rust_log.cpp +++ b/src/rt/rust_log.cpp @@ -197,6 +197,7 @@ size_t log_rt_gc; size_t log_rt_stdlib; size_t log_rt_kern; size_t log_rt_backtrace; +size_t log_rt_callback; static const mod_entry _rt_module_map[] = {{"::rt::mem", &log_rt_mem}, @@ -211,6 +212,7 @@ static const mod_entry _rt_module_map[] = {"::rt::stdlib", &log_rt_stdlib}, {"::rt::kern", &log_rt_kern}, {"::rt::backtrace", &log_rt_backtrace}, + {"::rt::callback", &log_rt_callback}, {NULL, NULL}}; void update_log_settings(void* crate_map, char* settings) { diff --git a/src/rt/rust_log.h b/src/rt/rust_log.h index 6021125705b..8770fc2f758 100644 --- a/src/rt/rust_log.h +++ b/src/rt/rust_log.h @@ -66,5 +66,6 @@ extern size_t log_rt_gc; extern size_t log_rt_stdlib; extern size_t log_rt_kern; extern size_t log_rt_backtrace; +extern size_t log_rt_callback; #endif /* RUST_LOG_H */ diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 2554868c986..02fa7bc2846 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -1,21 +1,8 @@ #include "rust_internal.h" +#include "rust_upcall.h" // Upcalls. -#ifdef __GNUC__ -#define LOG_UPCALL_ENTRY(task) \ - LOG(task, upcall, \ - "> UPCALL %s - task: %s 0x%" PRIxPTR \ - " retpc: x%" PRIxPTR, \ - __FUNCTION__, \ - (task)->name, (task), \ - __builtin_return_address(0)); -#else -#define LOG_UPCALL_ENTRY(task) \ - LOG(task, upcall, "> UPCALL task: %s @x%" PRIxPTR, \ - (task)->name, (task)); -#endif - extern "C" CDECL char const * str_buf(rust_task *task, rust_str *s); diff --git a/src/rt/rust_upcall.h b/src/rt/rust_upcall.h new file mode 100644 index 00000000000..e5ee5bb6948 --- /dev/null +++ b/src/rt/rust_upcall.h @@ -0,0 +1,17 @@ +#pragma once + +#ifdef __GNUC__ +#define LOG_UPCALL_ENTRY(task) \ + LOG(task, upcall, \ + "> UPCALL %s - task: %s 0x%" PRIxPTR \ + " retpc: x%" PRIxPTR, \ + __FUNCTION__, \ + (task)->name, (task), \ + __builtin_return_address(0)); +#else +#define LOG_UPCALL_ENTRY(task) \ + LOG(task, upcall, "> UPCALL task: %s @x%" PRIxPTR, \ + (task)->name, (task)); +#endif + + diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp new file mode 100644 index 00000000000..9e4e15b61aa --- /dev/null +++ b/src/rt/rust_uv.cpp @@ -0,0 +1,300 @@ +#include "rust_internal.h" +#include "rust_upcall.h" +// Disable libev prototypes - they will make inline compatability functions +// which are unused and so trigger a warning in gcc since -Wall is on. +#define EV_PROTOTYPES 0 +#include "libuv/uv.h" + +#ifdef __GNUC__ +#define LOG_CALLBACK_ENTRY(p) \ + LOG(iotask, callback, "> IO CALLBACK %s %p", __FUNCTION__, p) +#else +#define LOG_CALLBACK_ENTRY(p) \ + LOG(iotask, callback, "> IO CALLBACK %s:%d %p", __FILE__, __LINE__, p) +#endif + +// The primary task which is running the event loop. This is used to dispatch +// all the notifications back to rust so we clone all passed in channels to +// this task. +static rust_task *iotask = NULL; + +struct socket_data : public task_owned { + // Either the task that the connection attempt was made from or the task + // that the server was spawned on. + rust_task *task; + // Channel for reporting the status of a connection attempt + // For connections from servers, this is always null + // For server sockets, this is used to send the notification that the server + // was closed. + rust_chan *chan; + // Channel to a port which receives bytes from this socket + rust_chan *reader; + uv_tcp_t socket; + + ~socket_data() { + if (chan) + chan->deref(); + if (reader) + reader->deref(); + } +}; + +struct request : public uv_req_t, public task_owned { + rust_task *task; + // Used for notifying about completion of connections, writes + rust_chan *chan; + request(socket_data *data, rust_chan *chan, + void (*cb)(request *req, int status)) { + uv_req_init(this, (uv_handle_t*)&data->socket, (void*)cb); + this->data = data; + this->task = data->task; + this->chan = chan->clone(iotask); + } + socket_data *socket() { + return (socket_data*)data; + } + void send_result(void *data) { + chan->send(&data); + chan->deref(); + chan = NULL; + } +}; + +extern "C" CDECL void aio_close_socket(rust_task *task, socket_data *); + +static uv_idle_s idle_handler; + +static void idle_callback(uv_handle_t* handle, int status) { + rust_task *task = reinterpret_cast(handle->data); + task->yield(); +} + +extern "C" CDECL void aio_init(rust_task *task) { + LOG_UPCALL_ENTRY(task); + iotask = task; + uv_init(); + uv_idle_init(&idle_handler); + uv_idle_start(&idle_handler, idle_callback); +} + +extern "C" CDECL void aio_run(rust_task *task) { + LOG_UPCALL_ENTRY(task); + idle_handler.data = task; + uv_run(); +} + +void nop_close(uv_handle_t* handle) {} + +extern "C" CDECL void aio_stop(rust_task *task) { + LOG_UPCALL_ENTRY(task); + uv_close((uv_handle_t*)&idle_handler, nop_close); +} + +static socket_data *make_socket(rust_task *task, rust_chan *chan) { + socket_data *data = new (task, "make_socket") socket_data; + if (!data || + uv_tcp_init(&data->socket)) { + return NULL; + } + data->task = task; + // Connections from servers don't have a channel + if (chan) { + data->chan = chan->clone(iotask); + } else { + data->chan = NULL; + } + data->socket.data = data; + data->reader = NULL; + return data; +} + +// We allocate the requested space + rust_vec but return a pointer at a +// +rust_vec offset so that it writes the bytes to the correct location. +static uv_buf_t alloc_buffer(uv_tcp_t *socket, size_t suggested_size) { + LOG_CALLBACK_ENTRY(socket); + uv_buf_t buf; + size_t actual_size = suggested_size + sizeof (rust_ivec_heap); + socket_data *data = (socket_data*)socket->data; + char *base = + reinterpret_cast(data->task->kernel->malloc(actual_size, + "read buffer")); + buf.base = base + sizeof (rust_ivec_heap); + buf.len = suggested_size; + return buf; +} + +static void read_progress(uv_tcp_t *socket, ssize_t nread, uv_buf_t buf) { + LOG_CALLBACK_ENTRY(socket); + socket_data *data = (socket_data*)socket->data; + I(data->task->sched, data->reader != NULL); + I(data->task->sched, nread <= ssize_t(buf.len)); + + rust_ivec_heap *base = reinterpret_cast( + reinterpret_cast(buf.base) - sizeof (rust_ivec_heap)); + rust_ivec v; + v.fill = 0; + v.alloc = buf.len; + v.payload.ptr = base; + + switch (nread) { + case -1: // End of stream + base->fill = 0; + uv_read_stop(socket); + break; + case 0: // Nothing read + data->task->kernel->free(base); + return; + default: // Got nread bytes + base->fill = nread; + break; + } + data->reader->send(&v); +} + +static void new_connection(uv_tcp_t *socket, int status) { + LOG_CALLBACK_ENTRY(socket); + socket_data *server = (socket_data*)socket->data; + I(server->task->sched, socket == &server->socket); + // Connections from servers don't have a channel + socket_data *client = make_socket(server->task, NULL); + if (!client) { + server->task->fail(); + return; + } + if (uv_accept(socket, &client->socket)) { + aio_close_socket(client->task, client); + server->task->fail(); + return; + } + server->chan->send(&client); +} + +extern "C" CDECL socket_data *aio_serve(rust_task *task, const char *ip, + int port, rust_chan *chan) { + LOG_UPCALL_ENTRY(task); + struct sockaddr_in addr = uv_ip4_addr(const_cast(ip), port); + socket_data *server = make_socket(iotask, chan); + if (!server) + goto oom; + if (uv_bind(&server->socket, addr) || + uv_listen(&server->socket, 128, new_connection)) { + aio_close_socket(task, server); + return NULL; + } + return server; +oom: + task->fail(); + return NULL; +} + +static void free_socket(uv_handle_t *handle) { + LOG_CALLBACK_ENTRY(socket); + uv_tcp_t *socket = (uv_tcp_t*)handle; + socket_data *data = (socket_data*)socket->data; + I(data->task->sched, socket == &data->socket); + // For client sockets, send a 0-size buffer to indicate that we're done + // reading and should send the close notification. + if (data->reader) { + if (data->reader->is_associated()) { + uv_buf_t buf = alloc_buffer(socket, 0); + read_progress(socket, -1, buf); + uv_read_stop(socket); + } + } else { + // This is a server socket + bool closed = true; + I(data->task->sched, data->chan != NULL); + data->task->kill(); + data->chan->send(&closed); + } + delete data; +} + +extern "C" CDECL void aio_close_socket(rust_task *task, socket_data *client) { + LOG_UPCALL_ENTRY(task); + if (uv_close((uv_handle_t*)&client->socket, free_socket)) { + task->fail(); + } +} + +extern "C" CDECL void aio_close_server(rust_task *task, socket_data *server, + rust_chan *chan) { + LOG_UPCALL_ENTRY(task); + // XXX: hax until rust_task::kill + // send null and the receiver knows to call back into native code to check + void* null_client = NULL; + server->chan->send(&null_client); + server->chan->deref(); + server->chan = chan->clone(iotask); + aio_close_socket(task, server); +} + +extern "C" CDECL bool aio_is_null_client(rust_task *task, + socket_data *server) { + LOG_UPCALL_ENTRY(task); + return server == NULL; +} + +static void connection_complete(request *req, int status) { + LOG_CALLBACK_ENTRY(socket); + socket_data *client = req->socket(); + req->send_result(client); + delete req; +} + +extern "C" CDECL void aio_connect(rust_task *task, const char *host, + int port, rust_chan *chan) { + LOG_UPCALL_ENTRY(task); + struct sockaddr_in addr = uv_ip4_addr(const_cast(host), port); + request *req; + socket_data *client = make_socket(iotask, NULL); + if (!client) { + goto oom_client; + } + req = new (client->task, "connection request") + request(client, chan, connection_complete); + if (!req) { + goto oom_req; + } + if (0 == uv_connect(req, addr)) { + return; + } +oom_req: + aio_close_socket(task, client); +oom_client: + task->fail(); + return; +} + +static void write_complete(request *req, int status) { + LOG_CALLBACK_ENTRY(socket); + bool success = status == 0; + req->send_result(&success); + delete req; +} + +extern "C" CDECL void aio_writedata(rust_task *task, socket_data *data, + char *buf, size_t size, rust_chan *chan) { + LOG_UPCALL_ENTRY(task); + uv_buf_t buffer = { buf, size }; + request *req = new (data->task, "write request") + request(data, chan, write_complete); + if (!req) { + goto fail; + } + if (uv_write(req, &buffer, 1)) { + delete req; + goto fail; + } + return; +fail: + task->fail(); +} + +extern "C" CDECL void aio_read(rust_task *task, socket_data *data, + rust_chan *reader) { + LOG_UPCALL_ENTRY(task); + I(task->sched, data->reader == NULL); + data->reader = reader->clone(iotask); + uv_read_start(&data->socket, alloc_buffer, read_progress); +} diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index e818b9ac83d..6860dbee8d6 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -1,3 +1,13 @@ +aio_close_socket +aio_close_server +aio_connect +aio_init +aio_is_null_client +aio_read +aio_run +aio_serve +aio_stop +aio_writedata align_of chan_send check_claims diff --git a/src/test/run-pass/ivec-tag.rs b/src/test/run-pass/ivec-tag.rs new file mode 100644 index 00000000000..4d59964f6c6 --- /dev/null +++ b/src/test/run-pass/ivec-tag.rs @@ -0,0 +1,14 @@ +use std; + +fn producer(c: chan[u8[]]) { + c <| ~[1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, + 8u8, 9u8, 10u8, 11u8, 12u8, 13u8 ]; +} + +fn main() { + let p: port[u8[]] = port(); + let prod: task = spawn producer(chan(p)); + + let data: u8[]; + p |> data; +} diff --git a/src/test/run-pass/sio-client.rs b/src/test/run-pass/sio-client.rs new file mode 100644 index 00000000000..847d04c8776 --- /dev/null +++ b/src/test/run-pass/sio-client.rs @@ -0,0 +1,21 @@ +use std; +import std::sio; +import std::task; + +fn connectTask(cx: sio::ctx, ip: str, portnum: int) { + let client: sio::client; + client = sio::connect_to(cx, ip, portnum); + sio::close_client(client); +} + +fn main() { + let cx: sio::ctx = sio::new(); + let srv: sio::server = sio::create_server(cx, "0.0.0.0", 9090); + let child: task = spawn connectTask(cx, "127.0.0.1", 9090); + let client: sio::client = sio::accept_from(srv); + task::join(child); + sio::close_client(client); + sio::close_server(srv); + sio::destroy(cx); +} + diff --git a/src/test/run-pass/sio-ctx.rs b/src/test/run-pass/sio-ctx.rs new file mode 100644 index 00000000000..4cbc71016a6 --- /dev/null +++ b/src/test/run-pass/sio-ctx.rs @@ -0,0 +1,7 @@ +use std; +import std::sio; + +fn main() { + let cx: sio::ctx = sio::new(); + sio::destroy(cx); +} diff --git a/src/test/run-pass/sio-read.rs b/src/test/run-pass/sio-read.rs new file mode 100644 index 00000000000..a2fc95b2c1a --- /dev/null +++ b/src/test/run-pass/sio-read.rs @@ -0,0 +1,25 @@ +use std; +import std::sio; +import std::task; +import std::str; + +fn connectTask(cx: sio::ctx, ip: str, portnum: int) { + let client: sio::client; + client = sio::connect_to(cx, ip, portnum); + let data = sio::read(client); + sio::close_client(client); +} + +fn main() { + let cx: sio::ctx = sio::new(); + let srv: sio::server = sio::create_server(cx, "0.0.0.0", 9090); + let child: task = spawn connectTask(cx, "127.0.0.1", 9090); + let client: sio::client = sio::accept_from(srv); + sio::write_data(client, str::bytes_ivec("hello, world\n")); + task::join(child); + sio::close_client(client); + sio::close_server(srv); + sio::destroy(cx); +} + + diff --git a/src/test/run-pass/sio-srv.rs b/src/test/run-pass/sio-srv.rs new file mode 100644 index 00000000000..8a6fdccf484 --- /dev/null +++ b/src/test/run-pass/sio-srv.rs @@ -0,0 +1,9 @@ +use std; +import std::sio; + +fn main() { + let cx: sio::ctx = sio::new(); + let srv: sio::server = sio::create_server(cx, "0.0.0.0", 9090); + sio::close_server(srv); + sio::destroy(cx); +} diff --git a/src/test/run-pass/sio-write.rs b/src/test/run-pass/sio-write.rs new file mode 100644 index 00000000000..1094db7885c --- /dev/null +++ b/src/test/run-pass/sio-write.rs @@ -0,0 +1,24 @@ +use std; +import std::sio; +import std::task; +import std::str; + +fn connectTask(cx: sio::ctx, ip: str, portnum: int) { + let client: sio::client; + client = sio::connect_to(cx, ip, portnum); + sio::close_client(client); +} + +fn main() { + let cx: sio::ctx = sio::new(); + let srv: sio::server = sio::create_server(cx, "0.0.0.0", 9090); + let child: task = spawn connectTask(cx, "127.0.0.1", 9090); + let client: sio::client = sio::accept_from(srv); + sio::write_data(client, str::bytes_ivec("hello, world\n")); + task::join(child); + sio::close_client(client); + sio::close_server(srv); + sio::destroy(cx); +} + +