io: Implement process wait timeouts

This implements set_timeout() for std::io::Process which will affect wait()
operations on the process. This follows the same pattern as the rest of the
timeouts emerging in std::io::net.

The implementation was super easy for everything except libnative on unix
(backwards from usual!), which required a good bit of signal handling. There's a
doc comment explaining the strategy in libnative. Internally, this also required
refactoring the "helper thread" implementation used by libnative to allow for an
extra helper thread (not just the timer).

This is a breaking change in terms of the io::Process API. It is now possible
for wait() to fail, and subsequently wait_with_output(). These two functions now
return IoResult<T> due to the fact that they can time out.

Additionally, the wait_with_output() function has moved from taking `&mut self`
to taking `self`. If a timeout occurs while waiting with output, the semantics
are undesirable in almost all cases if attempting to re-wait on the process.
Equivalent functionality can still be achieved by dealing with the output
handles manually.

[breaking-change]

cc #13523
This commit is contained in:
Alex Crichton 2014-05-05 16:58:42 -07:00
parent 9f7caed202
commit f09592a5d1
23 changed files with 878 additions and 328 deletions

View File

@ -68,7 +68,7 @@ pub fn run(lib_path: &str,
input: Option<~str>) -> Option<Result> {
let env = env.clone().append(target_env(lib_path, prog).as_slice());
let mut opt_process = Process::configure(ProcessConfig {
let opt_process = Process::configure(ProcessConfig {
program: prog,
args: args,
env: Some(env.as_slice()),
@ -76,11 +76,12 @@ pub fn run(lib_path: &str,
});
match opt_process {
Ok(ref mut process) => {
Ok(mut process) => {
for input in input.iter() {
process.stdin.get_mut_ref().write(input.as_bytes()).unwrap();
}
let ProcessOutput { status, output, error } = process.wait_with_output();
let ProcessOutput { status, output, error } =
process.wait_with_output().unwrap();
Some(Result {
status: status,

View File

@ -502,7 +502,7 @@ fn run_debuginfo_lldb_test(config: &Config, props: &TestProps, testfile: &Path)
let args = &[lldb_batchmode_script, test_executable_str, debugger_script_str];
let env = &[("PYTHONPATH".to_owned(), config.lldb_python_dir.clone().unwrap())];
let mut opt_process = Process::configure(ProcessConfig {
let opt_process = Process::configure(ProcessConfig {
program: "python",
args: args,
env: Some(env),
@ -510,8 +510,9 @@ fn run_debuginfo_lldb_test(config: &Config, props: &TestProps, testfile: &Path)
});
let (status, out, err) = match opt_process {
Ok(ref mut process) => {
let ProcessOutput { status, output, error } = process.wait_with_output();
Ok(process) => {
let ProcessOutput { status, output, error } =
process.wait_with_output().unwrap();
(status,
str::from_utf8(output.as_slice()).unwrap().to_owned(),

View File

@ -1725,6 +1725,7 @@ impl<'a> StrSlice<'a> for &'a str {
#[inline]
fn is_char_boundary(&self, index: uint) -> bool {
if index == self.len() { return true; }
if index > self.len() { return false; }
let b = self[index];
return b < 128u8 || b >= 192u8;
}

View File

@ -173,7 +173,7 @@ pub use funcs::bsd43::{shutdown};
#[cfg(unix)] pub use consts::os::posix88::{EADDRINUSE, ENOENT, EISDIR, EAGAIN, EWOULDBLOCK};
#[cfg(unix)] pub use consts::os::posix88::{ECANCELED, SIGINT, EINPROGRESS};
#[cfg(unix)] pub use consts::os::posix88::{SIGTERM, SIGKILL, SIGPIPE, PROT_NONE};
#[cfg(unix)] pub use consts::os::posix01::{SIG_IGN, WNOHANG};
#[cfg(unix)] pub use consts::os::posix01::{SIG_IGN};
#[cfg(unix)] pub use consts::os::bsd44::{AF_UNIX};
#[cfg(unix)] pub use types::os::common::posix01::{pthread_t, timespec, timezone};
@ -2473,8 +2473,6 @@ pub mod consts {
pub static CLOCK_REALTIME: c_int = 0;
pub static CLOCK_MONOTONIC: c_int = 1;
pub static WNOHANG: c_int = 1;
}
pub mod posix08 {
}
@ -2924,8 +2922,6 @@ pub mod consts {
pub static CLOCK_REALTIME: c_int = 0;
pub static CLOCK_MONOTONIC: c_int = 4;
pub static WNOHANG: c_int = 1;
}
pub mod posix08 {
}
@ -3313,8 +3309,6 @@ pub mod consts {
pub static PTHREAD_CREATE_JOINABLE: c_int = 1;
pub static PTHREAD_CREATE_DETACHED: c_int = 2;
pub static PTHREAD_STACK_MIN: size_t = 8192;
pub static WNOHANG: c_int = 1;
}
pub mod posix08 {
}
@ -3980,16 +3974,6 @@ pub mod funcs {
}
}
pub mod wait {
use types::os::arch::c95::{c_int};
use types::os::arch::posix88::{pid_t};
extern {
pub fn waitpid(pid: pid_t, status: *mut c_int, options: c_int)
-> pid_t;
}
}
pub mod glob {
use types::os::arch::c95::{c_char, c_int};
use types::os::common::posix01::{glob_t};

View File

@ -10,7 +10,12 @@
//! C definitions used by libnative that don't belong in liblibc
#![allow(dead_code)]
pub use self::select::fd_set;
pub use self::signal::{sigaction, siginfo, sigset_t};
pub use self::signal::{SA_ONSTACK, SA_RESTART, SA_RESETHAND, SA_NOCLDSTOP};
pub use self::signal::{SA_NODEFER, SA_NOCLDWAIT, SA_SIGINFO, SIGCHLD};
use libc;
@ -34,6 +39,8 @@ pub static MSG_DONTWAIT: libc::c_int = 0x80;
#[cfg(target_os = "android")]
pub static MSG_DONTWAIT: libc::c_int = 0x40;
pub static WNOHANG: libc::c_int = 1;
extern {
pub fn gettimeofday(timeval: *mut libc::timeval,
tzp: *libc::c_void) -> libc::c_int;
@ -49,6 +56,17 @@ extern {
optlen: *mut libc::socklen_t) -> libc::c_int;
pub fn ioctl(fd: libc::c_int, req: libc::c_ulong, ...) -> libc::c_int;
pub fn waitpid(pid: libc::pid_t, status: *mut libc::c_int,
options: libc::c_int) -> libc::pid_t;
pub fn sigaction(signum: libc::c_int,
act: *sigaction,
oldact: *mut sigaction) -> libc::c_int;
pub fn sigaddset(set: *mut sigset_t, signum: libc::c_int) -> libc::c_int;
pub fn sigdelset(set: *mut sigset_t, signum: libc::c_int) -> libc::c_int;
pub fn sigemptyset(set: *mut sigset_t) -> libc::c_int;
}
#[cfg(target_os = "macos")]
@ -81,3 +99,94 @@ mod select {
set.fds_bits[fd / uint::BITS] |= 1 << (fd % uint::BITS);
}
}
#[cfg(target_os = "linux")]
#[cfg(target_os = "android")]
mod signal {
use libc;
pub static SA_NOCLDSTOP: libc::c_ulong = 0x00000001;
pub static SA_NOCLDWAIT: libc::c_ulong = 0x00000002;
pub static SA_NODEFER: libc::c_ulong = 0x40000000;
pub static SA_ONSTACK: libc::c_ulong = 0x08000000;
pub static SA_RESETHAND: libc::c_ulong = 0x80000000;
pub static SA_RESTART: libc::c_ulong = 0x10000000;
pub static SA_SIGINFO: libc::c_ulong = 0x00000004;
pub static SIGCHLD: libc::c_int = 17;
// This definition is not as accurate as it could be, {pid, uid, status} is
// actually a giant union. Currently we're only interested in these fields,
// however.
pub struct siginfo {
si_signo: libc::c_int,
si_errno: libc::c_int,
si_code: libc::c_int,
pub pid: libc::pid_t,
pub uid: libc::uid_t,
pub status: libc::c_int,
}
pub struct sigaction {
pub sa_handler: extern fn(libc::c_int),
pub sa_mask: sigset_t,
pub sa_flags: libc::c_ulong,
sa_restorer: *mut libc::c_void,
}
#[cfg(target_word_size = "32")]
pub struct sigset_t {
__val: [libc::c_ulong, ..32],
}
#[cfg(target_word_size = "64")]
pub struct sigset_t {
__val: [libc::c_ulong, ..16],
}
}
#[cfg(target_os = "macos")]
#[cfg(target_os = "freebsd")]
mod signal {
use libc;
pub static SA_ONSTACK: libc::c_int = 0x0001;
pub static SA_RESTART: libc::c_int = 0x0002;
pub static SA_RESETHAND: libc::c_int = 0x0004;
pub static SA_NOCLDSTOP: libc::c_int = 0x0008;
pub static SA_NODEFER: libc::c_int = 0x0010;
pub static SA_NOCLDWAIT: libc::c_int = 0x0020;
pub static SA_SIGINFO: libc::c_int = 0x0040;
pub static SIGCHLD: libc::c_int = 20;
#[cfg(target_os = "macos")]
pub type sigset_t = u32;
#[cfg(target_os = "freebsd")]
pub struct sigset_t {
bits: [u32, ..4],
}
// This structure has more fields, but we're not all that interested in
// them.
pub struct siginfo {
pub si_signo: libc::c_int,
pub si_errno: libc::c_int,
pub si_code: libc::c_int,
pub pid: libc::pid_t,
pub uid: libc::uid_t,
pub status: libc::c_int,
}
#[cfg(target_os = "macos")]
pub struct sigaction {
pub sa_handler: extern fn(libc::c_int),
sa_tramp: *mut libc::c_void,
pub sa_mask: sigset_t,
pub sa_flags: libc::c_int,
}
#[cfg(target_os = "freebsd")]
pub struct sigaction {
pub sa_handler: extern fn(libc::c_int),
pub sa_flags: libc::c_int,
pub sa_mask: sigset_t,
}
}

View File

@ -0,0 +1,205 @@
// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Implementation of the helper thread for the timer module
//!
//! This module contains the management necessary for the timer worker thread.
//! This thread is responsible for performing the send()s on channels for timers
//! that are using channels instead of a blocking call.
//!
//! The timer thread is lazily initialized, and it's shut down via the
//! `shutdown` function provided. It must be maintained as an invariant that
//! `shutdown` is only called when the entire program is finished. No new timers
//! can be created in the future and there must be no active timers at that
//! time.
#![macro_escape]
use std::mem;
use std::rt::bookkeeping;
use std::rt;
use std::ty::Unsafe;
use std::unstable::mutex::StaticNativeMutex;
use task;
/// A structure for management of a helper thread.
///
/// This is generally a static structure which tracks the lifetime of a helper
/// thread.
///
/// The fields of this helper are all public, but they should not be used, this
/// is for static initialization.
pub struct Helper<M> {
/// Internal lock which protects the remaining fields
pub lock: StaticNativeMutex,
// You'll notice that the remaining fields are Unsafe<T>, and this is
// because all helper thread operations are done through &self, but we need
// these to be mutable (once `lock` is held).
/// Lazily allocated channel to send messages to the helper thread.
pub chan: Unsafe<*mut Sender<M>>,
/// OS handle used to wake up a blocked helper thread
pub signal: Unsafe<uint>,
/// Flag if this helper thread has booted and been initialized yet.
pub initialized: Unsafe<bool>,
}
macro_rules! helper_init( (static mut $name:ident: Helper<$m:ty>) => (
static mut $name: Helper<$m> = Helper {
lock: ::std::unstable::mutex::NATIVE_MUTEX_INIT,
chan: ::std::ty::Unsafe {
value: 0 as *mut Sender<$m>,
marker1: ::std::kinds::marker::InvariantType,
},
signal: ::std::ty::Unsafe {
value: 0,
marker1: ::std::kinds::marker::InvariantType,
},
initialized: ::std::ty::Unsafe {
value: false,
marker1: ::std::kinds::marker::InvariantType,
},
};
) )
impl<M: Send> Helper<M> {
/// Lazily boots a helper thread, becoming a no-op if the helper has already
/// been spawned.
///
/// This function will check to see if the thread has been initialized, and
/// if it has it returns quickly. If initialization has not happened yet,
/// the closure `f` will be run (inside of the initialization lock) and
/// passed to the helper thread in a separate task.
///
/// This function is safe to be called many times.
pub fn boot<T: Send>(&'static self,
f: || -> T,
helper: fn(imp::signal, Receiver<M>, T)) {
unsafe {
let _guard = self.lock.lock();
if !*self.initialized.get() {
let (tx, rx) = channel();
*self.chan.get() = mem::transmute(box tx);
let (receive, send) = imp::new();
*self.signal.get() = send as uint;
let t = f();
task::spawn(proc() {
bookkeeping::decrement();
helper(receive, rx, t);
self.lock.lock().signal()
});
rt::at_exit(proc() { self.shutdown() });
*self.initialized.get() = true;
}
}
}
/// Sends a message to a spawned worker thread.
///
/// This is only valid if the worker thread has previously booted
pub fn send(&'static self, msg: M) {
unsafe {
let _guard = self.lock.lock();
// Must send and *then* signal to ensure that the child receives the
// message. Otherwise it could wake up and go to sleep before we
// send the message.
assert!(!self.chan.get().is_null());
(**self.chan.get()).send(msg);
imp::signal(*self.signal.get() as imp::signal);
}
}
fn shutdown(&'static self) {
unsafe {
// Shut down, but make sure this is done inside our lock to ensure
// that we'll always receive the exit signal when the thread
// returns.
let guard = self.lock.lock();
// Close the channel by destroying it
let chan: Box<Sender<M>> = mem::transmute(*self.chan.get());
*self.chan.get() = 0 as *mut Sender<M>;
drop(chan);
imp::signal(*self.signal.get() as imp::signal);
// Wait for the child to exit
guard.wait();
drop(guard);
// Clean up after ourselves
self.lock.destroy();
imp::close(*self.signal.get() as imp::signal);
*self.signal.get() = 0;
}
}
}
#[cfg(unix)]
mod imp {
use libc;
use std::os;
use io::file::FileDesc;
pub type signal = libc::c_int;
pub fn new() -> (signal, signal) {
let pipe = os::pipe();
(pipe.input, pipe.out)
}
pub fn signal(fd: libc::c_int) {
FileDesc::new(fd, false).inner_write([0]).unwrap();
}
pub fn close(fd: libc::c_int) {
let _fd = FileDesc::new(fd, true);
}
}
#[cfg(windows)]
mod imp {
use libc::{BOOL, LPCSTR, HANDLE, LPSECURITY_ATTRIBUTES, CloseHandle};
use std::ptr;
use libc;
pub type signal = HANDLE;
pub fn new() -> (HANDLE, HANDLE) {
unsafe {
let handle = CreateEventA(ptr::mut_null(), libc::FALSE, libc::FALSE,
ptr::null());
(handle, handle)
}
}
pub fn signal(handle: HANDLE) {
assert!(unsafe { SetEvent(handle) != 0 });
}
pub fn close(handle: HANDLE) {
assert!(unsafe { CloseHandle(handle) != 0 });
}
extern "system" {
fn CreateEventA(lpSecurityAttributes: LPSECURITY_ATTRIBUTES,
bManualReset: BOOL,
bInitialState: BOOL,
lpName: LPCSTR) -> HANDLE;
fn SetEvent(hEvent: HANDLE) -> BOOL;
}
}

View File

@ -40,6 +40,8 @@ use ai = std::io::net::addrinfo;
pub use self::file::FileDesc;
pub use self::process::Process;
mod helper_thread;
// Native I/O implementations
pub mod addrinfo;
pub mod net;
@ -75,8 +77,6 @@ pub mod pipe;
#[cfg(unix)] #[path = "c_unix.rs"] mod c;
#[cfg(windows)] #[path = "c_win32.rs"] mod c;
mod timer_helper;
pub type IoResult<T> = Result<T, IoError>;
fn unimpl() -> IoError {

View File

@ -8,20 +8,27 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use std::io;
use libc::{pid_t, c_void, c_int};
use libc;
use std::io;
use std::mem;
use std::os;
use std::ptr;
use std::rt::rtio;
use p = std::io::process;
use super::IoResult;
use super::file;
use super::util;
#[cfg(windows)] use std::mem;
#[cfg(windows)] use std::strbuf::StrBuf;
#[cfg(not(windows))] use super::retry;
#[cfg(unix)] use super::c;
#[cfg(unix)] use super::retry;
#[cfg(unix)] use io::helper_thread::Helper;
#[cfg(unix)]
helper_init!(static mut HELPER: Helper<Req>)
/**
* A value representing a child process.
@ -44,6 +51,14 @@ pub struct Process {
/// Manually delivered signal
exit_signal: Option<int>,
/// Deadline after which wait() will return
deadline: u64,
}
#[cfg(unix)]
enum Req {
NewChild(libc::pid_t, Sender<p::ProcessExit>, u64),
}
impl Process {
@ -116,6 +131,7 @@ impl Process {
handle: res.handle,
exit_code: None,
exit_signal: None,
deadline: 0,
},
ret_io))
}
@ -131,11 +147,15 @@ impl Process {
impl rtio::RtioProcess for Process {
fn id(&self) -> pid_t { self.pid }
fn wait(&mut self) -> p::ProcessExit {
fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0);
}
fn wait(&mut self) -> IoResult<p::ProcessExit> {
match self.exit_code {
Some(code) => code,
Some(code) => Ok(code),
None => {
let code = waitpid(self.pid);
let code = try!(waitpid(self.pid, self.deadline));
// On windows, waitpid will never return a signal. If a signal
// was successfully delivered to the process, however, we can
// consider it as having died via a signal.
@ -145,7 +165,7 @@ impl rtio::RtioProcess for Process {
Some(..) => code,
};
self.exit_code = Some(code);
code
Ok(code)
}
}
}
@ -762,61 +782,301 @@ fn translate_status(status: c_int) -> p::ProcessExit {
* operate on a none-existent process or, even worse, on a newer process
* with the same id.
*/
fn waitpid(pid: pid_t) -> p::ProcessExit {
return waitpid_os(pid);
#[cfg(windows)]
fn waitpid(pid: pid_t, deadline: u64) -> IoResult<p::ProcessExit> {
use libc::types::os::arch::extra::DWORD;
use libc::consts::os::extra::{
SYNCHRONIZE,
PROCESS_QUERY_INFORMATION,
FALSE,
STILL_ACTIVE,
INFINITE,
WAIT_TIMEOUT,
WAIT_OBJECT_0,
};
use libc::funcs::extra::kernel32::{
OpenProcess,
GetExitCodeProcess,
CloseHandle,
WaitForSingleObject,
};
#[cfg(windows)]
fn waitpid_os(pid: pid_t) -> p::ProcessExit {
use libc::types::os::arch::extra::DWORD;
use libc::consts::os::extra::{
SYNCHRONIZE,
PROCESS_QUERY_INFORMATION,
FALSE,
STILL_ACTIVE,
INFINITE,
WAIT_FAILED
};
use libc::funcs::extra::kernel32::{
OpenProcess,
GetExitCodeProcess,
CloseHandle,
WaitForSingleObject
};
unsafe {
let process = OpenProcess(SYNCHRONIZE | PROCESS_QUERY_INFORMATION,
FALSE,
pid as DWORD);
if process.is_null() {
return Err(super::last_error())
}
unsafe {
let process = OpenProcess(SYNCHRONIZE | PROCESS_QUERY_INFORMATION,
FALSE,
pid as DWORD);
if process.is_null() {
fail!("failure in OpenProcess: {}", os::last_os_error());
loop {
let mut status = 0;
if GetExitCodeProcess(process, &mut status) == FALSE {
let err = Err(super::last_error());
assert!(CloseHandle(process) != 0);
return err;
}
loop {
let mut status = 0;
if GetExitCodeProcess(process, &mut status) == FALSE {
if status != STILL_ACTIVE {
assert!(CloseHandle(process) != 0);
return Ok(p::ExitStatus(status as int));
}
let interval = if deadline == 0 {
INFINITE
} else {
let now = ::io::timer::now();
if deadline < now {0} else {(deadline - now) as u32}
};
match WaitForSingleObject(process, interval) {
WAIT_OBJECT_0 => {}
WAIT_TIMEOUT => {
assert!(CloseHandle(process) != 0);
fail!("failure in GetExitCodeProcess: {}", os::last_os_error());
return Err(util::timeout("process wait timed out"))
}
if status != STILL_ACTIVE {
_ => {
let err = Err(super::last_error());
assert!(CloseHandle(process) != 0);
return p::ExitStatus(status as int);
}
if WaitForSingleObject(process, INFINITE) == WAIT_FAILED {
assert!(CloseHandle(process) != 0);
fail!("failure in WaitForSingleObject: {}", os::last_os_error());
return err
}
}
}
}
}
#[cfg(unix)]
fn waitpid_os(pid: pid_t) -> p::ProcessExit {
use libc::funcs::posix01::wait;
let mut status = 0 as c_int;
match retry(|| unsafe { wait::waitpid(pid, &mut status, 0) }) {
#[cfg(unix)]
fn waitpid(pid: pid_t, deadline: u64) -> IoResult<p::ProcessExit> {
use std::cmp;
use std::comm;
static mut WRITE_FD: libc::c_int = 0;
let mut status = 0 as c_int;
if deadline == 0 {
return match retry(|| unsafe { c::waitpid(pid, &mut status, 0) }) {
-1 => fail!("unknown waitpid error: {}", super::last_error()),
_ => translate_status(status),
_ => Ok(translate_status(status)),
}
}
// On unix, wait() and its friends have no timeout parameters, so there is
// no way to time out a thread in wait(). From some googling and some
// thinking, it appears that there are a few ways to handle timeouts in
// wait(), but the only real reasonable one for a multi-threaded program is
// to listen for SIGCHLD.
//
// With this in mind, the waiting mechanism with a timeout barely uses
// waitpid() at all. There are a few times that waitpid() is invoked with
// WNOHANG, but otherwise all the necessary blocking is done by waiting for
// a SIGCHLD to arrive (and that blocking has a timeout). Note, however,
// that waitpid() is still used to actually reap the child.
//
// Signal handling is super tricky in general, and this is no exception. Due
// to the async nature of SIGCHLD, we use the self-pipe trick to transmit
// data out of the signal handler to the rest of the application. The first
// idea would be to have each thread waiting with a timeout to read this
// output file descriptor, but a write() is akin to a signal(), not a
// broadcast(), so it would only wake up one thread, and possibly the wrong
// thread. Hence a helper thread is used.
//
// The helper thread here is responsible for farming requests for a
// waitpid() with a timeout, and then processing all of the wait requests.
// By guaranteeing that only this helper thread is reading half of the
// self-pipe, we're sure that we'll never lose a SIGCHLD. This helper thread
// is also responsible for select() to wait for incoming messages or
// incoming SIGCHLD messages, along with passing an appropriate timeout to
// select() to wake things up as necessary.
//
// The ordering of the following statements is also very purposeful. First,
// we must be guaranteed that the helper thread is booted and available to
// receive SIGCHLD signals, and then we must also ensure that we do a
// nonblocking waitpid() at least once before we go ask the sigchld helper.
// This prevents the race where the child exits, we boot the helper, and
// then we ask for the child's exit status (never seeing a sigchld).
//
// The actual communication between the helper thread and this thread is
// quite simple, just a channel moving data around.
unsafe { HELPER.boot(register_sigchld, waitpid_helper) }
match waitpid_nowait(pid) {
Some(ret) => return Ok(ret),
None => {}
}
let (tx, rx) = channel();
unsafe { HELPER.send(NewChild(pid, tx, deadline)); }
return match rx.recv_opt() {
Ok(e) => Ok(e),
Err(()) => Err(util::timeout("wait timed out")),
};
// Register a new SIGCHLD handler, returning the reading half of the
// self-pipe plus the old handler registered (return value of sigaction).
fn register_sigchld() -> (libc::c_int, c::sigaction) {
unsafe {
let mut old: c::sigaction = mem::init();
let mut new: c::sigaction = mem::init();
new.sa_handler = sigchld_handler;
new.sa_flags = c::SA_NOCLDSTOP;
assert_eq!(c::sigaction(c::SIGCHLD, &new, &mut old), 0);
let mut pipes = [0, ..2];
assert_eq!(libc::pipe(pipes.as_mut_ptr()), 0);
util::set_nonblocking(pipes[0], true).unwrap();
util::set_nonblocking(pipes[1], true).unwrap();
WRITE_FD = pipes[1];
(pipes[0], old)
}
}
// Helper thread for processing SIGCHLD messages
fn waitpid_helper(input: libc::c_int,
messages: Receiver<Req>,
(read_fd, old): (libc::c_int, c::sigaction)) {
util::set_nonblocking(input, true).unwrap();
let mut set: c::fd_set = unsafe { mem::init() };
let mut tv: libc::timeval;
let mut active = Vec::<(libc::pid_t, Sender<p::ProcessExit>, u64)>::new();
let max = cmp::max(input, read_fd) + 1;
'outer: loop {
// Figure out the timeout of our syscall-to-happen. If we're waiting
// for some processes, then they'll have a timeout, otherwise we
// wait indefinitely for a message to arrive.
//
// FIXME: sure would be nice to not have to scan the entire array
let min = active.iter().map(|a| *a.ref2()).enumerate().min_by(|p| {
p.val1()
});
let (p, idx) = match min {
Some((idx, deadline)) => {
let now = ::io::timer::now();
let ms = if now < deadline {deadline - now} else {0};
tv = util::ms_to_timeval(ms);
(&tv as *_, idx)
}
None => (ptr::null(), -1),
};
// Wait for something to happen
c::fd_set(&mut set, input);
c::fd_set(&mut set, read_fd);
match unsafe { c::select(max, &set, ptr::null(), ptr::null(), p) } {
// interrupted, retry
-1 if os::errno() == libc::EINTR as int => continue,
// We read something, break out and process
1 | 2 => {}
// Timeout, the pending request is removed
0 => {
drop(active.remove(idx));
continue
}
n => fail!("error in select {} ({})", os::errno(), n),
}
// Process any pending messages
if drain(input) {
loop {
match messages.try_recv() {
Ok(NewChild(pid, tx, deadline)) => {
active.push((pid, tx, deadline));
}
Err(comm::Disconnected) => {
assert!(active.len() == 0);
break 'outer;
}
Err(comm::Empty) => break,
}
}
}
// If a child exited (somehow received SIGCHLD), then poll all
// children to see if any of them exited.
//
// We also attempt to be responsible netizens when dealing with
// SIGCHLD by invoking any previous SIGCHLD handler instead of just
// ignoring any previous SIGCHLD handler. Note that we don't provide
// a 1:1 mapping of our handler invocations to the previous handler
// invocations because we drain the `read_fd` entirely. This is
// probably OK because the kernel is already allowed to coalesce
// simultaneous signals, we're just doing some extra coalescing.
//
// Another point of note is that this likely runs the signal handler
// on a different thread than the one that received the signal. I
// *think* this is ok at this time.
//
// The main reason for doing this is to allow stdtest to run native
// tests as well. Both libgreen and libnative are running around
// with process timeouts, but libgreen should get there first
// (currently libuv doesn't handle old signal handlers).
if drain(read_fd) {
let i: uint = unsafe { mem::transmute(old.sa_handler) };
if i != 0 {
assert!(old.sa_flags & c::SA_SIGINFO == 0);
(old.sa_handler)(c::SIGCHLD);
}
// FIXME: sure would be nice to not have to scan the entire
// array...
active.retain(|&(pid, ref tx, _)| {
match waitpid_nowait(pid) {
Some(msg) => { tx.send(msg); false }
None => true,
}
});
}
}
// Once this helper thread is done, we re-register the old sigchld
// handler and close our intermediate file descriptors.
unsafe {
assert_eq!(c::sigaction(c::SIGCHLD, &old, ptr::mut_null()), 0);
let _ = libc::close(read_fd);
let _ = libc::close(WRITE_FD);
WRITE_FD = -1;
}
}
// Drain all pending data from the file descriptor, returning if any data
// could be drained. This requires that the file descriptor is in
// nonblocking mode.
fn drain(fd: libc::c_int) -> bool {
let mut ret = false;
loop {
let mut buf = [0u8, ..1];
match unsafe {
libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void,
buf.len() as libc::size_t)
} {
n if n > 0 => { ret = true; }
0 => return true,
-1 if util::wouldblock() => return ret,
n => fail!("bad read {} ({})", os::last_os_error(), n),
}
}
}
// Signal handler for SIGCHLD signals, must be async-signal-safe!
//
// This function will write to the writing half of the "self pipe" to wake
// up the helper thread if it's waiting. Note that this write must be
// nonblocking because if it blocks and the reader is the thread we
// interrupted, then we'll deadlock.
//
// When writing, if the write returns EWOULDBLOCK then we choose to ignore
// it. At that point we're guaranteed that there's something in the pipe
// which will wake up the other end at some point, so we just allow this
// signal to be coalesced with the pending signals on the pipe.
extern fn sigchld_handler(_signum: libc::c_int) {
let mut msg = 1;
match unsafe {
libc::write(WRITE_FD, &mut msg as *mut _ as *libc::c_void, 1)
} {
1 => {}
-1 if util::wouldblock() => {} // see above comments
n => fail!("bad error on write fd: {} {}", n, os::errno()),
}
}
}
@ -830,10 +1090,9 @@ fn waitpid_nowait(pid: pid_t) -> Option<p::ProcessExit> {
#[cfg(unix)]
fn waitpid_os(pid: pid_t) -> Option<p::ProcessExit> {
use libc::funcs::posix01::wait;
let mut status = 0 as c_int;
match retry(|| unsafe {
wait::waitpid(pid, &mut status, libc::WNOHANG)
c::waitpid(pid, &mut status, c::WNOHANG)
}) {
n if n == pid => Some(translate_status(status)),
0 => None,

View File

@ -1,149 +0,0 @@
// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Implementation of the helper thread for the timer module
//!
//! This module contains the management necessary for the timer worker thread.
//! This thread is responsible for performing the send()s on channels for timers
//! that are using channels instead of a blocking call.
//!
//! The timer thread is lazily initialized, and it's shut down via the
//! `shutdown` function provided. It must be maintained as an invariant that
//! `shutdown` is only called when the entire program is finished. No new timers
//! can be created in the future and there must be no active timers at that
//! time.
use std::mem;
use std::rt::bookkeeping;
use std::rt;
use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
use io::timer::{Req, Shutdown};
use task;
// You'll note that these variables are *not* protected by a lock. These
// variables are initialized with a Once before any Timer is created and are
// only torn down after everything else has exited. This means that these
// variables are read-only during use (after initialization) and both of which
// are safe to use concurrently.
static mut HELPER_CHAN: *mut Sender<Req> = 0 as *mut Sender<Req>;
static mut HELPER_SIGNAL: imp::signal = 0 as imp::signal;
static mut TIMER_HELPER_EXIT: StaticNativeMutex = NATIVE_MUTEX_INIT;
pub fn boot(helper: fn(imp::signal, Receiver<Req>)) {
static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
static mut INITIALIZED: bool = false;
unsafe {
let mut _guard = LOCK.lock();
if !INITIALIZED {
let (tx, rx) = channel();
// promote this to a shared channel
drop(tx.clone());
HELPER_CHAN = mem::transmute(box tx);
let (receive, send) = imp::new();
HELPER_SIGNAL = send;
task::spawn(proc() {
bookkeeping::decrement();
helper(receive, rx);
TIMER_HELPER_EXIT.lock().signal()
});
rt::at_exit(proc() { shutdown() });
INITIALIZED = true;
}
}
}
pub fn send(req: Req) {
unsafe {
assert!(!HELPER_CHAN.is_null());
(*HELPER_CHAN).send(req);
imp::signal(HELPER_SIGNAL);
}
}
fn shutdown() {
// Request a shutdown, and then wait for the task to exit
unsafe {
let guard = TIMER_HELPER_EXIT.lock();
send(Shutdown);
guard.wait();
drop(guard);
TIMER_HELPER_EXIT.destroy();
}
// Clean up after ther helper thread
unsafe {
imp::close(HELPER_SIGNAL);
let _chan: Box<Sender<Req>> = mem::transmute(HELPER_CHAN);
HELPER_CHAN = 0 as *mut Sender<Req>;
HELPER_SIGNAL = 0 as imp::signal;
}
}
#[cfg(unix)]
mod imp {
use libc;
use std::os;
use io::file::FileDesc;
pub type signal = libc::c_int;
pub fn new() -> (signal, signal) {
let pipe = os::pipe();
(pipe.input, pipe.out)
}
pub fn signal(fd: libc::c_int) {
FileDesc::new(fd, false).inner_write([0]).unwrap();
}
pub fn close(fd: libc::c_int) {
let _fd = FileDesc::new(fd, true);
}
}
#[cfg(windows)]
mod imp {
use libc::{BOOL, LPCSTR, HANDLE, LPSECURITY_ATTRIBUTES, CloseHandle};
use std::ptr;
use libc;
pub type signal = HANDLE;
pub fn new() -> (HANDLE, HANDLE) {
unsafe {
let handle = CreateEventA(ptr::mut_null(), libc::FALSE, libc::FALSE,
ptr::null());
(handle, handle)
}
}
pub fn signal(handle: HANDLE) {
assert!(unsafe { SetEvent(handle) != 0 });
}
pub fn close(handle: HANDLE) {
assert!(unsafe { CloseHandle(handle) != 0 });
}
extern "system" {
fn CreateEventA(lpSecurityAttributes: LPSECURITY_ATTRIBUTES,
bManualReset: BOOL,
bInitialState: BOOL,
lpName: LPCSTR) -> HANDLE;
fn SetEvent(hEvent: HANDLE) -> BOOL;
}
}

View File

@ -52,11 +52,14 @@ use std::os;
use std::ptr;
use std::rt::rtio;
use std::sync::atomics;
use std::comm;
use io::IoResult;
use io::c;
use io::file::FileDesc;
use io::timer_helper;
use io::helper_thread::Helper;
helper_init!(static mut HELPER: Helper<Req>)
pub struct Timer {
id: uint,
@ -79,9 +82,6 @@ pub enum Req {
// Remove a timer based on its id and then send it back on the channel
// provided
RemoveTimer(uint, Sender<Box<Inner>>),
// Shut down the loop and then ACK this channel once it's shut down
Shutdown,
}
// returns the current time (in milliseconds)
@ -93,7 +93,7 @@ pub fn now() -> u64 {
}
}
fn helper(input: libc::c_int, messages: Receiver<Req>) {
fn helper(input: libc::c_int, messages: Receiver<Req>, _: ()) {
let mut set: c::fd_set = unsafe { mem::init() };
let mut fd = FileDesc::new(input, true);
@ -163,7 +163,7 @@ fn helper(input: libc::c_int, messages: Receiver<Req>) {
1 => {
loop {
match messages.try_recv() {
Ok(Shutdown) => {
Err(comm::Disconnected) => {
assert!(active.len() == 0);
break 'outer;
}
@ -202,7 +202,7 @@ fn helper(input: libc::c_int, messages: Receiver<Req>) {
impl Timer {
pub fn new() -> IoResult<Timer> {
timer_helper::boot(helper);
unsafe { HELPER.boot(|| {}, helper); }
static mut ID: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT;
let id = unsafe { ID.fetch_add(1, atomics::Relaxed) };
@ -235,7 +235,7 @@ impl Timer {
Some(i) => i,
None => {
let (tx, rx) = channel();
timer_helper::send(RemoveTimer(self.id, tx));
unsafe { HELPER.send(RemoveTimer(self.id, tx)); }
rx.recv()
}
}
@ -261,7 +261,7 @@ impl rtio::RtioTimer for Timer {
inner.interval = msecs;
inner.target = now + msecs;
timer_helper::send(NewTimer(inner));
unsafe { HELPER.send(NewTimer(inner)); }
return rx;
}
@ -275,7 +275,7 @@ impl rtio::RtioTimer for Timer {
inner.interval = msecs;
inner.target = now + msecs;
timer_helper::send(NewTimer(inner));
unsafe { HELPER.send(NewTimer(inner)); }
return rx;
}
}

View File

@ -23,10 +23,13 @@
use libc;
use std::ptr;
use std::rt::rtio;
use std::comm;
use io::timer_helper;
use io::helper_thread::Helper;
use io::IoResult;
helper_init!(static mut HELPER: Helper<Req>)
pub struct Timer {
obj: libc::HANDLE,
on_worker: bool,
@ -35,10 +38,9 @@ pub struct Timer {
pub enum Req {
NewTimer(libc::HANDLE, Sender<()>, bool),
RemoveTimer(libc::HANDLE, Sender<()>),
Shutdown,
}
fn helper(input: libc::HANDLE, messages: Receiver<Req>) {
fn helper(input: libc::HANDLE, messages: Receiver<Req>, _: ()) {
let mut objs = vec![input];
let mut chans = vec![];
@ -67,12 +69,12 @@ fn helper(input: libc::HANDLE, messages: Receiver<Req>) {
None => {}
}
}
Ok(Shutdown) => {
Err(comm::Disconnected) => {
assert_eq!(objs.len(), 1);
assert_eq!(chans.len(), 0);
break 'outer;
}
_ => break
Err(..) => break
}
}
} else {
@ -102,7 +104,7 @@ pub fn now() -> u64 {
impl Timer {
pub fn new() -> IoResult<Timer> {
timer_helper::boot(helper);
unsafe { HELPER.boot(|| {}, helper) }
let obj = unsafe {
imp::CreateWaitableTimerA(ptr::mut_null(), 0, ptr::null())
@ -124,7 +126,7 @@ impl Timer {
if !self.on_worker { return }
let (tx, rx) = channel();
timer_helper::send(RemoveTimer(self.obj, tx));
unsafe { HELPER.send(RemoveTimer(self.obj, tx)) }
rx.recv();
self.on_worker = false;
@ -157,7 +159,7 @@ impl rtio::RtioTimer for Timer {
ptr::mut_null(), 0)
}, 1);
timer_helper::send(NewTimer(self.obj, tx, true));
unsafe { HELPER.send(NewTimer(self.obj, tx, true)) }
self.on_worker = true;
return rx;
}
@ -173,7 +175,7 @@ impl rtio::RtioTimer for Timer {
ptr::null(), ptr::mut_null(), 0)
}, 1);
timer_helper::send(NewTimer(self.obj, tx, false));
unsafe { HELPER.send(NewTimer(self.obj, tx, false)) }
self.on_worker = true;
return rx;

View File

@ -55,6 +55,7 @@
// NB this crate explicitly does *not* allow glob imports, please seriously
// consider whether they're needed before adding that feature here (the
// answer is that you don't need them)
#![feature(macro_rules)]
extern crate libc;

View File

@ -54,8 +54,8 @@ fn run_ar(sess: &Session, args: &str, cwd: Option<&Path>,
cwd: cwd.map(|a| &*a),
.. ProcessConfig::new()
}) {
Ok(mut prog) => {
let o = prog.wait_with_output();
Ok(prog) => {
let o = prog.wait_with_output().unwrap();
if !o.status.success() {
sess.err(format!("{} {} failed with: {}", ar, args.connect(" "),
o.status));

View File

@ -19,7 +19,8 @@ use std::rt::task::BlockedTask;
use homing::{HomingIO, HomeHandle};
use pipe::PipeWatcher;
use super::{UvHandle, UvError, uv_error_to_io_error,
wait_until_woken_after, wakeup};
wait_until_woken_after, wakeup, Loop};
use timer::TimerWatcher;
use uvio::UvIoFactory;
use uvll;
@ -32,6 +33,16 @@ pub struct Process {
/// Collected from the exit_cb
exit_status: Option<process::ProcessExit>,
/// Lazily initialized timeout timer
timer: Option<Box<TimerWatcher>>,
timeout_state: TimeoutState,
}
enum TimeoutState {
NoTimeout,
TimeoutPending,
TimeoutElapsed,
}
impl Process {
@ -92,6 +103,8 @@ impl Process {
home: io_loop.make_handle(),
to_wake: None,
exit_status: None,
timer: None,
timeout_state: NoTimeout,
};
match unsafe {
uvll::uv_spawn(io_loop.uv_loop(), handle, &options)
@ -223,21 +236,71 @@ impl RtioProcess for Process {
}
}
fn wait(&mut self) -> process::ProcessExit {
fn wait(&mut self) -> Result<process::ProcessExit, IoError> {
// Make sure (on the home scheduler) that we have an exit status listed
let _m = self.fire_homing_missile();
match self.exit_status {
Some(..) => {}
None => {
// If there's no exit code previously listed, then the
// process's exit callback has yet to be invoked. We just
// need to deschedule ourselves and wait to be reawoken.
wait_until_woken_after(&mut self.to_wake, &self.uv_loop(), || {});
assert!(self.exit_status.is_some());
}
Some(status) => return Ok(status),
None => {}
}
self.exit_status.unwrap()
// If there's no exit code previously listed, then the process's exit
// callback has yet to be invoked. We just need to deschedule ourselves
// and wait to be reawoken.
match self.timeout_state {
NoTimeout | TimeoutPending => {
wait_until_woken_after(&mut self.to_wake, &self.uv_loop(), || {});
}
TimeoutElapsed => {}
}
// If there's still no exit status listed, then we timed out, and we
// need to return.
match self.exit_status {
Some(status) => Ok(status),
None => Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
}
}
fn set_timeout(&mut self, timeout: Option<u64>) {
let _m = self.fire_homing_missile();
self.timeout_state = NoTimeout;
let ms = match timeout {
Some(ms) => ms,
None => {
match self.timer {
Some(ref mut timer) => timer.stop(),
None => {}
}
return
}
};
if self.timer.is_none() {
let loop_ = Loop::wrap(unsafe {
uvll::get_loop_for_uv_handle(self.uv_handle())
});
let mut timer = box TimerWatcher::new_home(&loop_, self.home().clone());
unsafe {
timer.set_data(self as *mut _ as *Process);
}
self.timer = Some(timer);
}
let timer = self.timer.get_mut_ref();
timer.stop();
timer.start(timer_cb, ms, 0);
self.timeout_state = TimeoutPending;
extern fn timer_cb(timer: *uvll::uv_timer_t) {
let p: &mut Process = unsafe {
&mut *(uvll::get_data_for_uv_handle(timer) as *mut Process)
};
p.timeout_state = TimeoutElapsed;
match p.to_wake.take() {
Some(task) => { let _t = task.wake().map(|t| t.reawaken()); }
None => {}
}
}
}
}

View File

@ -10,6 +10,8 @@
//! Bindings for executing child processes
#![allow(experimental)]
use prelude::*;
use fmt;
@ -50,7 +52,7 @@ use rt::rtio::{RtioProcess, IoFactory, LocalIo};
/// };
///
/// let contents = child.stdout.get_mut_ref().read_to_end();
/// assert!(child.wait().success());
/// assert!(child.wait().unwrap().success());
/// ```
pub struct Process {
handle: Box<RtioProcess:Send>,
@ -284,7 +286,7 @@ impl Process {
/// println!("stderr: {}", str::from_utf8_lossy(output.error.as_slice()));
/// ```
pub fn output(prog: &str, args: &[~str]) -> IoResult<ProcessOutput> {
Process::new(prog, args).map(|mut p| p.wait_with_output())
Process::new(prog, args).and_then(|p| p.wait_with_output())
}
/// Executes a child process and collects its exit status. This will block
@ -303,7 +305,7 @@ impl Process {
/// println!("process exited with: {}", status);
/// ```
pub fn status(prog: &str, args: &[~str]) -> IoResult<ProcessExit> {
Process::new(prog, args).map(|mut p| p.wait())
Process::new(prog, args).and_then(|mut p| p.wait())
}
/// Creates a new process with the specified configuration.
@ -378,17 +380,72 @@ impl Process {
/// after it has been called at least once.
///
/// The stdin handle to the child process will be closed before waiting.
pub fn wait(&mut self) -> ProcessExit {
///
/// # Errors
///
/// This function can fail if a timeout was previously specified via
/// `set_timeout` and the timeout expires before the child exits.
pub fn wait(&mut self) -> IoResult<ProcessExit> {
drop(self.stdin.take());
self.handle.wait()
}
/// Sets a timeout, in milliseconds, for future calls to wait().
///
/// The argument specified is a relative distance into the future, in
/// milliseconds, after which any call to wait() will return immediately
/// with a timeout error, and all future calls to wait() will not block.
///
/// A value of `None` will clear any previous timeout, and a value of `Some`
/// will override any previously set timeout.
///
/// # Example
///
/// ```no_run
/// # #![allow(experimental)]
/// use std::io::process::{Process, ProcessExit};
/// use std::io::IoResult;
///
/// fn run_gracefully(prog: &str) -> IoResult<ProcessExit> {
/// let mut p = try!(Process::new("long-running-process", []));
///
/// // give the process 10 seconds to finish completely
/// p.set_timeout(Some(10_000));
/// match p.wait() {
/// Ok(status) => return Ok(status),
/// Err(..) => {}
/// }
///
/// // Attempt to exit gracefully, but don't wait for it too long
/// try!(p.signal_exit());
/// p.set_timeout(Some(1_000));
/// match p.wait() {
/// Ok(status) => return Ok(status),
/// Err(..) => {}
/// }
///
/// // Well, we did our best, forcefully kill the process
/// try!(p.signal_kill());
/// p.set_timeout(None);
/// p.wait()
/// }
/// ```
#[experimental = "the type of the timeout is likely to change"]
pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
self.handle.set_timeout(timeout_ms)
}
/// Simultaneously wait for the child to exit and collect all remaining
/// output on the stdout/stderr handles, returning a `ProcessOutput`
/// instance.
///
/// The stdin handle to the child is closed before waiting.
pub fn wait_with_output(&mut self) -> ProcessOutput {
///
/// # Errors
///
/// This function can fail for any of the same reasons that `wait()` can
/// fail.
pub fn wait_with_output(mut self) -> IoResult<ProcessOutput> {
drop(self.stdin.take());
fn read(stream: Option<io::PipeStream>) -> Receiver<IoResult<Vec<u8>>> {
let (tx, rx) = channel();
@ -404,11 +461,13 @@ impl Process {
let stdout = read(self.stdout.take());
let stderr = read(self.stderr.take());
let status = self.wait();
let status = try!(self.wait());
ProcessOutput { status: status,
output: stdout.recv().ok().unwrap_or(Vec::new()),
error: stderr.recv().ok().unwrap_or(Vec::new()) }
Ok(ProcessOutput {
status: status,
output: stdout.recv().ok().unwrap_or(Vec::new()),
error: stderr.recv().ok().unwrap_or(Vec::new()),
})
}
}
@ -421,7 +480,8 @@ impl Drop for Process {
drop(self.stderr.take());
drop(mem::replace(&mut self.extra_io, Vec::new()));
self.wait();
self.set_timeout(None);
let _ = self.wait().unwrap();
}
}
@ -441,7 +501,7 @@ mod tests {
let p = Process::configure(args);
assert!(p.is_ok());
let mut p = p.unwrap();
assert!(p.wait().success());
assert!(p.wait().unwrap().success());
})
#[cfg(not(target_os="android"))]
@ -465,7 +525,7 @@ mod tests {
let p = Process::configure(args);
assert!(p.is_ok());
let mut p = p.unwrap();
assert!(p.wait().matches_exit_status(1));
assert!(p.wait().unwrap().matches_exit_status(1));
drop(p.wait().clone());
})
@ -479,7 +539,7 @@ mod tests {
let p = Process::configure(args);
assert!(p.is_ok());
let mut p = p.unwrap();
match p.wait() {
match p.wait().unwrap() {
process::ExitSignal(1) => {},
result => fail!("not terminated by signal 1 (instead, {})", result),
}
@ -495,7 +555,7 @@ mod tests {
let mut p = p.unwrap();
assert!(p.stdout.is_some());
let ret = read_all(p.stdout.get_mut_ref() as &mut Reader);
assert!(p.wait().success());
assert!(p.wait().unwrap().success());
return ret;
}
@ -536,7 +596,7 @@ mod tests {
p.stdin.get_mut_ref().write("foobar".as_bytes()).unwrap();
drop(p.stdin.take());
let out = read_all(p.stdout.get_mut_ref() as &mut Reader);
assert!(p.wait().success());
assert!(p.wait().unwrap().success());
assert_eq!(out, "foobar\n".to_owned());
})
@ -548,7 +608,7 @@ mod tests {
.. ProcessConfig::new()
};
let mut p = Process::configure(args).unwrap();
assert!(p.wait().success());
assert!(p.wait().unwrap().success());
})
#[cfg(windows)]
@ -572,7 +632,7 @@ mod tests {
.. ProcessConfig::new()
};
let mut p = Process::configure(args).unwrap();
assert!(p.wait().success());
assert!(p.wait().unwrap().success());
})
#[cfg(unix, not(target_os="android"))]
@ -635,21 +695,21 @@ mod tests {
#[cfg(not(target_os="android"))]
iotest!(fn test_finish_once() {
let mut prog = Process::new("false", []).unwrap();
assert!(prog.wait().matches_exit_status(1));
assert!(prog.wait().unwrap().matches_exit_status(1));
})
#[cfg(not(target_os="android"))]
iotest!(fn test_finish_twice() {
let mut prog = Process::new("false", []).unwrap();
assert!(prog.wait().matches_exit_status(1));
assert!(prog.wait().matches_exit_status(1));
assert!(prog.wait().unwrap().matches_exit_status(1));
assert!(prog.wait().unwrap().matches_exit_status(1));
})
#[cfg(not(target_os="android"))]
iotest!(fn test_wait_with_output_once() {
let mut prog = Process::new("echo", ["hello".to_owned()]).unwrap();
let ProcessOutput {status, output, error} = prog.wait_with_output();
let prog = Process::new("echo", ["hello".to_owned()]).unwrap();
let ProcessOutput {status, output, error} = prog.wait_with_output().unwrap();
let output_str = str::from_utf8(output.as_slice()).unwrap();
assert!(status.success());
@ -660,30 +720,6 @@ mod tests {
}
})
#[cfg(not(target_os="android"))]
iotest!(fn test_wait_with_output_twice() {
let mut prog = Process::new("echo", ["hello".to_owned()]).unwrap();
let ProcessOutput {status, output, error} = prog.wait_with_output();
let output_str = str::from_utf8(output.as_slice()).unwrap();
assert!(status.success());
assert_eq!(output_str.trim().to_owned(), "hello".to_owned());
// FIXME #7224
if !running_on_valgrind() {
assert_eq!(error, Vec::new());
}
let ProcessOutput {status, output, error} = prog.wait_with_output();
assert!(status.success());
assert_eq!(output, Vec::new());
// FIXME #7224
if !running_on_valgrind() {
assert_eq!(error, Vec::new());
}
})
#[cfg(unix,not(target_os="android"))]
pub fn run_pwd(dir: Option<&Path>) -> Process {
Process::configure(ProcessConfig {
@ -714,9 +750,10 @@ mod tests {
iotest!(fn test_keep_current_working_dir() {
use os;
let mut prog = run_pwd(None);
let prog = run_pwd(None);
let output = str::from_utf8(prog.wait_with_output().output.as_slice()).unwrap().to_owned();
let output = str::from_utf8(prog.wait_with_output().unwrap()
.output.as_slice()).unwrap().to_owned();
let parent_dir = os::getcwd();
let child_dir = Path::new(output.trim());
@ -732,9 +769,10 @@ mod tests {
// test changing to the parent of os::getcwd() because we know
// the path exists (and os::getcwd() is not expected to be root)
let parent_dir = os::getcwd().dir_path();
let mut prog = run_pwd(Some(&parent_dir));
let prog = run_pwd(Some(&parent_dir));
let output = str::from_utf8(prog.wait_with_output().output.as_slice()).unwrap().to_owned();
let output = str::from_utf8(prog.wait_with_output().unwrap()
.output.as_slice()).unwrap().to_owned();
let child_dir = Path::new(output.trim());
let parent_stat = parent_dir.stat().unwrap();
@ -777,8 +815,9 @@ mod tests {
use os;
if running_on_valgrind() { return; }
let mut prog = run_env(None);
let output = str::from_utf8(prog.wait_with_output().output.as_slice()).unwrap().to_owned();
let prog = run_env(None);
let output = str::from_utf8(prog.wait_with_output().unwrap()
.output.as_slice()).unwrap().to_owned();
let r = os::env();
for &(ref k, ref v) in r.iter() {
@ -791,8 +830,10 @@ mod tests {
use os;
if running_on_valgrind() { return; }
let mut prog = run_env(None);
let output = str::from_utf8(prog.wait_with_output().output.as_slice()).unwrap().to_owned();
let prog = run_env(None);
let output = str::from_utf8(prog.wait_with_output()
.unwrap().output.as_slice())
.unwrap().to_owned();
let r = os::env();
for &(ref k, ref v) in r.iter() {
@ -807,8 +848,8 @@ mod tests {
iotest!(fn test_add_to_env() {
let new_env = box [("RUN_TEST_NEW_ENV".to_owned(), "123".to_owned())];
let mut prog = run_env(Some(new_env));
let result = prog.wait_with_output();
let prog = run_env(Some(new_env));
let result = prog.wait_with_output().unwrap();
let output = str::from_utf8_lossy(result.output.as_slice()).into_owned();
assert!(output.contains("RUN_TEST_NEW_ENV=123"),
@ -830,14 +871,14 @@ mod tests {
iotest!(fn test_kill() {
let mut p = sleeper();
Process::kill(p.id(), PleaseExitSignal).unwrap();
assert!(!p.wait().success());
assert!(!p.wait().unwrap().success());
})
iotest!(fn test_exists() {
let mut p = sleeper();
assert!(Process::kill(p.id(), 0).is_ok());
p.signal_kill().unwrap();
assert!(!p.wait().success());
assert!(!p.wait().unwrap().success());
})
iotest!(fn test_zero() {
@ -845,11 +886,42 @@ mod tests {
p.signal_kill().unwrap();
for _ in range(0, 20) {
if p.signal(0).is_err() {
assert!(!p.wait().success());
assert!(!p.wait().unwrap().success());
return
}
timer::sleep(100);
}
fail!("never saw the child go away");
})
iotest!(fn wait_timeout() {
let mut p = sleeper();
p.set_timeout(Some(10));
assert_eq!(p.wait().err().unwrap().kind, TimedOut);
assert_eq!(p.wait().err().unwrap().kind, TimedOut);
p.signal_kill().unwrap();
p.set_timeout(None);
assert!(p.wait().is_ok());
})
iotest!(fn wait_timeout2() {
let (tx, rx) = channel();
let tx2 = tx.clone();
spawn(proc() {
let mut p = sleeper();
p.set_timeout(Some(10));
assert_eq!(p.wait().err().unwrap().kind, TimedOut);
p.signal_kill().unwrap();
tx.send(());
});
spawn(proc() {
let mut p = sleeper();
p.set_timeout(Some(10));
assert_eq!(p.wait().err().unwrap().kind, TimedOut);
p.signal_kill().unwrap();
tx2.send(());
});
rx.recv();
rx.recv();
})
}

View File

@ -275,7 +275,8 @@ pub trait RtioFileStream {
pub trait RtioProcess {
fn id(&self) -> libc::pid_t;
fn kill(&mut self, signal: int) -> IoResult<()>;
fn wait(&mut self) -> ProcessExit;
fn wait(&mut self) -> IoResult<ProcessExit>;
fn set_timeout(&mut self, timeout: Option<u64>);
}
pub trait RtioPipe {

View File

@ -50,7 +50,7 @@ fn runtest(me: &str) {
env: Some(env.as_slice()),
.. ProcessConfig::new()
}).unwrap();
let out = p.wait_with_output();
let out = p.wait_with_output().unwrap();
assert!(!out.status.success());
let s = str::from_utf8(out.error.as_slice()).unwrap();
assert!(s.contains("stack backtrace") && s.contains("foo::h"),
@ -62,7 +62,7 @@ fn runtest(me: &str) {
args: ["fail".to_owned()],
.. ProcessConfig::new()
}).unwrap();
let out = p.wait_with_output();
let out = p.wait_with_output().unwrap();
assert!(!out.status.success());
let s = str::from_utf8(out.error.as_slice()).unwrap();
assert!(!s.contains("stack backtrace") && !s.contains("foo::h"),
@ -74,7 +74,7 @@ fn runtest(me: &str) {
args: ["double-fail".to_owned()],
.. ProcessConfig::new()
}).unwrap();
let out = p.wait_with_output();
let out = p.wait_with_output().unwrap();
assert!(!out.status.success());
let s = str::from_utf8(out.error.as_slice()).unwrap();
assert!(s.contains("stack backtrace") && s.contains("double::h"),
@ -87,7 +87,7 @@ fn runtest(me: &str) {
env: Some(env.as_slice()),
.. ProcessConfig::new()
}).unwrap();
let out = p.wait_with_output();
let out = p.wait_with_output().unwrap();
assert!(!out.status.success());
let s = str::from_utf8(out.error.as_slice()).unwrap();
let mut i = 0;

View File

@ -120,7 +120,7 @@ pub fn test_destroy_actually_kills(force: bool) {
() = rx1.recv() => {}
}
});
match p.wait() {
match p.wait().unwrap() {
ExitStatus(..) => fail!("expected a signal"),
ExitSignal(..) => tx.send(()),
}

View File

@ -52,7 +52,7 @@ fn parent(flavor: ~str) {
let args = args.as_slice();
let mut p = io::Process::new(args[0].as_slice(), ["child".to_owned(), flavor]).unwrap();
p.stdin.get_mut_ref().write_str("test1\ntest2\ntest3").unwrap();
let out = p.wait_with_output();
let out = p.wait_with_output().unwrap();
assert!(out.status.success());
let s = str::from_utf8(out.output.as_slice()).unwrap();
assert_eq!(s, "test1\n\ntest2\n\ntest3\n");

View File

@ -36,7 +36,7 @@ fn main() {
env: Some(env.as_slice()),
..ProcessConfig::new()
};
let p = Process::configure(config).unwrap().wait_with_output();
let p = Process::configure(config).unwrap().wait_with_output().unwrap();
assert!(p.status.success());
let mut lines = str::from_utf8(p.error.as_slice()).unwrap().lines();
assert!(lines.next().unwrap().contains("foo"));

View File

@ -54,7 +54,7 @@ fn main() {
// Wait for the child process to die (terminate it's stdin and the read
// should fail).
drop(p.stdin.take());
match p.wait() {
match p.wait().unwrap() {
process::ExitStatus(..) => {}
process::ExitSignal(..) => fail!()
}

View File

@ -62,7 +62,7 @@ fn main() {
cwd: Some(&cwd),
env: Some(my_env.append_one(env).as_slice()),
.. ProcessConfig::new()
}).unwrap().wait_with_output();
}).unwrap().wait_with_output().unwrap();
// display the output
assert!(io::stdout().write(p.output.as_slice()).is_ok());

View File

@ -31,5 +31,5 @@ fn main() {
}
let mut p = Process::new(args[0], ["test".to_owned()]).unwrap();
assert!(p.wait().success());
assert!(p.wait().unwrap().success());
}