rustuv: Reimplement without using std::rt::sched
This reimplements librustuv without using the interfaces provided by the scheduler in libstd. This solely uses the new Runtime trait in order to interface with the local task and perform the necessary scheduling operations. The largest snag in this refactoring is reimplementing homing. The new runtime trait exposes no concept of "homing" a task or forcibly sending a task to a remote scheduler (there is no concept of a scheduler). In order to reimplement homing, the transferrence of tasks is now done at the librustuv level instead of the scheduler level. This means that all I/O loops now have a concurrent queue which receives homing messages and requests. This allows the entire implementation of librustuv to be only dependent on the runtime trait, severing all dependence of librustuv on the scheduler and related green-thread functions. This is all in preparation of the introduction of libgreen and libnative. At the same time, I also took the liberty of removing all glob imports from librustuv.
This commit is contained in:
parent
1ca77268d9
commit
429313de69
@ -11,12 +11,10 @@
|
||||
use ai = std::io::net::addrinfo;
|
||||
use std::libc::c_int;
|
||||
use std::ptr::null;
|
||||
use std::rt::BlockedTask;
|
||||
use std::rt::local::Local;
|
||||
use std::rt::sched::Scheduler;
|
||||
use std::rt::task::BlockedTask;
|
||||
|
||||
use net;
|
||||
use super::{Loop, UvError, Request, wait_until_woken_after};
|
||||
use super::{Loop, UvError, Request, wait_until_woken_after, wakeup};
|
||||
use uvll;
|
||||
|
||||
struct Addrinfo {
|
||||
@ -108,8 +106,7 @@ impl GetAddrInfoRequest {
|
||||
cx.status = status;
|
||||
cx.addrinfo = Some(Addrinfo { handle: res });
|
||||
|
||||
let sched: ~Scheduler = Local::take();
|
||||
sched.resume_blocked_task_immediately(cx.slot.take_unwrap());
|
||||
wakeup(&mut cx.slot);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -188,7 +185,6 @@ pub fn accum_addrinfo(addr: &Addrinfo) -> ~[ai::Info] {
|
||||
#[cfg(test, not(target_os="android"))]
|
||||
mod test {
|
||||
use std::io::net::ip::{SocketAddr, Ipv4Addr};
|
||||
use super::*;
|
||||
use super::super::local_loop;
|
||||
|
||||
#[test]
|
||||
|
@ -129,7 +129,6 @@ mod test_remote {
|
||||
use std::rt::thread::Thread;
|
||||
use std::rt::tube::Tube;
|
||||
|
||||
use super::*;
|
||||
use super::super::local_loop;
|
||||
|
||||
// Make sure that we can fire watchers in remote threads and that they
|
||||
|
@ -14,15 +14,15 @@ use std::cast::transmute;
|
||||
use std::cast;
|
||||
use std::libc::{c_int, c_char, c_void, size_t};
|
||||
use std::libc;
|
||||
use std::rt::BlockedTask;
|
||||
use std::rt::task::BlockedTask;
|
||||
use std::io::{FileStat, IoError};
|
||||
use std::io;
|
||||
use std::rt::local::Local;
|
||||
use std::rt::rtio;
|
||||
use std::rt::sched::{Scheduler, SchedHandle};
|
||||
use std::vec;
|
||||
|
||||
use super::{Loop, UvError, uv_error_to_io_error, wait_until_woken_after};
|
||||
use uvio::HomingIO;
|
||||
use homing::{HomingIO, HomeHandle};
|
||||
use super::{Loop, UvError, uv_error_to_io_error, wait_until_woken_after, wakeup};
|
||||
use uvio::UvIoFactory;
|
||||
use uvll;
|
||||
|
||||
pub struct FsRequest {
|
||||
@ -34,19 +34,19 @@ pub struct FileWatcher {
|
||||
priv loop_: Loop,
|
||||
priv fd: c_int,
|
||||
priv close: rtio::CloseBehavior,
|
||||
priv home: SchedHandle,
|
||||
priv home: HomeHandle,
|
||||
}
|
||||
|
||||
impl FsRequest {
|
||||
pub fn open(loop_: &Loop, path: &CString, flags: int, mode: int)
|
||||
pub fn open(io: &mut UvIoFactory, path: &CString, flags: int, mode: int)
|
||||
-> Result<FileWatcher, UvError>
|
||||
{
|
||||
execute(|req, cb| unsafe {
|
||||
uvll::uv_fs_open(loop_.handle,
|
||||
uvll::uv_fs_open(io.uv_loop(),
|
||||
req, path.with_ref(|p| p), flags as c_int,
|
||||
mode as c_int, cb)
|
||||
}).map(|req|
|
||||
FileWatcher::new(*loop_, req.get_result() as c_int,
|
||||
FileWatcher::new(io, req.get_result() as c_int,
|
||||
rtio::CloseSynchronously)
|
||||
)
|
||||
}
|
||||
@ -320,8 +320,7 @@ fn execute(f: |*uvll::uv_fs_t, uvll::uv_fs_cb| -> c_int)
|
||||
let slot: &mut Option<BlockedTask> = unsafe {
|
||||
cast::transmute(uvll::get_data_for_req(req))
|
||||
};
|
||||
let sched: ~Scheduler = Local::take();
|
||||
sched.resume_blocked_task_immediately(slot.take_unwrap());
|
||||
wakeup(slot);
|
||||
}
|
||||
}
|
||||
|
||||
@ -331,16 +330,17 @@ fn execute_nop(f: |*uvll::uv_fs_t, uvll::uv_fs_cb| -> c_int)
|
||||
}
|
||||
|
||||
impl HomingIO for FileWatcher {
|
||||
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
|
||||
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
|
||||
}
|
||||
|
||||
impl FileWatcher {
|
||||
pub fn new(loop_: Loop, fd: c_int, close: rtio::CloseBehavior) -> FileWatcher {
|
||||
pub fn new(io: &mut UvIoFactory, fd: c_int,
|
||||
close: rtio::CloseBehavior) -> FileWatcher {
|
||||
FileWatcher {
|
||||
loop_: loop_,
|
||||
loop_: Loop::wrap(io.uv_loop()),
|
||||
fd: fd,
|
||||
close: close,
|
||||
home: get_handle_to_current_scheduler!()
|
||||
home: io.make_handle(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -448,7 +448,6 @@ mod test {
|
||||
use std::io;
|
||||
use std::str;
|
||||
use std::vec;
|
||||
use super::*;
|
||||
use l = super::super::local_loop;
|
||||
|
||||
#[test]
|
||||
|
144
src/librustuv/homing.rs
Normal file
144
src/librustuv/homing.rs
Normal file
@ -0,0 +1,144 @@
|
||||
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
|
||||
// file at the top-level directory of this distribution and at
|
||||
// http://rust-lang.org/COPYRIGHT.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
||||
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
||||
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
//! Homing I/O implementation
|
||||
//!
|
||||
//! In libuv, whenever a handle is created on an I/O loop it is illegal to use
|
||||
//! that handle outside of that I/O loop. We use libuv I/O with our green
|
||||
//! scheduler, and each green scheduler corresponds to a different I/O loop on a
|
||||
//! different OS thread. Green tasks are also free to roam among schedulers,
|
||||
//! which implies that it is possible to create an I/O handle on one event loop
|
||||
//! and then attempt to use it on another.
|
||||
//!
|
||||
//! In order to solve this problem, this module implements the notion of a
|
||||
//! "homing operation" which will transplant a task from its currently running
|
||||
//! scheduler back onto the original I/O loop. This is accomplished entirely at
|
||||
//! the librustuv layer with very little cooperation from the scheduler (which
|
||||
//! we don't even know exists technically).
|
||||
//!
|
||||
//! These homing operations are completed by first realizing that we're on the
|
||||
//! wrong I/O loop, then descheduling ourselves, sending ourselves to the
|
||||
//! correct I/O loop, and then waking up the I/O loop in order to process its
|
||||
//! local queue of tasks which need to run.
|
||||
//!
|
||||
//! This enqueueing is done with a concurrent queue from libstd, and the
|
||||
//! signalling is achieved with an async handle.
|
||||
|
||||
use std::rt::local::Local;
|
||||
use std::rt::rtio::LocalIo;
|
||||
use std::rt::task::{Task, BlockedTask};
|
||||
|
||||
use ForbidUnwind;
|
||||
use queue::{Queue, QueuePool};
|
||||
|
||||
/// A handle to a remote libuv event loop. This handle will keep the event loop
|
||||
/// alive while active in order to ensure that a homing operation can always be
|
||||
/// completed.
|
||||
///
|
||||
/// Handles are clone-able in order to derive new handles from existing handles
|
||||
/// (very useful for when accepting a socket from a server).
|
||||
pub struct HomeHandle {
|
||||
priv queue: Queue,
|
||||
priv id: uint,
|
||||
}
|
||||
|
||||
impl HomeHandle {
|
||||
pub fn new(id: uint, pool: &mut QueuePool) -> HomeHandle {
|
||||
HomeHandle { queue: pool.queue(), id: id }
|
||||
}
|
||||
|
||||
fn send(&mut self, task: BlockedTask) {
|
||||
self.queue.push(task);
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for HomeHandle {
|
||||
fn clone(&self) -> HomeHandle {
|
||||
HomeHandle {
|
||||
queue: self.queue.clone(),
|
||||
id: self.id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait HomingIO {
|
||||
fn home<'r>(&'r mut self) -> &'r mut HomeHandle;
|
||||
|
||||
/// This function will move tasks to run on their home I/O scheduler. Note
|
||||
/// that this function does *not* pin the task to the I/O scheduler, but
|
||||
/// rather it simply moves it to running on the I/O scheduler.
|
||||
fn go_to_IO_home(&mut self) -> uint {
|
||||
let _f = ForbidUnwind::new("going home");
|
||||
|
||||
let mut cur_task: ~Task = Local::take();
|
||||
let cur_loop_id = {
|
||||
let mut io = cur_task.local_io().expect("libuv must have I/O");
|
||||
io.get().id()
|
||||
};
|
||||
|
||||
// Try at all costs to avoid the homing operation because it is quite
|
||||
// expensive. Hence, we only deschedule/send if we're not on the correct
|
||||
// event loop. If we're already on the home event loop, then we're good
|
||||
// to go (remember we have no preemption, so we're guaranteed to stay on
|
||||
// this event loop as long as we avoid the scheduler).
|
||||
if cur_loop_id != self.home().id {
|
||||
cur_task.deschedule(1, |task| {
|
||||
self.home().send(task);
|
||||
Ok(())
|
||||
});
|
||||
|
||||
// Once we wake up, assert that we're in the right location
|
||||
let cur_loop_id = {
|
||||
let mut io = LocalIo::borrow().expect("libuv must have I/O");
|
||||
io.get().id()
|
||||
};
|
||||
assert_eq!(cur_loop_id, self.home().id);
|
||||
|
||||
cur_loop_id
|
||||
} else {
|
||||
Local::put(cur_task);
|
||||
cur_loop_id
|
||||
}
|
||||
}
|
||||
|
||||
/// Fires a single homing missile, returning another missile targeted back
|
||||
/// at the original home of this task. In other words, this function will
|
||||
/// move the local task to its I/O scheduler and then return an RAII wrapper
|
||||
/// which will return the task home.
|
||||
fn fire_homing_missile(&mut self) -> HomingMissile {
|
||||
HomingMissile { io_home: self.go_to_IO_home() }
|
||||
}
|
||||
}
|
||||
|
||||
/// After a homing operation has been completed, this will return the current
|
||||
/// task back to its appropriate home (if applicable). The field is used to
|
||||
/// assert that we are where we think we are.
|
||||
struct HomingMissile {
|
||||
priv io_home: uint,
|
||||
}
|
||||
|
||||
impl HomingMissile {
|
||||
/// Check at runtime that the task has *not* transplanted itself to a
|
||||
/// different I/O loop while executing.
|
||||
pub fn check(&self, msg: &'static str) {
|
||||
let mut io = LocalIo::borrow().expect("libuv must have I/O");
|
||||
assert!(io.get().id() == self.io_home, "{}", msg);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for HomingMissile {
|
||||
fn drop(&mut self) {
|
||||
let _f = ForbidUnwind::new("leaving home");
|
||||
|
||||
// It would truly be a sad day if we had moved off the home I/O
|
||||
// scheduler while we were doing I/O.
|
||||
self.check("task moved away from the home scheduler");
|
||||
}
|
||||
}
|
@ -97,7 +97,6 @@ impl Drop for IdleWatcher {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use std::rt::tube::Tube;
|
||||
use std::rt::rtio::{Callback, PausableIdleCallback};
|
||||
use super::super::local_loop;
|
||||
|
@ -41,23 +41,22 @@ via `close` and `delete` methods.
|
||||
#[crate_type = "rlib"];
|
||||
#[crate_type = "dylib"];
|
||||
|
||||
#[feature(macro_rules, globs)];
|
||||
#[feature(macro_rules)];
|
||||
|
||||
use std::cast::transmute;
|
||||
use std::cast;
|
||||
use std::io;
|
||||
use std::io::IoError;
|
||||
use std::libc::{c_int, malloc};
|
||||
use std::ptr::null;
|
||||
use std::ptr;
|
||||
use std::rt::BlockedTask;
|
||||
use std::rt::local::Local;
|
||||
use std::rt::sched::Scheduler;
|
||||
use std::rt::task::{BlockedTask, Task};
|
||||
use std::rt::rtio::LocalIo;
|
||||
use std::str::raw::from_c_str;
|
||||
use std::str;
|
||||
use std::task;
|
||||
use std::unstable::finally::Finally;
|
||||
|
||||
use std::io::IoError;
|
||||
|
||||
pub use self::async::AsyncWatcher;
|
||||
pub use self::file::{FsRequest, FileWatcher};
|
||||
pub use self::idle::IdleWatcher;
|
||||
@ -70,6 +69,9 @@ pub use self::tty::TtyWatcher;
|
||||
|
||||
mod macros;
|
||||
|
||||
mod queue;
|
||||
mod homing;
|
||||
|
||||
/// The implementation of `rtio` for libuv
|
||||
pub mod uvio;
|
||||
|
||||
@ -144,32 +146,31 @@ pub trait UvHandle<T> {
|
||||
uvll::free_handle(handle);
|
||||
if data == ptr::null() { return }
|
||||
let slot: &mut Option<BlockedTask> = cast::transmute(data);
|
||||
let sched: ~Scheduler = Local::take();
|
||||
sched.resume_blocked_task_immediately(slot.take_unwrap());
|
||||
wakeup(slot);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ForbidSwitch {
|
||||
msg: &'static str,
|
||||
sched: uint,
|
||||
priv msg: &'static str,
|
||||
priv io: uint,
|
||||
}
|
||||
|
||||
impl ForbidSwitch {
|
||||
fn new(s: &'static str) -> ForbidSwitch {
|
||||
let mut sched = Local::borrow(None::<Scheduler>);
|
||||
let mut io = LocalIo::borrow().expect("libuv must have local I/O");
|
||||
ForbidSwitch {
|
||||
msg: s,
|
||||
sched: sched.get().sched_id(),
|
||||
io: io.get().id(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ForbidSwitch {
|
||||
fn drop(&mut self) {
|
||||
let mut sched = Local::borrow(None::<Scheduler>);
|
||||
assert!(self.sched == sched.get().sched_id(),
|
||||
let mut io = LocalIo::borrow().expect("libuv must have local I/O");
|
||||
assert!(self.io == io.get().id(),
|
||||
"didnt want a scheduler switch: {}",
|
||||
self.msg);
|
||||
}
|
||||
@ -199,14 +200,20 @@ fn wait_until_woken_after(slot: *mut Option<BlockedTask>, f: ||) {
|
||||
let _f = ForbidUnwind::new("wait_until_woken_after");
|
||||
unsafe {
|
||||
assert!((*slot).is_none());
|
||||
let sched: ~Scheduler = Local::take();
|
||||
sched.deschedule_running_task_and_then(|_, task| {
|
||||
f();
|
||||
let task: ~Task = Local::take();
|
||||
task.deschedule(1, |task| {
|
||||
*slot = Some(task);
|
||||
})
|
||||
f();
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn wakeup(slot: &mut Option<BlockedTask>) {
|
||||
assert!(slot.is_some());
|
||||
slot.take_unwrap().wake().map(|t| t.reawaken(true));
|
||||
}
|
||||
|
||||
pub struct Request {
|
||||
handle: *uvll::uv_req_t,
|
||||
priv defused: bool,
|
||||
@ -325,28 +332,26 @@ fn error_smoke_test() {
|
||||
pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
|
||||
unsafe {
|
||||
// Importing error constants
|
||||
use uvll::*;
|
||||
use std::io::*;
|
||||
|
||||
// uv error descriptions are static
|
||||
let c_desc = uvll::uv_strerror(*uverr);
|
||||
let desc = str::raw::c_str_to_static_slice(c_desc);
|
||||
|
||||
let kind = match *uverr {
|
||||
UNKNOWN => OtherIoError,
|
||||
OK => OtherIoError,
|
||||
EOF => EndOfFile,
|
||||
EACCES => PermissionDenied,
|
||||
ECONNREFUSED => ConnectionRefused,
|
||||
ECONNRESET => ConnectionReset,
|
||||
ENOENT => FileNotFound,
|
||||
ENOTCONN => NotConnected,
|
||||
EPIPE => BrokenPipe,
|
||||
ECONNABORTED => ConnectionAborted,
|
||||
uvll::UNKNOWN => io::OtherIoError,
|
||||
uvll::OK => io::OtherIoError,
|
||||
uvll::EOF => io::EndOfFile,
|
||||
uvll::EACCES => io::PermissionDenied,
|
||||
uvll::ECONNREFUSED => io::ConnectionRefused,
|
||||
uvll::ECONNRESET => io::ConnectionReset,
|
||||
uvll::ENOTCONN => io::NotConnected,
|
||||
uvll::ENOENT => io::FileNotFound,
|
||||
uvll::EPIPE => io::BrokenPipe,
|
||||
uvll::ECONNABORTED => io::ConnectionAborted,
|
||||
err => {
|
||||
uvdebug!("uverr.code {}", err as int);
|
||||
// XXX: Need to map remaining uv error types
|
||||
OtherIoError
|
||||
io::OtherIoError
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -27,18 +27,21 @@ macro_rules! uvdebug (
|
||||
})
|
||||
)
|
||||
|
||||
// get a handle for the current scheduler
|
||||
macro_rules! get_handle_to_current_scheduler(
|
||||
() => ({
|
||||
let mut sched = Local::borrow(None::<Scheduler>);
|
||||
sched.get().make_handle()
|
||||
})
|
||||
)
|
||||
|
||||
pub fn dumb_println(args: &fmt::Arguments) {
|
||||
use std::io::native::file::FileDesc;
|
||||
use std::io;
|
||||
use std::libc;
|
||||
let mut out = FileDesc::new(libc::STDERR_FILENO, false);
|
||||
fmt::writeln(&mut out as &mut io::Writer, args);
|
||||
use std::vec;
|
||||
|
||||
struct Stderr;
|
||||
impl io::Writer for Stderr {
|
||||
fn write(&mut self, data: &[u8]) {
|
||||
unsafe {
|
||||
libc::write(libc::STDERR_FILENO,
|
||||
vec::raw::to_ptr(data) as *libc::c_void,
|
||||
data.len() as libc::size_t);
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut w = Stderr;
|
||||
fmt::writeln(&mut w as &mut io::Writer, args);
|
||||
}
|
||||
|
@ -9,24 +9,22 @@
|
||||
// except according to those terms.
|
||||
|
||||
use std::cast;
|
||||
use std::libc;
|
||||
use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char};
|
||||
use std::ptr;
|
||||
use std::rt::BlockedTask;
|
||||
use std::io::IoError;
|
||||
use std::io::net::ip::{Ipv4Addr, Ipv6Addr, SocketAddr, IpAddr};
|
||||
use std::rt::local::Local;
|
||||
use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char};
|
||||
use std::libc;
|
||||
use std::ptr;
|
||||
use std::rt::rtio;
|
||||
use std::rt::sched::{Scheduler, SchedHandle};
|
||||
use std::rt::tube::Tube;
|
||||
use std::rt::task::BlockedTask;
|
||||
use std::str;
|
||||
use std::vec;
|
||||
|
||||
use homing::{HomingIO, HomeHandle};
|
||||
use stream::StreamWatcher;
|
||||
use super::{Loop, Request, UvError, Buf, status_to_io_result,
|
||||
uv_error_to_io_error, UvHandle, slice_to_uv_buf,
|
||||
wait_until_woken_after};
|
||||
use uvio::HomingIO;
|
||||
wait_until_woken_after, wakeup};
|
||||
use uvio::UvIoFactory;
|
||||
use uvll;
|
||||
use uvll::sockaddr;
|
||||
|
||||
@ -145,42 +143,47 @@ fn socket_name(sk: SocketNameKind, handle: *c_void) -> Result<SocketAddr, IoErro
|
||||
pub struct TcpWatcher {
|
||||
handle: *uvll::uv_tcp_t,
|
||||
stream: StreamWatcher,
|
||||
home: SchedHandle,
|
||||
home: HomeHandle,
|
||||
}
|
||||
|
||||
pub struct TcpListener {
|
||||
home: SchedHandle,
|
||||
home: HomeHandle,
|
||||
handle: *uvll::uv_pipe_t,
|
||||
priv closing_task: Option<BlockedTask>,
|
||||
priv outgoing: Tube<Result<~rtio::RtioTcpStream, IoError>>,
|
||||
priv outgoing: Chan<Result<~rtio::RtioTcpStream, IoError>>,
|
||||
priv incoming: Port<Result<~rtio::RtioTcpStream, IoError>>,
|
||||
}
|
||||
|
||||
pub struct TcpAcceptor {
|
||||
listener: ~TcpListener,
|
||||
priv incoming: Tube<Result<~rtio::RtioTcpStream, IoError>>,
|
||||
}
|
||||
|
||||
// TCP watchers (clients/streams)
|
||||
|
||||
impl TcpWatcher {
|
||||
pub fn new(loop_: &Loop) -> TcpWatcher {
|
||||
pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
|
||||
let handle = io.make_handle();
|
||||
TcpWatcher::new_home(&io.loop_, handle)
|
||||
}
|
||||
|
||||
fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
|
||||
let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
|
||||
assert_eq!(unsafe {
|
||||
uvll::uv_tcp_init(loop_.handle, handle)
|
||||
}, 0);
|
||||
TcpWatcher {
|
||||
home: get_handle_to_current_scheduler!(),
|
||||
home: home,
|
||||
handle: handle,
|
||||
stream: StreamWatcher::new(handle),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn connect(loop_: &mut Loop, address: SocketAddr)
|
||||
pub fn connect(io: &mut UvIoFactory, address: SocketAddr)
|
||||
-> Result<TcpWatcher, UvError>
|
||||
{
|
||||
struct Ctx { status: c_int, task: Option<BlockedTask> }
|
||||
|
||||
let tcp = TcpWatcher::new(loop_);
|
||||
let tcp = TcpWatcher::new(io);
|
||||
let ret = socket_addr_as_sockaddr(address, |addr| {
|
||||
let mut req = Request::new(uvll::UV_CONNECT);
|
||||
let result = unsafe {
|
||||
@ -213,14 +216,13 @@ impl TcpWatcher {
|
||||
assert!(status != uvll::ECANCELED);
|
||||
let cx: &mut Ctx = unsafe { req.get_data() };
|
||||
cx.status = status;
|
||||
let scheduler: ~Scheduler = Local::take();
|
||||
scheduler.resume_blocked_task_immediately(cx.task.take_unwrap());
|
||||
wakeup(&mut cx.task);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl HomingIO for TcpWatcher {
|
||||
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
|
||||
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
|
||||
}
|
||||
|
||||
impl rtio::RtioSocket for TcpWatcher {
|
||||
@ -290,17 +292,19 @@ impl Drop for TcpWatcher {
|
||||
// TCP listeners (unbound servers)
|
||||
|
||||
impl TcpListener {
|
||||
pub fn bind(loop_: &mut Loop, address: SocketAddr)
|
||||
pub fn bind(io: &mut UvIoFactory, address: SocketAddr)
|
||||
-> Result<~TcpListener, UvError> {
|
||||
let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
|
||||
assert_eq!(unsafe {
|
||||
uvll::uv_tcp_init(loop_.handle, handle)
|
||||
uvll::uv_tcp_init(io.uv_loop(), handle)
|
||||
}, 0);
|
||||
let (port, chan) = Chan::new();
|
||||
let l = ~TcpListener {
|
||||
home: get_handle_to_current_scheduler!(),
|
||||
home: io.make_handle(),
|
||||
handle: handle,
|
||||
closing_task: None,
|
||||
outgoing: Tube::new(),
|
||||
outgoing: chan,
|
||||
incoming: port,
|
||||
};
|
||||
let res = socket_addr_as_sockaddr(address, |addr| unsafe {
|
||||
uvll::uv_tcp_bind(l.handle, addr)
|
||||
@ -313,7 +317,7 @@ impl TcpListener {
|
||||
}
|
||||
|
||||
impl HomingIO for TcpListener {
|
||||
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
|
||||
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
|
||||
}
|
||||
|
||||
impl UvHandle<uvll::uv_tcp_t> for TcpListener {
|
||||
@ -330,11 +334,7 @@ impl rtio::RtioSocket for TcpListener {
|
||||
impl rtio::RtioTcpListener for TcpListener {
|
||||
fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
|
||||
// create the acceptor object from ourselves
|
||||
let incoming = self.outgoing.clone();
|
||||
let mut acceptor = ~TcpAcceptor {
|
||||
listener: self,
|
||||
incoming: incoming,
|
||||
};
|
||||
let mut acceptor = ~TcpAcceptor { listener: self };
|
||||
|
||||
let _m = acceptor.fire_homing_missile();
|
||||
// XXX: the 128 backlog should be configurable
|
||||
@ -347,19 +347,18 @@ impl rtio::RtioTcpListener for TcpListener {
|
||||
|
||||
extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
|
||||
assert!(status != uvll::ECANCELED);
|
||||
let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
|
||||
let msg = match status {
|
||||
0 => {
|
||||
let loop_ = Loop::wrap(unsafe {
|
||||
uvll::get_loop_for_uv_handle(server)
|
||||
});
|
||||
let client = TcpWatcher::new(&loop_);
|
||||
let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
|
||||
assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
|
||||
Ok(~client as ~rtio::RtioTcpStream)
|
||||
}
|
||||
n => Err(uv_error_to_io_error(UvError(n)))
|
||||
};
|
||||
|
||||
let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
|
||||
tcp.outgoing.send(msg);
|
||||
}
|
||||
|
||||
@ -373,7 +372,7 @@ impl Drop for TcpListener {
|
||||
// TCP acceptors (bound servers)
|
||||
|
||||
impl HomingIO for TcpAcceptor {
|
||||
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
|
||||
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
|
||||
}
|
||||
|
||||
impl rtio::RtioSocket for TcpAcceptor {
|
||||
@ -385,8 +384,7 @@ impl rtio::RtioSocket for TcpAcceptor {
|
||||
|
||||
impl rtio::RtioTcpAcceptor for TcpAcceptor {
|
||||
fn accept(&mut self) -> Result<~rtio::RtioTcpStream, IoError> {
|
||||
let _m = self.fire_homing_missile();
|
||||
self.incoming.recv()
|
||||
self.listener.incoming.recv()
|
||||
}
|
||||
|
||||
fn accept_simultaneously(&mut self) -> Result<(), IoError> {
|
||||
@ -410,18 +408,18 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
|
||||
|
||||
pub struct UdpWatcher {
|
||||
handle: *uvll::uv_udp_t,
|
||||
home: SchedHandle,
|
||||
home: HomeHandle,
|
||||
}
|
||||
|
||||
impl UdpWatcher {
|
||||
pub fn bind(loop_: &Loop, address: SocketAddr)
|
||||
pub fn bind(io: &mut UvIoFactory, address: SocketAddr)
|
||||
-> Result<UdpWatcher, UvError> {
|
||||
let udp = UdpWatcher {
|
||||
handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
|
||||
home: get_handle_to_current_scheduler!(),
|
||||
home: io.make_handle(),
|
||||
};
|
||||
assert_eq!(unsafe {
|
||||
uvll::uv_udp_init(loop_.handle, udp.handle)
|
||||
uvll::uv_udp_init(io.uv_loop(), udp.handle)
|
||||
}, 0);
|
||||
let result = socket_addr_as_sockaddr(address, |addr| unsafe {
|
||||
uvll::uv_udp_bind(udp.handle, addr, 0u32)
|
||||
@ -438,7 +436,7 @@ impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
|
||||
}
|
||||
|
||||
impl HomingIO for UdpWatcher {
|
||||
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
|
||||
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
|
||||
}
|
||||
|
||||
impl rtio::RtioSocket for UdpWatcher {
|
||||
@ -519,9 +517,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
|
||||
Some(sockaddr_to_socket_addr(addr))
|
||||
};
|
||||
cx.result = Some((nread, addr));
|
||||
|
||||
let sched: ~Scheduler = Local::take();
|
||||
sched.resume_blocked_task_immediately(cx.task.take_unwrap());
|
||||
wakeup(&mut cx.task);
|
||||
}
|
||||
}
|
||||
|
||||
@ -556,9 +552,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
|
||||
assert!(status != uvll::ECANCELED);
|
||||
let cx: &mut Ctx = unsafe { req.get_data() };
|
||||
cx.result = status;
|
||||
|
||||
let sched: ~Scheduler = Local::take();
|
||||
sched.resume_blocked_task_immediately(cx.task.take_unwrap());
|
||||
wakeup(&mut cx.task);
|
||||
}
|
||||
}
|
||||
|
||||
@ -646,12 +640,10 @@ impl Drop for UdpWatcher {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::rt::test::*;
|
||||
use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
|
||||
RtioUdpSocket};
|
||||
use std::task;
|
||||
|
||||
use super::*;
|
||||
use super::super::local_loop;
|
||||
|
||||
#[test]
|
||||
@ -824,7 +816,6 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn test_read_read_read() {
|
||||
use std::rt::rtio::*;
|
||||
let addr = next_test_ip4();
|
||||
static MAX: uint = 5000;
|
||||
let (port, chan) = Chan::new();
|
||||
|
@ -9,35 +9,33 @@
|
||||
// except according to those terms.
|
||||
|
||||
use std::c_str::CString;
|
||||
use std::libc;
|
||||
use std::rt::BlockedTask;
|
||||
use std::io::IoError;
|
||||
use std::rt::local::Local;
|
||||
use std::libc;
|
||||
use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
|
||||
use std::rt::sched::{Scheduler, SchedHandle};
|
||||
use std::rt::tube::Tube;
|
||||
use std::rt::task::BlockedTask;
|
||||
|
||||
use homing::{HomingIO, HomeHandle};
|
||||
use stream::StreamWatcher;
|
||||
use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error,
|
||||
wait_until_woken_after};
|
||||
use uvio::HomingIO;
|
||||
wait_until_woken_after, wakeup};
|
||||
use uvio::UvIoFactory;
|
||||
use uvll;
|
||||
|
||||
pub struct PipeWatcher {
|
||||
stream: StreamWatcher,
|
||||
home: SchedHandle,
|
||||
home: HomeHandle,
|
||||
priv defused: bool,
|
||||
}
|
||||
|
||||
pub struct PipeListener {
|
||||
home: SchedHandle,
|
||||
home: HomeHandle,
|
||||
pipe: *uvll::uv_pipe_t,
|
||||
priv outgoing: Tube<Result<~RtioPipe, IoError>>,
|
||||
priv outgoing: Chan<Result<~RtioPipe, IoError>>,
|
||||
priv incoming: Port<Result<~RtioPipe, IoError>>,
|
||||
}
|
||||
|
||||
pub struct PipeAcceptor {
|
||||
listener: ~PipeListener,
|
||||
priv incoming: Tube<Result<~RtioPipe, IoError>>,
|
||||
}
|
||||
|
||||
// PipeWatcher implementation and traits
|
||||
@ -46,7 +44,12 @@ impl PipeWatcher {
|
||||
// Creates an uninitialized pipe watcher. The underlying uv pipe is ready to
|
||||
// get bound to some other source (this is normally a helper method paired
|
||||
// with another call).
|
||||
pub fn new(loop_: &Loop, ipc: bool) -> PipeWatcher {
|
||||
pub fn new(io: &mut UvIoFactory, ipc: bool) -> PipeWatcher {
|
||||
let home = io.make_handle();
|
||||
PipeWatcher::new_home(&io.loop_, home, ipc)
|
||||
}
|
||||
|
||||
pub fn new_home(loop_: &Loop, home: HomeHandle, ipc: bool) -> PipeWatcher {
|
||||
let handle = unsafe {
|
||||
let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE);
|
||||
assert!(!handle.is_null());
|
||||
@ -56,26 +59,28 @@ impl PipeWatcher {
|
||||
};
|
||||
PipeWatcher {
|
||||
stream: StreamWatcher::new(handle),
|
||||
home: get_handle_to_current_scheduler!(),
|
||||
home: home,
|
||||
defused: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn open(loop_: &Loop, file: libc::c_int) -> Result<PipeWatcher, UvError>
|
||||
pub fn open(io: &mut UvIoFactory, file: libc::c_int)
|
||||
-> Result<PipeWatcher, UvError>
|
||||
{
|
||||
let pipe = PipeWatcher::new(loop_, false);
|
||||
let pipe = PipeWatcher::new(io, false);
|
||||
match unsafe { uvll::uv_pipe_open(pipe.handle(), file) } {
|
||||
0 => Ok(pipe),
|
||||
n => Err(UvError(n))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn connect(loop_: &Loop, name: &CString) -> Result<PipeWatcher, UvError>
|
||||
pub fn connect(io: &mut UvIoFactory, name: &CString)
|
||||
-> Result<PipeWatcher, UvError>
|
||||
{
|
||||
struct Ctx { task: Option<BlockedTask>, result: libc::c_int, }
|
||||
let mut cx = Ctx { task: None, result: 0 };
|
||||
let mut req = Request::new(uvll::UV_CONNECT);
|
||||
let pipe = PipeWatcher::new(loop_, false);
|
||||
let pipe = PipeWatcher::new(io, false);
|
||||
|
||||
wait_until_woken_after(&mut cx.task, || {
|
||||
unsafe {
|
||||
@ -97,8 +102,7 @@ impl PipeWatcher {
|
||||
assert!(status != uvll::ECANCELED);
|
||||
let cx: &mut Ctx = unsafe { req.get_data() };
|
||||
cx.result = status;
|
||||
let sched: ~Scheduler = Local::take();
|
||||
sched.resume_blocked_task_immediately(cx.task.take_unwrap());
|
||||
wakeup(&mut cx.task);
|
||||
}
|
||||
}
|
||||
|
||||
@ -125,7 +129,7 @@ impl RtioPipe for PipeWatcher {
|
||||
}
|
||||
|
||||
impl HomingIO for PipeWatcher {
|
||||
fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home }
|
||||
fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
|
||||
}
|
||||
|
||||
impl UvHandle<uvll::uv_pipe_t> for PipeWatcher {
|
||||
@ -144,8 +148,10 @@ impl Drop for PipeWatcher {
|
||||
// PipeListener implementation and traits
|
||||
|
||||
impl PipeListener {
|
||||
pub fn bind(loop_: &Loop, name: &CString) -> Result<~PipeListener, UvError> {
|
||||
let pipe = PipeWatcher::new(loop_, false);
|
||||
pub fn bind(io: &mut UvIoFactory, name: &CString)
|
||||
-> Result<~PipeListener, UvError>
|
||||
{
|
||||
let pipe = PipeWatcher::new(io, false);
|
||||
match unsafe {
|
||||
uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p))
|
||||
} {
|
||||
@ -153,10 +159,12 @@ impl PipeListener {
|
||||
// If successful, unwrap the PipeWatcher because we control how
|
||||
// we close the pipe differently. We can't rely on
|
||||
// StreamWatcher's default close method.
|
||||
let (port, chan) = Chan::new();
|
||||
let p = ~PipeListener {
|
||||
home: get_handle_to_current_scheduler!(),
|
||||
home: io.make_handle(),
|
||||
pipe: pipe.unwrap(),
|
||||
outgoing: Tube::new(),
|
||||
incoming: port,
|
||||
outgoing: chan,
|
||||
};
|
||||
Ok(p.install())
|
||||
}
|
||||
@ -168,11 +176,7 @@ impl PipeListener {
|
||||
impl RtioUnixListener for PipeListener {
|
||||
fn listen(mut ~self) -> Result<~RtioUnixAcceptor, IoError> {
|
||||
// create the acceptor object from ourselves
|
||||
let incoming = self.outgoing.clone();
|
||||
let mut acceptor = ~PipeAcceptor {
|
||||
listener: self,
|
||||
incoming: incoming,
|
||||
};
|
||||
let mut acceptor = ~PipeAcceptor { listener: self };
|
||||
|
||||
let _m = acceptor.fire_homing_missile();
|
||||
// XXX: the 128 backlog should be configurable
|
||||
@ -184,7 +188,7 @@ impl RtioUnixListener for PipeListener {
|
||||
}
|
||||
|
||||
impl HomingIO for PipeListener {
|
||||
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
|
||||
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
|
||||
}
|
||||
|
||||
impl UvHandle<uvll::uv_pipe_t> for PipeListener {
|
||||
@ -193,20 +197,20 @@ impl UvHandle<uvll::uv_pipe_t> for PipeListener {
|
||||
|
||||
extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) {
|
||||
assert!(status != uvll::ECANCELED);
|
||||
|
||||
let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) };
|
||||
let msg = match status {
|
||||
0 => {
|
||||
let loop_ = Loop::wrap(unsafe {
|
||||
uvll::get_loop_for_uv_handle(server)
|
||||
});
|
||||
let client = PipeWatcher::new(&loop_, false);
|
||||
let client = PipeWatcher::new_home(&loop_, pipe.home().clone(), false);
|
||||
assert_eq!(unsafe { uvll::uv_accept(server, client.handle()) }, 0);
|
||||
Ok(~client as ~RtioPipe)
|
||||
}
|
||||
n => Err(uv_error_to_io_error(UvError(n)))
|
||||
};
|
||||
|
||||
let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) };
|
||||
pipe.outgoing.send(msg);
|
||||
pipe.outgoing.send_deferred(msg);
|
||||
}
|
||||
|
||||
impl Drop for PipeListener {
|
||||
@ -220,13 +224,12 @@ impl Drop for PipeListener {
|
||||
|
||||
impl RtioUnixAcceptor for PipeAcceptor {
|
||||
fn accept(&mut self) -> Result<~RtioPipe, IoError> {
|
||||
let _m = self.fire_homing_missile();
|
||||
self.incoming.recv()
|
||||
self.listener.incoming.recv()
|
||||
}
|
||||
}
|
||||
|
||||
impl HomingIO for PipeAcceptor {
|
||||
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
|
||||
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home }
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@ -234,7 +237,6 @@ mod tests {
|
||||
use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe};
|
||||
use std::rt::test::next_test_unix;
|
||||
|
||||
use super::*;
|
||||
use super::super::local_loop;
|
||||
|
||||
#[test]
|
||||
|
@ -8,32 +8,31 @@
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
use std::io::IoError;
|
||||
use std::io::process;
|
||||
use std::libc::c_int;
|
||||
use std::libc;
|
||||
use std::ptr;
|
||||
use std::rt::BlockedTask;
|
||||
use std::io::IoError;
|
||||
use std::io::process::*;
|
||||
use std::rt::local::Local;
|
||||
use std::rt::rtio::RtioProcess;
|
||||
use std::rt::sched::{Scheduler, SchedHandle};
|
||||
use std::rt::task::BlockedTask;
|
||||
use std::vec;
|
||||
|
||||
use super::{Loop, UvHandle, UvError, uv_error_to_io_error,
|
||||
wait_until_woken_after};
|
||||
use uvio::HomingIO;
|
||||
use uvll;
|
||||
use homing::{HomingIO, HomeHandle};
|
||||
use pipe::PipeWatcher;
|
||||
use super::{UvHandle, UvError, uv_error_to_io_error,
|
||||
wait_until_woken_after, wakeup};
|
||||
use uvio::UvIoFactory;
|
||||
use uvll;
|
||||
|
||||
pub struct Process {
|
||||
handle: *uvll::uv_process_t,
|
||||
home: SchedHandle,
|
||||
home: HomeHandle,
|
||||
|
||||
/// Task to wake up (may be null) for when the process exits
|
||||
to_wake: Option<BlockedTask>,
|
||||
|
||||
/// Collected from the exit_cb
|
||||
exit_status: Option<ProcessExit>,
|
||||
exit_status: Option<process::ProcessExit>,
|
||||
}
|
||||
|
||||
impl Process {
|
||||
@ -41,7 +40,7 @@ impl Process {
|
||||
///
|
||||
/// Returns either the corresponding process object or an error which
|
||||
/// occurred.
|
||||
pub fn spawn(loop_: &Loop, config: ProcessConfig)
|
||||
pub fn spawn(io_loop: &mut UvIoFactory, config: process::ProcessConfig)
|
||||
-> Result<(~Process, ~[Option<PipeWatcher>]), UvError>
|
||||
{
|
||||
let cwd = config.cwd.map(|s| s.to_c_str());
|
||||
@ -52,7 +51,7 @@ impl Process {
|
||||
stdio.set_len(io.len());
|
||||
for (slot, other) in stdio.iter().zip(io.iter()) {
|
||||
let io = set_stdio(slot as *uvll::uv_stdio_container_t, other,
|
||||
loop_);
|
||||
io_loop);
|
||||
ret_io.push(io);
|
||||
}
|
||||
}
|
||||
@ -78,12 +77,12 @@ impl Process {
|
||||
let handle = UvHandle::alloc(None::<Process>, uvll::UV_PROCESS);
|
||||
let process = ~Process {
|
||||
handle: handle,
|
||||
home: get_handle_to_current_scheduler!(),
|
||||
home: io_loop.make_handle(),
|
||||
to_wake: None,
|
||||
exit_status: None,
|
||||
};
|
||||
match unsafe {
|
||||
uvll::uv_spawn(loop_.handle, handle, &options)
|
||||
uvll::uv_spawn(io_loop.uv_loop(), handle, &options)
|
||||
} {
|
||||
0 => Ok(process.install()),
|
||||
err => Err(UvError(err)),
|
||||
@ -105,33 +104,28 @@ extern fn on_exit(handle: *uvll::uv_process_t,
|
||||
|
||||
assert!(p.exit_status.is_none());
|
||||
p.exit_status = Some(match term_signal {
|
||||
0 => ExitStatus(exit_status as int),
|
||||
n => ExitSignal(n as int),
|
||||
0 => process::ExitStatus(exit_status as int),
|
||||
n => process::ExitSignal(n as int),
|
||||
});
|
||||
|
||||
match p.to_wake.take() {
|
||||
Some(task) => {
|
||||
let scheduler: ~Scheduler = Local::take();
|
||||
scheduler.resume_blocked_task_immediately(task);
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
if p.to_wake.is_none() { return }
|
||||
wakeup(&mut p.to_wake);
|
||||
}
|
||||
|
||||
unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,
|
||||
io: &StdioContainer,
|
||||
loop_: &Loop) -> Option<PipeWatcher> {
|
||||
io: &process::StdioContainer,
|
||||
io_loop: &mut UvIoFactory) -> Option<PipeWatcher> {
|
||||
match *io {
|
||||
Ignored => {
|
||||
process::Ignored => {
|
||||
uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE);
|
||||
None
|
||||
}
|
||||
InheritFd(fd) => {
|
||||
process::InheritFd(fd) => {
|
||||
uvll::set_stdio_container_flags(dst, uvll::STDIO_INHERIT_FD);
|
||||
uvll::set_stdio_container_fd(dst, fd);
|
||||
None
|
||||
}
|
||||
CreatePipe(readable, writable) => {
|
||||
process::CreatePipe(readable, writable) => {
|
||||
let mut flags = uvll::STDIO_CREATE_PIPE as libc::c_int;
|
||||
if readable {
|
||||
flags |= uvll::STDIO_READABLE_PIPE as libc::c_int;
|
||||
@ -139,7 +133,7 @@ unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,
|
||||
if writable {
|
||||
flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int;
|
||||
}
|
||||
let pipe = PipeWatcher::new(loop_, false);
|
||||
let pipe = PipeWatcher::new(io_loop, false);
|
||||
uvll::set_stdio_container_flags(dst, flags);
|
||||
uvll::set_stdio_container_stream(dst, pipe.handle());
|
||||
Some(pipe)
|
||||
@ -186,7 +180,7 @@ fn with_env<T>(env: Option<&[(~str, ~str)]>, f: |**libc::c_char| -> T) -> T {
|
||||
}
|
||||
|
||||
impl HomingIO for Process {
|
||||
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
|
||||
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
|
||||
}
|
||||
|
||||
impl UvHandle<uvll::uv_process_t> for Process {
|
||||
@ -208,7 +202,7 @@ impl RtioProcess for Process {
|
||||
}
|
||||
}
|
||||
|
||||
fn wait(&mut self) -> ProcessExit {
|
||||
fn wait(&mut self) -> process::ProcessExit {
|
||||
// Make sure (on the home scheduler) that we have an exit status listed
|
||||
let _m = self.fire_homing_missile();
|
||||
match self.exit_status {
|
||||
|
184
src/librustuv/queue.rs
Normal file
184
src/librustuv/queue.rs
Normal file
@ -0,0 +1,184 @@
|
||||
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
|
||||
// file at the top-level directory of this distribution and at
|
||||
// http://rust-lang.org/COPYRIGHT.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
||||
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
||||
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
//! A concurrent queue used to signal remote event loops
|
||||
//!
|
||||
//! This queue implementation is used to send tasks among event loops. This is
|
||||
//! backed by a multi-producer/single-consumer queue from libstd and uv_async_t
|
||||
//! handles (to wake up a remote event loop).
|
||||
//!
|
||||
//! The uv_async_t is stored next to the event loop, so in order to not keep the
|
||||
//! event loop alive we use uv_ref and uv_unref in order to control when the
|
||||
//! async handle is active or not.
|
||||
|
||||
use std::cast;
|
||||
use std::libc::{c_void, c_int};
|
||||
use std::rt::task::BlockedTask;
|
||||
use std::unstable::sync::LittleLock;
|
||||
use mpsc = std::sync::mpsc_queue;
|
||||
|
||||
use async::AsyncWatcher;
|
||||
use super::{Loop, UvHandle};
|
||||
use uvll;
|
||||
|
||||
enum Message {
|
||||
Task(BlockedTask),
|
||||
Increment,
|
||||
Decrement,
|
||||
}
|
||||
|
||||
struct State {
|
||||
handle: *uvll::uv_async_t,
|
||||
lock: LittleLock, // see comments in async_cb for why this is needed
|
||||
}
|
||||
|
||||
/// This structure is intended to be stored next to the event loop, and it is
|
||||
/// used to create new `Queue` structures.
|
||||
pub struct QueuePool {
|
||||
priv producer: mpsc::Producer<Message, State>,
|
||||
priv consumer: mpsc::Consumer<Message, State>,
|
||||
priv refcnt: uint,
|
||||
}
|
||||
|
||||
/// This type is used to send messages back to the original event loop.
|
||||
pub struct Queue {
|
||||
priv queue: mpsc::Producer<Message, State>,
|
||||
}
|
||||
|
||||
extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
|
||||
assert_eq!(status, 0);
|
||||
let state: &mut QueuePool = unsafe {
|
||||
cast::transmute(uvll::get_data_for_uv_handle(handle))
|
||||
};
|
||||
let packet = unsafe { state.consumer.packet() };
|
||||
|
||||
// Remember that there is no guarantee about how many times an async
|
||||
// callback is called with relation to the number of sends, so process the
|
||||
// entire queue in a loop.
|
||||
loop {
|
||||
match state.consumer.pop() {
|
||||
mpsc::Data(Task(task)) => {
|
||||
task.wake().map(|t| t.reawaken(true));
|
||||
}
|
||||
mpsc::Data(Increment) => unsafe {
|
||||
if state.refcnt == 0 {
|
||||
uvll::uv_ref((*packet).handle);
|
||||
}
|
||||
state.refcnt += 1;
|
||||
},
|
||||
mpsc::Data(Decrement) => unsafe {
|
||||
state.refcnt -= 1;
|
||||
if state.refcnt == 0 {
|
||||
uvll::uv_unref((*packet).handle);
|
||||
}
|
||||
},
|
||||
mpsc::Empty | mpsc::Inconsistent => break
|
||||
};
|
||||
}
|
||||
|
||||
// If the refcount is now zero after processing the queue, then there is no
|
||||
// longer a reference on the async handle and it is possible that this event
|
||||
// loop can exit. What we're not guaranteed, however, is that a producer in
|
||||
// the middle of dropping itself is yet done with the handle. It could be
|
||||
// possible that we saw their Decrement message but they have yet to signal
|
||||
// on the async handle. If we were to return immediately, the entire uv loop
|
||||
// could be destroyed meaning the call to uv_async_send would abort()
|
||||
//
|
||||
// In order to fix this, an OS mutex is used to wait for the other end to
|
||||
// finish before we continue. The drop block on a handle will acquire a
|
||||
// mutex and then drop it after both the push and send have been completed.
|
||||
// If we acquire the mutex here, then we are guaranteed that there are no
|
||||
// longer any senders which are holding on to their handles, so we can
|
||||
// safely allow the event loop to exit.
|
||||
if state.refcnt == 0 {
|
||||
unsafe {
|
||||
let _l = (*packet).lock.lock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl QueuePool {
|
||||
pub fn new(loop_: &mut Loop) -> ~QueuePool {
|
||||
let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
|
||||
let (c, p) = mpsc::queue(State {
|
||||
handle: handle,
|
||||
lock: LittleLock::new(),
|
||||
});
|
||||
let q = ~QueuePool {
|
||||
producer: p,
|
||||
consumer: c,
|
||||
refcnt: 0,
|
||||
};
|
||||
|
||||
unsafe {
|
||||
assert_eq!(uvll::uv_async_init(loop_.handle, handle, async_cb), 0);
|
||||
uvll::uv_unref(handle);
|
||||
let data: *c_void = *cast::transmute::<&~QueuePool, &*c_void>(&q);
|
||||
uvll::set_data_for_uv_handle(handle, data);
|
||||
}
|
||||
|
||||
return q;
|
||||
}
|
||||
|
||||
pub fn queue(&mut self) -> Queue {
|
||||
unsafe {
|
||||
if self.refcnt == 0 {
|
||||
uvll::uv_ref((*self.producer.packet()).handle);
|
||||
}
|
||||
self.refcnt += 1;
|
||||
}
|
||||
Queue { queue: self.producer.clone() }
|
||||
}
|
||||
}
|
||||
|
||||
impl Queue {
|
||||
pub fn push(&mut self, task: BlockedTask) {
|
||||
self.queue.push(Task(task));
|
||||
unsafe {
|
||||
uvll::uv_async_send((*self.queue.packet()).handle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for Queue {
|
||||
fn clone(&self) -> Queue {
|
||||
// Push a request to increment on the queue, but there's no need to
|
||||
// signal the event loop to process it at this time. We're guaranteed
|
||||
// that the count is at least one (because we have a queue right here),
|
||||
// and if the queue is dropped later on it'll see the increment for the
|
||||
// decrement anyway.
|
||||
unsafe {
|
||||
cast::transmute_mut(self).queue.push(Increment);
|
||||
}
|
||||
Queue { queue: self.queue.clone() }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Queue {
|
||||
fn drop(&mut self) {
|
||||
// See the comments in the async_cb function for why there is a lock
|
||||
// that is acquired only on a drop.
|
||||
unsafe {
|
||||
let state = self.queue.packet();
|
||||
let _l = (*state).lock.lock();
|
||||
self.queue.push(Decrement);
|
||||
uvll::uv_async_send((*state).handle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for State {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
uvll::uv_close(self.handle, cast::transmute(0));
|
||||
uvll::free_handle(self.handle);
|
||||
}
|
||||
}
|
||||
}
|
@ -10,34 +10,33 @@
|
||||
|
||||
use std::libc::c_int;
|
||||
use std::io::signal::Signum;
|
||||
use std::rt::sched::{SchedHandle, Scheduler};
|
||||
use std::comm::SharedChan;
|
||||
use std::rt::local::Local;
|
||||
use std::rt::rtio::RtioSignal;
|
||||
|
||||
use super::{Loop, UvError, UvHandle};
|
||||
use homing::{HomingIO, HomeHandle};
|
||||
use super::{UvError, UvHandle};
|
||||
use uvll;
|
||||
use uvio::HomingIO;
|
||||
use uvio::UvIoFactory;
|
||||
|
||||
pub struct SignalWatcher {
|
||||
handle: *uvll::uv_signal_t,
|
||||
home: SchedHandle,
|
||||
home: HomeHandle,
|
||||
|
||||
channel: SharedChan<Signum>,
|
||||
signal: Signum,
|
||||
}
|
||||
|
||||
impl SignalWatcher {
|
||||
pub fn new(loop_: &mut Loop, signum: Signum,
|
||||
pub fn new(io: &mut UvIoFactory, signum: Signum,
|
||||
channel: SharedChan<Signum>) -> Result<~SignalWatcher, UvError> {
|
||||
let s = ~SignalWatcher {
|
||||
handle: UvHandle::alloc(None::<SignalWatcher>, uvll::UV_SIGNAL),
|
||||
home: get_handle_to_current_scheduler!(),
|
||||
home: io.make_handle(),
|
||||
channel: channel,
|
||||
signal: signum,
|
||||
};
|
||||
assert_eq!(unsafe {
|
||||
uvll::uv_signal_init(loop_.handle, s.handle)
|
||||
uvll::uv_signal_init(io.uv_loop(), s.handle)
|
||||
}, 0);
|
||||
|
||||
match unsafe {
|
||||
@ -57,7 +56,7 @@ extern fn signal_cb(handle: *uvll::uv_signal_t, signum: c_int) {
|
||||
}
|
||||
|
||||
impl HomingIO for SignalWatcher {
|
||||
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
|
||||
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
|
||||
}
|
||||
|
||||
impl UvHandle<uvll::uv_signal_t> for SignalWatcher {
|
||||
@ -75,7 +74,6 @@ impl Drop for SignalWatcher {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use super::super::local_loop;
|
||||
use std::io::signal;
|
||||
|
||||
|
@ -11,12 +11,10 @@
|
||||
use std::cast;
|
||||
use std::libc::{c_int, size_t, ssize_t};
|
||||
use std::ptr;
|
||||
use std::rt::BlockedTask;
|
||||
use std::rt::local::Local;
|
||||
use std::rt::sched::Scheduler;
|
||||
use std::rt::task::BlockedTask;
|
||||
|
||||
use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after,
|
||||
ForbidUnwind};
|
||||
ForbidUnwind, wakeup};
|
||||
use uvll;
|
||||
|
||||
// This is a helper structure which is intended to get embedded into other
|
||||
@ -164,8 +162,7 @@ extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: *Buf) {
|
||||
unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); }
|
||||
rcx.result = nread;
|
||||
|
||||
let scheduler: ~Scheduler = Local::take();
|
||||
scheduler.resume_blocked_task_immediately(rcx.task.take_unwrap());
|
||||
wakeup(&mut rcx.task);
|
||||
}
|
||||
|
||||
// Unlike reading, the WriteContext is stored in the uv_write_t request. Like
|
||||
@ -180,6 +177,5 @@ extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
|
||||
wcx.result = status;
|
||||
req.defuse();
|
||||
|
||||
let sched: ~Scheduler = Local::take();
|
||||
sched.resume_blocked_task_immediately(wcx.task.take_unwrap());
|
||||
wakeup(&mut wcx.task);
|
||||
}
|
||||
|
@ -9,19 +9,19 @@
|
||||
// except according to those terms.
|
||||
|
||||
use std::libc::c_int;
|
||||
use std::rt::BlockedTask;
|
||||
use std::rt::local::Local;
|
||||
use std::rt::rtio::RtioTimer;
|
||||
use std::rt::sched::{Scheduler, SchedHandle};
|
||||
use std::rt::task::{BlockedTask, Task};
|
||||
use std::util;
|
||||
|
||||
use homing::{HomeHandle, HomingIO};
|
||||
use super::{UvHandle, ForbidUnwind, ForbidSwitch};
|
||||
use uvio::UvIoFactory;
|
||||
use uvll;
|
||||
use super::{Loop, UvHandle, ForbidUnwind, ForbidSwitch};
|
||||
use uvio::HomingIO;
|
||||
|
||||
pub struct TimerWatcher {
|
||||
handle: *uvll::uv_timer_t,
|
||||
home: SchedHandle,
|
||||
home: HomeHandle,
|
||||
action: Option<NextAction>,
|
||||
id: uint, // see comments in timer_cb
|
||||
}
|
||||
@ -33,15 +33,15 @@ pub enum NextAction {
|
||||
}
|
||||
|
||||
impl TimerWatcher {
|
||||
pub fn new(loop_: &mut Loop) -> ~TimerWatcher {
|
||||
pub fn new(io: &mut UvIoFactory) -> ~TimerWatcher {
|
||||
let handle = UvHandle::alloc(None::<TimerWatcher>, uvll::UV_TIMER);
|
||||
assert_eq!(unsafe {
|
||||
uvll::uv_timer_init(loop_.handle, handle)
|
||||
uvll::uv_timer_init(io.uv_loop(), handle)
|
||||
}, 0);
|
||||
let me = ~TimerWatcher {
|
||||
handle: handle,
|
||||
action: None,
|
||||
home: get_handle_to_current_scheduler!(),
|
||||
home: io.make_handle(),
|
||||
id: 0,
|
||||
};
|
||||
return me.install();
|
||||
@ -59,7 +59,7 @@ impl TimerWatcher {
|
||||
}
|
||||
|
||||
impl HomingIO for TimerWatcher {
|
||||
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
|
||||
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
|
||||
}
|
||||
|
||||
impl UvHandle<uvll::uv_timer_t> for TimerWatcher {
|
||||
@ -89,10 +89,11 @@ impl RtioTimer for TimerWatcher {
|
||||
// started, then we need to call stop on the timer.
|
||||
let _f = ForbidUnwind::new("timer");
|
||||
|
||||
let sched: ~Scheduler = Local::take();
|
||||
sched.deschedule_running_task_and_then(|_sched, task| {
|
||||
let task: ~Task = Local::take();
|
||||
task.deschedule(1, |task| {
|
||||
self.action = Some(WakeTask(task));
|
||||
self.start(msecs, 0);
|
||||
Ok(())
|
||||
});
|
||||
self.stop();
|
||||
}
|
||||
@ -137,8 +138,7 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) {
|
||||
|
||||
match timer.action.take_unwrap() {
|
||||
WakeTask(task) => {
|
||||
let sched: ~Scheduler = Local::take();
|
||||
sched.resume_blocked_task_immediately(task);
|
||||
task.wake().map(|t| t.reawaken(true));
|
||||
}
|
||||
SendOnce(chan) => { chan.try_send_deferred(()); }
|
||||
SendMany(chan, id) => {
|
||||
@ -177,7 +177,6 @@ impl Drop for TimerWatcher {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use std::rt::rtio::RtioTimer;
|
||||
use super::super::local_loop;
|
||||
|
||||
|
@ -10,24 +10,23 @@
|
||||
|
||||
use std::libc;
|
||||
use std::io::IoError;
|
||||
use std::rt::local::Local;
|
||||
use std::rt::rtio::RtioTTY;
|
||||
use std::rt::sched::{Scheduler, SchedHandle};
|
||||
|
||||
use homing::{HomingIO, HomeHandle};
|
||||
use stream::StreamWatcher;
|
||||
use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
|
||||
use uvio::HomingIO;
|
||||
use super::{UvError, UvHandle, uv_error_to_io_error};
|
||||
use uvio::UvIoFactory;
|
||||
use uvll;
|
||||
|
||||
pub struct TtyWatcher{
|
||||
tty: *uvll::uv_tty_t,
|
||||
stream: StreamWatcher,
|
||||
home: SchedHandle,
|
||||
home: HomeHandle,
|
||||
fd: libc::c_int,
|
||||
}
|
||||
|
||||
impl TtyWatcher {
|
||||
pub fn new(loop_: &Loop, fd: libc::c_int, readable: bool)
|
||||
pub fn new(io: &mut UvIoFactory, fd: libc::c_int, readable: bool)
|
||||
-> Result<TtyWatcher, UvError>
|
||||
{
|
||||
// libuv may succeed in giving us a handle (via uv_tty_init), but if the
|
||||
@ -56,14 +55,14 @@ impl TtyWatcher {
|
||||
// with attempting to open it as a tty.
|
||||
let handle = UvHandle::alloc(None::<TtyWatcher>, uvll::UV_TTY);
|
||||
match unsafe {
|
||||
uvll::uv_tty_init(loop_.handle, handle, fd as libc::c_int,
|
||||
uvll::uv_tty_init(io.uv_loop(), handle, fd as libc::c_int,
|
||||
readable as libc::c_int)
|
||||
} {
|
||||
0 => {
|
||||
Ok(TtyWatcher {
|
||||
tty: handle,
|
||||
stream: StreamWatcher::new(handle),
|
||||
home: get_handle_to_current_scheduler!(),
|
||||
home: io.make_handle(),
|
||||
fd: fd,
|
||||
})
|
||||
}
|
||||
@ -120,7 +119,7 @@ impl UvHandle<uvll::uv_tty_t> for TtyWatcher {
|
||||
}
|
||||
|
||||
impl HomingIO for TtyWatcher {
|
||||
fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home }
|
||||
fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
|
||||
}
|
||||
|
||||
impl Drop for TtyWatcher {
|
||||
|
@ -9,121 +9,41 @@
|
||||
// except according to those terms.
|
||||
|
||||
use std::c_str::CString;
|
||||
use std::cast;
|
||||
use std::comm::SharedChan;
|
||||
use std::libc::c_int;
|
||||
use std::libc;
|
||||
use std::path::Path;
|
||||
use std::io::IoError;
|
||||
use std::io::net::ip::SocketAddr;
|
||||
use std::io::process::ProcessConfig;
|
||||
use std::io;
|
||||
use std::rt::local::Local;
|
||||
use std::rt::rtio::*;
|
||||
use std::rt::sched::{Scheduler, SchedHandle};
|
||||
use std::rt::task::Task;
|
||||
use std::libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY,
|
||||
S_IRUSR, S_IWUSR};
|
||||
use std::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write,
|
||||
ReadWrite, FileStat};
|
||||
use std::io::signal::Signum;
|
||||
use std::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write,
|
||||
ReadWrite, FileStat};
|
||||
use std::io;
|
||||
use std::libc::c_int;
|
||||
use std::libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, S_IRUSR,
|
||||
S_IWUSR};
|
||||
use std::libc;
|
||||
use std::path::Path;
|
||||
use std::rt::rtio;
|
||||
use std::rt::rtio::IoFactory;
|
||||
use ai = std::io::net::addrinfo;
|
||||
|
||||
#[cfg(test)] use std::unstable::run_in_bare_thread;
|
||||
|
||||
use super::*;
|
||||
use super::{uv_error_to_io_error, Loop};
|
||||
|
||||
use addrinfo::GetAddrInfoRequest;
|
||||
|
||||
pub trait HomingIO {
|
||||
|
||||
fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
|
||||
|
||||
/// This function will move tasks to run on their home I/O scheduler. Note
|
||||
/// that this function does *not* pin the task to the I/O scheduler, but
|
||||
/// rather it simply moves it to running on the I/O scheduler.
|
||||
fn go_to_IO_home(&mut self) -> uint {
|
||||
use std::rt::sched::RunOnce;
|
||||
|
||||
let _f = ForbidUnwind::new("going home");
|
||||
|
||||
let current_sched_id = {
|
||||
let mut sched = Local::borrow(None::<Scheduler>);
|
||||
sched.get().sched_id()
|
||||
};
|
||||
|
||||
// Only need to invoke a context switch if we're not on the right
|
||||
// scheduler.
|
||||
if current_sched_id != self.home().sched_id {
|
||||
let scheduler: ~Scheduler = Local::take();
|
||||
scheduler.deschedule_running_task_and_then(|_, task| {
|
||||
task.wake().map(|task| {
|
||||
self.home().send(RunOnce(task));
|
||||
});
|
||||
})
|
||||
}
|
||||
let current_sched_id = {
|
||||
let mut sched = Local::borrow(None::<Scheduler>);
|
||||
sched.get().sched_id()
|
||||
};
|
||||
assert!(current_sched_id == self.home().sched_id);
|
||||
|
||||
self.home().sched_id
|
||||
}
|
||||
|
||||
/// Fires a single homing missile, returning another missile targeted back
|
||||
/// at the original home of this task. In other words, this function will
|
||||
/// move the local task to its I/O scheduler and then return an RAII wrapper
|
||||
/// which will return the task home.
|
||||
fn fire_homing_missile(&mut self) -> HomingMissile {
|
||||
HomingMissile { io_home: self.go_to_IO_home() }
|
||||
}
|
||||
|
||||
/// Same as `fire_homing_missile`, but returns the local I/O scheduler as
|
||||
/// well (the one that was homed to).
|
||||
fn fire_homing_missile_sched(&mut self) -> (HomingMissile, ~Scheduler) {
|
||||
// First, transplant ourselves to the home I/O scheduler
|
||||
let missile = self.fire_homing_missile();
|
||||
// Next (must happen next), grab the local I/O scheduler
|
||||
let io_sched: ~Scheduler = Local::take();
|
||||
|
||||
(missile, io_sched)
|
||||
}
|
||||
}
|
||||
|
||||
/// After a homing operation has been completed, this will return the current
|
||||
/// task back to its appropriate home (if applicable). The field is used to
|
||||
/// assert that we are where we think we are.
|
||||
struct HomingMissile {
|
||||
priv io_home: uint,
|
||||
}
|
||||
|
||||
impl HomingMissile {
|
||||
pub fn check(&self, msg: &'static str) {
|
||||
let mut sched = Local::borrow(None::<Scheduler>);
|
||||
let local_id = sched.get().sched_id();
|
||||
assert!(local_id == self.io_home, "{}", msg);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for HomingMissile {
|
||||
fn drop(&mut self) {
|
||||
let _f = ForbidUnwind::new("leaving home");
|
||||
|
||||
// It would truly be a sad day if we had moved off the home I/O
|
||||
// scheduler while we were doing I/O.
|
||||
self.check("task moved away from the home scheduler");
|
||||
|
||||
// If we were a homed task, then we must send ourselves back to the
|
||||
// original scheduler. Otherwise, we can just return and keep running
|
||||
if !Task::on_appropriate_sched() {
|
||||
let scheduler: ~Scheduler = Local::take();
|
||||
scheduler.deschedule_running_task_and_then(|_, task| {
|
||||
task.wake().map(|task| {
|
||||
Scheduler::run_task(task);
|
||||
});
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
use async::AsyncWatcher;
|
||||
use file::{FsRequest, FileWatcher};
|
||||
use queue::QueuePool;
|
||||
use homing::HomeHandle;
|
||||
use idle::IdleWatcher;
|
||||
use net::{TcpWatcher, TcpListener, UdpWatcher};
|
||||
use pipe::{PipeWatcher, PipeListener};
|
||||
use process::Process;
|
||||
use signal::SignalWatcher;
|
||||
use timer::TimerWatcher;
|
||||
use tty::TtyWatcher;
|
||||
use uvll;
|
||||
|
||||
// Obviously an Event Loop is always home.
|
||||
pub struct UvEventLoop {
|
||||
@ -132,45 +52,52 @@ pub struct UvEventLoop {
|
||||
|
||||
impl UvEventLoop {
|
||||
pub fn new() -> UvEventLoop {
|
||||
let mut loop_ = Loop::new();
|
||||
let handle_pool = QueuePool::new(&mut loop_);
|
||||
UvEventLoop {
|
||||
uvio: UvIoFactory(Loop::new())
|
||||
uvio: UvIoFactory {
|
||||
loop_: loop_,
|
||||
handle_pool: handle_pool,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for UvEventLoop {
|
||||
fn drop(&mut self) {
|
||||
self.uvio.uv_loop().close();
|
||||
self.uvio.loop_.close();
|
||||
}
|
||||
}
|
||||
|
||||
impl EventLoop for UvEventLoop {
|
||||
impl rtio::EventLoop for UvEventLoop {
|
||||
fn run(&mut self) {
|
||||
self.uvio.uv_loop().run();
|
||||
self.uvio.loop_.run();
|
||||
}
|
||||
|
||||
fn callback(&mut self, f: proc()) {
|
||||
IdleWatcher::onetime(self.uvio.uv_loop(), f);
|
||||
IdleWatcher::onetime(&mut self.uvio.loop_, f);
|
||||
}
|
||||
|
||||
fn pausable_idle_callback(&mut self, cb: ~Callback) -> ~PausableIdleCallback {
|
||||
IdleWatcher::new(self.uvio.uv_loop(), cb) as ~PausableIdleCallback
|
||||
fn pausible_idle_callback(&mut self, cb: ~rtio::Callback)
|
||||
-> ~rtio::PausibleIdleCallback
|
||||
{
|
||||
IdleWatcher::new(&mut self.uvio.loop_, cb) as ~rtio::PausibleIdleCallback
|
||||
}
|
||||
|
||||
fn remote_callback(&mut self, f: ~Callback) -> ~RemoteCallback {
|
||||
~AsyncWatcher::new(self.uvio.uv_loop(), f) as ~RemoteCallback
|
||||
fn remote_callback(&mut self, f: ~rtio::Callback) -> ~rtio::RemoteCallback {
|
||||
~AsyncWatcher::new(&mut self.uvio.loop_, f) as ~rtio::RemoteCallback
|
||||
}
|
||||
|
||||
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> {
|
||||
let factory = &mut self.uvio as &mut IoFactory;
|
||||
fn io<'a>(&'a mut self) -> Option<&'a mut rtio::IoFactory> {
|
||||
let factory = &mut self.uvio as &mut rtio::IoFactory;
|
||||
Some(factory)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
#[lang = "event_loop_factory"]
|
||||
pub extern "C" fn new_loop() -> ~EventLoop {
|
||||
~UvEventLoop::new() as ~EventLoop
|
||||
pub extern "C" fn new_loop() -> ~rtio::EventLoop {
|
||||
~UvEventLoop::new() as ~rtio::EventLoop
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -187,59 +114,65 @@ fn test_callback_run_once() {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UvIoFactory(Loop);
|
||||
pub struct UvIoFactory {
|
||||
loop_: Loop,
|
||||
priv handle_pool: ~QueuePool,
|
||||
}
|
||||
|
||||
impl UvIoFactory {
|
||||
pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
|
||||
match self { &UvIoFactory(ref mut ptr) => ptr }
|
||||
pub fn uv_loop<'a>(&mut self) -> *uvll::uv_loop_t { self.loop_.handle }
|
||||
|
||||
pub fn make_handle(&mut self) -> HomeHandle {
|
||||
HomeHandle::new(self.id(), &mut *self.handle_pool)
|
||||
}
|
||||
}
|
||||
|
||||
impl IoFactory for UvIoFactory {
|
||||
fn id(&self) -> uint { unsafe { cast::transmute(self) } }
|
||||
|
||||
// Connect to an address and return a new stream
|
||||
// NB: This blocks the task waiting on the connection.
|
||||
// It would probably be better to return a future
|
||||
fn tcp_connect(&mut self, addr: SocketAddr)
|
||||
-> Result<~RtioTcpStream, IoError>
|
||||
-> Result<~rtio::RtioTcpStream, IoError>
|
||||
{
|
||||
match TcpWatcher::connect(self.uv_loop(), addr) {
|
||||
Ok(t) => Ok(~t as ~RtioTcpStream),
|
||||
match TcpWatcher::connect(self, addr) {
|
||||
Ok(t) => Ok(~t as ~rtio::RtioTcpStream),
|
||||
Err(e) => Err(uv_error_to_io_error(e)),
|
||||
}
|
||||
}
|
||||
|
||||
fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListener, IoError> {
|
||||
match TcpListener::bind(self.uv_loop(), addr) {
|
||||
Ok(t) => Ok(t as ~RtioTcpListener),
|
||||
fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~rtio::RtioTcpListener, IoError> {
|
||||
match TcpListener::bind(self, addr) {
|
||||
Ok(t) => Ok(t as ~rtio::RtioTcpListener),
|
||||
Err(e) => Err(uv_error_to_io_error(e)),
|
||||
}
|
||||
}
|
||||
|
||||
fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocket, IoError> {
|
||||
match UdpWatcher::bind(self.uv_loop(), addr) {
|
||||
Ok(u) => Ok(~u as ~RtioUdpSocket),
|
||||
fn udp_bind(&mut self, addr: SocketAddr) -> Result<~rtio::RtioUdpSocket, IoError> {
|
||||
match UdpWatcher::bind(self, addr) {
|
||||
Ok(u) => Ok(~u as ~rtio::RtioUdpSocket),
|
||||
Err(e) => Err(uv_error_to_io_error(e)),
|
||||
}
|
||||
}
|
||||
|
||||
fn timer_init(&mut self) -> Result<~RtioTimer, IoError> {
|
||||
Ok(TimerWatcher::new(self.uv_loop()) as ~RtioTimer)
|
||||
fn timer_init(&mut self) -> Result<~rtio::RtioTimer, IoError> {
|
||||
Ok(TimerWatcher::new(self) as ~rtio::RtioTimer)
|
||||
}
|
||||
|
||||
fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
|
||||
hint: Option<ai::Hint>) -> Result<~[ai::Info], IoError> {
|
||||
let r = GetAddrInfoRequest::run(self.uv_loop(), host, servname, hint);
|
||||
let r = GetAddrInfoRequest::run(&self.loop_, host, servname, hint);
|
||||
r.map_err(uv_error_to_io_error)
|
||||
}
|
||||
|
||||
fn fs_from_raw_fd(&mut self, fd: c_int,
|
||||
close: CloseBehavior) -> ~RtioFileStream {
|
||||
let loop_ = Loop::wrap(self.uv_loop().handle);
|
||||
~FileWatcher::new(loop_, fd, close) as ~RtioFileStream
|
||||
close: rtio::CloseBehavior) -> ~rtio::RtioFileStream {
|
||||
~FileWatcher::new(self, fd, close) as ~rtio::RtioFileStream
|
||||
}
|
||||
|
||||
fn fs_open(&mut self, path: &CString, fm: FileMode, fa: FileAccess)
|
||||
-> Result<~RtioFileStream, IoError> {
|
||||
-> Result<~rtio::RtioFileStream, IoError> {
|
||||
let flags = match fm {
|
||||
io::Open => 0,
|
||||
io::Append => libc::O_APPEND,
|
||||
@ -254,117 +187,117 @@ impl IoFactory for UvIoFactory {
|
||||
libc::S_IRUSR | libc::S_IWUSR),
|
||||
};
|
||||
|
||||
match FsRequest::open(self.uv_loop(), path, flags as int, mode as int) {
|
||||
Ok(fs) => Ok(~fs as ~RtioFileStream),
|
||||
match FsRequest::open(self, path, flags as int, mode as int) {
|
||||
Ok(fs) => Ok(~fs as ~rtio::RtioFileStream),
|
||||
Err(e) => Err(uv_error_to_io_error(e))
|
||||
}
|
||||
}
|
||||
|
||||
fn fs_unlink(&mut self, path: &CString) -> Result<(), IoError> {
|
||||
let r = FsRequest::unlink(self.uv_loop(), path);
|
||||
let r = FsRequest::unlink(&self.loop_, path);
|
||||
r.map_err(uv_error_to_io_error)
|
||||
}
|
||||
fn fs_lstat(&mut self, path: &CString) -> Result<FileStat, IoError> {
|
||||
let r = FsRequest::lstat(self.uv_loop(), path);
|
||||
let r = FsRequest::lstat(&self.loop_, path);
|
||||
r.map_err(uv_error_to_io_error)
|
||||
}
|
||||
fn fs_stat(&mut self, path: &CString) -> Result<FileStat, IoError> {
|
||||
let r = FsRequest::stat(self.uv_loop(), path);
|
||||
let r = FsRequest::stat(&self.loop_, path);
|
||||
r.map_err(uv_error_to_io_error)
|
||||
}
|
||||
fn fs_mkdir(&mut self, path: &CString,
|
||||
perm: io::FilePermission) -> Result<(), IoError> {
|
||||
let r = FsRequest::mkdir(self.uv_loop(), path, perm as c_int);
|
||||
let r = FsRequest::mkdir(&self.loop_, path, perm as c_int);
|
||||
r.map_err(uv_error_to_io_error)
|
||||
}
|
||||
fn fs_rmdir(&mut self, path: &CString) -> Result<(), IoError> {
|
||||
let r = FsRequest::rmdir(self.uv_loop(), path);
|
||||
let r = FsRequest::rmdir(&self.loop_, path);
|
||||
r.map_err(uv_error_to_io_error)
|
||||
}
|
||||
fn fs_rename(&mut self, path: &CString, to: &CString) -> Result<(), IoError> {
|
||||
let r = FsRequest::rename(self.uv_loop(), path, to);
|
||||
let r = FsRequest::rename(&self.loop_, path, to);
|
||||
r.map_err(uv_error_to_io_error)
|
||||
}
|
||||
fn fs_chmod(&mut self, path: &CString,
|
||||
perm: io::FilePermission) -> Result<(), IoError> {
|
||||
let r = FsRequest::chmod(self.uv_loop(), path, perm as c_int);
|
||||
let r = FsRequest::chmod(&self.loop_, path, perm as c_int);
|
||||
r.map_err(uv_error_to_io_error)
|
||||
}
|
||||
fn fs_readdir(&mut self, path: &CString, flags: c_int)
|
||||
-> Result<~[Path], IoError>
|
||||
{
|
||||
let r = FsRequest::readdir(self.uv_loop(), path, flags);
|
||||
let r = FsRequest::readdir(&self.loop_, path, flags);
|
||||
r.map_err(uv_error_to_io_error)
|
||||
}
|
||||
fn fs_link(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> {
|
||||
let r = FsRequest::link(self.uv_loop(), src, dst);
|
||||
let r = FsRequest::link(&self.loop_, src, dst);
|
||||
r.map_err(uv_error_to_io_error)
|
||||
}
|
||||
fn fs_symlink(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> {
|
||||
let r = FsRequest::symlink(self.uv_loop(), src, dst);
|
||||
let r = FsRequest::symlink(&self.loop_, src, dst);
|
||||
r.map_err(uv_error_to_io_error)
|
||||
}
|
||||
fn fs_chown(&mut self, path: &CString, uid: int, gid: int) -> Result<(), IoError> {
|
||||
let r = FsRequest::chown(self.uv_loop(), path, uid, gid);
|
||||
let r = FsRequest::chown(&self.loop_, path, uid, gid);
|
||||
r.map_err(uv_error_to_io_error)
|
||||
}
|
||||
fn fs_readlink(&mut self, path: &CString) -> Result<Path, IoError> {
|
||||
let r = FsRequest::readlink(self.uv_loop(), path);
|
||||
let r = FsRequest::readlink(&self.loop_, path);
|
||||
r.map_err(uv_error_to_io_error)
|
||||
}
|
||||
fn fs_utime(&mut self, path: &CString, atime: u64, mtime: u64)
|
||||
-> Result<(), IoError>
|
||||
{
|
||||
let r = FsRequest::utime(self.uv_loop(), path, atime, mtime);
|
||||
let r = FsRequest::utime(&self.loop_, path, atime, mtime);
|
||||
r.map_err(uv_error_to_io_error)
|
||||
}
|
||||
|
||||
fn spawn(&mut self, config: ProcessConfig)
|
||||
-> Result<(~RtioProcess, ~[Option<~RtioPipe>]), IoError>
|
||||
-> Result<(~rtio::RtioProcess, ~[Option<~rtio::RtioPipe>]), IoError>
|
||||
{
|
||||
match Process::spawn(self.uv_loop(), config) {
|
||||
match Process::spawn(self, config) {
|
||||
Ok((p, io)) => {
|
||||
Ok((p as ~RtioProcess,
|
||||
io.move_iter().map(|i| i.map(|p| ~p as ~RtioPipe)).collect()))
|
||||
Ok((p as ~rtio::RtioProcess,
|
||||
io.move_iter().map(|i| i.map(|p| ~p as ~rtio::RtioPipe)).collect()))
|
||||
}
|
||||
Err(e) => Err(uv_error_to_io_error(e)),
|
||||
}
|
||||
}
|
||||
|
||||
fn unix_bind(&mut self, path: &CString) -> Result<~RtioUnixListener, IoError>
|
||||
fn unix_bind(&mut self, path: &CString) -> Result<~rtio::RtioUnixListener, IoError>
|
||||
{
|
||||
match PipeListener::bind(self.uv_loop(), path) {
|
||||
Ok(p) => Ok(p as ~RtioUnixListener),
|
||||
match PipeListener::bind(self, path) {
|
||||
Ok(p) => Ok(p as ~rtio::RtioUnixListener),
|
||||
Err(e) => Err(uv_error_to_io_error(e)),
|
||||
}
|
||||
}
|
||||
|
||||
fn unix_connect(&mut self, path: &CString) -> Result<~RtioPipe, IoError> {
|
||||
match PipeWatcher::connect(self.uv_loop(), path) {
|
||||
Ok(p) => Ok(~p as ~RtioPipe),
|
||||
fn unix_connect(&mut self, path: &CString) -> Result<~rtio::RtioPipe, IoError> {
|
||||
match PipeWatcher::connect(self, path) {
|
||||
Ok(p) => Ok(~p as ~rtio::RtioPipe),
|
||||
Err(e) => Err(uv_error_to_io_error(e)),
|
||||
}
|
||||
}
|
||||
|
||||
fn tty_open(&mut self, fd: c_int, readable: bool)
|
||||
-> Result<~RtioTTY, IoError> {
|
||||
match TtyWatcher::new(self.uv_loop(), fd, readable) {
|
||||
Ok(tty) => Ok(~tty as ~RtioTTY),
|
||||
-> Result<~rtio::RtioTTY, IoError> {
|
||||
match TtyWatcher::new(self, fd, readable) {
|
||||
Ok(tty) => Ok(~tty as ~rtio::RtioTTY),
|
||||
Err(e) => Err(uv_error_to_io_error(e))
|
||||
}
|
||||
}
|
||||
|
||||
fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError> {
|
||||
match PipeWatcher::open(self.uv_loop(), fd) {
|
||||
Ok(s) => Ok(~s as ~RtioPipe),
|
||||
fn pipe_open(&mut self, fd: c_int) -> Result<~rtio::RtioPipe, IoError> {
|
||||
match PipeWatcher::open(self, fd) {
|
||||
Ok(s) => Ok(~s as ~rtio::RtioPipe),
|
||||
Err(e) => Err(uv_error_to_io_error(e))
|
||||
}
|
||||
}
|
||||
|
||||
fn signal(&mut self, signum: Signum, channel: SharedChan<Signum>)
|
||||
-> Result<~RtioSignal, IoError> {
|
||||
match SignalWatcher::new(self.uv_loop(), signum, channel) {
|
||||
Ok(s) => Ok(s as ~RtioSignal),
|
||||
-> Result<~rtio::RtioSignal, IoError> {
|
||||
match SignalWatcher::new(self, signum, channel) {
|
||||
Ok(s) => Ok(s as ~rtio::RtioSignal),
|
||||
Err(e) => Err(uv_error_to_io_error(e)),
|
||||
}
|
||||
}
|
||||
|
@ -37,7 +37,8 @@ use std::libc;
|
||||
#[cfg(test)]
|
||||
use std::libc::uintptr_t;
|
||||
|
||||
pub use self::errors::*;
|
||||
pub use self::errors::{EACCES, ECONNREFUSED, ECONNRESET, EPIPE, ECONNABORTED,
|
||||
ECANCELED, EBADF, ENOTCONN};
|
||||
|
||||
pub static OK: c_int = 0;
|
||||
pub static EOF: c_int = -4095;
|
||||
@ -576,6 +577,8 @@ extern {
|
||||
|
||||
// generic uv functions
|
||||
pub fn uv_loop_delete(l: *uv_loop_t);
|
||||
pub fn uv_ref(t: *uv_handle_t);
|
||||
pub fn uv_unref(t: *uv_handle_t);
|
||||
pub fn uv_handle_size(ty: uv_handle_type) -> size_t;
|
||||
pub fn uv_req_size(ty: uv_req_type) -> size_t;
|
||||
pub fn uv_run(l: *uv_loop_t, mode: uv_run_mode) -> c_int;
|
||||
|
Loading…
Reference in New Issue
Block a user