From cb7756a81d3cbc48e79ffaa1a1f9d4934b581166 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 16 Sep 2013 15:28:56 -0700 Subject: [PATCH] Implement process bindings to libuv This is a re-landing of #8645, except that the bindings are *not* being used to power std::run just yet. Instead, this adds the bindings as standalone bindings inside the rt::io::process module. I made one major change from before, having to do with how pipes are created/bound. It's much clearer now when you can read/write to a pipe, as there's an explicit difference (different types) between an unbound and a bound pipe. The process configuration now takes unbound pipes (and consumes ownership of them), and will return corresponding pipe structures back if spawning is successful (otherwise everything is destroyed normally). --- src/libstd/rt/io/mod.rs | 9 + src/libstd/rt/io/pipe.rs | 76 +++++++++ src/libstd/rt/io/process.rs | 278 ++++++++++++++++++++++++++++++ src/libstd/rt/rtio.rs | 19 +++ src/libstd/rt/uv/mod.rs | 12 +- src/libstd/rt/uv/pipe.rs | 66 +++++++ src/libstd/rt/uv/process.rs | 219 ++++++++++++++++++++++++ src/libstd/rt/uv/uvio.rs | 331 +++++++++++++++++++++++++++++------- src/libstd/rt/uv/uvll.rs | 91 ++++++++++ src/rt/rust_uv.cpp | 35 ++++ src/rt/rustrt.def.in | 7 + 11 files changed, 1076 insertions(+), 67 deletions(-) create mode 100644 src/libstd/rt/io/pipe.rs create mode 100644 src/libstd/rt/io/process.rs create mode 100644 src/libstd/rt/uv/pipe.rs create mode 100644 src/libstd/rt/uv/process.rs diff --git a/src/libstd/rt/io/mod.rs b/src/libstd/rt/io/mod.rs index bfec2e9bdf8..6b405b0948a 100644 --- a/src/libstd/rt/io/mod.rs +++ b/src/libstd/rt/io/mod.rs @@ -260,6 +260,9 @@ pub use self::net::ip::IpAddr; pub use self::net::tcp::TcpListener; pub use self::net::tcp::TcpStream; pub use self::net::udp::UdpStream; +pub use self::pipe::PipeStream; +pub use self::pipe::UnboundPipeStream; +pub use self::process::Process; // Some extension traits that all Readers and Writers get. pub use self::extensions::ReaderUtil; @@ -269,6 +272,12 @@ pub use self::extensions::WriterByteConversions; /// Synchronous, non-blocking file I/O. pub mod file; +/// Synchronous, in-memory I/O. +pub mod pipe; + +/// Child process management. +pub mod process; + /// Synchronous, non-blocking network I/O. pub mod net; diff --git a/src/libstd/rt/io/pipe.rs b/src/libstd/rt/io/pipe.rs new file mode 100644 index 00000000000..7e6c59ffd0b --- /dev/null +++ b/src/libstd/rt/io/pipe.rs @@ -0,0 +1,76 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Synchronous, in-memory pipes. +//! +//! Currently these aren't particularly useful, there only exists bindings +//! enough so that pipes can be created to child processes. + +use prelude::*; +use super::{Reader, Writer}; +use rt::io::{io_error, read_error, EndOfFile}; +use rt::local::Local; +use rt::rtio::{RtioPipe, RtioPipeObject, IoFactoryObject, IoFactory}; +use rt::rtio::RtioUnboundPipeObject; + +pub struct PipeStream(RtioPipeObject); +pub struct UnboundPipeStream(~RtioUnboundPipeObject); + +impl PipeStream { + /// Creates a new pipe initialized, but not bound to any particular + /// source/destination + pub fn new() -> Option { + let pipe = unsafe { + let io: *mut IoFactoryObject = Local::unsafe_borrow(); + (*io).pipe_init(false) + }; + match pipe { + Ok(p) => Some(UnboundPipeStream(p)), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + } + + pub fn bind(inner: RtioPipeObject) -> PipeStream { + PipeStream(inner) + } +} + +impl Reader for PipeStream { + fn read(&mut self, buf: &mut [u8]) -> Option { + match (**self).read(buf) { + Ok(read) => Some(read), + Err(ioerr) => { + // EOF is indicated by returning None + if ioerr.kind != EndOfFile { + read_error::cond.raise(ioerr); + } + return None; + } + } + } + + fn eof(&mut self) -> bool { fail!() } +} + +impl Writer for PipeStream { + fn write(&mut self, buf: &[u8]) { + match (**self).write(buf) { + Ok(_) => (), + Err(ioerr) => { + io_error::cond.raise(ioerr); + } + } + } + + fn flush(&mut self) { fail!() } +} diff --git a/src/libstd/rt/io/process.rs b/src/libstd/rt/io/process.rs new file mode 100644 index 00000000000..e92b0d3b7b5 --- /dev/null +++ b/src/libstd/rt/io/process.rs @@ -0,0 +1,278 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Bindings for executing child processes + +use prelude::*; + +use libc; +use rt::io; +use rt::io::io_error; +use rt::local::Local; +use rt::rtio::{RtioProcess, RtioProcessObject, IoFactoryObject, IoFactory}; + +pub struct Process { + priv handle: ~RtioProcessObject, + io: ~[Option], +} + +/// This configuration describes how a new process should be spawned. This is +/// translated to libuv's own configuration +pub struct ProcessConfig<'self> { + /// Path to the program to run + program: &'self str, + + /// Arguments to pass to the program (doesn't include the program itself) + args: &'self [~str], + + /// Optional environment to specify for the program. If this is None, then + /// it will inherit the current process's environment. + env: Option<&'self [(~str, ~str)]>, + + /// Optional working directory for the new process. If this is None, then + /// the current directory of the running process is inherited. + cwd: Option<&'self str>, + + /// Any number of streams/file descriptors/pipes may be attached to this + /// process. This list enumerates the file descriptors and such for the + /// process to be spawned, and the file descriptors inherited will start at + /// 0 and go to the length of this array. + /// + /// Standard file descriptors are: + /// + /// 0 - stdin + /// 1 - stdout + /// 2 - stderr + io: ~[StdioContainer] +} + +/// Describes what to do with a standard io stream for a child process. +pub enum StdioContainer { + /// This stream will be ignored. This is the equivalent of attaching the + /// stream to `/dev/null` + Ignored, + + /// The specified file descriptor is inherited for the stream which it is + /// specified for. + InheritFd(libc::c_int), + + // XXX: these two shouldn't have libuv-specific implementation details + + /// The specified libuv stream is inherited for the corresponding file + /// descriptor it is assigned to. + // XXX: this needs to be thought out more. + //InheritStream(uv::net::StreamWatcher), + + /// Creates a pipe for the specified file descriptor which will be directed + /// into the previously-initialized pipe passed in. + /// + /// The first boolean argument is whether the pipe is readable, and the + /// second is whether it is writable. These properties are from the view of + /// the *child* process, not the parent process. + CreatePipe(io::UnboundPipeStream, + bool /* readable */, + bool /* writable */), +} + +impl Process { + /// Creates a new pipe initialized, but not bound to any particular + /// source/destination + pub fn new(config: ProcessConfig) -> Option { + let process = unsafe { + let io: *mut IoFactoryObject = Local::unsafe_borrow(); + (*io).spawn(config) + }; + match process { + Ok((p, io)) => Some(Process{ + handle: p, + io: io.move_iter().map(|p| + p.map_move(|p| io::PipeStream::bind(p)) + ).collect() + }), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + } + + /// Returns the process id of this child process + pub fn id(&self) -> libc::pid_t { self.handle.id() } + + /// Sends the specified signal to the child process, returning whether the + /// signal could be delivered or not. + /// + /// Note that this is purely a wrapper around libuv's `uv_process_kill` + /// function. + /// + /// If the signal delivery fails, then the `io_error` condition is raised on + pub fn signal(&mut self, signal: int) { + match self.handle.kill(signal) { + Ok(()) => {} + Err(err) => { + io_error::cond.raise(err) + } + } + } + + /// Wait for the child to exit completely, returning the status that it + /// exited with. This function will continue to have the same return value + /// after it has been called at least once. + pub fn wait(&mut self) -> int { self.handle.wait() } +} + +impl Drop for Process { + fn drop(&mut self) { + // Close all I/O before exiting to ensure that the child doesn't wait + // forever to print some text or something similar. + for _ in range(0, self.io.len()) { + self.io.pop(); + } + + self.wait(); + } +} + +#[cfg(test)] +mod tests { + use prelude::*; + use super::*; + + use rt::io::{Reader, Writer}; + use rt::io::pipe::*; + use str; + + #[test] + #[cfg(unix, not(android))] + fn smoke() { + let io = ~[]; + let args = ProcessConfig { + program: "/bin/sh", + args: [~"-c", ~"true"], + env: None, + cwd: None, + io: io, + }; + let p = Process::new(args); + assert!(p.is_some()); + let mut p = p.unwrap(); + assert_eq!(p.wait(), 0); + } + + #[test] + #[cfg(unix, not(android))] + fn smoke_failure() { + let io = ~[]; + let args = ProcessConfig { + program: "if-this-is-a-binary-then-the-world-has-ended", + args: [], + env: None, + cwd: None, + io: io, + }; + let p = Process::new(args); + assert!(p.is_some()); + let mut p = p.unwrap(); + assert!(p.wait() != 0); + } + + #[test] + #[cfg(unix, not(android))] + fn exit_reported_right() { + let io = ~[]; + let args = ProcessConfig { + program: "/bin/sh", + args: [~"-c", ~"exit 1"], + env: None, + cwd: None, + io: io, + }; + let p = Process::new(args); + assert!(p.is_some()); + let mut p = p.unwrap(); + assert_eq!(p.wait(), 1); + } + + fn read_all(input: &mut Reader) -> ~str { + let mut ret = ~""; + let mut buf = [0, ..1024]; + loop { + match input.read(buf) { + None | Some(0) => { break } + Some(n) => { ret = ret + str::from_utf8(buf.slice_to(n)); } + } + } + return ret; + } + + fn run_output(args: ProcessConfig) -> ~str { + let p = Process::new(args); + assert!(p.is_some()); + let mut p = p.unwrap(); + assert!(p.io[0].is_none()); + assert!(p.io[1].is_some()); + let ret = read_all(p.io[1].get_mut_ref() as &mut Reader); + assert_eq!(p.wait(), 0); + return ret; + } + + #[test] + #[cfg(unix, not(android))] + fn stdout_works() { + let pipe = PipeStream::new().unwrap(); + let io = ~[Ignored, CreatePipe(pipe, false, true)]; + let args = ProcessConfig { + program: "/bin/sh", + args: [~"-c", ~"echo foobar"], + env: None, + cwd: None, + io: io, + }; + assert_eq!(run_output(args), ~"foobar\n"); + } + + #[test] + #[cfg(unix, not(android))] + fn set_cwd_works() { + let pipe = PipeStream::new().unwrap(); + let io = ~[Ignored, CreatePipe(pipe, false, true)]; + let cwd = Some("/"); + let args = ProcessConfig { + program: "/bin/sh", + args: [~"-c", ~"pwd"], + env: None, + cwd: cwd, + io: io, + }; + assert_eq!(run_output(args), ~"/\n"); + } + + #[test] + #[cfg(unix, not(android))] + fn stdin_works() { + let input = PipeStream::new().unwrap(); + let output = PipeStream::new().unwrap(); + let io = ~[CreatePipe(input, true, false), + CreatePipe(output, false, true)]; + let args = ProcessConfig { + program: "/bin/sh", + args: [~"-c", ~"read line; echo $line"], + env: None, + cwd: None, + io: io, + }; + let mut p = Process::new(args).expect("didn't create a proces?!"); + p.io[0].get_mut_ref().write("foobar".as_bytes()); + p.io[0] = None; // close stdin; + let out = read_all(p.io[1].get_mut_ref() as &mut Reader); + assert_eq!(p.wait(), 0); + assert_eq!(out, ~"foobar\n"); + } +} diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index d05a3a26169..ca521c792dc 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -8,11 +8,13 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use libc; use option::*; use result::*; use libc::c_int; use rt::io::IoError; +use super::io::process::ProcessConfig; use super::io::net::ip::{IpAddr, SocketAddr}; use rt::uv::uvio; use path::Path; @@ -31,6 +33,9 @@ pub type RtioTcpListenerObject = uvio::UvTcpListener; pub type RtioUdpSocketObject = uvio::UvUdpSocket; pub type RtioTimerObject = uvio::UvTimer; pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback; +pub type RtioPipeObject = uvio::UvPipeStream; +pub type RtioUnboundPipeObject = uvio::UvUnboundPipe; +pub type RtioProcessObject = uvio::UvProcess; pub trait EventLoop { fn run(&mut self); @@ -79,6 +84,9 @@ pub trait IoFactory { fn fs_rmdir(&mut self, path: &P) -> Result<(), IoError>; fn fs_readdir(&mut self, path: &P, flags: c_int) -> Result<~[Path], IoError>; + fn pipe_init(&mut self, ipc: bool) -> Result<~RtioUnboundPipeObject, IoError>; + fn spawn(&mut self, config: ProcessConfig) + -> Result<(~RtioProcessObject, ~[Option]), IoError>; } pub trait RtioTcpListener : RtioSocket { @@ -135,3 +143,14 @@ pub trait RtioFileStream { fn tell(&self) -> Result; fn flush(&mut self) -> Result<(), IoError>; } + +pub trait RtioProcess { + fn id(&self) -> libc::pid_t; + fn kill(&mut self, signal: int) -> Result<(), IoError>; + fn wait(&mut self) -> int; +} + +pub trait RtioPipe { + fn read(&mut self, buf: &mut [u8]) -> Result; + fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; +} diff --git a/src/libstd/rt/uv/mod.rs b/src/libstd/rt/uv/mod.rs index 451d454d2d8..95b2059d538 100644 --- a/src/libstd/rt/uv/mod.rs +++ b/src/libstd/rt/uv/mod.rs @@ -58,6 +58,8 @@ pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher}; pub use self::idle::IdleWatcher; pub use self::timer::TimerWatcher; pub use self::async::AsyncWatcher; +pub use self::process::Process; +pub use self::pipe::Pipe; /// The implementation of `rtio` for libuv pub mod uvio; @@ -71,6 +73,8 @@ pub mod idle; pub mod timer; pub mod async; pub mod addrinfo; +pub mod process; +pub mod pipe; /// XXX: Loop(*handle) is buggy with destructors. Normal structs /// with dtors may not be destructured, but tuple structs can, @@ -127,6 +131,8 @@ pub type NullCallback = ~fn(); pub type IdleCallback = ~fn(IdleWatcher, Option); pub type ConnectionCallback = ~fn(StreamWatcher, Option); pub type FsCallback = ~fn(&mut FsRequest, Option); +// first int is exit_status, second is term_signal +pub type ExitCallback = ~fn(Process, int, int, Option); pub type TimerCallback = ~fn(TimerWatcher, Option); pub type AsyncCallback = ~fn(AsyncWatcher, Option); pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option); @@ -145,7 +151,8 @@ struct WatcherData { timer_cb: Option, async_cb: Option, udp_recv_cb: Option, - udp_send_cb: Option + udp_send_cb: Option, + exit_cb: Option, } pub trait WatcherInterop { @@ -177,7 +184,8 @@ impl> WatcherInterop for W { timer_cb: None, async_cb: None, udp_recv_cb: None, - udp_send_cb: None + udp_send_cb: None, + exit_cb: None, }; let data = transmute::<~WatcherData, *c_void>(data); uvll::set_data_for_uv_handle(self.native_handle(), data); diff --git a/src/libstd/rt/uv/pipe.rs b/src/libstd/rt/uv/pipe.rs new file mode 100644 index 00000000000..1147c731a60 --- /dev/null +++ b/src/libstd/rt/uv/pipe.rs @@ -0,0 +1,66 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use prelude::*; +use libc; + +use rt::uv; +use rt::uv::net; +use rt::uv::uvll; + +pub struct Pipe(*uvll::uv_pipe_t); + +impl uv::Watcher for Pipe {} + +impl Pipe { + pub fn new(loop_: &uv::Loop, ipc: bool) -> Pipe { + unsafe { + let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE); + assert!(handle.is_not_null()); + let ipc = ipc as libc::c_int; + assert_eq!(uvll::pipe_init(loop_.native_handle(), handle, ipc), 0); + let mut ret: Pipe = + uv::NativeHandle::from_native_handle(handle); + ret.install_watcher_data(); + ret + } + } + + pub fn as_stream(&self) -> net::StreamWatcher { + net::StreamWatcher(**self as *uvll::uv_stream_t) + } + + pub fn close(self, cb: uv::NullCallback) { + { + let mut this = self; + let data = this.get_watcher_data(); + assert!(data.close_cb.is_none()); + data.close_cb = Some(cb); + } + + unsafe { uvll::close(self.native_handle(), close_cb); } + + extern fn close_cb(handle: *uvll::uv_pipe_t) { + let mut process: Pipe = uv::NativeHandle::from_native_handle(handle); + process.get_watcher_data().close_cb.take_unwrap()(); + process.drop_watcher_data(); + unsafe { uvll::free_handle(handle as *libc::c_void) } + } + } +} + +impl uv::NativeHandle<*uvll::uv_pipe_t> for Pipe { + fn from_native_handle(handle: *uvll::uv_pipe_t) -> Pipe { + Pipe(handle) + } + fn native_handle(&self) -> *uvll::uv_pipe_t { + match self { &Pipe(ptr) => ptr } + } +} diff --git a/src/libstd/rt/uv/process.rs b/src/libstd/rt/uv/process.rs new file mode 100644 index 00000000000..ccfa1ff87db --- /dev/null +++ b/src/libstd/rt/uv/process.rs @@ -0,0 +1,219 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use prelude::*; +use cell::Cell; +use libc; +use ptr; +use util; +use vec; + +use rt::io::process::*; +use rt::uv; +use rt::uv::uvio::UvPipeStream; +use rt::uv::uvll; + +/// A process wraps the handle of the underlying uv_process_t. +pub struct Process(*uvll::uv_process_t); + +impl uv::Watcher for Process {} + +impl Process { + /// Creates a new process, ready to spawn inside an event loop + pub fn new() -> Process { + let handle = unsafe { uvll::malloc_handle(uvll::UV_PROCESS) }; + assert!(handle.is_not_null()); + let mut ret: Process = uv::NativeHandle::from_native_handle(handle); + ret.install_watcher_data(); + return ret; + } + + /// Spawn a new process inside the specified event loop. + /// + /// The `config` variable will be passed down to libuv, and the `exit_cb` + /// will be run only once, when the process exits. + /// + /// Returns either the corresponding process object or an error which + /// occurred. + pub fn spawn(&mut self, loop_: &uv::Loop, mut config: ProcessConfig, + exit_cb: uv::ExitCallback) + -> Result<~[Option], uv::UvError> + { + let cwd = config.cwd.map_move(|s| s.to_c_str()); + + extern fn on_exit(p: *uvll::uv_process_t, + exit_status: libc::c_int, + term_signal: libc::c_int) { + let mut p: Process = uv::NativeHandle::from_native_handle(p); + let err = match exit_status { + 0 => None, + _ => uv::status_to_maybe_uv_error(-1) + }; + p.get_watcher_data().exit_cb.take_unwrap()(p, + exit_status as int, + term_signal as int, + err); + } + + let io = util::replace(&mut config.io, ~[]); + let mut stdio = vec::with_capacity::(io.len()); + let mut ret_io = vec::with_capacity(io.len()); + unsafe { + vec::raw::set_len(&mut stdio, io.len()); + for (slot, other) in stdio.iter().zip(io.move_iter()) { + let io = set_stdio(slot as *uvll::uv_stdio_container_t, other); + ret_io.push(io); + } + } + + let exit_cb = Cell::new(exit_cb); + let ret_io = Cell::new(ret_io); + do with_argv(config.program, config.args) |argv| { + do with_env(config.env) |envp| { + let options = uvll::uv_process_options_t { + exit_cb: on_exit, + file: unsafe { *argv }, + args: argv, + env: envp, + cwd: match cwd { + Some(ref cwd) => cwd.with_ref(|p| p), + None => ptr::null(), + }, + flags: 0, + stdio_count: stdio.len() as libc::c_int, + stdio: stdio.as_imm_buf(|p, _| p), + uid: 0, + gid: 0, + }; + + match unsafe { + uvll::spawn(loop_.native_handle(), **self, options) + } { + 0 => { + (*self).get_watcher_data().exit_cb = Some(exit_cb.take()); + Ok(ret_io.take()) + } + err => Err(uv::UvError(err)) + } + } + } + } + + /// Sends a signal to this process. + /// + /// This is a wrapper around `uv_process_kill` + pub fn kill(&self, signum: int) -> Result<(), uv::UvError> { + match unsafe { + uvll::process_kill(self.native_handle(), signum as libc::c_int) + } { + 0 => Ok(()), + err => Err(uv::UvError(err)) + } + } + + /// Returns the process id of a spawned process + pub fn pid(&self) -> libc::pid_t { + unsafe { uvll::process_pid(**self) as libc::pid_t } + } + + /// Closes this handle, invoking the specified callback once closed + pub fn close(self, cb: uv::NullCallback) { + { + let mut this = self; + let data = this.get_watcher_data(); + assert!(data.close_cb.is_none()); + data.close_cb = Some(cb); + } + + unsafe { uvll::close(self.native_handle(), close_cb); } + + extern fn close_cb(handle: *uvll::uv_process_t) { + let mut process: Process = uv::NativeHandle::from_native_handle(handle); + process.get_watcher_data().close_cb.take_unwrap()(); + process.drop_watcher_data(); + unsafe { uvll::free_handle(handle as *libc::c_void) } + } + } +} + +unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t, + io: StdioContainer) -> Option { + match io { + Ignored => { + uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE); + None + } + InheritFd(fd) => { + uvll::set_stdio_container_flags(dst, uvll::STDIO_INHERIT_FD); + uvll::set_stdio_container_fd(dst, fd); + None + } + CreatePipe(pipe, readable, writable) => { + let mut flags = uvll::STDIO_CREATE_PIPE as libc::c_int; + if readable { + flags |= uvll::STDIO_READABLE_PIPE as libc::c_int; + } + if writable { + flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int; + } + let handle = pipe.pipe.as_stream().native_handle(); + uvll::set_stdio_container_flags(dst, flags); + uvll::set_stdio_container_stream(dst, handle); + Some(pipe.bind()) + } + } +} + +/// Converts the program and arguments to the argv array expected by libuv +fn with_argv(prog: &str, args: &[~str], f: &fn(**libc::c_char) -> T) -> T { + // First, allocation space to put all the C-strings (we need to have + // ownership of them somewhere + let mut c_strs = vec::with_capacity(args.len() + 1); + c_strs.push(prog.to_c_str()); + for arg in args.iter() { + c_strs.push(arg.to_c_str()); + } + + // Next, create the char** array + let mut c_args = vec::with_capacity(c_strs.len() + 1); + for s in c_strs.iter() { + c_args.push(s.with_ref(|p| p)); + } + c_args.push(ptr::null()); + c_args.as_imm_buf(|buf, _| f(buf)) +} + +/// Converts the environment to the env array expected by libuv +fn with_env(env: Option<&[(~str, ~str)]>, f: &fn(**libc::c_char) -> T) -> T { + let env = match env { + Some(s) => s, + None => { return f(ptr::null()); } + }; + // As with argv, create some temporary storage and then the actual array + let mut envp = vec::with_capacity(env.len()); + for &(ref key, ref value) in env.iter() { + envp.push(fmt!("%s=%s", *key, *value).to_c_str()); + } + let mut c_envp = vec::with_capacity(envp.len() + 1); + for s in envp.iter() { + c_envp.push(s.with_ref(|p| p)); + } + c_envp.push(ptr::null()); + c_envp.as_imm_buf(|buf, _| f(buf)) +} + +impl uv::NativeHandle<*uvll::uv_process_t> for Process { + fn from_native_handle(handle: *uvll::uv_process_t) -> Process { + Process(handle) + } + fn native_handle(&self) -> *uvll::uv_process_t { + match self { &Process(ptr) => ptr } + } +} diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 76dcf6daae6..ed6e16c8fdb 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -13,7 +13,7 @@ use cast::transmute; use cast; use cell::Cell; use clone::Clone; -use libc::{c_int, c_uint, c_void}; +use libc::{c_int, c_uint, c_void, pid_t}; use ops::Drop; use option::*; use ptr; @@ -22,6 +22,8 @@ use result::*; use rt::io::IoError; use rt::io::net::ip::{SocketAddr, IpAddr}; use rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, SeekEnd}; +use rt::io::process::ProcessConfig; +use rt::kill::BlockedTask; use rt::local::Local; use rt::rtio::*; use rt::sched::{Scheduler, SchedHandle}; @@ -735,6 +737,64 @@ impl IoFactory for UvIoFactory { assert!(!result_cell.is_empty()); return result_cell.take(); } + + fn pipe_init(&mut self, ipc: bool) -> Result<~RtioUnboundPipeObject, IoError> { + let home = get_handle_to_current_scheduler!(); + Ok(~UvUnboundPipe { pipe: Pipe::new(self.uv_loop(), ipc), home: home }) + } + + fn spawn(&mut self, config: ProcessConfig) + -> Result<(~RtioProcessObject, ~[Option]), IoError> + { + // Sadly, we must create the UvProcess before we actually call uv_spawn + // so that the exit_cb can close over it and notify it when the process + // has exited. + let mut ret = ~UvProcess { + process: Process::new(), + home: None, + exit_status: None, + term_signal: None, + exit_error: None, + descheduled: None, + }; + let ret_ptr = unsafe { + *cast::transmute::<&~UvProcess, &*mut UvProcess>(&ret) + }; + + // The purpose of this exit callback is to record the data about the + // exit and then wake up the task which may be waiting for the process + // to exit. This is all performed in the current io-loop, and the + // implementation of UvProcess ensures that reading these fields always + // occurs on the current io-loop. + let exit_cb: ExitCallback = |_, exit_status, term_signal, error| { + unsafe { + assert!((*ret_ptr).exit_status.is_none()); + (*ret_ptr).exit_status = Some(exit_status); + (*ret_ptr).term_signal = Some(term_signal); + (*ret_ptr).exit_error = error; + match (*ret_ptr).descheduled.take() { + Some(task) => { + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task); + } + None => {} + } + } + }; + + match ret.process.spawn(self.uv_loop(), config, exit_cb) { + Ok(io) => { + // Only now do we actually get a handle to this scheduler. + ret.home = Some(get_handle_to_current_scheduler!()); + Ok((ret, io)) + } + Err(uverr) => { + // We still need to close the process handle we created, but + // that's taken care for us in the destructor of UvProcess + Err(uv_error_to_io_error(uverr)) + } + } + } } pub struct UvTcpListener { @@ -856,6 +916,126 @@ impl RtioTcpAcceptor for UvTcpAcceptor { } } +fn read_stream(mut watcher: StreamWatcher, + scheduler: ~Scheduler, + buf: &mut [u8]) -> Result { + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; + + let buf_ptr: *&mut [u8] = &buf; + do scheduler.deschedule_running_task_and_then |_sched, task| { + let task_cell = Cell::new(task); + // XXX: We shouldn't reallocate these callbacks every + // call to read + let alloc: AllocCallback = |_| unsafe { + slice_to_uv_buf(*buf_ptr) + }; + do watcher.read_start(alloc) |mut watcher, nread, _buf, status| { + + // Stop reading so that no read callbacks are + // triggered before the user calls `read` again. + // XXX: Is there a performance impact to calling + // stop here? + watcher.read_stop(); + + let result = if status.is_none() { + assert!(nread >= 0); + Ok(nread as uint) + } else { + Err(uv_error_to_io_error(status.unwrap())) + }; + + unsafe { (*result_cell_ptr).put_back(result); } + + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + + assert!(!result_cell.is_empty()); + result_cell.take() +} + +fn write_stream(mut watcher: StreamWatcher, + scheduler: ~Scheduler, + buf: &[u8]) -> Result<(), IoError> { + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell> = &result_cell; + let buf_ptr: *&[u8] = &buf; + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; + do watcher.write(buf) |_watcher, status| { + let result = if status.is_none() { + Ok(()) + } else { + Err(uv_error_to_io_error(status.unwrap())) + }; + + unsafe { (*result_cell_ptr).put_back(result); } + + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + + assert!(!result_cell.is_empty()); + result_cell.take() +} + +pub struct UvUnboundPipe { + pipe: Pipe, + home: SchedHandle, +} + +impl HomingIO for UvUnboundPipe { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} + +impl Drop for UvUnboundPipe { + fn drop(&mut self) { + do self.home_for_io |self_| { + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do self_.pipe.close { + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + } + } +} + +impl UvUnboundPipe { + pub unsafe fn bind(~self) -> UvPipeStream { + UvPipeStream { inner: self } + } +} + +pub struct UvPipeStream { + priv inner: ~UvUnboundPipe, +} + +impl UvPipeStream { + pub fn new(inner: ~UvUnboundPipe) -> UvPipeStream { + UvPipeStream { inner: inner } + } +} + +impl RtioPipe for UvPipeStream { + fn read(&mut self, buf: &mut [u8]) -> Result { + do self.inner.home_for_io_with_sched |self_, scheduler| { + read_stream(self_.pipe.as_stream(), scheduler, buf) + } + } + fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { + do self.inner.home_for_io_with_sched |self_, scheduler| { + write_stream(self_.pipe.as_stream(), scheduler, buf) + } + } +} + pub struct UvTcpStream { watcher: TcpWatcher, home: SchedHandle, @@ -890,70 +1070,13 @@ impl RtioSocket for UvTcpStream { impl RtioTcpStream for UvTcpStream { fn read(&mut self, buf: &mut [u8]) -> Result { do self.home_for_io_with_sched |self_, scheduler| { - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - - let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |_sched, task| { - let task_cell = Cell::new(task); - // XXX: We shouldn't reallocate these callbacks every - // call to read - let alloc: AllocCallback = |_| unsafe { - slice_to_uv_buf(*buf_ptr) - }; - let mut watcher = self_.watcher.as_stream(); - do watcher.read_start(alloc) |mut watcher, nread, _buf, status| { - - // Stop reading so that no read callbacks are - // triggered before the user calls `read` again. - // XXX: Is there a performance impact to calling - // stop here? - watcher.read_stop(); - - let result = if status.is_none() { - assert!(nread >= 0); - Ok(nread as uint) - } else { - Err(uv_error_to_io_error(status.unwrap())) - }; - - unsafe { (*result_cell_ptr).put_back(result); } - - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - - assert!(!result_cell.is_empty()); - result_cell.take() + read_stream(self_.watcher.as_stream(), scheduler, buf) } } fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { do self.home_for_io_with_sched |self_, scheduler| { - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; - let mut watcher = self_.watcher.as_stream(); - do watcher.write(buf) |_watcher, status| { - let result = if status.is_none() { - Ok(()) - } else { - Err(uv_error_to_io_error(status.unwrap())) - }; - - unsafe { (*result_cell_ptr).put_back(result); } - - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - - assert!(!result_cell.is_empty()); - result_cell.take() + write_stream(self_.watcher.as_stream(), scheduler, buf) } } @@ -1240,8 +1363,7 @@ impl UvTimer { impl Drop for UvTimer { fn drop(&mut self) { - let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) }; - do self_.home_for_io_with_sched |self_, scheduler| { + do self.home_for_io_with_sched |self_, scheduler| { rtdebug!("closing UvTimer"); do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); @@ -1356,13 +1478,12 @@ impl UvFileStream { impl Drop for UvFileStream { fn drop(&mut self) { - let self_ = unsafe { transmute::<&UvFileStream, &mut UvFileStream>(self) }; if self.close_on_drop { - do self_.home_for_io_with_sched |self_, scheduler| { + do self.home_for_io_with_sched |self_, scheduler| { do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); let close_req = file::FsRequest::new(); - do close_req.close(&self.loop_, self_.fd) |_,_| { + do close_req.close(&self_.loop_, self_.fd) |_,_| { let scheduler: ~Scheduler = Local::take(); scheduler.resume_blocked_task_immediately(task_cell.take()); }; @@ -1405,6 +1526,86 @@ impl RtioFileStream for UvFileStream { } } +pub struct UvProcess { + process: process::Process, + + // Sadly, this structure must be created before we return it, so in that + // brief interim the `home` is None. + home: Option, + + // All None until the process exits (exit_error may stay None) + priv exit_status: Option, + priv term_signal: Option, + priv exit_error: Option, + + // Used to store which task to wake up from the exit_cb + priv descheduled: Option, +} + +impl HomingIO for UvProcess { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.home.get_mut_ref() } +} + +impl Drop for UvProcess { + fn drop(&mut self) { + let close = |self_: &mut UvProcess| { + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task = Cell::new(task); + do self_.process.close { + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task.take()); + } + } + }; + + // If home is none, then this process never actually successfully + // spawned, so there's no need to switch event loops + if self.home.is_none() { + close(self) + } else { + self.home_for_io(close) + } + } +} + +impl RtioProcess for UvProcess { + fn id(&self) -> pid_t { + self.process.pid() + } + + fn kill(&mut self, signal: int) -> Result<(), IoError> { + do self.home_for_io |self_| { + match self_.process.kill(signal) { + Ok(()) => Ok(()), + Err(uverr) => Err(uv_error_to_io_error(uverr)) + } + } + } + + fn wait(&mut self) -> int { + // Make sure (on the home scheduler) that we have an exit status listed + do self.home_for_io |self_| { + match self_.exit_status { + Some(*) => {} + None => { + // If there's no exit code previously listed, then the + // process's exit callback has yet to be invoked. We just + // need to deschedule ourselves and wait to be reawoken. + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + assert!(self_.descheduled.is_none()); + self_.descheduled = Some(task); + } + assert!(self_.exit_status.is_some()); + } + } + } + + self.exit_status.unwrap() + } +} + #[test] fn test_simple_io_no_connect() { do run_in_mt_newsched_task { diff --git a/src/libstd/rt/uv/uvll.rs b/src/libstd/rt/uv/uvll.rs index 9591fc82df4..790bf53a291 100644 --- a/src/libstd/rt/uv/uvll.rs +++ b/src/libstd/rt/uv/uvll.rs @@ -66,6 +66,19 @@ pub mod errors { pub static EPIPE: c_int = -libc::EPIPE; } +pub static PROCESS_SETUID: c_int = 1 << 0; +pub static PROCESS_SETGID: c_int = 1 << 1; +pub static PROCESS_WINDOWS_VERBATIM_ARGUMENTS: c_int = 1 << 2; +pub static PROCESS_DETACHED: c_int = 1 << 3; +pub static PROCESS_WINDOWS_HIDE: c_int = 1 << 4; + +pub static STDIO_IGNORE: c_int = 0x00; +pub static STDIO_CREATE_PIPE: c_int = 0x01; +pub static STDIO_INHERIT_FD: c_int = 0x02; +pub static STDIO_INHERIT_STREAM: c_int = 0x04; +pub static STDIO_READABLE_PIPE: c_int = 0x10; +pub static STDIO_WRITABLE_PIPE: c_int = 0x20; + // see libuv/include/uv-unix.h #[cfg(unix)] pub struct uv_buf_t { @@ -80,6 +93,26 @@ pub struct uv_buf_t { base: *u8, } +pub struct uv_process_options_t { + exit_cb: uv_exit_cb, + file: *libc::c_char, + args: **libc::c_char, + env: **libc::c_char, + cwd: *libc::c_char, + flags: libc::c_uint, + stdio_count: libc::c_int, + stdio: *uv_stdio_container_t, + uid: uv_uid_t, + gid: uv_gid_t, +} + +// These fields are private because they must be interfaced with through the +// functions below. +pub struct uv_stdio_container_t { + priv flags: libc::c_int, + priv stream: *uv_stream_t, +} + pub type uv_handle_t = c_void; pub type uv_loop_t = c_void; pub type uv_idle_t = c_void; @@ -94,6 +127,8 @@ pub type uv_stream_t = c_void; pub type uv_fs_t = c_void; pub type uv_udp_send_t = c_void; pub type uv_getaddrinfo_t = c_void; +pub type uv_process_t = c_void; +pub type uv_pipe_t = c_void; pub struct uv_timespec_t { tv_sec: libc::c_long, @@ -178,6 +213,9 @@ pub type uv_write_cb = extern "C" fn(handle: *uv_write_t, pub type uv_getaddrinfo_cb = extern "C" fn(req: *uv_getaddrinfo_t, status: c_int, res: *addrinfo); +pub type uv_exit_cb = extern "C" fn(handle: *uv_process_t, + exit_status: c_int, + term_signal: c_int); pub type sockaddr = c_void; pub type sockaddr_in = c_void; @@ -226,6 +264,11 @@ pub struct addrinfo { ai_next: *addrinfo } +#[cfg(unix)] pub type uv_uid_t = libc::types::os::arch::posix88::uid_t; +#[cfg(unix)] pub type uv_gid_t = libc::types::os::arch::posix88::gid_t; +#[cfg(windows)] pub type uv_uid_t = libc::c_uchar; +#[cfg(windows)] pub type uv_gid_t = libc::c_uchar; + #[deriving(Eq)] pub enum uv_handle_type { UV_UNKNOWN_HANDLE, @@ -787,6 +830,45 @@ pub unsafe fn fs_req_cleanup(req: *uv_fs_t) { rust_uv_fs_req_cleanup(req); } +pub unsafe fn spawn(loop_ptr: *c_void, result: *uv_process_t, + options: uv_process_options_t) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + return rust_uv_spawn(loop_ptr, result, options); +} + +pub unsafe fn process_kill(p: *uv_process_t, signum: c_int) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + return rust_uv_process_kill(p, signum); +} + +pub unsafe fn process_pid(p: *uv_process_t) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + return rust_uv_process_pid(p); +} + +pub unsafe fn set_stdio_container_flags(c: *uv_stdio_container_t, + flags: libc::c_int) { + #[fixed_stack_segment]; #[inline(never)]; + rust_set_stdio_container_flags(c, flags); +} + +pub unsafe fn set_stdio_container_fd(c: *uv_stdio_container_t, + fd: libc::c_int) { + #[fixed_stack_segment]; #[inline(never)]; + rust_set_stdio_container_fd(c, fd); +} + +pub unsafe fn set_stdio_container_stream(c: *uv_stdio_container_t, + stream: *uv_stream_t) { + #[fixed_stack_segment]; #[inline(never)]; + rust_set_stdio_container_stream(c, stream); +} + +pub unsafe fn pipe_init(loop_ptr: *c_void, p: *uv_pipe_t, ipc: c_int) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + rust_uv_pipe_init(loop_ptr, p, ipc) +} + // data access helpers pub unsafe fn get_result_from_fs_req(req: *uv_fs_t) -> c_int { #[fixed_stack_segment]; #[inline(never)]; @@ -1009,4 +1091,13 @@ extern { node: *c_char, service: *c_char, hints: *addrinfo) -> c_int; fn rust_uv_freeaddrinfo(ai: *addrinfo); + fn rust_uv_spawn(loop_ptr: *c_void, outptr: *uv_process_t, + options: uv_process_options_t) -> c_int; + fn rust_uv_process_kill(p: *uv_process_t, signum: c_int) -> c_int; + fn rust_uv_process_pid(p: *uv_process_t) -> c_int; + fn rust_set_stdio_container_flags(c: *uv_stdio_container_t, flags: c_int); + fn rust_set_stdio_container_fd(c: *uv_stdio_container_t, fd: c_int); + fn rust_set_stdio_container_stream(c: *uv_stdio_container_t, + stream: *uv_stream_t); + fn rust_uv_pipe_init(loop_ptr: *c_void, p: *uv_pipe_t, ipc: c_int) -> c_int; } diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index 9b460cffd74..3e9b8ba136e 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -602,3 +602,38 @@ extern "C" int rust_uv_fs_readdir(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags, uv_fs_cb cb) { return uv_fs_readdir(loop, req, path, flags, cb); } + +extern "C" int +rust_uv_spawn(uv_loop_t *loop, uv_process_t *p, uv_process_options_t options) { + return uv_spawn(loop, p, options); +} + +extern "C" int +rust_uv_process_kill(uv_process_t *p, int signum) { + return uv_process_kill(p, signum); +} + +extern "C" void +rust_set_stdio_container_flags(uv_stdio_container_t *c, int flags) { + c->flags = (uv_stdio_flags) flags; +} + +extern "C" void +rust_set_stdio_container_fd(uv_stdio_container_t *c, int fd) { + c->data.fd = fd; +} + +extern "C" void +rust_set_stdio_container_stream(uv_stdio_container_t *c, uv_stream_t *stream) { + c->data.stream = stream; +} + +extern "C" int +rust_uv_process_pid(uv_process_t* p) { + return p->pid; +} + +extern "C" int +rust_uv_pipe_init(uv_loop_t *loop, uv_pipe_t* p, int ipc) { + return uv_pipe_init(loop, p, ipc); +} diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 3be958837dc..6059d647614 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -198,3 +198,10 @@ rust_drop_linenoise_lock rust_get_test_int rust_get_task rust_uv_get_loop_from_getaddrinfo_req +rust_uv_spawn +rust_uv_process_kill +rust_set_stdio_container_flags +rust_set_stdio_container_fd +rust_set_stdio_container_stream +rust_uv_process_pid +rust_uv_pipe_init