auto merge of #5409 : brson/rust/rt, r=brson

r?

There are a lot of commits here, but not all that much substance. Mostly just refactoring.

I started sketching out the beginnings of a very simple I/O API in `core::rt::io` that represents I/O streams as a single `Stream` trait instead of `Reader` / `Writer` pairs. This seems to be the more common pattern (at least this is how the .NET BCL does it) and it seems to me that separate readers and writers would make duplex streams very awkward. Regardless, I don't intend to go very far down the I/O API design road without some mailing list discussion.

I've also started on the uv bindings for file I/O but haven't gotten very far.

Also hooked up the new scheduler to `rust_start` and the compiletest driver. 70% of run-pass test cases already pass, but I wouldn't read too much into that.

I also split the direct, low-level uv bindings in two so that the scheduler can have its own set, leaving `std::net` on its own.
This commit is contained in:
bors 2013-03-25 13:01:11 -07:00
commit ef282dbe2a
29 changed files with 1815 additions and 1086 deletions

View File

@ -238,7 +238,7 @@ $(foreach target,$(CFG_TARGET_TRIPLES),\
CORELIB_CRATE := $(S)src/libcore/core.rc
CORELIB_INPUTS := $(wildcard $(addprefix $(S)src/libcore/, \
core.rc *.rs */*.rs))
core.rc *.rs */*.rs */*/*rs))
######################################################################
# Standard library variables

View File

@ -244,21 +244,29 @@ $(foreach host,$(CFG_HOST_TRIPLES), \
define TEST_RUNNER
# If NO_REBUILD is set then break the dependencies on std so we can
# test crates without rebuilding core and std first
ifeq ($(NO_REBUILD),)
STDTESTDEP_$(1)_$(2)_$(3) = $$(TLIB$(1)_T_$(2)_H_$(3))/$$(CFG_STDLIB_$(2))
else
STDTESTDEP_$(1)_$(2)_$(3) =
endif
$(3)/test/coretest.stage$(1)-$(2)$$(X_$(2)): \
$$(CORELIB_CRATE) $$(CORELIB_INPUTS) \
$$(TLIB$(1)_T_$(2)_H_$(3))/$$(CFG_STDLIB_$(2))
$$(STDTESTDEP_$(1)_$(2)_$(3))
@$$(call E, compile_and_link: $$@)
$$(STAGE$(1)_T_$(2)_H_$(3)) -o $$@ $$< --test
$(3)/test/stdtest.stage$(1)-$(2)$$(X_$(2)): \
$$(STDLIB_CRATE) $$(STDLIB_INPUTS) \
$$(TLIB$(1)_T_$(2)_H_$(3))/$$(CFG_STDLIB_$(2))
$$(STDTESTDEP_$(1)_$(2)_$(3))
@$$(call E, compile_and_link: $$@)
$$(STAGE$(1)_T_$(2)_H_$(3)) -o $$@ $$< --test
$(3)/test/syntaxtest.stage$(1)-$(2)$$(X_$(2)): \
$$(LIBSYNTAX_CRATE) $$(LIBSYNTAX_INPUTS) \
$$(TLIB$(1)_T_$(2)_H_$(3))/$$(CFG_STDLIB_$(2))
$$(STDTESTDEP_$(1)_$(2)_$(3))
@$$(call E, compile_and_link: $$@)
$$(STAGE$(1)_T_$(2)_H_$(3)) -o $$@ $$< --test

View File

@ -63,6 +63,9 @@ pub struct config {
// Run tests using the JIT
jit: bool,
// Run tests using the new runtime
newrt: bool,
// Explain what's going on
verbose: bool

View File

@ -61,7 +61,8 @@ pub fn parse_config(args: ~[~str]) -> config {
getopts::optopt(~"runtool"), getopts::optopt(~"rustcflags"),
getopts::optflag(~"verbose"),
getopts::optopt(~"logfile"),
getopts::optflag(~"jit")];
getopts::optflag(~"jit"),
getopts::optflag(~"newrt")];
fail_unless!(!args.is_empty());
let args_ = vec::tail(args);
@ -95,6 +96,7 @@ pub fn parse_config(args: ~[~str]) -> config {
runtool: getopts::opt_maybe_str(matches, ~"runtool"),
rustcflags: getopts::opt_maybe_str(matches, ~"rustcflags"),
jit: getopts::opt_present(matches, ~"jit"),
newrt: getopts::opt_present(matches, ~"newrt"),
verbose: getopts::opt_present(matches, ~"verbose")
}
}
@ -114,6 +116,7 @@ pub fn log_config(config: config) {
logv(c, fmt!("runtool: %s", opt_str(config.runtool)));
logv(c, fmt!("rustcflags: %s", opt_str(config.rustcflags)));
logv(c, fmt!("jit: %b", config.jit));
logv(c, fmt!("newrt: %b", config.newrt));
logv(c, fmt!("verbose: %b", config.verbose));
logv(c, fmt!("\n"));
}

View File

@ -484,9 +484,17 @@ fn compile_test_(config: config, props: TestProps,
fn exec_compiled_test(config: config, props: TestProps,
testfile: &Path) -> ProcRes {
// If testing the new runtime then set the RUST_NEWRT env var
let env = if config.newrt {
props.exec_env + ~[(~"RUST_NEWRT", ~"1")]
} else {
props.exec_env
};
compose_and_run(config, testfile,
make_run_args(config, props, testfile),
props.exec_env,
env,
config.run_lib_path, None)
}

View File

@ -8,6 +8,7 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use option::*;
use super::stack::StackSegment;
use libc::c_void;
use cast::{transmute, transmute_mut_unsafe,
@ -16,17 +17,30 @@ use cast::{transmute, transmute_mut_unsafe,
// XXX: Registers is boxed so that it is 16-byte aligned, for storing
// SSE regs. It would be marginally better not to do this. In C++ we
// use an attribute on a struct.
pub struct Context(~Registers);
// XXX: It would be nice to define regs as `~Option<Registers>` since
// the registers are sometimes empty, but the discriminant would
// then misalign the regs again.
pub struct Context {
/// The context entry point, saved here for later destruction
start: Option<~~fn()>,
/// Hold the registers while the task or scheduler is suspended
regs: ~Registers
}
pub impl Context {
fn empty() -> Context {
Context(new_regs())
Context {
start: None,
regs: new_regs()
}
}
/// Create a new context that will resume execution by running ~fn()
/// # Safety Note
/// The `start` closure must remain valid for the life of the Task
fn new(start: &~fn(), stack: &mut StackSegment) -> Context {
fn new(start: ~fn(), stack: &mut StackSegment) -> Context {
// XXX: Putting main into a ~ so it's a thin pointer and can
// be passed to the spawn function. Another unfortunate
// allocation
let start = ~start;
// The C-ABI function that is the task entry point
extern fn task_start_wrapper(f: &~fn()) { (*f)() }
@ -40,21 +54,29 @@ pub impl Context {
// which we will then modify to call the given function when restored
let mut regs = new_regs();
unsafe {
swap_registers(transmute_mut_region(&mut *regs),
transmute_region(&*regs))
swap_registers(transmute_mut_region(&mut *regs), transmute_region(&*regs))
};
initialize_call_frame(&mut *regs, fp, argp, sp);
return Context(regs);
return Context {
start: Some(start),
regs: regs
}
}
/* Switch contexts
Suspend the current execution context and resume another by
saving the registers values of the executing thread to a Context
then loading the registers from a previously saved Context.
*/
fn swap(out_context: &mut Context, in_context: &Context) {
let out_regs: &mut Registers = match out_context {
&Context(~ref mut r) => r
&Context { regs: ~ref mut r, _ } => r
};
let in_regs: &Registers = match in_context {
&Context(~ref r) => r
&Context { regs: ~ref r, _ } => r
};
unsafe { swap_registers(out_regs, in_regs) };
@ -84,11 +106,10 @@ fn new_regs() -> ~Registers {
}
#[cfg(target_arch = "x86")]
fn initialize_call_frame(regs: &mut Registers,
fptr: *c_void, arg: *c_void, sp: *mut uint) {
fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, sp: *mut uint) {
let sp = align_down(sp);
let sp = mut_offset(sp, -4); // XXX: -4 words? Needs this be done at all?
let sp = mut_offset(sp, -4);
unsafe { *sp = arg as uint; }
let sp = mut_offset(sp, -1);
@ -108,8 +129,7 @@ type Registers = [uint * 22];
fn new_regs() -> ~Registers { ~[0, .. 22] }
#[cfg(target_arch = "x86_64")]
fn initialize_call_frame(regs: &mut Registers,
fptr: *c_void, arg: *c_void, sp: *mut uint) {
fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, sp: *mut uint) {
// Redefinitions from regs.h
static RUSTRT_ARG0: uint = 3;
@ -143,8 +163,7 @@ type Registers = [uint * 32];
fn new_regs() -> ~Registers { ~[0, .. 32] }
#[cfg(target_arch = "arm")]
fn initialize_call_frame(regs: &mut Registers,
fptr: *c_void, arg: *c_void, sp: *mut uint) {
fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, sp: *mut uint) {
let sp = mut_offset(sp, -1);
// The final return address. 0 indicates the bottom of the stack
@ -162,8 +181,7 @@ type Registers = [uint * 32];
fn new_regs() -> ~Registers { ~[0, .. 32] }
#[cfg(target_arch = "mips")]
fn initialize_call_frame(regs: &mut Registers,
fptr: *c_void, arg: *c_void, sp: *mut uint) {
fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, sp: *mut uint) {
let sp = mut_offset(sp, -1);
// The final return address. 0 indicates the bottom of the stack

45
src/libcore/rt/io/file.rs Normal file
View File

@ -0,0 +1,45 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::*;
use super::super::sched::*;
use super::super::rtio::*;
use super::Stream;
pub struct FileStream;
pub impl FileStream {
fn new(_path: Path) -> FileStream {
fail!()
}
}
impl Stream for FileStream {
fn read(&mut self, _buf: &mut [u8]) -> uint {
fail!()
}
fn eof(&mut self) -> bool {
fail!()
}
fn write(&mut self, _v: &const [u8]) {
fail!()
}
}
#[test]
#[ignore]
fn super_simple_smoke_test_lets_go_read_some_files_and_have_a_good_time() {
let message = "it's alright. have a good time";
let filename = Path("test.txt");
let mut outstream = FileStream::new(filename);
outstream.write(message.to_bytes());
}

45
src/libcore/rt/io/mod.rs Normal file
View File

@ -0,0 +1,45 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use option::*;
use comm::{GenericPort, GenericChan};
pub mod file;
// FIXME #5370 Strongly want this to be StreamError(&mut Stream)
pub struct StreamError;
// XXX: Can't put doc comments on macros
// Raised by `Stream` instances on error. Returning `true` from the handler
// indicates that the `Stream` should continue, `false` that it should fail.
condition! {
stream_error: super::StreamError -> bool;
}
pub trait Stream {
/// Read bytes, up to the length of `buf` and place them in `buf`,
/// returning the number of bytes read or an `IoError`. Reads
/// 0 bytes on EOF.
///
/// # Failure
///
/// Raises the `reader_error` condition on error
fn read(&mut self, buf: &mut [u8]) -> uint;
/// Return whether the Reader has reached the end of the stream
fn eof(&mut self) -> bool;
/// Write the given buffer
///
/// # Failure
///
/// Raises the `writer_error` condition on error
fn write(&mut self, v: &const [u8]);
}

View File

@ -8,6 +8,7 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use libc::c_char;
// Some basic logging
macro_rules! rtdebug_ (
@ -15,16 +16,10 @@ macro_rules! rtdebug_ (
dumb_println(fmt!( $($arg),+ ));
fn dumb_println(s: &str) {
use str::as_c_str;
use libc::c_char;
extern {
fn printf(s: *c_char);
}
do as_c_str(s.to_str() + "\n") |s| {
unsafe { printf(s); }
}
use io::WriterUtil;
let dbg = ::libc::STDERR_FILENO as ::io::fd_t;
dbg.write_str(s);
dbg.write_str("\n");
}
} )
@ -36,9 +31,13 @@ macro_rules! rtdebug (
)
mod sched;
mod io;
mod rtio;
pub mod uvll;
mod uvio;
#[path = "uv/mod.rs"]
mod uv;
#[path = "io/mod.rs"]
mod io;
// FIXME #5248: The import in `sched` doesn't resolve unless this is pub!
pub mod thread_local_storage;
mod work_queue;
@ -46,3 +45,23 @@ mod stack;
mod context;
mod thread;
pub mod env;
pub fn start(main: *u8, _argc: int, _argv: *c_char, _crate_map: *u8) -> int {
use self::sched::{Scheduler, Task};
use self::uvio::UvEventLoop;
let loop_ = ~UvEventLoop::new();
let mut sched = ~Scheduler::new(loop_);
let main_task = ~do Task::new(&mut sched.stack_pool) {
// XXX: Can't call a C function pointer from Rust yet
unsafe { rust_call_nullary_fn(main) };
};
sched.task_queue.push_back(main_task);
sched.run();
return 0;
extern {
fn rust_call_nullary_fn(f: *u8);
}
}

View File

@ -16,7 +16,7 @@ use ptr::mut_null;
use super::work_queue::WorkQueue;
use super::stack::{StackPool, StackSegment};
use super::io::{EventLoop, EventLoopObject};
use super::rtio::{EventLoop, EventLoopObject};
use super::context::Context;
use tls = super::thread_local_storage;
@ -70,7 +70,14 @@ enum CleanupJob {
pub impl Scheduler {
pub fn new(event_loop: ~EventLoopObject) -> Scheduler {
fn new(event_loop: ~EventLoopObject) -> Scheduler {
// Lazily initialize the global state, currently the scheduler TLS key
unsafe { rust_initialize_global_state(); }
extern {
fn rust_initialize_global_state();
}
Scheduler {
event_loop: event_loop,
task_queue: WorkQueue::new(),
@ -183,8 +190,7 @@ pub impl Scheduler {
let blocked_task = self.current_task.swap_unwrap();
let f_fake_region = unsafe {
transmute::<&fn(&mut Scheduler, ~Task),
&fn(&mut Scheduler, ~Task)>(f)
transmute::<&fn(&mut Scheduler, ~Task), &fn(&mut Scheduler, ~Task)>(f)
};
let f_opaque = HackAroundBorrowCk::from_fn(f_fake_region);
self.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
@ -233,8 +239,7 @@ pub impl Scheduler {
Context::swap(task_context, scheduler_context);
}
priv fn swap_in_task_from_running_task(&mut self,
running_task: &mut Task) {
priv fn swap_in_task_from_running_task(&mut self, running_task: &mut Task) {
let running_task_context = &mut running_task.saved_context;
let next_context = &self.current_task.get_ref().saved_context;
Context::swap(running_task_context, next_context);
@ -285,8 +290,6 @@ pub impl Scheduler {
static TASK_MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack
pub struct Task {
/// The task entry point, saved here for later destruction
priv start: ~~fn(),
/// The segment of stack on which the task is currently running or,
/// if the task is blocked, on which the task will resume execution
priv current_stack_segment: StackSegment,
@ -295,17 +298,13 @@ pub struct Task {
priv saved_context: Context,
}
impl Task {
pub fn new(stack_pool: &mut StackPool, start: ~fn()) -> Task {
// XXX: Putting main into a ~ so it's a thin pointer and can
// be passed to the spawn function. Another unfortunate
// allocation
let start = ~Task::build_start_wrapper(start);
pub impl Task {
fn new(stack_pool: &mut StackPool, start: ~fn()) -> Task {
let start = Task::build_start_wrapper(start);
let mut stack = stack_pool.take_segment(TASK_MIN_STACK_SIZE);
// NB: Context holds a pointer to that ~fn
let initial_context = Context::new(&*start, &mut stack);
let initial_context = Context::new(start, &mut stack);
return Task {
start: start,
current_stack_segment: stack,
saved_context: initial_context,
};
@ -350,8 +349,7 @@ impl ThreadLocalScheduler {
fn put_scheduler(&mut self, scheduler: ~Scheduler) {
unsafe {
let key = match self { &ThreadLocalScheduler(key) => key };
let value: *mut c_void =
transmute::<~Scheduler, *mut c_void>(scheduler);
let value: *mut c_void = transmute::<~Scheduler, *mut c_void>(scheduler);
tls::set(key, value);
}
}
@ -363,8 +361,9 @@ impl ThreadLocalScheduler {
fail_unless!(value.is_not_null());
{
let value_ptr = &mut value;
let sched: &mut ~Scheduler =
transmute::<&mut *mut c_void, &mut ~Scheduler>(value_ptr);
let sched: &mut ~Scheduler = {
transmute::<&mut *mut c_void, &mut ~Scheduler>(value_ptr)
};
let sched: &mut Scheduler = &mut **sched;
return sched;
}

View File

@ -27,6 +27,7 @@ pub impl StackSegment {
}
}
/// Point one word beyond the high end of the allocated stack
fn end(&self) -> *uint {
unsafe {
vec::raw::to_ptr(self.buf).offset(self.buf.len()) as *uint

View File

@ -14,13 +14,13 @@ use ops::Drop;
#[allow(non_camel_case_types)] // runtime type
type raw_thread = libc::c_void;
struct Thread {
pub struct Thread {
main: ~fn(),
raw_thread: *raw_thread
}
impl Thread {
pub fn start(main: ~fn()) -> Thread {
pub impl Thread {
fn start(main: ~fn()) -> Thread {
fn substart(main: &fn()) -> *raw_thread {
unsafe { rust_raw_thread_start(&main) }
}

View File

@ -1,919 +0,0 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
/*!
Bindings to libuv.
UV types consist of the event loop (Loop), Watchers, Requests and
Callbacks.
Watchers and Requests encapsulate pointers to uv *handles*, which have
subtyping relationships with each other. This subtyping is reflected
in the bindings with explicit or implicit coercions. For example, an
upcast from TcpWatcher to StreamWatcher is done with
`tcp_watcher.as_stream()`. In other cases a callback on a specific
type of watcher will be passed a watcher of a supertype.
Currently all use of Request types (connect/write requests) are
encapsulated in the bindings and don't need to be dealt with by the
caller.
# Safety note
Due to the complex lifecycle of uv handles, as well as compiler bugs,
this module is not memory safe and requires explicit memory management,
via `close` and `delete` methods.
*/
use option::*;
use str::raw::from_c_str;
use to_str::ToStr;
use vec;
use ptr;
use libc::{c_void, c_int, size_t, malloc, free, ssize_t};
use cast::{transmute, transmute_mut_region};
use ptr::null;
use sys::size_of;
use unstable::uvll;
use super::io::{IpAddr, Ipv4, Ipv6};
#[cfg(test)] use unstable::run_in_bare_thread;
#[cfg(test)] use super::thread::Thread;
#[cfg(test)] use cell::Cell;
fn ip4_to_uv_ip4(addr: IpAddr) -> uvll::sockaddr_in {
match addr {
Ipv4(a, b, c, d, p) => {
unsafe {
uvll::ip4_addr(fmt!("%u.%u.%u.%u",
a as uint,
b as uint,
c as uint,
d as uint), p as int)
}
}
Ipv6 => fail!()
}
}
/// A trait for callbacks to implement. Provides a little extra type safety
/// for generic, unsafe interop functions like `set_watcher_callback`.
trait Callback { }
type NullCallback = ~fn();
impl Callback for NullCallback { }
/// A type that wraps a native handle
trait NativeHandle<T> {
pub fn from_native_handle(T) -> Self;
pub fn native_handle(&self) -> T;
}
/// XXX: Loop(*handle) is buggy with destructors. Normal structs
/// with dtors may not be destructured, but tuple structs can,
/// but the results are not correct.
pub struct Loop {
handle: *uvll::uv_loop_t
}
pub impl Loop {
fn new() -> Loop {
let handle = unsafe { uvll::loop_new() };
fail_unless!(handle.is_not_null());
NativeHandle::from_native_handle(handle)
}
fn run(&mut self) {
unsafe { uvll::run(self.native_handle()) };
}
fn close(&mut self) {
unsafe { uvll::loop_delete(self.native_handle()) };
}
}
impl NativeHandle<*uvll::uv_loop_t> for Loop {
fn from_native_handle(handle: *uvll::uv_loop_t) -> Loop {
Loop { handle: handle }
}
fn native_handle(&self) -> *uvll::uv_loop_t {
self.handle
}
}
/// The trait implemented by uv 'watchers' (handles). Watchers are
/// non-owning wrappers around the uv handles and are not completely
/// safe - there may be multiple instances for a single underlying
/// handle. Watchers are generally created, then `start`ed, `stop`ed
/// and `close`ed, but due to their complex life cycle may not be
/// entirely memory safe if used in unanticipated patterns.
trait Watcher {
fn event_loop(&self) -> Loop;
}
pub struct IdleWatcher(*uvll::uv_idle_t);
impl Watcher for IdleWatcher {
fn event_loop(&self) -> Loop {
loop_from_watcher(self)
}
}
type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
impl Callback for IdleCallback { }
pub impl IdleWatcher {
fn new(loop_: &mut Loop) -> IdleWatcher {
unsafe {
let handle = uvll::idle_new();
fail_unless!(handle.is_not_null());
fail_unless!(0 == uvll::idle_init(loop_.native_handle(), handle));
uvll::set_data_for_uv_handle(handle, null::<()>());
NativeHandle::from_native_handle(handle)
}
}
fn start(&mut self, cb: IdleCallback) {
set_watcher_callback(self, cb);
unsafe {
fail_unless!(0 == uvll::idle_start(self.native_handle(), idle_cb))
};
extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) {
let idle_watcher: IdleWatcher =
NativeHandle::from_native_handle(handle);
let cb: &IdleCallback =
borrow_callback_from_watcher(&idle_watcher);
let status = status_to_maybe_uv_error(handle, status);
(*cb)(idle_watcher, status);
}
}
fn stop(&mut self) {
unsafe { fail_unless!(0 == uvll::idle_stop(self.native_handle())); }
}
fn close(self) {
unsafe { uvll::close(self.native_handle(), close_cb) };
extern fn close_cb(handle: *uvll::uv_idle_t) {
let mut idle_watcher = NativeHandle::from_native_handle(handle);
drop_watcher_callback::<uvll::uv_idle_t,
IdleWatcher,
IdleCallback>(&mut idle_watcher);
unsafe { uvll::idle_delete(handle) };
}
}
}
impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher {
fn from_native_handle(handle: *uvll::uv_idle_t) -> IdleWatcher {
IdleWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_idle_t {
match self { &IdleWatcher(ptr) => ptr }
}
}
// uv_stream t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
// and uv_file_t
pub struct StreamWatcher(*uvll::uv_stream_t);
impl Watcher for StreamWatcher {
fn event_loop(&self) -> Loop {
loop_from_watcher(self)
}
}
type ReadCallback = ~fn(StreamWatcher, int, Buf, Option<UvError>);
impl Callback for ReadCallback { }
// XXX: The uv alloc callback also has a *uv_handle_t arg
pub type AllocCallback = ~fn(uint) -> Buf;
impl Callback for AllocCallback { }
pub impl StreamWatcher {
fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
// XXX: Borrowchk problems
let data = get_watcher_data(unsafe { transmute_mut_region(self) });
data.alloc_cb = Some(alloc);
data.read_cb = Some(cb);
let handle = self.native_handle();
unsafe { uvll::read_start(handle, alloc_cb, read_cb); }
extern fn alloc_cb(stream: *uvll::uv_stream_t,
suggested_size: size_t) -> Buf {
let mut stream_watcher: StreamWatcher =
NativeHandle::from_native_handle(stream);
let data = get_watcher_data(&mut stream_watcher);
let alloc_cb = data.alloc_cb.get_ref();
return (*alloc_cb)(suggested_size as uint);
}
extern fn read_cb(stream: *uvll::uv_stream_t,
nread: ssize_t, ++buf: Buf) {
rtdebug!("buf addr: %x", buf.base as uint);
rtdebug!("buf len: %d", buf.len as int);
let mut stream_watcher: StreamWatcher =
NativeHandle::from_native_handle(stream);
let data = get_watcher_data(&mut stream_watcher);
let cb = data.read_cb.get_ref();
let status = status_to_maybe_uv_error(stream, nread as c_int);
(*cb)(stream_watcher, nread as int, buf, status);
}
}
fn read_stop(&mut self) {
// It would be nice to drop the alloc and read callbacks here,
// but read_stop may be called from inside one of them and we
// would end up freeing the in-use environment
let handle = self.native_handle();
unsafe { uvll::read_stop(handle); }
}
// XXX: Needs to take &[u8], not ~[u8]
fn write(&mut self, msg: ~[u8], cb: ConnectionCallback) {
// XXX: Borrowck
let data = get_watcher_data(unsafe { transmute_mut_region(self) });
fail_unless!(data.write_cb.is_none());
data.write_cb = Some(cb);
let req = WriteRequest::new();
let buf = vec_to_uv_buf(msg);
// XXX: Allocation
let bufs = ~[buf];
unsafe {
fail_unless!(0 == uvll::write(req.native_handle(),
self.native_handle(),
&bufs, write_cb));
}
// XXX: Freeing immediately after write. Is this ok?
let _v = vec_from_uv_buf(buf);
extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
let write_request: WriteRequest =
NativeHandle::from_native_handle(req);
let mut stream_watcher = write_request.stream();
write_request.delete();
let cb = get_watcher_data(&mut stream_watcher)
.write_cb.swap_unwrap();
let status = status_to_maybe_uv_error(
stream_watcher.native_handle(), status);
cb(stream_watcher, status);
}
}
fn accept(&mut self, stream: StreamWatcher) {
let self_handle = self.native_handle() as *c_void;
let stream_handle = stream.native_handle() as *c_void;
unsafe {
fail_unless!(0 == uvll::accept(self_handle, stream_handle));
}
}
fn close(self, cb: NullCallback) {
{
let mut self = self;
let data = get_watcher_data(&mut self);
fail_unless!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe { uvll::close(self.native_handle(), close_cb); }
extern fn close_cb(handle: *uvll::uv_stream_t) {
let mut stream_watcher: StreamWatcher =
NativeHandle::from_native_handle(handle);
{
let mut data = get_watcher_data(&mut stream_watcher);
data.close_cb.swap_unwrap()();
}
drop_watcher_data(&mut stream_watcher);
unsafe { free(handle as *c_void) }
}
}
}
impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
fn from_native_handle(
handle: *uvll::uv_stream_t) -> StreamWatcher {
StreamWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_stream_t {
match self { &StreamWatcher(ptr) => ptr }
}
}
pub struct TcpWatcher(*uvll::uv_tcp_t);
impl Watcher for TcpWatcher {
fn event_loop(&self) -> Loop {
loop_from_watcher(self)
}
}
type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
impl Callback for ConnectionCallback { }
pub impl TcpWatcher {
fn new(loop_: &mut Loop) -> TcpWatcher {
unsafe {
let size = size_of::<uvll::uv_tcp_t>() as size_t;
let handle = malloc(size) as *uvll::uv_tcp_t;
fail_unless!(handle.is_not_null());
fail_unless!(0 == uvll::tcp_init(loop_.native_handle(), handle));
let mut watcher = NativeHandle::from_native_handle(handle);
install_watcher_data(&mut watcher);
return watcher;
}
}
fn bind(&mut self, address: IpAddr) {
match address {
Ipv4(*) => {
let addr = ip4_to_uv_ip4(address);
let result = unsafe {
uvll::tcp_bind(self.native_handle(), &addr)
};
// XXX: bind is likely to fail. need real error handling
fail_unless!(result == 0);
}
_ => fail!()
}
}
fn connect(&mut self, address: IpAddr, cb: ConnectionCallback) {
unsafe {
fail_unless!(get_watcher_data(self).connect_cb.is_none());
get_watcher_data(self).connect_cb = Some(cb);
let mut connect_watcher = ConnectRequest::new();
let connect_handle = connect_watcher.native_handle();
match address {
Ipv4(*) => {
let addr = ip4_to_uv_ip4(address);
rtdebug!("connect_t: %x", connect_handle as uint);
fail_unless!(0 == uvll::tcp_connect(connect_handle,
self.native_handle(),
&addr, connect_cb));
}
_ => fail!()
}
extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
rtdebug!("connect_t: %x", req as uint);
let connect_request: ConnectRequest =
NativeHandle::from_native_handle(req);
let mut stream_watcher = connect_request.stream();
connect_request.delete();
let cb: ConnectionCallback = {
let data = get_watcher_data(&mut stream_watcher);
data.connect_cb.swap_unwrap()
};
let status = status_to_maybe_uv_error(
stream_watcher.native_handle(), status);
cb(stream_watcher, status);
}
}
}
fn listen(&mut self, cb: ConnectionCallback) {
// XXX: Borrowck
let data = get_watcher_data(unsafe { transmute_mut_region(self) });
fail_unless!(data.connect_cb.is_none());
data.connect_cb = Some(cb);
unsafe {
static BACKLOG: c_int = 128; // XXX should be configurable
// XXX: This can probably fail
fail_unless!(0 == uvll::listen(self.native_handle(),
BACKLOG, connection_cb));
}
extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
rtdebug!("connection_cb");
let mut stream_watcher: StreamWatcher =
NativeHandle::from_native_handle(handle);
let cb = get_watcher_data(&mut stream_watcher)
.connect_cb.swap_unwrap();
let status = status_to_maybe_uv_error(
stream_watcher.native_handle(), status);
cb(stream_watcher, status);
}
}
fn as_stream(&self) -> StreamWatcher {
NativeHandle::from_native_handle(
self.native_handle() as *uvll::uv_stream_t)
}
}
impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
TcpWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_tcp_t {
match self { &TcpWatcher(ptr) => ptr }
}
}
trait Request { }
type ConnectCallback = ~fn(ConnectRequest, Option<UvError>);
impl Callback for ConnectCallback { }
// uv_connect_t is a subclass of uv_req_t
struct ConnectRequest(*uvll::uv_connect_t);
impl Request for ConnectRequest { }
impl ConnectRequest {
fn new() -> ConnectRequest {
let connect_handle = unsafe {
malloc(size_of::<uvll::uv_connect_t>() as size_t)
};
fail_unless!(connect_handle.is_not_null());
let connect_handle = connect_handle as *uvll::uv_connect_t;
ConnectRequest(connect_handle)
}
fn stream(&self) -> StreamWatcher {
unsafe {
let stream_handle =
uvll::get_stream_handle_from_connect_req(
self.native_handle());
NativeHandle::from_native_handle(stream_handle)
}
}
fn delete(self) {
unsafe { free(self.native_handle() as *c_void) }
}
}
impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
fn from_native_handle(
handle: *uvll:: uv_connect_t) -> ConnectRequest {
ConnectRequest(handle)
}
fn native_handle(&self) -> *uvll::uv_connect_t {
match self { &ConnectRequest(ptr) => ptr }
}
}
pub struct WriteRequest(*uvll::uv_write_t);
impl Request for WriteRequest { }
impl WriteRequest {
fn new() -> WriteRequest {
let write_handle = unsafe {
malloc(size_of::<uvll::uv_write_t>() as size_t)
};
fail_unless!(write_handle.is_not_null());
let write_handle = write_handle as *uvll::uv_write_t;
WriteRequest(write_handle)
}
fn stream(&self) -> StreamWatcher {
unsafe {
let stream_handle =
uvll::get_stream_handle_from_write_req(self.native_handle());
NativeHandle::from_native_handle(stream_handle)
}
}
fn delete(self) {
unsafe { free(self.native_handle() as *c_void) }
}
}
impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
WriteRequest(handle)
}
fn native_handle(&self) -> *uvll::uv_write_t {
match self { &WriteRequest(ptr) => ptr }
}
}
// XXX: Need to define the error constants like EOF so they can be
// compared to the UvError type
struct UvError(uvll::uv_err_t);
impl UvError {
fn name(&self) -> ~str {
unsafe {
let inner = match self { &UvError(ref a) => a };
let name_str = uvll::err_name(inner);
fail_unless!(name_str.is_not_null());
from_c_str(name_str)
}
}
fn desc(&self) -> ~str {
unsafe {
let inner = match self { &UvError(ref a) => a };
let desc_str = uvll::strerror(inner);
fail_unless!(desc_str.is_not_null());
from_c_str(desc_str)
}
}
}
impl ToStr for UvError {
fn to_str(&self) -> ~str {
fmt!("%s: %s", self.name(), self.desc())
}
}
#[test]
fn error_smoke_test() {
let err = uvll::uv_err_t { code: 1, sys_errno_: 1 };
let err: UvError = UvError(err);
fail_unless!(err.to_str() == ~"EOF: end of file");
}
/// Given a uv handle, convert a callback status to a UvError
// XXX: Follow the pattern below by parameterizing over T: Watcher, not T
fn status_to_maybe_uv_error<T>(handle: *T, status: c_int) -> Option<UvError> {
if status != -1 {
None
} else {
unsafe {
rtdebug!("handle: %x", handle as uint);
let loop_ = uvll::get_loop_for_uv_handle(handle);
rtdebug!("loop: %x", loop_ as uint);
let err = uvll::last_error(loop_);
Some(UvError(err))
}
}
}
/// Get the uv event loop from a Watcher
pub fn loop_from_watcher<H, W: Watcher + NativeHandle<*H>>(
watcher: &W) -> Loop {
let handle = watcher.native_handle();
let loop_ = unsafe { uvll::get_loop_for_uv_handle(handle) };
NativeHandle::from_native_handle(loop_)
}
/// Set the custom data on a handle to a callback Note: This is only
/// suitable for watchers that make just one type of callback. For
/// others use WatcherData
fn set_watcher_callback<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
watcher: &mut W, cb: CB) {
drop_watcher_callback::<H, W, CB>(watcher);
// XXX: Boxing the callback so it fits into a
// pointer. Unfortunate extra allocation
let boxed_cb = ~cb;
let data = unsafe { transmute::<~CB, *c_void>(boxed_cb) };
unsafe { uvll::set_data_for_uv_handle(watcher.native_handle(), data) };
}
/// Delete a callback from a handle's custom data
fn drop_watcher_callback<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
watcher: &mut W) {
unsafe {
let handle = watcher.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
if handle_data.is_not_null() {
// Take ownership of the callback and drop it
let _cb = transmute::<*c_void, ~CB>(handle_data);
// Make sure the pointer is zeroed
uvll::set_data_for_uv_handle(
watcher.native_handle(), null::<()>());
}
}
}
/// Take a pointer to the callback installed as custom data
fn borrow_callback_from_watcher<H, W: Watcher + NativeHandle<*H>,
CB: Callback>(watcher: &W) -> &CB {
unsafe {
let handle = watcher.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
fail_unless!(handle_data.is_not_null());
let cb = transmute::<&*c_void, &~CB>(&handle_data);
return &**cb;
}
}
/// Take ownership of the callback installed as custom data
fn take_callback_from_watcher<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
watcher: &mut W) -> CB {
unsafe {
let handle = watcher.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
fail_unless!(handle_data.is_not_null());
uvll::set_data_for_uv_handle(handle, null::<()>());
let cb: ~CB = transmute::<*c_void, ~CB>(handle_data);
let cb = match cb { ~cb => cb };
return cb;
}
}
/// Callbacks used by StreamWatchers, set as custom data on the foreign handle
struct WatcherData {
read_cb: Option<ReadCallback>,
write_cb: Option<ConnectionCallback>,
connect_cb: Option<ConnectionCallback>,
close_cb: Option<NullCallback>,
alloc_cb: Option<AllocCallback>
}
fn install_watcher_data<H, W: Watcher + NativeHandle<*H>>(watcher: &mut W) {
unsafe {
let data = ~WatcherData {
read_cb: None,
write_cb: None,
connect_cb: None,
close_cb: None,
alloc_cb: None
};
let data = transmute::<~WatcherData, *c_void>(data);
uvll::set_data_for_uv_handle(watcher.native_handle(), data);
}
}
fn get_watcher_data<H, W: Watcher + NativeHandle<*H>>(
watcher: &'r mut W) -> &'r mut WatcherData {
unsafe {
let data = uvll::get_data_for_uv_handle(watcher.native_handle());
let data = transmute::<&*c_void, &mut ~WatcherData>(&data);
return &mut **data;
}
}
fn drop_watcher_data<H, W: Watcher + NativeHandle<*H>>(watcher: &mut W) {
unsafe {
let data = uvll::get_data_for_uv_handle(watcher.native_handle());
let _data = transmute::<*c_void, ~WatcherData>(data);
uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>());
}
}
#[test]
fn test_slice_to_uv_buf() {
let slice = [0, .. 20];
let buf = slice_to_uv_buf(slice);
fail_unless!(buf.len == 20);
unsafe {
let base = transmute::<*u8, *mut u8>(buf.base);
(*base) = 1;
(*ptr::mut_offset(base, 1)) = 2;
}
fail_unless!(slice[0] == 1);
fail_unless!(slice[1] == 2);
}
/// The uv buffer type
pub type Buf = uvll::uv_buf_t;
/// Borrow a slice to a Buf
pub fn slice_to_uv_buf(v: &[u8]) -> Buf {
let data = unsafe { vec::raw::to_ptr(v) };
unsafe { uvll::buf_init(data, v.len()) }
}
// XXX: Do these conversions without copying
/// Transmute an owned vector to a Buf
fn vec_to_uv_buf(v: ~[u8]) -> Buf {
let data = unsafe { malloc(v.len() as size_t) } as *u8;
fail_unless!(data.is_not_null());
do vec::as_imm_buf(v) |b, l| {
let data = data as *mut u8;
unsafe { ptr::copy_memory(data, b, l) }
}
let buf = unsafe { uvll::buf_init(data, v.len()) };
return buf;
}
/// Transmute a Buf that was once a ~[u8] back to ~[u8]
fn vec_from_uv_buf(buf: Buf) -> Option<~[u8]> {
if !(buf.len == 0 && buf.base.is_null()) {
let v = unsafe { vec::from_buf(buf.base, buf.len as uint) };
unsafe { free(buf.base as *c_void) };
return Some(v);
} else {
// No buffer
return None;
}
}
#[test]
fn loop_smoke_test() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
loop_.run();
loop_.close();
}
}
#[test]
#[ignore(reason = "valgrind - loop destroyed before watcher?")]
fn idle_new_then_close() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
idle_watcher.close();
}
}
#[test]
fn idle_smoke_test() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
let mut count = 10;
let count_ptr: *mut int = &mut count;
do idle_watcher.start |idle_watcher, status| {
let mut idle_watcher = idle_watcher;
fail_unless!(status.is_none());
if unsafe { *count_ptr == 10 } {
idle_watcher.stop();
idle_watcher.close();
} else {
unsafe { *count_ptr = *count_ptr + 1; }
}
}
loop_.run();
loop_.close();
fail_unless!(count == 10);
}
}
#[test]
fn idle_start_stop_start() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
do idle_watcher.start |idle_watcher, status| {
let mut idle_watcher = idle_watcher;
fail_unless!(status.is_none());
idle_watcher.stop();
do idle_watcher.start |idle_watcher, status| {
fail_unless!(status.is_none());
let mut idle_watcher = idle_watcher;
idle_watcher.stop();
idle_watcher.close();
}
}
loop_.run();
loop_.close();
}
}
#[test]
#[ignore(reason = "ffi struct issues")]
fn connect_close() {
do run_in_bare_thread() {
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
// Connect to a port where nobody is listening
let addr = Ipv4(127, 0, 0, 1, 2923);
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("tcp_watcher.connect!");
fail_unless!(status.is_some());
fail_unless!(status.get().name() == ~"ECONNREFUSED");
stream_watcher.close(||());
}
loop_.run();
loop_.close();
}
}
#[test]
#[ignore(reason = "need a server to connect to")]
fn connect_read() {
do run_in_bare_thread() {
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
let addr = Ipv4(127, 0, 0, 1, 2924);
do tcp_watcher.connect(addr) |stream_watcher, status| {
let mut stream_watcher = stream_watcher;
rtdebug!("tcp_watcher.connect!");
fail_unless!(status.is_none());
let alloc: AllocCallback = |size| {
vec_to_uv_buf(vec::from_elem(size, 0))
};
do stream_watcher.read_start(alloc)
|stream_watcher, nread, buf, status| {
let buf = vec_from_uv_buf(buf);
rtdebug!("read cb!");
if status.is_none() {
let bytes = buf.unwrap();
rtdebug!("%s", bytes.slice(0, nread as uint).to_str());
} else {
rtdebug!("status after read: %s", status.get().to_str());
rtdebug!("closing");
stream_watcher.close(||());
}
}
}
loop_.run();
loop_.close();
}
}
#[test]
#[ignore(reason = "ffi struct issues")]
fn listen() {
do run_in_bare_thread() {
static MAX: int = 10;
let mut loop_ = Loop::new();
let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
let addr = Ipv4(127, 0, 0, 1, 2925);
server_tcp_watcher.bind(addr);
let loop_ = loop_;
rtdebug!("listening");
do server_tcp_watcher.listen |server_stream_watcher, status| {
rtdebug!("listened!");
fail_unless!(status.is_none());
let mut server_stream_watcher = server_stream_watcher;
let mut loop_ = loop_;
let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
let mut client_tcp_watcher = client_tcp_watcher.as_stream();
server_stream_watcher.accept(client_tcp_watcher);
let count_cell = Cell(0);
let server_stream_watcher = server_stream_watcher;
rtdebug!("starting read");
let alloc: AllocCallback = |size| {
vec_to_uv_buf(vec::from_elem(size, 0))
};
do client_tcp_watcher.read_start(alloc)
|stream_watcher, nread, buf, status| {
rtdebug!("i'm reading!");
let buf = vec_from_uv_buf(buf);
let mut count = count_cell.take();
if status.is_none() {
rtdebug!("got %d bytes", nread);
let buf = buf.unwrap();
for buf.slice(0, nread as uint).each |byte| {
fail_unless!(*byte == count as u8);
rtdebug!("%u", *byte as uint);
count += 1;
}
} else {
fail_unless!(count == MAX);
do stream_watcher.close {
server_stream_watcher.close(||());
}
}
count_cell.put_back(count);
}
}
let _client_thread = do Thread::start {
rtdebug!("starting client thread");
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("connecting");
fail_unless!(status.is_none());
let mut stream_watcher = stream_watcher;
let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
do stream_watcher.write(msg) |stream_watcher, status| {
rtdebug!("writing");
fail_unless!(status.is_none());
stream_watcher.close(||());
}
}
loop_.run();
loop_.close();
};
let mut loop_ = loop_;
loop_.run();
loop_.close();
}
}

52
src/libcore/rt/uv/file.rs Normal file
View File

@ -0,0 +1,52 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::*;
use ptr::null;
use libc::c_void;
use super::{UvError, Callback, Request, NativeHandle, Loop};
use super::super::uvll;
use super::super::uvll::*;
pub type FsCallback = ~fn(FsRequest, Option<UvError>);
impl Callback for FsCallback { }
pub struct FsRequest(*uvll::uv_fs_t);
impl Request for FsRequest;
impl FsRequest {
fn new() -> FsRequest {
let fs_req = unsafe { malloc_req(UV_FS) };
fail_unless!(fs_req.is_not_null());
let fs_req = fs_req as *uvll::uv_write_t;
unsafe { uvll::set_data_for_req(fs_req, null::<()>()); }
NativeHandle::from_native_handle(fs_req)
}
fn delete(self) {
unsafe { free_req(self.native_handle() as *c_void) }
}
fn open(&mut self, _loop_: &Loop, _cb: FsCallback) {
}
fn close(&mut self, _loop_: &Loop, _cb: FsCallback) {
}
}
impl NativeHandle<*uvll::uv_fs_t> for FsRequest {
fn from_native_handle(handle: *uvll:: uv_fs_t) -> FsRequest {
FsRequest(handle)
}
fn native_handle(&self) -> *uvll::uv_fs_t {
match self { &FsRequest(ptr) => ptr }
}
}

456
src/libcore/rt/uv/mod.rs Normal file
View File

@ -0,0 +1,456 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
/*!
Bindings to libuv.
UV types consist of the event loop (Loop), Watchers, Requests and
Callbacks.
Watchers and Requests encapsulate pointers to uv *handles*, which have
subtyping relationships with each other. This subtyping is reflected
in the bindings with explicit or implicit coercions. For example, an
upcast from TcpWatcher to StreamWatcher is done with
`tcp_watcher.as_stream()`. In other cases a callback on a specific
type of watcher will be passed a watcher of a supertype.
Currently all use of Request types (connect/write requests) are
encapsulated in the bindings and don't need to be dealt with by the
caller.
# Safety note
Due to the complex lifecycle of uv handles, as well as compiler bugs,
this module is not memory safe and requires explicit memory management,
via `close` and `delete` methods.
*/
use option::*;
use str::raw::from_c_str;
use to_str::ToStr;
use vec;
use ptr;
use libc::{c_void, c_int, size_t, malloc, free, ssize_t};
use cast::{transmute, transmute_mut_region};
use ptr::null;
use sys::size_of;
use super::uvll;
use super::uvll::*;
use unstable::finally::Finally;
#[cfg(test)] use unstable::run_in_bare_thread;
#[cfg(test)] use super::thread::Thread;
#[cfg(test)] use cell::Cell;
pub use self::file::{FsRequest, FsCallback};
pub use self::net::{StreamWatcher, TcpWatcher};
pub use self::net::{ReadCallback, AllocCallback, ConnectionCallback, ConnectCallback};
pub mod file;
pub mod net;
/// A trait for callbacks to implement. Provides a little extra type safety
/// for generic, unsafe interop functions like `set_watcher_callback`.
pub trait Callback { }
pub trait Request { }
/// The trait implemented by uv 'watchers' (handles). Watchers are
/// non-owning wrappers around the uv handles and are not completely
/// safe - there may be multiple instances for a single underlying
/// handle. Watchers are generally created, then `start`ed, `stop`ed
/// and `close`ed, but due to their complex life cycle may not be
/// entirely memory safe if used in unanticipated patterns.
pub trait Watcher {
fn event_loop(&self) -> Loop;
}
pub type NullCallback = ~fn();
impl Callback for NullCallback { }
/// A type that wraps a native handle
pub trait NativeHandle<T> {
pub fn from_native_handle(T) -> Self;
pub fn native_handle(&self) -> T;
}
/// XXX: Loop(*handle) is buggy with destructors. Normal structs
/// with dtors may not be destructured, but tuple structs can,
/// but the results are not correct.
pub struct Loop {
handle: *uvll::uv_loop_t
}
pub impl Loop {
fn new() -> Loop {
let handle = unsafe { uvll::loop_new() };
fail_unless!(handle.is_not_null());
NativeHandle::from_native_handle(handle)
}
fn run(&mut self) {
unsafe { uvll::run(self.native_handle()) };
}
fn close(&mut self) {
unsafe { uvll::loop_delete(self.native_handle()) };
}
}
impl NativeHandle<*uvll::uv_loop_t> for Loop {
fn from_native_handle(handle: *uvll::uv_loop_t) -> Loop {
Loop { handle: handle }
}
fn native_handle(&self) -> *uvll::uv_loop_t {
self.handle
}
}
pub struct IdleWatcher(*uvll::uv_idle_t);
impl Watcher for IdleWatcher {
fn event_loop(&self) -> Loop {
loop_from_watcher(self)
}
}
pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
impl Callback for IdleCallback { }
pub impl IdleWatcher {
fn new(loop_: &mut Loop) -> IdleWatcher {
unsafe {
let handle = uvll::idle_new();
fail_unless!(handle.is_not_null());
fail_unless!(0 == uvll::idle_init(loop_.native_handle(), handle));
uvll::set_data_for_uv_handle(handle, null::<()>());
NativeHandle::from_native_handle(handle)
}
}
fn start(&mut self, cb: IdleCallback) {
set_watcher_callback(self, cb);
unsafe {
fail_unless!(0 == uvll::idle_start(self.native_handle(), idle_cb))
};
extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) {
let idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
let cb: &IdleCallback = borrow_callback_from_watcher(&idle_watcher);
let status = status_to_maybe_uv_error(handle, status);
(*cb)(idle_watcher, status);
}
}
fn stop(&mut self) {
unsafe { fail_unless!(0 == uvll::idle_stop(self.native_handle())); }
}
fn close(self) {
unsafe { uvll::close(self.native_handle(), close_cb) };
extern fn close_cb(handle: *uvll::uv_idle_t) {
let mut idle_watcher = NativeHandle::from_native_handle(handle);
drop_watcher_callback::<uvll::uv_idle_t, IdleWatcher, IdleCallback>(&mut idle_watcher);
unsafe { uvll::idle_delete(handle) };
}
}
}
impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher {
fn from_native_handle(handle: *uvll::uv_idle_t) -> IdleWatcher {
IdleWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_idle_t {
match self { &IdleWatcher(ptr) => ptr }
}
}
// XXX: Need to define the error constants like EOF so they can be
// compared to the UvError type
pub struct UvError(uvll::uv_err_t);
pub impl UvError {
fn name(&self) -> ~str {
unsafe {
let inner = match self { &UvError(ref a) => a };
let name_str = uvll::err_name(inner);
fail_unless!(name_str.is_not_null());
from_c_str(name_str)
}
}
fn desc(&self) -> ~str {
unsafe {
let inner = match self { &UvError(ref a) => a };
let desc_str = uvll::strerror(inner);
fail_unless!(desc_str.is_not_null());
from_c_str(desc_str)
}
}
}
impl ToStr for UvError {
fn to_str(&self) -> ~str {
fmt!("%s: %s", self.name(), self.desc())
}
}
#[test]
fn error_smoke_test() {
let err = uvll::uv_err_t { code: 1, sys_errno_: 1 };
let err: UvError = UvError(err);
fail_unless!(err.to_str() == ~"EOF: end of file");
}
/// Given a uv handle, convert a callback status to a UvError
// XXX: Follow the pattern below by parameterizing over T: Watcher, not T
pub fn status_to_maybe_uv_error<T>(handle: *T, status: c_int) -> Option<UvError> {
if status != -1 {
None
} else {
unsafe {
rtdebug!("handle: %x", handle as uint);
let loop_ = uvll::get_loop_for_uv_handle(handle);
rtdebug!("loop: %x", loop_ as uint);
let err = uvll::last_error(loop_);
Some(UvError(err))
}
}
}
/// Get the uv event loop from a Watcher
pub fn loop_from_watcher<H, W: Watcher + NativeHandle<*H>>(
watcher: &W) -> Loop {
let handle = watcher.native_handle();
let loop_ = unsafe { uvll::get_loop_for_uv_handle(handle) };
NativeHandle::from_native_handle(loop_)
}
/// Set the custom data on a handle to a callback Note: This is only
/// suitable for watchers that make just one type of callback. For
/// others use WatcherData
pub fn set_watcher_callback<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
watcher: &mut W, cb: CB) {
drop_watcher_callback::<H, W, CB>(watcher);
// XXX: Boxing the callback so it fits into a
// pointer. Unfortunate extra allocation
let boxed_cb = ~cb;
let data = unsafe { transmute::<~CB, *c_void>(boxed_cb) };
unsafe { uvll::set_data_for_uv_handle(watcher.native_handle(), data) };
}
/// Delete a callback from a handle's custom data
pub fn drop_watcher_callback<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
watcher: &mut W) {
unsafe {
let handle = watcher.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
if handle_data.is_not_null() {
// Take ownership of the callback and drop it
let _cb = transmute::<*c_void, ~CB>(handle_data);
// Make sure the pointer is zeroed
uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>());
}
}
}
/// Take a pointer to the callback installed as custom data
pub fn borrow_callback_from_watcher<H, W: Watcher + NativeHandle<*H>,
CB: Callback>(watcher: &W) -> &CB {
unsafe {
let handle = watcher.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
fail_unless!(handle_data.is_not_null());
let cb = transmute::<&*c_void, &~CB>(&handle_data);
return &**cb;
}
}
/// Take ownership of the callback installed as custom data
pub fn take_callback_from_watcher<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
watcher: &mut W) -> CB {
unsafe {
let handle = watcher.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
fail_unless!(handle_data.is_not_null());
uvll::set_data_for_uv_handle(handle, null::<()>());
let cb: ~CB = transmute::<*c_void, ~CB>(handle_data);
let cb = match cb { ~cb => cb };
return cb;
}
}
/// Callbacks used by StreamWatchers, set as custom data on the foreign handle
struct WatcherData {
read_cb: Option<ReadCallback>,
write_cb: Option<ConnectionCallback>,
connect_cb: Option<ConnectionCallback>,
close_cb: Option<NullCallback>,
alloc_cb: Option<AllocCallback>
}
pub fn install_watcher_data<H, W: Watcher + NativeHandle<*H>>(watcher: &mut W) {
unsafe {
let data = ~WatcherData {
read_cb: None,
write_cb: None,
connect_cb: None,
close_cb: None,
alloc_cb: None
};
let data = transmute::<~WatcherData, *c_void>(data);
uvll::set_data_for_uv_handle(watcher.native_handle(), data);
}
}
pub fn get_watcher_data<H, W: Watcher + NativeHandle<*H>>(
watcher: &'r mut W) -> &'r mut WatcherData {
unsafe {
let data = uvll::get_data_for_uv_handle(watcher.native_handle());
let data = transmute::<&*c_void, &mut ~WatcherData>(&data);
return &mut **data;
}
}
pub fn drop_watcher_data<H, W: Watcher + NativeHandle<*H>>(watcher: &mut W) {
unsafe {
let data = uvll::get_data_for_uv_handle(watcher.native_handle());
let _data = transmute::<*c_void, ~WatcherData>(data);
uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>());
}
}
#[test]
fn test_slice_to_uv_buf() {
let slice = [0, .. 20];
let buf = slice_to_uv_buf(slice);
fail_unless!(buf.len == 20);
unsafe {
let base = transmute::<*u8, *mut u8>(buf.base);
(*base) = 1;
(*ptr::mut_offset(base, 1)) = 2;
}
fail_unless!(slice[0] == 1);
fail_unless!(slice[1] == 2);
}
/// The uv buffer type
pub type Buf = uvll::uv_buf_t;
/// Borrow a slice to a Buf
pub fn slice_to_uv_buf(v: &[u8]) -> Buf {
let data = unsafe { vec::raw::to_ptr(v) };
unsafe { uvll::buf_init(data, v.len()) }
}
// XXX: Do these conversions without copying
/// Transmute an owned vector to a Buf
pub fn vec_to_uv_buf(v: ~[u8]) -> Buf {
let data = unsafe { malloc(v.len() as size_t) } as *u8;
fail_unless!(data.is_not_null());
do vec::as_imm_buf(v) |b, l| {
let data = data as *mut u8;
unsafe { ptr::copy_memory(data, b, l) }
}
let buf = unsafe { uvll::buf_init(data, v.len()) };
return buf;
}
/// Transmute a Buf that was once a ~[u8] back to ~[u8]
pub fn vec_from_uv_buf(buf: Buf) -> Option<~[u8]> {
if !(buf.len == 0 && buf.base.is_null()) {
let v = unsafe { vec::from_buf(buf.base, buf.len as uint) };
unsafe { free(buf.base as *c_void) };
return Some(v);
} else {
// No buffer
return None;
}
}
#[test]
fn loop_smoke_test() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
loop_.run();
loop_.close();
}
}
#[test]
#[ignore(reason = "valgrind - loop destroyed before watcher?")]
fn idle_new_then_close() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
idle_watcher.close();
}
}
#[test]
fn idle_smoke_test() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
let mut count = 10;
let count_ptr: *mut int = &mut count;
do idle_watcher.start |idle_watcher, status| {
let mut idle_watcher = idle_watcher;
fail_unless!(status.is_none());
if unsafe { *count_ptr == 10 } {
idle_watcher.stop();
idle_watcher.close();
} else {
unsafe { *count_ptr = *count_ptr + 1; }
}
}
loop_.run();
loop_.close();
fail_unless!(count == 10);
}
}
#[test]
fn idle_start_stop_start() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
do idle_watcher.start |idle_watcher, status| {
let mut idle_watcher = idle_watcher;
fail_unless!(status.is_none());
idle_watcher.stop();
do idle_watcher.start |idle_watcher, status| {
fail_unless!(status.is_none());
let mut idle_watcher = idle_watcher;
idle_watcher.stop();
idle_watcher.close();
}
}
loop_.run();
loop_.close();
}
}

483
src/libcore/rt/uv/net.rs Normal file
View File

@ -0,0 +1,483 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::*;
use libc::{size_t, ssize_t, c_int, c_void};
use cast::{transmute, transmute_mut_region};
use super::super::uvll;
use super::super::uvll::*;
use super::{Loop, Watcher, Request, UvError, Buf, Callback, NativeHandle, NullCallback,
loop_from_watcher, status_to_maybe_uv_error,
install_watcher_data, get_watcher_data, drop_watcher_data,
vec_to_uv_buf, vec_from_uv_buf};
use super::super::rtio::{IpAddr, Ipv4, Ipv6};
#[cfg(test)]
use unstable::run_in_bare_thread;
#[cfg(test)]
use super::super::thread::Thread;
#[cfg(test)]
use cell::Cell;
fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) {
match addr {
Ipv4(a, b, c, d, p) => {
unsafe {
let addr = malloc_ip4_addr(fmt!("%u.%u.%u.%u",
a as uint,
b as uint,
c as uint,
d as uint), p as int);
do (|| {
f(addr);
}).finally {
free_ip4_addr(addr);
}
}
}
Ipv6 => fail!()
}
}
// uv_stream t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
// and uv_file_t
pub struct StreamWatcher(*uvll::uv_stream_t);
impl Watcher for StreamWatcher {
fn event_loop(&self) -> Loop {
loop_from_watcher(self)
}
}
pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option<UvError>);
impl Callback for ReadCallback { }
// XXX: The uv alloc callback also has a *uv_handle_t arg
pub type AllocCallback = ~fn(uint) -> Buf;
impl Callback for AllocCallback { }
pub impl StreamWatcher {
fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
// XXX: Borrowchk problems
let data = get_watcher_data(unsafe { transmute_mut_region(self) });
data.alloc_cb = Some(alloc);
data.read_cb = Some(cb);
let handle = self.native_handle();
unsafe { uvll::read_start(handle, alloc_cb, read_cb); }
extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
let data = get_watcher_data(&mut stream_watcher);
let alloc_cb = data.alloc_cb.get_ref();
return (*alloc_cb)(suggested_size as uint);
}
extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, ++buf: Buf) {
rtdebug!("buf addr: %x", buf.base as uint);
rtdebug!("buf len: %d", buf.len as int);
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
let data = get_watcher_data(&mut stream_watcher);
let cb = data.read_cb.get_ref();
let status = status_to_maybe_uv_error(stream, nread as c_int);
(*cb)(stream_watcher, nread as int, buf, status);
}
}
fn read_stop(&mut self) {
// It would be nice to drop the alloc and read callbacks here,
// but read_stop may be called from inside one of them and we
// would end up freeing the in-use environment
let handle = self.native_handle();
unsafe { uvll::read_stop(handle); }
}
// XXX: Needs to take &[u8], not ~[u8]
fn write(&mut self, msg: ~[u8], cb: ConnectionCallback) {
// XXX: Borrowck
let data = get_watcher_data(unsafe { transmute_mut_region(self) });
fail_unless!(data.write_cb.is_none());
data.write_cb = Some(cb);
let req = WriteRequest::new();
let buf = vec_to_uv_buf(msg);
// XXX: Allocation
let bufs = ~[buf];
unsafe {
fail_unless!(0 == uvll::write(req.native_handle(),
self.native_handle(),
&bufs, write_cb));
}
// XXX: Freeing immediately after write. Is this ok?
let _v = vec_from_uv_buf(buf);
extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
let write_request: WriteRequest = NativeHandle::from_native_handle(req);
let mut stream_watcher = write_request.stream();
write_request.delete();
let cb = get_watcher_data(&mut stream_watcher).write_cb.swap_unwrap();
let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
cb(stream_watcher, status);
}
}
fn accept(&mut self, stream: StreamWatcher) {
let self_handle = self.native_handle() as *c_void;
let stream_handle = stream.native_handle() as *c_void;
unsafe {
fail_unless!(0 == uvll::accept(self_handle, stream_handle));
}
}
fn close(self, cb: NullCallback) {
{
let mut self = self;
let data = get_watcher_data(&mut self);
fail_unless!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe { uvll::close(self.native_handle(), close_cb); }
extern fn close_cb(handle: *uvll::uv_stream_t) {
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
{
let mut data = get_watcher_data(&mut stream_watcher);
data.close_cb.swap_unwrap()();
}
drop_watcher_data(&mut stream_watcher);
unsafe { free_handle(handle as *c_void) }
}
}
}
impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
fn from_native_handle(
handle: *uvll::uv_stream_t) -> StreamWatcher {
StreamWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_stream_t {
match self { &StreamWatcher(ptr) => ptr }
}
}
pub struct TcpWatcher(*uvll::uv_tcp_t);
impl Watcher for TcpWatcher {
fn event_loop(&self) -> Loop {
loop_from_watcher(self)
}
}
pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
impl Callback for ConnectionCallback { }
pub impl TcpWatcher {
fn new(loop_: &mut Loop) -> TcpWatcher {
unsafe {
let handle = malloc_handle(UV_TCP);
fail_unless!(handle.is_not_null());
fail_unless!(0 == uvll::tcp_init(loop_.native_handle(), handle));
let mut watcher = NativeHandle::from_native_handle(handle);
install_watcher_data(&mut watcher);
return watcher;
}
}
fn bind(&mut self, address: IpAddr) {
match address {
Ipv4(*) => {
do ip4_as_uv_ip4(address) |addr| {
let result = unsafe {
uvll::tcp_bind(self.native_handle(), addr)
};
// XXX: bind is likely to fail. need real error handling
fail_unless!(result == 0);
}
}
_ => fail!()
}
}
fn connect(&mut self, address: IpAddr, cb: ConnectionCallback) {
unsafe {
fail_unless!(get_watcher_data(self).connect_cb.is_none());
get_watcher_data(self).connect_cb = Some(cb);
let mut connect_watcher = ConnectRequest::new();
let connect_handle = connect_watcher.native_handle();
match address {
Ipv4(*) => {
do ip4_as_uv_ip4(address) |addr| {
rtdebug!("connect_t: %x", connect_handle as uint);
fail_unless!(0 == uvll::tcp_connect(connect_handle,
self.native_handle(),
addr, connect_cb));
}
}
_ => fail!()
}
extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
rtdebug!("connect_t: %x", req as uint);
let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
let mut stream_watcher = connect_request.stream();
connect_request.delete();
let cb: ConnectionCallback = {
let data = get_watcher_data(&mut stream_watcher);
data.connect_cb.swap_unwrap()
};
let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
cb(stream_watcher, status);
}
}
}
fn listen(&mut self, cb: ConnectionCallback) {
// XXX: Borrowck
let data = get_watcher_data(unsafe { transmute_mut_region(self) });
fail_unless!(data.connect_cb.is_none());
data.connect_cb = Some(cb);
unsafe {
static BACKLOG: c_int = 128; // XXX should be configurable
// XXX: This can probably fail
fail_unless!(0 == uvll::listen(self.native_handle(),
BACKLOG, connection_cb));
}
extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
rtdebug!("connection_cb");
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
let cb = get_watcher_data(&mut stream_watcher).connect_cb.swap_unwrap();
let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
cb(stream_watcher, status);
}
}
fn as_stream(&self) -> StreamWatcher {
NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t)
}
}
impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
TcpWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_tcp_t {
match self { &TcpWatcher(ptr) => ptr }
}
}
pub type ConnectCallback = ~fn(ConnectRequest, Option<UvError>);
impl Callback for ConnectCallback { }
// uv_connect_t is a subclass of uv_req_t
struct ConnectRequest(*uvll::uv_connect_t);
impl Request for ConnectRequest { }
impl ConnectRequest {
fn new() -> ConnectRequest {
let connect_handle = unsafe {
malloc_req(UV_CONNECT)
};
fail_unless!(connect_handle.is_not_null());
let connect_handle = connect_handle as *uvll::uv_connect_t;
ConnectRequest(connect_handle)
}
fn stream(&self) -> StreamWatcher {
unsafe {
let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle());
NativeHandle::from_native_handle(stream_handle)
}
}
fn delete(self) {
unsafe { free_req(self.native_handle() as *c_void) }
}
}
impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
fn from_native_handle(
handle: *uvll:: uv_connect_t) -> ConnectRequest {
ConnectRequest(handle)
}
fn native_handle(&self) -> *uvll::uv_connect_t {
match self { &ConnectRequest(ptr) => ptr }
}
}
pub struct WriteRequest(*uvll::uv_write_t);
impl Request for WriteRequest { }
pub impl WriteRequest {
fn new() -> WriteRequest {
let write_handle = unsafe {
malloc_req(UV_WRITE)
};
fail_unless!(write_handle.is_not_null());
let write_handle = write_handle as *uvll::uv_write_t;
WriteRequest(write_handle)
}
fn stream(&self) -> StreamWatcher {
unsafe {
let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle());
NativeHandle::from_native_handle(stream_handle)
}
}
fn delete(self) {
unsafe { free_req(self.native_handle() as *c_void) }
}
}
impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
WriteRequest(handle)
}
fn native_handle(&self) -> *uvll::uv_write_t {
match self { &WriteRequest(ptr) => ptr }
}
}
#[test]
#[ignore(reason = "ffi struct issues")]
fn connect_close() {
do run_in_bare_thread() {
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
// Connect to a port where nobody is listening
let addr = Ipv4(127, 0, 0, 1, 2923);
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("tcp_watcher.connect!");
fail_unless!(status.is_some());
fail_unless!(status.get().name() == ~"ECONNREFUSED");
stream_watcher.close(||());
}
loop_.run();
loop_.close();
}
}
#[test]
#[ignore(reason = "need a server to connect to")]
fn connect_read() {
do run_in_bare_thread() {
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
let addr = Ipv4(127, 0, 0, 1, 2924);
do tcp_watcher.connect(addr) |stream_watcher, status| {
let mut stream_watcher = stream_watcher;
rtdebug!("tcp_watcher.connect!");
fail_unless!(status.is_none());
let alloc: AllocCallback = |size| {
vec_to_uv_buf(vec::from_elem(size, 0))
};
do stream_watcher.read_start(alloc)
|stream_watcher, nread, buf, status| {
let buf = vec_from_uv_buf(buf);
rtdebug!("read cb!");
if status.is_none() {
let bytes = buf.unwrap();
rtdebug!("%s", bytes.slice(0, nread as uint).to_str());
} else {
rtdebug!("status after read: %s", status.get().to_str());
rtdebug!("closing");
stream_watcher.close(||());
}
}
}
loop_.run();
loop_.close();
}
}
#[test]
#[ignore(reason = "ffi struct issues")]
fn listen() {
do run_in_bare_thread() {
static MAX: int = 10;
let mut loop_ = Loop::new();
let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
let addr = Ipv4(127, 0, 0, 1, 2925);
server_tcp_watcher.bind(addr);
let loop_ = loop_;
rtdebug!("listening");
do server_tcp_watcher.listen |server_stream_watcher, status| {
rtdebug!("listened!");
fail_unless!(status.is_none());
let mut server_stream_watcher = server_stream_watcher;
let mut loop_ = loop_;
let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
let mut client_tcp_watcher = client_tcp_watcher.as_stream();
server_stream_watcher.accept(client_tcp_watcher);
let count_cell = Cell(0);
let server_stream_watcher = server_stream_watcher;
rtdebug!("starting read");
let alloc: AllocCallback = |size| {
vec_to_uv_buf(vec::from_elem(size, 0))
};
do client_tcp_watcher.read_start(alloc)
|stream_watcher, nread, buf, status| {
rtdebug!("i'm reading!");
let buf = vec_from_uv_buf(buf);
let mut count = count_cell.take();
if status.is_none() {
rtdebug!("got %d bytes", nread);
let buf = buf.unwrap();
for buf.slice(0, nread as uint).each |byte| {
fail_unless!(*byte == count as u8);
rtdebug!("%u", *byte as uint);
count += 1;
}
} else {
fail_unless!(count == MAX);
do stream_watcher.close {
server_stream_watcher.close(||());
}
}
count_cell.put_back(count);
}
}
let _client_thread = do Thread::start {
rtdebug!("starting client thread");
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("connecting");
fail_unless!(status.is_none());
let mut stream_watcher = stream_watcher;
let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
do stream_watcher.write(msg) |stream_watcher, status| {
rtdebug!("writing");
fail_unless!(status.is_none());
stream_watcher.close(||());
}
}
loop_.run();
loop_.close();
};
let mut loop_ = loop_;
loop_.run();
loop_.close();
}
}

View File

@ -12,13 +12,11 @@ use option::*;
use result::*;
use super::uv::*;
use super::io::*;
use super::rtio::*;
use ops::Drop;
use cell::{Cell, empty_cell};
use cast::transmute;
use super::StreamObject;
use super::sched::Scheduler;
use super::IoFactoryObject;
#[cfg(test)] use super::sched::Task;
#[cfg(test)] use unstable::run_in_bare_thread;
@ -189,12 +187,9 @@ impl TcpListener for UvTcpListener {
do server_tcp_watcher.listen |server_stream_watcher, status| {
let maybe_stream = if status.is_none() {
let mut server_stream_watcher = server_stream_watcher;
let mut loop_ =
loop_from_watcher(&server_stream_watcher);
let mut client_tcp_watcher =
TcpWatcher::new(&mut loop_);
let mut client_tcp_watcher =
client_tcp_watcher.as_stream();
let mut loop_ = loop_from_watcher(&server_stream_watcher);
let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
let mut client_tcp_watcher = client_tcp_watcher.as_stream();
// XXX: Need's to be surfaced in interface
server_stream_watcher.accept(client_tcp_watcher);
Some(~UvStream::new(client_tcp_watcher))
@ -425,8 +420,7 @@ fn test_read_and_block() {
// Yield to the other task in hopes that it
// will trigger a read callback while we are
// not ready for it
do scheduler.block_running_task_and_then
|scheduler, task| {
do scheduler.block_running_task_and_then |scheduler, task| {
scheduler.task_queue.push_back(task);
}
}

442
src/libcore/rt/uvll.rs Normal file
View File

@ -0,0 +1,442 @@
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
/*!
* Low-level bindings to the libuv library.
*
* This module contains a set of direct, 'bare-metal' wrappers around
* the libuv C-API.
*
* We're not bothering yet to redefine uv's structs as Rust structs
* because they are quite large and change often between versions.
* The maintenance burden is just too high. Instead we use the uv's
* `uv_handle_size` and `uv_req_size` to find the correct size of the
* structs and allocate them on the heap. This can be revisited later.
*
* There are also a collection of helper functions to ease interacting
* with the low-level API.
*
* As new functionality, existant in uv.h, is added to the rust stdlib,
* the mappings should be added in this module.
*/
#[allow(non_camel_case_types)]; // C types
use libc::{size_t, c_int, c_uint, c_void, c_char, uintptr_t};
use libc::{malloc, free};
use prelude::*;
use ptr::to_unsafe_ptr;
pub struct uv_err_t {
code: c_int,
sys_errno_: c_int
}
pub struct uv_buf_t {
base: *u8,
len: libc::size_t,
}
pub type uv_handle_t = c_void;
pub type uv_loop_t = c_void;
pub type uv_idle_t = c_void;
pub type uv_tcp_t = c_void;
pub type uv_connect_t = c_void;
pub type uv_write_t = c_void;
pub type uv_async_t = c_void;
pub type uv_timer_t = c_void;
pub type uv_stream_t = c_void;
pub type uv_fs_t = c_void;
pub type uv_idle_cb = *u8;
pub type sockaddr_in = c_void;
pub type sockaddr_in6 = c_void;
#[deriving(Eq)]
pub enum uv_handle_type {
UV_UNKNOWN_HANDLE,
UV_ASYNC,
UV_CHECK,
UV_FS_EVENT,
UV_FS_POLL,
UV_HANDLE,
UV_IDLE,
UV_NAMED_PIPE,
UV_POLL,
UV_PREPARE,
UV_PROCESS,
UV_STREAM,
UV_TCP,
UV_TIMER,
UV_TTY,
UV_UDP,
UV_SIGNAL,
UV_FILE,
UV_HANDLE_TYPE_MAX
}
#[deriving(Eq)]
pub enum uv_req_type {
UV_UNKNOWN_REQ,
UV_REQ,
UV_CONNECT,
UV_WRITE,
UV_SHUTDOWN,
UV_UDP_SEND,
UV_FS,
UV_WORK,
UV_GETADDRINFO,
UV_REQ_TYPE_MAX
}
pub unsafe fn malloc_handle(handle: uv_handle_type) -> *c_void {
fail_unless!(handle != UV_UNKNOWN_HANDLE && handle != UV_HANDLE_TYPE_MAX);
let size = unsafe { rust_uv_handle_size(handle as uint) };
let p = malloc(size);
fail_unless!(p.is_not_null());
return p;
}
pub unsafe fn free_handle(v: *c_void) {
free(v)
}
pub unsafe fn malloc_req(req: uv_req_type) -> *c_void {
fail_unless!(req != UV_UNKNOWN_REQ && req != UV_REQ_TYPE_MAX);
let size = unsafe { rust_uv_req_size(req as uint) };
let p = malloc(size);
fail_unless!(p.is_not_null());
return p;
}
pub unsafe fn free_req(v: *c_void) {
free(v)
}
#[test]
fn handle_sanity_check() {
unsafe {
fail_unless!(UV_HANDLE_TYPE_MAX as uint == rust_uv_handle_type_max());
}
}
#[test]
fn request_sanity_check() {
unsafe {
fail_unless!(UV_REQ_TYPE_MAX as uint == rust_uv_req_type_max());
}
}
pub unsafe fn loop_new() -> *c_void {
return rust_uv_loop_new();
}
pub unsafe fn loop_delete(loop_handle: *c_void) {
rust_uv_loop_delete(loop_handle);
}
pub unsafe fn run(loop_handle: *c_void) {
rust_uv_run(loop_handle);
}
pub unsafe fn close<T>(handle: *T, cb: *u8) {
rust_uv_close(handle as *c_void, cb);
}
pub unsafe fn walk(loop_handle: *c_void, cb: *u8, arg: *c_void) {
rust_uv_walk(loop_handle, cb, arg);
}
pub unsafe fn idle_new() -> *uv_idle_t {
rust_uv_idle_new()
}
pub unsafe fn idle_delete(handle: *uv_idle_t) {
rust_uv_idle_delete(handle)
}
pub unsafe fn idle_init(loop_handle: *uv_loop_t, handle: *uv_idle_t) -> c_int {
rust_uv_idle_init(loop_handle, handle)
}
pub unsafe fn idle_start(handle: *uv_idle_t, cb: uv_idle_cb) -> c_int {
rust_uv_idle_start(handle, cb)
}
pub unsafe fn idle_stop(handle: *uv_idle_t) -> c_int {
rust_uv_idle_stop(handle)
}
pub unsafe fn tcp_init(loop_handle: *c_void, handle: *uv_tcp_t) -> c_int {
return rust_uv_tcp_init(loop_handle, handle);
}
// FIXME ref #2064
pub unsafe fn tcp_connect(connect_ptr: *uv_connect_t,
tcp_handle_ptr: *uv_tcp_t,
addr_ptr: *sockaddr_in,
after_connect_cb: *u8) -> c_int {
return rust_uv_tcp_connect(connect_ptr, tcp_handle_ptr,
after_connect_cb, addr_ptr);
}
// FIXME ref #2064
pub unsafe fn tcp_connect6(connect_ptr: *uv_connect_t,
tcp_handle_ptr: *uv_tcp_t,
addr_ptr: *sockaddr_in6,
after_connect_cb: *u8) -> c_int {
return rust_uv_tcp_connect6(connect_ptr, tcp_handle_ptr,
after_connect_cb, addr_ptr);
}
// FIXME ref #2064
pub unsafe fn tcp_bind(tcp_server_ptr: *uv_tcp_t, addr_ptr: *sockaddr_in) -> c_int {
return rust_uv_tcp_bind(tcp_server_ptr, addr_ptr);
}
// FIXME ref #2064
pub unsafe fn tcp_bind6(tcp_server_ptr: *uv_tcp_t, addr_ptr: *sockaddr_in6) -> c_int {
return rust_uv_tcp_bind6(tcp_server_ptr, addr_ptr);
}
pub unsafe fn tcp_getpeername(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in) -> c_int {
return rust_uv_tcp_getpeername(tcp_handle_ptr, name);
}
pub unsafe fn tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in6) ->c_int {
return rust_uv_tcp_getpeername6(tcp_handle_ptr, name);
}
pub unsafe fn listen<T>(stream: *T, backlog: c_int, cb: *u8) -> c_int {
return rust_uv_listen(stream as *c_void, backlog, cb);
}
pub unsafe fn accept(server: *c_void, client: *c_void) -> c_int {
return rust_uv_accept(server as *c_void, client as *c_void);
}
pub unsafe fn write<T>(req: *uv_write_t, stream: *T, buf_in: *~[uv_buf_t], cb: *u8) -> c_int {
let buf_ptr = vec::raw::to_ptr(*buf_in);
let buf_cnt = vec::len(*buf_in) as i32;
return rust_uv_write(req as *c_void, stream as *c_void, buf_ptr, buf_cnt, cb);
}
pub unsafe fn read_start(stream: *uv_stream_t, on_alloc: *u8, on_read: *u8) -> c_int {
return rust_uv_read_start(stream as *c_void, on_alloc, on_read);
}
pub unsafe fn read_stop(stream: *uv_stream_t) -> c_int {
return rust_uv_read_stop(stream as *c_void);
}
pub unsafe fn last_error(loop_handle: *c_void) -> uv_err_t {
return rust_uv_last_error(loop_handle);
}
pub unsafe fn strerror(err: *uv_err_t) -> *c_char {
return rust_uv_strerror(err);
}
pub unsafe fn err_name(err: *uv_err_t) -> *c_char {
return rust_uv_err_name(err);
}
pub unsafe fn async_init(loop_handle: *c_void, async_handle: *uv_async_t, cb: *u8) -> c_int {
return rust_uv_async_init(loop_handle, async_handle, cb);
}
pub unsafe fn async_send(async_handle: *uv_async_t) {
return rust_uv_async_send(async_handle);
}
pub unsafe fn buf_init(input: *u8, len: uint) -> uv_buf_t {
let out_buf = uv_buf_t { base: ptr::null(), len: 0 as size_t };
let out_buf_ptr = ptr::addr_of(&out_buf);
rust_uv_buf_init(out_buf_ptr, input, len as size_t);
return out_buf;
}
pub unsafe fn timer_init(loop_ptr: *c_void, timer_ptr: *uv_timer_t) -> c_int {
return rust_uv_timer_init(loop_ptr, timer_ptr);
}
pub unsafe fn timer_start(timer_ptr: *uv_timer_t, cb: *u8, timeout: uint,
repeat: uint) -> c_int {
return rust_uv_timer_start(timer_ptr, cb, timeout as c_uint, repeat as c_uint);
}
pub unsafe fn timer_stop(timer_ptr: *uv_timer_t) -> c_int {
return rust_uv_timer_stop(timer_ptr);
}
pub unsafe fn malloc_ip4_addr(ip: &str, port: int) -> *sockaddr_in {
do str::as_c_str(ip) |ip_buf| {
rust_uv_ip4_addrp(ip_buf as *u8, port as libc::c_int)
}
}
pub unsafe fn malloc_ip6_addr(ip: &str, port: int) -> *sockaddr_in6 {
do str::as_c_str(ip) |ip_buf| {
rust_uv_ip6_addrp(ip_buf as *u8, port as libc::c_int)
}
}
pub unsafe fn free_ip4_addr(addr: *sockaddr_in) {
rust_uv_free_ip4_addr(addr);
}
pub unsafe fn free_ip6_addr(addr: *sockaddr_in6) {
rust_uv_free_ip6_addr(addr);
}
// data access helpers
pub unsafe fn get_loop_for_uv_handle<T>(handle: *T) -> *c_void {
return rust_uv_get_loop_for_uv_handle(handle as *c_void);
}
pub unsafe fn get_stream_handle_from_connect_req(connect: *uv_connect_t) -> *uv_stream_t {
return rust_uv_get_stream_handle_from_connect_req(connect);
}
pub unsafe fn get_stream_handle_from_write_req(write_req: *uv_write_t) -> *uv_stream_t {
return rust_uv_get_stream_handle_from_write_req(write_req);
}
pub unsafe fn get_data_for_uv_loop(loop_ptr: *c_void) -> *c_void {
rust_uv_get_data_for_uv_loop(loop_ptr)
}
pub unsafe fn set_data_for_uv_loop(loop_ptr: *c_void, data: *c_void) {
rust_uv_set_data_for_uv_loop(loop_ptr, data);
}
pub unsafe fn get_data_for_uv_handle<T>(handle: *T) -> *c_void {
return rust_uv_get_data_for_uv_handle(handle as *c_void);
}
pub unsafe fn set_data_for_uv_handle<T, U>(handle: *T, data: *U) {
rust_uv_set_data_for_uv_handle(handle as *c_void, data as *c_void);
}
pub unsafe fn get_data_for_req<T>(req: *T) -> *c_void {
return rust_uv_get_data_for_req(req as *c_void);
}
pub unsafe fn set_data_for_req<T, U>(req: *T, data: *U) {
rust_uv_set_data_for_req(req as *c_void, data as *c_void);
}
pub unsafe fn get_base_from_buf(buf: uv_buf_t) -> *u8 {
return rust_uv_get_base_from_buf(buf);
}
pub unsafe fn get_len_from_buf(buf: uv_buf_t) -> size_t {
return rust_uv_get_len_from_buf(buf);
}
pub unsafe fn malloc_buf_base_of(suggested_size: size_t) -> *u8 {
return rust_uv_malloc_buf_base_of(suggested_size);
}
pub unsafe fn free_base_of_buf(buf: uv_buf_t) {
rust_uv_free_base_of_buf(buf);
}
pub unsafe fn get_last_err_info(uv_loop: *c_void) -> ~str {
let err = last_error(uv_loop);
let err_ptr = ptr::addr_of(&err);
let err_name = str::raw::from_c_str(err_name(err_ptr));
let err_msg = str::raw::from_c_str(strerror(err_ptr));
return fmt!("LIBUV ERROR: name: %s msg: %s",
err_name, err_msg);
}
pub unsafe fn get_last_err_data(uv_loop: *c_void) -> uv_err_data {
let err = last_error(uv_loop);
let err_ptr = ptr::addr_of(&err);
let err_name = str::raw::from_c_str(err_name(err_ptr));
let err_msg = str::raw::from_c_str(strerror(err_ptr));
uv_err_data { err_name: err_name, err_msg: err_msg }
}
pub struct uv_err_data {
err_name: ~str,
err_msg: ~str,
}
extern {
fn rust_uv_handle_size(type_: uintptr_t) -> size_t;
fn rust_uv_req_size(type_: uintptr_t) -> size_t;
fn rust_uv_handle_type_max() -> uintptr_t;
fn rust_uv_req_type_max() -> uintptr_t;
// libuv public API
fn rust_uv_loop_new() -> *c_void;
fn rust_uv_loop_delete(lp: *c_void);
fn rust_uv_run(loop_handle: *c_void);
fn rust_uv_close(handle: *c_void, cb: *u8);
fn rust_uv_walk(loop_handle: *c_void, cb: *u8, arg: *c_void);
fn rust_uv_idle_new() -> *uv_idle_t;
fn rust_uv_idle_delete(handle: *uv_idle_t);
fn rust_uv_idle_init(loop_handle: *uv_loop_t, handle: *uv_idle_t) -> c_int;
fn rust_uv_idle_start(handle: *uv_idle_t, cb: uv_idle_cb) -> c_int;
fn rust_uv_idle_stop(handle: *uv_idle_t) -> c_int;
fn rust_uv_async_send(handle: *uv_async_t);
fn rust_uv_async_init(loop_handle: *c_void,
async_handle: *uv_async_t,
cb: *u8) -> c_int;
fn rust_uv_tcp_init(loop_handle: *c_void, handle_ptr: *uv_tcp_t) -> c_int;
// FIXME ref #2604 .. ?
fn rust_uv_buf_init(out_buf: *uv_buf_t, base: *u8, len: size_t);
fn rust_uv_last_error(loop_handle: *c_void) -> uv_err_t;
// FIXME ref #2064
fn rust_uv_strerror(err: *uv_err_t) -> *c_char;
// FIXME ref #2064
fn rust_uv_err_name(err: *uv_err_t) -> *c_char;
fn rust_uv_ip4_addrp(ip: *u8, port: c_int) -> *sockaddr_in;
fn rust_uv_ip6_addrp(ip: *u8, port: c_int) -> *sockaddr_in6;
fn rust_uv_free_ip4_addr(addr: *sockaddr_in);
fn rust_uv_free_ip6_addr(addr: *sockaddr_in6);
fn rust_uv_ip4_name(src: *sockaddr_in, dst: *u8, size: size_t) -> c_int;
fn rust_uv_ip6_name(src: *sockaddr_in6, dst: *u8, size: size_t) -> c_int;
fn rust_uv_ip4_port(src: *sockaddr_in) -> c_uint;
fn rust_uv_ip6_port(src: *sockaddr_in6) -> c_uint;
// FIXME ref #2064
fn rust_uv_tcp_connect(connect_ptr: *uv_connect_t,
tcp_handle_ptr: *uv_tcp_t,
++after_cb: *u8,
++addr: *sockaddr_in) -> c_int;
// FIXME ref #2064
fn rust_uv_tcp_bind(tcp_server: *uv_tcp_t, ++addr: *sockaddr_in) -> c_int;
// FIXME ref #2064
fn rust_uv_tcp_connect6(connect_ptr: *uv_connect_t,
tcp_handle_ptr: *uv_tcp_t,
++after_cb: *u8,
++addr: *sockaddr_in6) -> c_int;
// FIXME ref #2064
fn rust_uv_tcp_bind6(tcp_server: *uv_tcp_t, ++addr: *sockaddr_in6) -> c_int;
fn rust_uv_tcp_getpeername(tcp_handle_ptr: *uv_tcp_t, ++name: *sockaddr_in) -> c_int;
fn rust_uv_tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t, ++name: *sockaddr_in6) ->c_int;
fn rust_uv_listen(stream: *c_void, backlog: c_int, cb: *u8) -> c_int;
fn rust_uv_accept(server: *c_void, client: *c_void) -> c_int;
fn rust_uv_write(req: *c_void,
stream: *c_void,
++buf_in: *uv_buf_t,
buf_cnt: c_int,
cb: *u8) -> c_int;
fn rust_uv_read_start(stream: *c_void,
on_alloc: *u8,
on_read: *u8) -> c_int;
fn rust_uv_read_stop(stream: *c_void) -> c_int;
fn rust_uv_timer_init(loop_handle: *c_void,
timer_handle: *uv_timer_t) -> c_int;
fn rust_uv_timer_start(timer_handle: *uv_timer_t,
cb: *u8,
timeout: c_uint,
repeat: c_uint) -> c_int;
fn rust_uv_timer_stop(handle: *uv_timer_t) -> c_int;
fn rust_uv_malloc_buf_base_of(sug_size: size_t) -> *u8;
fn rust_uv_free_base_of_buf(++buf: uv_buf_t);
fn rust_uv_get_stream_handle_from_connect_req(connect_req: *uv_connect_t) -> *uv_stream_t;
fn rust_uv_get_stream_handle_from_write_req(write_req: *uv_write_t) -> *uv_stream_t;
fn rust_uv_get_loop_for_uv_handle(handle: *c_void) -> *c_void;
fn rust_uv_get_data_for_uv_loop(loop_ptr: *c_void) -> *c_void;
fn rust_uv_set_data_for_uv_loop(loop_ptr: *c_void, data: *c_void);
fn rust_uv_get_data_for_uv_handle(handle: *c_void) -> *c_void;
fn rust_uv_set_data_for_uv_handle(handle: *c_void, data: *c_void);
fn rust_uv_get_data_for_req(req: *c_void) -> *c_void;
fn rust_uv_set_data_for_req(req: *c_void, data: *c_void);
fn rust_uv_get_base_from_buf(++buf: uv_buf_t) -> *u8;
fn rust_uv_get_len_from_buf(++buf: uv_buf_t) -> size_t;
}

View File

@ -35,8 +35,6 @@ pub mod extfmt;
#[path = "unstable/lang.rs"]
#[cfg(notest)]
pub mod lang;
#[path = "unstable/uvll.rs"]
pub mod uvll;
mod rustrt {
use unstable::{raw_thread, rust_little_lock};

View File

@ -120,16 +120,25 @@ pub unsafe fn strdup_uniq(ptr: *c_uchar, len: uint) -> ~str {
#[lang="start"]
pub fn start(main: *u8, argc: int, argv: *c_char,
crate_map: *u8) -> int {
use libc::getenv;
use rt::start;
unsafe {
let use_new_rt = do str::as_c_str("RUST_NEWRT") |s| {
getenv(s).is_null()
};
if use_new_rt {
return rust_start(main as *c_void, argc as c_int, argv,
crate_map as *c_void) as int;
} else {
return start(main, argc, argv, crate_map);
}
}
extern {
fn rust_start(main: *c_void, argc: c_int, argv: *c_char,
crate_map: *c_void) -> c_int;
}
unsafe {
return rust_start(main as *c_void, argc as c_int, argv,
crate_map as *c_void) as int;
}
}
// Local Variables:

View File

@ -21,20 +21,20 @@ use core::vec;
use iotask = uv::iotask::IoTask;
use interact = uv::iotask::interact;
use sockaddr_in = core::unstable::uvll::sockaddr_in;
use sockaddr_in6 = core::unstable::uvll::sockaddr_in6;
use addrinfo = core::unstable::uvll::addrinfo;
use uv_getaddrinfo_t = core::unstable::uvll::uv_getaddrinfo_t;
use uv_ip4_name = core::unstable::uvll::ip4_name;
use uv_ip4_port = core::unstable::uvll::ip4_port;
use uv_ip6_name = core::unstable::uvll::ip6_name;
use uv_ip6_port = core::unstable::uvll::ip6_port;
use uv_getaddrinfo = core::unstable::uvll::getaddrinfo;
use uv_freeaddrinfo = core::unstable::uvll::freeaddrinfo;
use create_uv_getaddrinfo_t = core::unstable::uvll::getaddrinfo_t;
use set_data_for_req = core::unstable::uvll::set_data_for_req;
use get_data_for_req = core::unstable::uvll::get_data_for_req;
use ll = core::unstable::uvll;
use sockaddr_in = super::uv_ll::sockaddr_in;
use sockaddr_in6 = super::uv_ll::sockaddr_in6;
use addrinfo = super::uv_ll::addrinfo;
use uv_getaddrinfo_t = super::uv_ll::uv_getaddrinfo_t;
use uv_ip4_name = super::uv_ll::ip4_name;
use uv_ip4_port = super::uv_ll::ip4_port;
use uv_ip6_name = super::uv_ll::ip6_name;
use uv_ip6_port = super::uv_ll::ip6_port;
use uv_getaddrinfo = super::uv_ll::getaddrinfo;
use uv_freeaddrinfo = super::uv_ll::freeaddrinfo;
use create_uv_getaddrinfo_t = super::uv_ll::getaddrinfo_t;
use set_data_for_req = super::uv_ll::set_data_for_req;
use get_data_for_req = super::uv_ll::get_data_for_req;
use ll = super::uv_ll;
/// An IP address
pub enum IpAddr {

View File

@ -36,7 +36,7 @@ not required in or otherwise suitable for the core library.
extern mod core(vers = "0.6");
use core::*;
pub use uv_ll = core::unstable::uvll;
pub mod uv_ll;
// General io and system-services modules

View File

@ -33,6 +33,6 @@
* facilities.
*/
pub use ll = core::unstable::uvll;
pub use ll = super::uv_ll;
pub use iotask = uv_iotask;
pub use global_loop = uv_global_loop;

View File

@ -32,10 +32,10 @@
#[allow(non_camel_case_types)]; // C types
use libc::size_t;
use libc::c_void;
use prelude::*;
use ptr::to_unsafe_ptr;
use core::libc::size_t;
use core::libc::c_void;
use core::prelude::*;
use core::ptr::to_unsafe_ptr;
pub type uv_handle_t = c_void;
pub type uv_loop_t = c_void;
@ -357,7 +357,7 @@ pub struct uv_getaddrinfo_t {
pub mod uv_ll_struct_stubgen {
use ptr;
use core::ptr;
use super::{
uv_async_t,
@ -930,8 +930,6 @@ pub unsafe fn tcp_connect(connect_ptr: *uv_connect_t,
addr_ptr: *sockaddr_in,
after_connect_cb: *u8)
-> libc::c_int {
debug!("b4 foreign tcp_connect--addr port: %u cb: %u",
(*addr_ptr).sin_port as uint, after_connect_cb as uint);
return rustrt::rust_uv_tcp_connect(connect_ptr, tcp_handle_ptr,
after_connect_cb, addr_ptr);
}
@ -1021,22 +1019,8 @@ pub unsafe fn async_send(async_handle: *uv_async_t) {
pub unsafe fn buf_init(input: *u8, len: uint) -> uv_buf_t {
let out_buf = uv_buf_t { base: ptr::null(), len: 0 as libc::size_t };
let out_buf_ptr = ptr::addr_of(&out_buf);
debug!("buf_init - input %u len %u out_buf: %u",
input as uint,
len as uint,
out_buf_ptr as uint);
// yuck :/
rustrt::rust_uv_buf_init(out_buf_ptr, input, len as size_t);
//let result = rustrt::rust_uv_buf_init_2(input, len as size_t);
debug!("after rust_uv_buf_init");
let res_base = get_base_from_buf(out_buf);
let res_len = get_len_from_buf(out_buf);
//let res_base = get_base_from_buf(result);
debug!("buf_init - result %u len %u",
res_base as uint,
res_len as uint);
return out_buf;
//return result;
}
pub unsafe fn ip4_addr(ip: &str, port: int)
-> sockaddr_in {
@ -1078,8 +1062,6 @@ pub unsafe fn ip6_name(src: &sockaddr_in6) -> ~str {
0u8,0u8,0u8,0u8,0u8,0u8];
do vec::as_imm_buf(dst) |dst_buf, size| {
let src_unsafe_ptr = to_unsafe_ptr(src);
debug!("val of src *sockaddr_in6: %? sockaddr_in6: %?",
src_unsafe_ptr, src);
let result = rustrt::rust_uv_ip6_name(src_unsafe_ptr,
dst_buf, size as libc::size_t);
match result {
@ -1240,9 +1222,9 @@ pub unsafe fn addrinfo_as_sockaddr_in6(input: *addrinfo) -> *sockaddr_in6 {
#[cfg(test)]
pub mod test {
use prelude::*;
use core::prelude::*;
use core::comm::{SharedChan, stream, GenericChan, GenericPort};
use super::*;
use comm::{SharedChan, stream, GenericChan, GenericPort};
enum tcp_read_data {
tcp_read_eof,
@ -1265,7 +1247,7 @@ pub mod test {
suggested_size: libc::size_t)
-> uv_buf_t {
unsafe {
debug!("on_alloc_cb!");
debug!(~"on_alloc_cb!");
let char_ptr = malloc_buf_base_of(suggested_size);
debug!("on_alloc_cb h: %? char_ptr: %u sugsize: %u",
handle,
@ -1298,15 +1280,15 @@ pub mod test {
}
else if (nread == -1) {
// err .. possibly EOF
debug!("read: eof!");
debug!(~"read: eof!");
}
else {
// nread == 0 .. do nothing, just free buf as below
debug!("read: do nothing!");
debug!(~"read: do nothing!");
}
// when we're done
free_base_of_buf(buf);
debug!("CLIENT exiting on_read_cb");
debug!(~"CLIENT exiting on_read_cb");
}
}
@ -1321,8 +1303,7 @@ pub mod test {
"CLIENT on_write_complete_cb: tcp:%d write_handle:%d",
stream as int, write_req as int);
let result = read_start(stream, on_alloc_cb, on_read_cb);
debug!(
"CLIENT ending on_write_complete_cb .. status: %d",
debug!("CLIENT ending on_write_complete_cb .. status: %d",
result as int);
}
}
@ -1335,7 +1316,7 @@ pub mod test {
let stream =
get_stream_handle_from_connect_req(connect_req_ptr);
if (status == 0i32) {
debug!("on_connect_cb: in status=0 if..");
debug!(~"on_connect_cb: in status=0 if..");
let client_data = get_data_for_req(
connect_req_ptr as *libc::c_void)
as *request_wrapper;
@ -1353,10 +1334,10 @@ pub mod test {
let test_loop = get_loop_for_uv_handle(
stream as *libc::c_void);
let err_msg = get_last_err_info(test_loop);
debug!("%?", err_msg);
debug!(err_msg);
fail_unless!(false);
}
debug!("finishing on_connect_cb");
debug!(~"finishing on_connect_cb");
}
}
@ -1396,9 +1377,9 @@ pub mod test {
let tcp_init_result = tcp_init(
test_loop as *libc::c_void, tcp_handle_ptr);
if (tcp_init_result == 0i32) {
debug!("sucessful tcp_init_result");
debug!(~"sucessful tcp_init_result");
debug!("building addr...");
debug!(~"building addr...");
let addr = ip4_addr(ip, port);
// FIXME ref #2064
let addr_ptr = ptr::addr_of(&addr);
@ -1420,17 +1401,17 @@ pub mod test {
set_data_for_uv_handle(
tcp_handle_ptr as *libc::c_void,
ptr::addr_of(&client_data) as *libc::c_void);
debug!("before run tcp req loop");
debug!(~"before run tcp req loop");
run(test_loop);
debug!("after run tcp req loop");
debug!(~"after run tcp req loop");
}
else {
debug!("tcp_connect() failure");
debug!(~"tcp_connect() failure");
fail_unless!(false);
}
}
else {
debug!("tcp_init() failure");
debug!(~"tcp_init() failure");
fail_unless!(false);
}
loop_delete(test_loop);
@ -1446,8 +1427,7 @@ pub mod test {
extern fn client_stream_after_close_cb(handle: *libc::c_void) {
unsafe {
debug!(
"SERVER: closed client stream, now closing server stream");
debug!(~"SERVER: closed client stream, now closing server stream");
let client_data = get_data_for_uv_handle(
handle) as
*tcp_server_data;
@ -1460,7 +1440,7 @@ pub mod test {
unsafe {
let client_stream_ptr =
get_stream_handle_from_write_req(req);
debug!("SERVER: resp sent... closing client stream");
debug!(~"SERVER: resp sent... closing client stream");
close(client_stream_ptr as *libc::c_void,
client_stream_after_close_cb)
}
@ -1491,8 +1471,8 @@ pub mod test {
let server_kill_msg = copy (*client_data).server_kill_msg;
let write_req = (*client_data).server_write_req;
if str::contains(request_str, server_kill_msg) {
debug!("SERVER: client req contains kill_msg!");
debug!("SERVER: sending response to client");
debug!(~"SERVER: client req contains kill_msg!");
debug!(~"SERVER: sending response to client");
read_stop(client_stream_ptr);
let server_chan = (*client_data).server_chan.clone();
server_chan.send(request_str);
@ -1504,28 +1484,28 @@ pub mod test {
debug!("SERVER: resp write result: %d",
write_result as int);
if (write_result != 0i32) {
debug!("bad result for server resp write()");
debug!("%s", get_last_err_info(
debug!(~"bad result for server resp write()");
debug!(get_last_err_info(
get_loop_for_uv_handle(client_stream_ptr
as *libc::c_void)));
fail_unless!(false);
}
}
else {
debug!("SERVER: client req !contain kill_msg!");
debug!(~"SERVER: client req !contain kill_msg!");
}
}
else if (nread == -1) {
// err .. possibly EOF
debug!("read: eof!");
debug!(~"read: eof!");
}
else {
// nread == 0 .. do nothing, just free buf as below
debug!("read: do nothing!");
debug!(~"read: do nothing!");
}
// when we're done
free_base_of_buf(buf);
debug!("SERVER exiting on_read_cb");
debug!(~"SERVER exiting on_read_cb");
}
}
@ -1533,7 +1513,7 @@ pub mod test {
*uv_stream_t,
status: libc::c_int) {
unsafe {
debug!("client connecting!");
debug!(~"client connecting!");
let test_loop = get_loop_for_uv_handle(
server_stream_ptr as *libc::c_void);
if status != 0i32 {
@ -1551,7 +1531,7 @@ pub mod test {
client_stream_ptr as *libc::c_void,
server_data as *libc::c_void);
if (client_init_result == 0i32) {
debug!("successfully initialized client stream");
debug!(~"successfully initialized client stream");
let accept_result = accept(server_stream_ptr as
*libc::c_void,
client_stream_ptr as
@ -1563,7 +1543,7 @@ pub mod test {
on_alloc_cb,
on_server_read_cb);
if (read_result == 0i32) {
debug!("successful server read start");
debug!(~"successful server read start");
}
else {
debug!("server_connection_cb: bad read:%d",
@ -1674,7 +1654,7 @@ pub mod test {
let bind_result = tcp_bind(tcp_server_ptr,
server_addr_ptr);
if (bind_result == 0i32) {
debug!("successful uv_tcp_bind, listening");
debug!(~"successful uv_tcp_bind, listening");
// uv_listen()
let listen_result = listen(tcp_server_ptr as
@ -1694,7 +1674,7 @@ pub mod test {
async_send(continue_async_handle_ptr);
// uv_run()
run(test_loop);
debug!("server uv::run() has returned");
debug!(~"server uv::run() has returned");
}
else {
debug!("uv_async_init failure: %d",
@ -1751,9 +1731,9 @@ pub mod test {
};
// block until the server up is.. possibly a race?
debug!("before receiving on server continue_port");
debug!(~"before receiving on server continue_port");
continue_port.recv();
debug!("received on continue port, set up tcp client");
debug!(~"received on continue port, set up tcp client");
let kill_server_msg_copy = copy kill_server_msg;
do task::spawn_sched(task::ManualThreads(1u)) {
@ -1808,7 +1788,7 @@ pub mod test {
let output = fmt!(
"STRUCT_SIZE FAILURE: %s -- actual: %u expected: %u",
t_name, rust_size, foreign_size as uint);
debug!("%s", output);
debug!(output);
}
fail_unless!(sizes_match);
}

View File

@ -21,17 +21,6 @@
void* global_crate_map = NULL;
#ifndef _WIN32
pthread_key_t sched_key;
#else
DWORD sched_key;
#endif
extern "C" void*
rust_get_sched_tls_key() {
return &sched_key;
}
/**
The runtime entrypoint. The (C ABI) main function generated by rustc calls
`rust_start`, providing the address of the Rust ABI main function, the
@ -41,10 +30,6 @@ rust_get_sched_tls_key() {
extern "C" CDECL int
rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) {
#ifndef _WIN32
pthread_key_create(&sched_key, NULL);
#endif
// Load runtime configuration options from the environment.
// FIXME #1497: Should provide a way to get these from the command
// line as well.

View File

@ -882,6 +882,46 @@ rust_get_rt_env() {
return task->kernel->env;
}
typedef void *(*nullary_fn)();
extern "C" CDECL void
rust_call_nullary_fn(nullary_fn f) {
f();
}
#ifndef _WIN32
pthread_key_t sched_key;
#else
DWORD sched_key;
#endif
extern "C" void*
rust_get_sched_tls_key() {
return &sched_key;
}
// Initialize the global state required by the new scheduler
extern "C" CDECL void
rust_initialize_global_state() {
static lock_and_signal init_lock;
static bool initialized = false;
scoped_lock with(init_lock);
if (!initialized) {
#ifndef _WIN32
assert(!pthread_key_create(&sched_key, NULL));
#else
sched_key = TlsAlloc();
assert(sched_key != TLS_OUT_OF_INDEXES);
#endif
initialized = true;
}
}
//
// Local Variables:
// mode: C++

View File

@ -479,6 +479,34 @@ extern "C" struct sockaddr_in6
rust_uv_ip6_addr(const char* ip, int port) {
return uv_ip6_addr(ip, port);
}
extern "C" struct sockaddr_in*
rust_uv_ip4_addrp(const char* ip, int port) {
struct sockaddr_in addr = uv_ip4_addr(ip, port);
struct sockaddr_in *addrp = (sockaddr_in*)malloc(sizeof(struct sockaddr_in));
assert(addrp);
memcpy(addrp, &addr, sizeof(struct sockaddr_in));
return addrp;
}
extern "C" struct sockaddr_in6*
rust_uv_ip6_addrp(const char* ip, int port) {
struct sockaddr_in6 addr = uv_ip6_addr(ip, port);
struct sockaddr_in6 *addrp = (sockaddr_in6*)malloc(sizeof(struct sockaddr_in6));
assert(addrp);
memcpy(addrp, &addr, sizeof(struct sockaddr_in6));
return addrp;
}
extern "C" void
rust_uv_free_ip4_addr(sockaddr_in *addrp) {
free(addrp);
}
extern "C" void
rust_uv_free_ip6_addr(sockaddr_in6 *addrp) {
free(addrp);
}
extern "C" int
rust_uv_ip4_name(struct sockaddr_in* src, char* dst, size_t size) {
return uv_ip4_name(src, dst, size);
@ -563,3 +591,23 @@ extern "C" int
rust_uv_idle_stop(uv_idle_t* idle) {
return uv_idle_stop(idle);
}
extern "C" size_t
rust_uv_handle_size(uintptr_t type) {
return uv_handle_size((uv_handle_type)type);
}
extern "C" size_t
rust_uv_req_size(uintptr_t type) {
return uv_req_size((uv_req_type)type);
}
extern "C" uintptr_t
rust_uv_handle_type_max() {
return UV_HANDLE_TYPE_MAX;
}
extern "C" uintptr_t
rust_uv_req_type_max() {
return UV_REQ_TYPE_MAX;
}

View File

@ -202,3 +202,15 @@ rust_dbg_extern_identity_TwoU64s
rust_dbg_extern_identity_double
rust_dbg_extern_identity_u8
rust_get_rt_env
rust_uv_handle_size
rust_uv_req_size
rust_uv_handle_type_max
rust_uv_req_type_max
rust_uv_ip4_addrp
rust_uv_ip6_addrp
rust_uv_free_ip4_addr
rust_uv_free_ip6_addr
rust_call_nullary_fn
rust_initialize_global_state