Implement process bindings to libuv

This is a re-landing of #8645, except that the bindings are *not* being used to
power std::run just yet. Instead, this adds the bindings as standalone bindings
inside the rt::io::process module.

I made one major change from before, having to do with how pipes are
created/bound. It's much clearer now when you can read/write to a pipe, as
there's an explicit difference (different types) between an unbound and a bound
pipe. The process configuration now takes unbound pipes (and consumes ownership
of them), and will return corresponding pipe structures back if spawning is
successful (otherwise everything is destroyed normally).
This commit is contained in:
Alex Crichton 2013-09-16 15:28:56 -07:00
parent d2b0b11aeb
commit cb7756a81d
11 changed files with 1076 additions and 67 deletions

View File

@ -260,6 +260,9 @@ pub use self::net::ip::IpAddr;
pub use self::net::tcp::TcpListener;
pub use self::net::tcp::TcpStream;
pub use self::net::udp::UdpStream;
pub use self::pipe::PipeStream;
pub use self::pipe::UnboundPipeStream;
pub use self::process::Process;
// Some extension traits that all Readers and Writers get.
pub use self::extensions::ReaderUtil;
@ -269,6 +272,12 @@ pub use self::extensions::WriterByteConversions;
/// Synchronous, non-blocking file I/O.
pub mod file;
/// Synchronous, in-memory I/O.
pub mod pipe;
/// Child process management.
pub mod process;
/// Synchronous, non-blocking network I/O.
pub mod net;

76
src/libstd/rt/io/pipe.rs Normal file
View File

@ -0,0 +1,76 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Synchronous, in-memory pipes.
//!
//! Currently these aren't particularly useful, there only exists bindings
//! enough so that pipes can be created to child processes.
use prelude::*;
use super::{Reader, Writer};
use rt::io::{io_error, read_error, EndOfFile};
use rt::local::Local;
use rt::rtio::{RtioPipe, RtioPipeObject, IoFactoryObject, IoFactory};
use rt::rtio::RtioUnboundPipeObject;
pub struct PipeStream(RtioPipeObject);
pub struct UnboundPipeStream(~RtioUnboundPipeObject);
impl PipeStream {
/// Creates a new pipe initialized, but not bound to any particular
/// source/destination
pub fn new() -> Option<UnboundPipeStream> {
let pipe = unsafe {
let io: *mut IoFactoryObject = Local::unsafe_borrow();
(*io).pipe_init(false)
};
match pipe {
Ok(p) => Some(UnboundPipeStream(p)),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
}
}
pub fn bind(inner: RtioPipeObject) -> PipeStream {
PipeStream(inner)
}
}
impl Reader for PipeStream {
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
match (**self).read(buf) {
Ok(read) => Some(read),
Err(ioerr) => {
// EOF is indicated by returning None
if ioerr.kind != EndOfFile {
read_error::cond.raise(ioerr);
}
return None;
}
}
}
fn eof(&mut self) -> bool { fail!() }
}
impl Writer for PipeStream {
fn write(&mut self, buf: &[u8]) {
match (**self).write(buf) {
Ok(_) => (),
Err(ioerr) => {
io_error::cond.raise(ioerr);
}
}
}
fn flush(&mut self) { fail!() }
}

278
src/libstd/rt/io/process.rs Normal file
View File

@ -0,0 +1,278 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Bindings for executing child processes
use prelude::*;
use libc;
use rt::io;
use rt::io::io_error;
use rt::local::Local;
use rt::rtio::{RtioProcess, RtioProcessObject, IoFactoryObject, IoFactory};
pub struct Process {
priv handle: ~RtioProcessObject,
io: ~[Option<io::PipeStream>],
}
/// This configuration describes how a new process should be spawned. This is
/// translated to libuv's own configuration
pub struct ProcessConfig<'self> {
/// Path to the program to run
program: &'self str,
/// Arguments to pass to the program (doesn't include the program itself)
args: &'self [~str],
/// Optional environment to specify for the program. If this is None, then
/// it will inherit the current process's environment.
env: Option<&'self [(~str, ~str)]>,
/// Optional working directory for the new process. If this is None, then
/// the current directory of the running process is inherited.
cwd: Option<&'self str>,
/// Any number of streams/file descriptors/pipes may be attached to this
/// process. This list enumerates the file descriptors and such for the
/// process to be spawned, and the file descriptors inherited will start at
/// 0 and go to the length of this array.
///
/// Standard file descriptors are:
///
/// 0 - stdin
/// 1 - stdout
/// 2 - stderr
io: ~[StdioContainer]
}
/// Describes what to do with a standard io stream for a child process.
pub enum StdioContainer {
/// This stream will be ignored. This is the equivalent of attaching the
/// stream to `/dev/null`
Ignored,
/// The specified file descriptor is inherited for the stream which it is
/// specified for.
InheritFd(libc::c_int),
// XXX: these two shouldn't have libuv-specific implementation details
/// The specified libuv stream is inherited for the corresponding file
/// descriptor it is assigned to.
// XXX: this needs to be thought out more.
//InheritStream(uv::net::StreamWatcher),
/// Creates a pipe for the specified file descriptor which will be directed
/// into the previously-initialized pipe passed in.
///
/// The first boolean argument is whether the pipe is readable, and the
/// second is whether it is writable. These properties are from the view of
/// the *child* process, not the parent process.
CreatePipe(io::UnboundPipeStream,
bool /* readable */,
bool /* writable */),
}
impl Process {
/// Creates a new pipe initialized, but not bound to any particular
/// source/destination
pub fn new(config: ProcessConfig) -> Option<Process> {
let process = unsafe {
let io: *mut IoFactoryObject = Local::unsafe_borrow();
(*io).spawn(config)
};
match process {
Ok((p, io)) => Some(Process{
handle: p,
io: io.move_iter().map(|p|
p.map_move(|p| io::PipeStream::bind(p))
).collect()
}),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
}
}
/// Returns the process id of this child process
pub fn id(&self) -> libc::pid_t { self.handle.id() }
/// Sends the specified signal to the child process, returning whether the
/// signal could be delivered or not.
///
/// Note that this is purely a wrapper around libuv's `uv_process_kill`
/// function.
///
/// If the signal delivery fails, then the `io_error` condition is raised on
pub fn signal(&mut self, signal: int) {
match self.handle.kill(signal) {
Ok(()) => {}
Err(err) => {
io_error::cond.raise(err)
}
}
}
/// Wait for the child to exit completely, returning the status that it
/// exited with. This function will continue to have the same return value
/// after it has been called at least once.
pub fn wait(&mut self) -> int { self.handle.wait() }
}
impl Drop for Process {
fn drop(&mut self) {
// Close all I/O before exiting to ensure that the child doesn't wait
// forever to print some text or something similar.
for _ in range(0, self.io.len()) {
self.io.pop();
}
self.wait();
}
}
#[cfg(test)]
mod tests {
use prelude::*;
use super::*;
use rt::io::{Reader, Writer};
use rt::io::pipe::*;
use str;
#[test]
#[cfg(unix, not(android))]
fn smoke() {
let io = ~[];
let args = ProcessConfig {
program: "/bin/sh",
args: [~"-c", ~"true"],
env: None,
cwd: None,
io: io,
};
let p = Process::new(args);
assert!(p.is_some());
let mut p = p.unwrap();
assert_eq!(p.wait(), 0);
}
#[test]
#[cfg(unix, not(android))]
fn smoke_failure() {
let io = ~[];
let args = ProcessConfig {
program: "if-this-is-a-binary-then-the-world-has-ended",
args: [],
env: None,
cwd: None,
io: io,
};
let p = Process::new(args);
assert!(p.is_some());
let mut p = p.unwrap();
assert!(p.wait() != 0);
}
#[test]
#[cfg(unix, not(android))]
fn exit_reported_right() {
let io = ~[];
let args = ProcessConfig {
program: "/bin/sh",
args: [~"-c", ~"exit 1"],
env: None,
cwd: None,
io: io,
};
let p = Process::new(args);
assert!(p.is_some());
let mut p = p.unwrap();
assert_eq!(p.wait(), 1);
}
fn read_all(input: &mut Reader) -> ~str {
let mut ret = ~"";
let mut buf = [0, ..1024];
loop {
match input.read(buf) {
None | Some(0) => { break }
Some(n) => { ret = ret + str::from_utf8(buf.slice_to(n)); }
}
}
return ret;
}
fn run_output(args: ProcessConfig) -> ~str {
let p = Process::new(args);
assert!(p.is_some());
let mut p = p.unwrap();
assert!(p.io[0].is_none());
assert!(p.io[1].is_some());
let ret = read_all(p.io[1].get_mut_ref() as &mut Reader);
assert_eq!(p.wait(), 0);
return ret;
}
#[test]
#[cfg(unix, not(android))]
fn stdout_works() {
let pipe = PipeStream::new().unwrap();
let io = ~[Ignored, CreatePipe(pipe, false, true)];
let args = ProcessConfig {
program: "/bin/sh",
args: [~"-c", ~"echo foobar"],
env: None,
cwd: None,
io: io,
};
assert_eq!(run_output(args), ~"foobar\n");
}
#[test]
#[cfg(unix, not(android))]
fn set_cwd_works() {
let pipe = PipeStream::new().unwrap();
let io = ~[Ignored, CreatePipe(pipe, false, true)];
let cwd = Some("/");
let args = ProcessConfig {
program: "/bin/sh",
args: [~"-c", ~"pwd"],
env: None,
cwd: cwd,
io: io,
};
assert_eq!(run_output(args), ~"/\n");
}
#[test]
#[cfg(unix, not(android))]
fn stdin_works() {
let input = PipeStream::new().unwrap();
let output = PipeStream::new().unwrap();
let io = ~[CreatePipe(input, true, false),
CreatePipe(output, false, true)];
let args = ProcessConfig {
program: "/bin/sh",
args: [~"-c", ~"read line; echo $line"],
env: None,
cwd: None,
io: io,
};
let mut p = Process::new(args).expect("didn't create a proces?!");
p.io[0].get_mut_ref().write("foobar".as_bytes());
p.io[0] = None; // close stdin;
let out = read_all(p.io[1].get_mut_ref() as &mut Reader);
assert_eq!(p.wait(), 0);
assert_eq!(out, ~"foobar\n");
}
}

View File

@ -8,11 +8,13 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use libc;
use option::*;
use result::*;
use libc::c_int;
use rt::io::IoError;
use super::io::process::ProcessConfig;
use super::io::net::ip::{IpAddr, SocketAddr};
use rt::uv::uvio;
use path::Path;
@ -31,6 +33,9 @@ pub type RtioTcpListenerObject = uvio::UvTcpListener;
pub type RtioUdpSocketObject = uvio::UvUdpSocket;
pub type RtioTimerObject = uvio::UvTimer;
pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback;
pub type RtioPipeObject = uvio::UvPipeStream;
pub type RtioUnboundPipeObject = uvio::UvUnboundPipe;
pub type RtioProcessObject = uvio::UvProcess;
pub trait EventLoop {
fn run(&mut self);
@ -79,6 +84,9 @@ pub trait IoFactory {
fn fs_rmdir<P: PathLike>(&mut self, path: &P) -> Result<(), IoError>;
fn fs_readdir<P: PathLike>(&mut self, path: &P, flags: c_int) ->
Result<~[Path], IoError>;
fn pipe_init(&mut self, ipc: bool) -> Result<~RtioUnboundPipeObject, IoError>;
fn spawn(&mut self, config: ProcessConfig)
-> Result<(~RtioProcessObject, ~[Option<RtioPipeObject>]), IoError>;
}
pub trait RtioTcpListener : RtioSocket {
@ -135,3 +143,14 @@ pub trait RtioFileStream {
fn tell(&self) -> Result<u64, IoError>;
fn flush(&mut self) -> Result<(), IoError>;
}
pub trait RtioProcess {
fn id(&self) -> libc::pid_t;
fn kill(&mut self, signal: int) -> Result<(), IoError>;
fn wait(&mut self) -> int;
}
pub trait RtioPipe {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
}

View File

@ -58,6 +58,8 @@ pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher};
pub use self::idle::IdleWatcher;
pub use self::timer::TimerWatcher;
pub use self::async::AsyncWatcher;
pub use self::process::Process;
pub use self::pipe::Pipe;
/// The implementation of `rtio` for libuv
pub mod uvio;
@ -71,6 +73,8 @@ pub mod idle;
pub mod timer;
pub mod async;
pub mod addrinfo;
pub mod process;
pub mod pipe;
/// XXX: Loop(*handle) is buggy with destructors. Normal structs
/// with dtors may not be destructured, but tuple structs can,
@ -127,6 +131,8 @@ pub type NullCallback = ~fn();
pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
pub type FsCallback = ~fn(&mut FsRequest, Option<UvError>);
// first int is exit_status, second is term_signal
pub type ExitCallback = ~fn(Process, int, int, Option<UvError>);
pub type TimerCallback = ~fn(TimerWatcher, Option<UvError>);
pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>);
pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option<UvError>);
@ -145,7 +151,8 @@ struct WatcherData {
timer_cb: Option<TimerCallback>,
async_cb: Option<AsyncCallback>,
udp_recv_cb: Option<UdpReceiveCallback>,
udp_send_cb: Option<UdpSendCallback>
udp_send_cb: Option<UdpSendCallback>,
exit_cb: Option<ExitCallback>,
}
pub trait WatcherInterop {
@ -177,7 +184,8 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
timer_cb: None,
async_cb: None,
udp_recv_cb: None,
udp_send_cb: None
udp_send_cb: None,
exit_cb: None,
};
let data = transmute::<~WatcherData, *c_void>(data);
uvll::set_data_for_uv_handle(self.native_handle(), data);

66
src/libstd/rt/uv/pipe.rs Normal file
View File

@ -0,0 +1,66 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::*;
use libc;
use rt::uv;
use rt::uv::net;
use rt::uv::uvll;
pub struct Pipe(*uvll::uv_pipe_t);
impl uv::Watcher for Pipe {}
impl Pipe {
pub fn new(loop_: &uv::Loop, ipc: bool) -> Pipe {
unsafe {
let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE);
assert!(handle.is_not_null());
let ipc = ipc as libc::c_int;
assert_eq!(uvll::pipe_init(loop_.native_handle(), handle, ipc), 0);
let mut ret: Pipe =
uv::NativeHandle::from_native_handle(handle);
ret.install_watcher_data();
ret
}
}
pub fn as_stream(&self) -> net::StreamWatcher {
net::StreamWatcher(**self as *uvll::uv_stream_t)
}
pub fn close(self, cb: uv::NullCallback) {
{
let mut this = self;
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe { uvll::close(self.native_handle(), close_cb); }
extern fn close_cb(handle: *uvll::uv_pipe_t) {
let mut process: Pipe = uv::NativeHandle::from_native_handle(handle);
process.get_watcher_data().close_cb.take_unwrap()();
process.drop_watcher_data();
unsafe { uvll::free_handle(handle as *libc::c_void) }
}
}
}
impl uv::NativeHandle<*uvll::uv_pipe_t> for Pipe {
fn from_native_handle(handle: *uvll::uv_pipe_t) -> Pipe {
Pipe(handle)
}
fn native_handle(&self) -> *uvll::uv_pipe_t {
match self { &Pipe(ptr) => ptr }
}
}

219
src/libstd/rt/uv/process.rs Normal file
View File

@ -0,0 +1,219 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::*;
use cell::Cell;
use libc;
use ptr;
use util;
use vec;
use rt::io::process::*;
use rt::uv;
use rt::uv::uvio::UvPipeStream;
use rt::uv::uvll;
/// A process wraps the handle of the underlying uv_process_t.
pub struct Process(*uvll::uv_process_t);
impl uv::Watcher for Process {}
impl Process {
/// Creates a new process, ready to spawn inside an event loop
pub fn new() -> Process {
let handle = unsafe { uvll::malloc_handle(uvll::UV_PROCESS) };
assert!(handle.is_not_null());
let mut ret: Process = uv::NativeHandle::from_native_handle(handle);
ret.install_watcher_data();
return ret;
}
/// Spawn a new process inside the specified event loop.
///
/// The `config` variable will be passed down to libuv, and the `exit_cb`
/// will be run only once, when the process exits.
///
/// Returns either the corresponding process object or an error which
/// occurred.
pub fn spawn(&mut self, loop_: &uv::Loop, mut config: ProcessConfig,
exit_cb: uv::ExitCallback)
-> Result<~[Option<UvPipeStream>], uv::UvError>
{
let cwd = config.cwd.map_move(|s| s.to_c_str());
extern fn on_exit(p: *uvll::uv_process_t,
exit_status: libc::c_int,
term_signal: libc::c_int) {
let mut p: Process = uv::NativeHandle::from_native_handle(p);
let err = match exit_status {
0 => None,
_ => uv::status_to_maybe_uv_error(-1)
};
p.get_watcher_data().exit_cb.take_unwrap()(p,
exit_status as int,
term_signal as int,
err);
}
let io = util::replace(&mut config.io, ~[]);
let mut stdio = vec::with_capacity::<uvll::uv_stdio_container_t>(io.len());
let mut ret_io = vec::with_capacity(io.len());
unsafe {
vec::raw::set_len(&mut stdio, io.len());
for (slot, other) in stdio.iter().zip(io.move_iter()) {
let io = set_stdio(slot as *uvll::uv_stdio_container_t, other);
ret_io.push(io);
}
}
let exit_cb = Cell::new(exit_cb);
let ret_io = Cell::new(ret_io);
do with_argv(config.program, config.args) |argv| {
do with_env(config.env) |envp| {
let options = uvll::uv_process_options_t {
exit_cb: on_exit,
file: unsafe { *argv },
args: argv,
env: envp,
cwd: match cwd {
Some(ref cwd) => cwd.with_ref(|p| p),
None => ptr::null(),
},
flags: 0,
stdio_count: stdio.len() as libc::c_int,
stdio: stdio.as_imm_buf(|p, _| p),
uid: 0,
gid: 0,
};
match unsafe {
uvll::spawn(loop_.native_handle(), **self, options)
} {
0 => {
(*self).get_watcher_data().exit_cb = Some(exit_cb.take());
Ok(ret_io.take())
}
err => Err(uv::UvError(err))
}
}
}
}
/// Sends a signal to this process.
///
/// This is a wrapper around `uv_process_kill`
pub fn kill(&self, signum: int) -> Result<(), uv::UvError> {
match unsafe {
uvll::process_kill(self.native_handle(), signum as libc::c_int)
} {
0 => Ok(()),
err => Err(uv::UvError(err))
}
}
/// Returns the process id of a spawned process
pub fn pid(&self) -> libc::pid_t {
unsafe { uvll::process_pid(**self) as libc::pid_t }
}
/// Closes this handle, invoking the specified callback once closed
pub fn close(self, cb: uv::NullCallback) {
{
let mut this = self;
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe { uvll::close(self.native_handle(), close_cb); }
extern fn close_cb(handle: *uvll::uv_process_t) {
let mut process: Process = uv::NativeHandle::from_native_handle(handle);
process.get_watcher_data().close_cb.take_unwrap()();
process.drop_watcher_data();
unsafe { uvll::free_handle(handle as *libc::c_void) }
}
}
}
unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,
io: StdioContainer) -> Option<UvPipeStream> {
match io {
Ignored => {
uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE);
None
}
InheritFd(fd) => {
uvll::set_stdio_container_flags(dst, uvll::STDIO_INHERIT_FD);
uvll::set_stdio_container_fd(dst, fd);
None
}
CreatePipe(pipe, readable, writable) => {
let mut flags = uvll::STDIO_CREATE_PIPE as libc::c_int;
if readable {
flags |= uvll::STDIO_READABLE_PIPE as libc::c_int;
}
if writable {
flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int;
}
let handle = pipe.pipe.as_stream().native_handle();
uvll::set_stdio_container_flags(dst, flags);
uvll::set_stdio_container_stream(dst, handle);
Some(pipe.bind())
}
}
}
/// Converts the program and arguments to the argv array expected by libuv
fn with_argv<T>(prog: &str, args: &[~str], f: &fn(**libc::c_char) -> T) -> T {
// First, allocation space to put all the C-strings (we need to have
// ownership of them somewhere
let mut c_strs = vec::with_capacity(args.len() + 1);
c_strs.push(prog.to_c_str());
for arg in args.iter() {
c_strs.push(arg.to_c_str());
}
// Next, create the char** array
let mut c_args = vec::with_capacity(c_strs.len() + 1);
for s in c_strs.iter() {
c_args.push(s.with_ref(|p| p));
}
c_args.push(ptr::null());
c_args.as_imm_buf(|buf, _| f(buf))
}
/// Converts the environment to the env array expected by libuv
fn with_env<T>(env: Option<&[(~str, ~str)]>, f: &fn(**libc::c_char) -> T) -> T {
let env = match env {
Some(s) => s,
None => { return f(ptr::null()); }
};
// As with argv, create some temporary storage and then the actual array
let mut envp = vec::with_capacity(env.len());
for &(ref key, ref value) in env.iter() {
envp.push(fmt!("%s=%s", *key, *value).to_c_str());
}
let mut c_envp = vec::with_capacity(envp.len() + 1);
for s in envp.iter() {
c_envp.push(s.with_ref(|p| p));
}
c_envp.push(ptr::null());
c_envp.as_imm_buf(|buf, _| f(buf))
}
impl uv::NativeHandle<*uvll::uv_process_t> for Process {
fn from_native_handle(handle: *uvll::uv_process_t) -> Process {
Process(handle)
}
fn native_handle(&self) -> *uvll::uv_process_t {
match self { &Process(ptr) => ptr }
}
}

View File

@ -13,7 +13,7 @@ use cast::transmute;
use cast;
use cell::Cell;
use clone::Clone;
use libc::{c_int, c_uint, c_void};
use libc::{c_int, c_uint, c_void, pid_t};
use ops::Drop;
use option::*;
use ptr;
@ -22,6 +22,8 @@ use result::*;
use rt::io::IoError;
use rt::io::net::ip::{SocketAddr, IpAddr};
use rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, SeekEnd};
use rt::io::process::ProcessConfig;
use rt::kill::BlockedTask;
use rt::local::Local;
use rt::rtio::*;
use rt::sched::{Scheduler, SchedHandle};
@ -735,6 +737,64 @@ impl IoFactory for UvIoFactory {
assert!(!result_cell.is_empty());
return result_cell.take();
}
fn pipe_init(&mut self, ipc: bool) -> Result<~RtioUnboundPipeObject, IoError> {
let home = get_handle_to_current_scheduler!();
Ok(~UvUnboundPipe { pipe: Pipe::new(self.uv_loop(), ipc), home: home })
}
fn spawn(&mut self, config: ProcessConfig)
-> Result<(~RtioProcessObject, ~[Option<RtioPipeObject>]), IoError>
{
// Sadly, we must create the UvProcess before we actually call uv_spawn
// so that the exit_cb can close over it and notify it when the process
// has exited.
let mut ret = ~UvProcess {
process: Process::new(),
home: None,
exit_status: None,
term_signal: None,
exit_error: None,
descheduled: None,
};
let ret_ptr = unsafe {
*cast::transmute::<&~UvProcess, &*mut UvProcess>(&ret)
};
// The purpose of this exit callback is to record the data about the
// exit and then wake up the task which may be waiting for the process
// to exit. This is all performed in the current io-loop, and the
// implementation of UvProcess ensures that reading these fields always
// occurs on the current io-loop.
let exit_cb: ExitCallback = |_, exit_status, term_signal, error| {
unsafe {
assert!((*ret_ptr).exit_status.is_none());
(*ret_ptr).exit_status = Some(exit_status);
(*ret_ptr).term_signal = Some(term_signal);
(*ret_ptr).exit_error = error;
match (*ret_ptr).descheduled.take() {
Some(task) => {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task);
}
None => {}
}
}
};
match ret.process.spawn(self.uv_loop(), config, exit_cb) {
Ok(io) => {
// Only now do we actually get a handle to this scheduler.
ret.home = Some(get_handle_to_current_scheduler!());
Ok((ret, io))
}
Err(uverr) => {
// We still need to close the process handle we created, but
// that's taken care for us in the destructor of UvProcess
Err(uv_error_to_io_error(uverr))
}
}
}
}
pub struct UvTcpListener {
@ -856,6 +916,126 @@ impl RtioTcpAcceptor for UvTcpAcceptor {
}
}
fn read_stream(mut watcher: StreamWatcher,
scheduler: ~Scheduler,
buf: &mut [u8]) -> Result<uint, IoError> {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |_sched, task| {
let task_cell = Cell::new(task);
// XXX: We shouldn't reallocate these callbacks every
// call to read
let alloc: AllocCallback = |_| unsafe {
slice_to_uv_buf(*buf_ptr)
};
do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
// Stop reading so that no read callbacks are
// triggered before the user calls `read` again.
// XXX: Is there a performance impact to calling
// stop here?
watcher.read_stop();
let result = if status.is_none() {
assert!(nread >= 0);
Ok(nread as uint)
} else {
Err(uv_error_to_io_error(status.unwrap()))
};
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
assert!(!result_cell.is_empty());
result_cell.take()
}
fn write_stream(mut watcher: StreamWatcher,
scheduler: ~Scheduler,
buf: &[u8]) -> Result<(), IoError> {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
do watcher.write(buf) |_watcher, status| {
let result = if status.is_none() {
Ok(())
} else {
Err(uv_error_to_io_error(status.unwrap()))
};
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
assert!(!result_cell.is_empty());
result_cell.take()
}
pub struct UvUnboundPipe {
pipe: Pipe,
home: SchedHandle,
}
impl HomingIO for UvUnboundPipe {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
}
impl Drop for UvUnboundPipe {
fn drop(&mut self) {
do self.home_for_io |self_| {
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.pipe.close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
}
}
impl UvUnboundPipe {
pub unsafe fn bind(~self) -> UvPipeStream {
UvPipeStream { inner: self }
}
}
pub struct UvPipeStream {
priv inner: ~UvUnboundPipe,
}
impl UvPipeStream {
pub fn new(inner: ~UvUnboundPipe) -> UvPipeStream {
UvPipeStream { inner: inner }
}
}
impl RtioPipe for UvPipeStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
do self.inner.home_for_io_with_sched |self_, scheduler| {
read_stream(self_.pipe.as_stream(), scheduler, buf)
}
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
do self.inner.home_for_io_with_sched |self_, scheduler| {
write_stream(self_.pipe.as_stream(), scheduler, buf)
}
}
}
pub struct UvTcpStream {
watcher: TcpWatcher,
home: SchedHandle,
@ -890,70 +1070,13 @@ impl RtioSocket for UvTcpStream {
impl RtioTcpStream for UvTcpStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
do self.home_for_io_with_sched |self_, scheduler| {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |_sched, task| {
let task_cell = Cell::new(task);
// XXX: We shouldn't reallocate these callbacks every
// call to read
let alloc: AllocCallback = |_| unsafe {
slice_to_uv_buf(*buf_ptr)
};
let mut watcher = self_.watcher.as_stream();
do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
// Stop reading so that no read callbacks are
// triggered before the user calls `read` again.
// XXX: Is there a performance impact to calling
// stop here?
watcher.read_stop();
let result = if status.is_none() {
assert!(nread >= 0);
Ok(nread as uint)
} else {
Err(uv_error_to_io_error(status.unwrap()))
};
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
assert!(!result_cell.is_empty());
result_cell.take()
read_stream(self_.watcher.as_stream(), scheduler, buf)
}
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
do self.home_for_io_with_sched |self_, scheduler| {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
let mut watcher = self_.watcher.as_stream();
do watcher.write(buf) |_watcher, status| {
let result = if status.is_none() {
Ok(())
} else {
Err(uv_error_to_io_error(status.unwrap()))
};
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
assert!(!result_cell.is_empty());
result_cell.take()
write_stream(self_.watcher.as_stream(), scheduler, buf)
}
}
@ -1240,8 +1363,7 @@ impl UvTimer {
impl Drop for UvTimer {
fn drop(&mut self) {
let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) };
do self_.home_for_io_with_sched |self_, scheduler| {
do self.home_for_io_with_sched |self_, scheduler| {
rtdebug!("closing UvTimer");
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
@ -1356,13 +1478,12 @@ impl UvFileStream {
impl Drop for UvFileStream {
fn drop(&mut self) {
let self_ = unsafe { transmute::<&UvFileStream, &mut UvFileStream>(self) };
if self.close_on_drop {
do self_.home_for_io_with_sched |self_, scheduler| {
do self.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
let close_req = file::FsRequest::new();
do close_req.close(&self.loop_, self_.fd) |_,_| {
do close_req.close(&self_.loop_, self_.fd) |_,_| {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
};
@ -1405,6 +1526,86 @@ impl RtioFileStream for UvFileStream {
}
}
pub struct UvProcess {
process: process::Process,
// Sadly, this structure must be created before we return it, so in that
// brief interim the `home` is None.
home: Option<SchedHandle>,
// All None until the process exits (exit_error may stay None)
priv exit_status: Option<int>,
priv term_signal: Option<int>,
priv exit_error: Option<UvError>,
// Used to store which task to wake up from the exit_cb
priv descheduled: Option<BlockedTask>,
}
impl HomingIO for UvProcess {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.home.get_mut_ref() }
}
impl Drop for UvProcess {
fn drop(&mut self) {
let close = |self_: &mut UvProcess| {
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
let task = Cell::new(task);
do self_.process.close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task.take());
}
}
};
// If home is none, then this process never actually successfully
// spawned, so there's no need to switch event loops
if self.home.is_none() {
close(self)
} else {
self.home_for_io(close)
}
}
}
impl RtioProcess for UvProcess {
fn id(&self) -> pid_t {
self.process.pid()
}
fn kill(&mut self, signal: int) -> Result<(), IoError> {
do self.home_for_io |self_| {
match self_.process.kill(signal) {
Ok(()) => Ok(()),
Err(uverr) => Err(uv_error_to_io_error(uverr))
}
}
}
fn wait(&mut self) -> int {
// Make sure (on the home scheduler) that we have an exit status listed
do self.home_for_io |self_| {
match self_.exit_status {
Some(*) => {}
None => {
// If there's no exit code previously listed, then the
// process's exit callback has yet to be invoked. We just
// need to deschedule ourselves and wait to be reawoken.
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
assert!(self_.descheduled.is_none());
self_.descheduled = Some(task);
}
assert!(self_.exit_status.is_some());
}
}
}
self.exit_status.unwrap()
}
}
#[test]
fn test_simple_io_no_connect() {
do run_in_mt_newsched_task {

View File

@ -66,6 +66,19 @@ pub mod errors {
pub static EPIPE: c_int = -libc::EPIPE;
}
pub static PROCESS_SETUID: c_int = 1 << 0;
pub static PROCESS_SETGID: c_int = 1 << 1;
pub static PROCESS_WINDOWS_VERBATIM_ARGUMENTS: c_int = 1 << 2;
pub static PROCESS_DETACHED: c_int = 1 << 3;
pub static PROCESS_WINDOWS_HIDE: c_int = 1 << 4;
pub static STDIO_IGNORE: c_int = 0x00;
pub static STDIO_CREATE_PIPE: c_int = 0x01;
pub static STDIO_INHERIT_FD: c_int = 0x02;
pub static STDIO_INHERIT_STREAM: c_int = 0x04;
pub static STDIO_READABLE_PIPE: c_int = 0x10;
pub static STDIO_WRITABLE_PIPE: c_int = 0x20;
// see libuv/include/uv-unix.h
#[cfg(unix)]
pub struct uv_buf_t {
@ -80,6 +93,26 @@ pub struct uv_buf_t {
base: *u8,
}
pub struct uv_process_options_t {
exit_cb: uv_exit_cb,
file: *libc::c_char,
args: **libc::c_char,
env: **libc::c_char,
cwd: *libc::c_char,
flags: libc::c_uint,
stdio_count: libc::c_int,
stdio: *uv_stdio_container_t,
uid: uv_uid_t,
gid: uv_gid_t,
}
// These fields are private because they must be interfaced with through the
// functions below.
pub struct uv_stdio_container_t {
priv flags: libc::c_int,
priv stream: *uv_stream_t,
}
pub type uv_handle_t = c_void;
pub type uv_loop_t = c_void;
pub type uv_idle_t = c_void;
@ -94,6 +127,8 @@ pub type uv_stream_t = c_void;
pub type uv_fs_t = c_void;
pub type uv_udp_send_t = c_void;
pub type uv_getaddrinfo_t = c_void;
pub type uv_process_t = c_void;
pub type uv_pipe_t = c_void;
pub struct uv_timespec_t {
tv_sec: libc::c_long,
@ -178,6 +213,9 @@ pub type uv_write_cb = extern "C" fn(handle: *uv_write_t,
pub type uv_getaddrinfo_cb = extern "C" fn(req: *uv_getaddrinfo_t,
status: c_int,
res: *addrinfo);
pub type uv_exit_cb = extern "C" fn(handle: *uv_process_t,
exit_status: c_int,
term_signal: c_int);
pub type sockaddr = c_void;
pub type sockaddr_in = c_void;
@ -226,6 +264,11 @@ pub struct addrinfo {
ai_next: *addrinfo
}
#[cfg(unix)] pub type uv_uid_t = libc::types::os::arch::posix88::uid_t;
#[cfg(unix)] pub type uv_gid_t = libc::types::os::arch::posix88::gid_t;
#[cfg(windows)] pub type uv_uid_t = libc::c_uchar;
#[cfg(windows)] pub type uv_gid_t = libc::c_uchar;
#[deriving(Eq)]
pub enum uv_handle_type {
UV_UNKNOWN_HANDLE,
@ -787,6 +830,45 @@ pub unsafe fn fs_req_cleanup(req: *uv_fs_t) {
rust_uv_fs_req_cleanup(req);
}
pub unsafe fn spawn(loop_ptr: *c_void, result: *uv_process_t,
options: uv_process_options_t) -> c_int {
#[fixed_stack_segment]; #[inline(never)];
return rust_uv_spawn(loop_ptr, result, options);
}
pub unsafe fn process_kill(p: *uv_process_t, signum: c_int) -> c_int {
#[fixed_stack_segment]; #[inline(never)];
return rust_uv_process_kill(p, signum);
}
pub unsafe fn process_pid(p: *uv_process_t) -> c_int {
#[fixed_stack_segment]; #[inline(never)];
return rust_uv_process_pid(p);
}
pub unsafe fn set_stdio_container_flags(c: *uv_stdio_container_t,
flags: libc::c_int) {
#[fixed_stack_segment]; #[inline(never)];
rust_set_stdio_container_flags(c, flags);
}
pub unsafe fn set_stdio_container_fd(c: *uv_stdio_container_t,
fd: libc::c_int) {
#[fixed_stack_segment]; #[inline(never)];
rust_set_stdio_container_fd(c, fd);
}
pub unsafe fn set_stdio_container_stream(c: *uv_stdio_container_t,
stream: *uv_stream_t) {
#[fixed_stack_segment]; #[inline(never)];
rust_set_stdio_container_stream(c, stream);
}
pub unsafe fn pipe_init(loop_ptr: *c_void, p: *uv_pipe_t, ipc: c_int) -> c_int {
#[fixed_stack_segment]; #[inline(never)];
rust_uv_pipe_init(loop_ptr, p, ipc)
}
// data access helpers
pub unsafe fn get_result_from_fs_req(req: *uv_fs_t) -> c_int {
#[fixed_stack_segment]; #[inline(never)];
@ -1009,4 +1091,13 @@ extern {
node: *c_char, service: *c_char,
hints: *addrinfo) -> c_int;
fn rust_uv_freeaddrinfo(ai: *addrinfo);
fn rust_uv_spawn(loop_ptr: *c_void, outptr: *uv_process_t,
options: uv_process_options_t) -> c_int;
fn rust_uv_process_kill(p: *uv_process_t, signum: c_int) -> c_int;
fn rust_uv_process_pid(p: *uv_process_t) -> c_int;
fn rust_set_stdio_container_flags(c: *uv_stdio_container_t, flags: c_int);
fn rust_set_stdio_container_fd(c: *uv_stdio_container_t, fd: c_int);
fn rust_set_stdio_container_stream(c: *uv_stdio_container_t,
stream: *uv_stream_t);
fn rust_uv_pipe_init(loop_ptr: *c_void, p: *uv_pipe_t, ipc: c_int) -> c_int;
}

View File

@ -602,3 +602,38 @@ extern "C" int
rust_uv_fs_readdir(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags, uv_fs_cb cb) {
return uv_fs_readdir(loop, req, path, flags, cb);
}
extern "C" int
rust_uv_spawn(uv_loop_t *loop, uv_process_t *p, uv_process_options_t options) {
return uv_spawn(loop, p, options);
}
extern "C" int
rust_uv_process_kill(uv_process_t *p, int signum) {
return uv_process_kill(p, signum);
}
extern "C" void
rust_set_stdio_container_flags(uv_stdio_container_t *c, int flags) {
c->flags = (uv_stdio_flags) flags;
}
extern "C" void
rust_set_stdio_container_fd(uv_stdio_container_t *c, int fd) {
c->data.fd = fd;
}
extern "C" void
rust_set_stdio_container_stream(uv_stdio_container_t *c, uv_stream_t *stream) {
c->data.stream = stream;
}
extern "C" int
rust_uv_process_pid(uv_process_t* p) {
return p->pid;
}
extern "C" int
rust_uv_pipe_init(uv_loop_t *loop, uv_pipe_t* p, int ipc) {
return uv_pipe_init(loop, p, ipc);
}

View File

@ -198,3 +198,10 @@ rust_drop_linenoise_lock
rust_get_test_int
rust_get_task
rust_uv_get_loop_from_getaddrinfo_req
rust_uv_spawn
rust_uv_process_kill
rust_set_stdio_container_flags
rust_set_stdio_container_fd
rust_set_stdio_container_stream
rust_uv_process_pid
rust_uv_pipe_init