std: Don't spawn threads in `wait_with_output`

Semantically there's actually no reason for us to spawn threads as part of the
call to `wait_with_output`, and that's generally an incredibly heavyweight
operation for just reading a few bytes (especially when stderr probably rarely
has bytes!). An equivalent operation in terms of what's implemented today would
be to just drain both pipes of all contents and then call `wait` on the child
process itself.

On Unix we can implement this through some convenient use of the `select`
function, whereas on Windows we can make use of overlapped I/O. Note that on
Windows this requires us to use named pipes instead of anonymous pipes, but
they're semantically the same under the hood.
This commit is contained in:
Alex Crichton 2016-02-12 10:29:25 -08:00
parent 6afa32a250
commit 7c3038f824
10 changed files with 460 additions and 35 deletions

@ -1 +1 @@
Subproject commit e19309c8b4e8bbd11f4d84dfffd75e3d1ac477fe
Subproject commit 2278a549559c38872b4338cb002ecc2a80d860dc

View File

@ -20,10 +20,9 @@ use fmt;
use io;
use path::Path;
use str;
use sys::pipe::AnonPipe;
use sys::pipe::{read2, AnonPipe};
use sys::process as imp;
use sys_common::{AsInner, AsInnerMut, FromInner, IntoInner};
use thread::{self, JoinHandle};
/// Representation of a running or exited child process.
///
@ -503,24 +502,29 @@ impl Child {
#[stable(feature = "process", since = "1.0.0")]
pub fn wait_with_output(mut self) -> io::Result<Output> {
drop(self.stdin.take());
fn read<R>(mut input: R) -> JoinHandle<io::Result<Vec<u8>>>
where R: Read + Send + 'static
{
thread::spawn(move || {
let mut ret = Vec::new();
input.read_to_end(&mut ret).map(|_| ret)
})
}
let stdout = self.stdout.take().map(read);
let stderr = self.stderr.take().map(read);
let status = try!(self.wait());
let stdout = stdout.and_then(|t| t.join().unwrap().ok());
let stderr = stderr.and_then(|t| t.join().unwrap().ok());
let (mut stdout, mut stderr) = (Vec::new(), Vec::new());
match (self.stdout.take(), self.stderr.take()) {
(None, None) => {}
(Some(mut out), None) => {
let res = out.read_to_end(&mut stdout);
res.unwrap();
}
(None, Some(mut err)) => {
let res = err.read_to_end(&mut stderr);
res.unwrap();
}
(Some(out), Some(err)) => {
let res = read2(out.inner, &mut stdout, err.inner, &mut stderr);
res.unwrap();
}
}
let status = try!(self.wait());
Ok(Output {
status: status,
stdout: stdout.unwrap_or(Vec::new()),
stderr: stderr.unwrap_or(Vec::new()),
stdout: stdout,
stderr: stderr,
})
}
}

View File

@ -8,6 +8,8 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
#![unstable(reason = "not public", issue = "0", feature = "fd")]
use prelude::v1::*;
use io::{self, Read};
@ -75,6 +77,20 @@ impl FileDesc {
}
}
pub fn set_nonblocking(&self, nonblocking: bool) {
unsafe {
let previous = libc::fcntl(self.fd, libc::F_GETFL);
debug_assert!(previous != -1);
let new = if nonblocking {
previous | libc::O_NONBLOCK
} else {
previous & !libc::O_NONBLOCK
};
let ret = libc::fcntl(self.fd, libc::F_SETFL, new);
debug_assert!(ret != -1);
}
}
pub fn duplicate(&self) -> io::Result<FileDesc> {
// We want to atomically duplicate this file descriptor and set the
// CLOEXEC flag, and currently that's done via F_DUPFD_CLOEXEC. This
@ -126,7 +142,6 @@ impl FileDesc {
}
}
#[unstable(reason = "not public", issue = "0", feature = "fd_read")]
impl<'a> Read for &'a FileDesc {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
(**self).read(buf)

View File

@ -8,8 +8,12 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::v1::*;
use cmp;
use io;
use libc::{self, c_int};
use mem;
use sys::cvt_r;
use sys::fd::FileDesc;
@ -68,3 +72,54 @@ impl AnonPipe {
pub fn fd(&self) -> &FileDesc { &self.0 }
pub fn into_fd(self) -> FileDesc { self.0 }
}
pub fn read2(p1: AnonPipe,
v1: &mut Vec<u8>,
p2: AnonPipe,
v2: &mut Vec<u8>) -> io::Result<()> {
// Set both pipes into nonblocking mode as we're gonna be reading from both
// in the `select` loop below, and we wouldn't want one to block the other!
let p1 = p1.into_fd();
let p2 = p2.into_fd();
p1.set_nonblocking(true);
p2.set_nonblocking(true);
let max = cmp::max(p1.raw(), p2.raw());
loop {
// wait for either pipe to become readable using `select`
try!(cvt_r(|| unsafe {
let mut read: libc::fd_set = mem::zeroed();
libc::FD_SET(p1.raw(), &mut read);
libc::FD_SET(p2.raw(), &mut read);
libc::select(max + 1, &mut read, 0 as *mut _, 0 as *mut _,
0 as *mut _)
}));
// Read as much as we can from each pipe, ignoring EWOULDBLOCK or
// EAGAIN. If we hit EOF, then this will happen because the underlying
// reader will return Ok(0), in which case we'll see `Ok` ourselves. In
// this case we flip the other fd back into blocking mode and read
// whatever's leftover on that file descriptor.
let read = |fd: &FileDesc, dst: &mut Vec<u8>| {
match fd.read_to_end(dst) {
Ok(_) => Ok(true),
Err(e) => {
if e.raw_os_error() == Some(libc::EWOULDBLOCK) ||
e.raw_os_error() == Some(libc::EAGAIN) {
Ok(false)
} else {
Err(e)
}
}
}
};
if try!(read(&p1, v1)) {
p2.set_nonblocking(false);
return p2.read_to_end(v2).map(|_| ());
}
if try!(read(&p2, v2)) {
p1.set_nonblocking(false);
return p1.read_to_end(v1).map(|_| ());
}
}
}

View File

@ -651,7 +651,7 @@ mod tests {
cmd.stdin(Stdio::MakePipe);
cmd.stdout(Stdio::MakePipe);
let (mut cat, mut pipes) = t!(cmd.spawn(Stdio::Null));
let (mut cat, mut pipes) = t!(cmd.spawn(Stdio::Null, true));
let stdin_write = pipes.stdin.take().unwrap();
let stdout_read = pipes.stdout.take().unwrap();

View File

@ -12,6 +12,7 @@
#![allow(bad_style)]
#![cfg_attr(test, allow(dead_code))]
#![unstable(issue = "0", feature = "windows_c")]
use os::raw::{c_int, c_uint, c_ulong, c_long, c_longlong, c_ushort,};
use os::raw::{c_char, c_ulonglong};
@ -181,6 +182,7 @@ pub const ERROR_PATH_NOT_FOUND: DWORD = 3;
pub const ERROR_ACCESS_DENIED: DWORD = 5;
pub const ERROR_INVALID_HANDLE: DWORD = 6;
pub const ERROR_NO_MORE_FILES: DWORD = 18;
pub const ERROR_HANDLE_EOF: DWORD = 38;
pub const ERROR_BROKEN_PIPE: DWORD = 109;
pub const ERROR_CALL_NOT_IMPLEMENTED: DWORD = 120;
pub const ERROR_INSUFFICIENT_BUFFER: DWORD = 122;
@ -188,6 +190,7 @@ pub const ERROR_ALREADY_EXISTS: DWORD = 183;
pub const ERROR_NO_DATA: DWORD = 232;
pub const ERROR_ENVVAR_NOT_FOUND: DWORD = 203;
pub const ERROR_OPERATION_ABORTED: DWORD = 995;
pub const ERROR_IO_PENDING: DWORD = 997;
pub const ERROR_TIMEOUT: DWORD = 0x5B4;
pub const INVALID_HANDLE_VALUE: HANDLE = !0 as HANDLE;
@ -292,6 +295,14 @@ pub const EXCEPTION_UNWIND: DWORD = EXCEPTION_UNWINDING |
EXCEPTION_TARGET_UNWIND |
EXCEPTION_COLLIDED_UNWIND;
pub const PIPE_ACCESS_INBOUND: DWORD = 0x00000001;
pub const FILE_FLAG_FIRST_PIPE_INSTANCE: DWORD = 0x00080000;
pub const FILE_FLAG_OVERLAPPED: DWORD = 0x40000000;
pub const PIPE_WAIT: DWORD = 0x00000000;
pub const PIPE_TYPE_BYTE: DWORD = 0x00000000;
pub const PIPE_REJECT_REMOTE_CLIENTS: DWORD = 0x00000008;
pub const PIPE_READMODE_BYTE: DWORD = 0x00000000;
#[repr(C)]
#[cfg(target_arch = "x86")]
pub struct WSADATA {
@ -913,10 +924,6 @@ extern "system" {
nOutBufferSize: DWORD,
lpBytesReturned: LPDWORD,
lpOverlapped: LPOVERLAPPED) -> BOOL;
pub fn CreatePipe(hReadPipe: LPHANDLE,
hWritePipe: LPHANDLE,
lpPipeAttributes: LPSECURITY_ATTRIBUTES,
nSize: DWORD) -> BOOL;
pub fn CreateThread(lpThreadAttributes: LPSECURITY_ATTRIBUTES,
dwStackSize: SIZE_T,
lpStartAddress: extern "system" fn(*mut c_void)
@ -1129,6 +1136,29 @@ extern "system" {
OriginalContext: *const CONTEXT,
HistoryTable: *const UNWIND_HISTORY_TABLE);
pub fn GetSystemTimeAsFileTime(lpSystemTimeAsFileTime: LPFILETIME);
pub fn CreateEventW(lpEventAttributes: LPSECURITY_ATTRIBUTES,
bManualReset: BOOL,
bInitialState: BOOL,
lpName: LPCWSTR) -> HANDLE;
pub fn WaitForMultipleObjects(nCount: DWORD,
lpHandles: *const HANDLE,
bWaitAll: BOOL,
dwMilliseconds: DWORD) -> DWORD;
pub fn CreateNamedPipeW(lpName: LPCWSTR,
dwOpenMode: DWORD,
dwPipeMode: DWORD,
nMaxInstances: DWORD,
nOutBufferSize: DWORD,
nInBufferSize: DWORD,
nDefaultTimeOut: DWORD,
lpSecurityAttributes: LPSECURITY_ATTRIBUTES)
-> HANDLE;
pub fn CancelIo(handle: HANDLE) -> BOOL;
pub fn GetOverlappedResult(hFile: HANDLE,
lpOverlapped: LPOVERLAPPED,
lpNumberOfBytesTransferred: LPDWORD,
bWait: BOOL) -> BOOL;
}
// Functions that aren't available on Windows XP, but we still use them and just

View File

@ -8,6 +8,8 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
#![unstable(issue = "0", feature = "windows_handle")]
use prelude::v1::*;
use cmp;
@ -42,6 +44,20 @@ impl Handle {
Handle(RawHandle::new(handle))
}
pub fn new_event(manual: bool, init: bool) -> io::Result<Handle> {
unsafe {
let event = c::CreateEventW(0 as *mut _,
manual as c::BOOL,
init as c::BOOL,
0 as *const _);
if event.is_null() {
Err(io::Error::last_os_error())
} else {
Ok(Handle::new(event))
}
}
}
pub fn into_raw(self) -> c::HANDLE {
let ret = self.raw();
mem::forget(self);
@ -90,6 +106,59 @@ impl RawHandle {
}
}
pub unsafe fn read_overlapped(&self,
buf: &mut [u8],
overlapped: *mut c::OVERLAPPED)
-> io::Result<Option<usize>> {
let len = cmp::min(buf.len(), <c::DWORD>::max_value() as usize) as c::DWORD;
let mut amt = 0;
let res = cvt({
c::ReadFile(self.0, buf.as_ptr() as c::LPVOID,
len, &mut amt, overlapped)
});
match res {
Ok(_) => Ok(Some(amt as usize)),
Err(e) => {
if e.raw_os_error() == Some(c::ERROR_IO_PENDING as i32) {
Ok(None)
} else if e.raw_os_error() == Some(c::ERROR_BROKEN_PIPE as i32) {
Ok(Some(0))
} else {
Err(e)
}
}
}
}
pub fn overlapped_result(&self,
overlapped: *mut c::OVERLAPPED,
wait: bool) -> io::Result<usize> {
unsafe {
let mut bytes = 0;
let wait = if wait {c::TRUE} else {c::FALSE};
let res = cvt({
c::GetOverlappedResult(self.raw(), overlapped, &mut bytes, wait)
});
match res {
Ok(_) => Ok(bytes as usize),
Err(e) => {
if e.raw_os_error() == Some(c::ERROR_HANDLE_EOF as i32) ||
e.raw_os_error() == Some(c::ERROR_BROKEN_PIPE as i32) {
Ok(0)
} else {
Err(e)
}
}
}
}
}
pub fn cancel_io(&self) -> io::Result<()> {
unsafe {
cvt(c::CancelIo(self.raw())).map(|_| ())
}
}
pub fn read_to_end(&self, buf: &mut Vec<u8>) -> io::Result<usize> {
let mut me = self;
(&mut me).read_to_end(buf)
@ -120,7 +189,6 @@ impl RawHandle {
}
}
#[unstable(reason = "not public", issue = "0", feature = "fd_read")]
impl<'a> Read for &'a RawHandle {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
(**self).read(buf)

View File

@ -8,6 +8,8 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
#![unstable(issue = "0", feature = "windows_net")]
use prelude::v1::*;
use cmp;

View File

@ -9,11 +9,16 @@
// except according to those terms.
use prelude::v1::*;
use os::windows::prelude::*;
use ffi::OsStr;
use path::Path;
use io;
use ptr;
use sys::cvt;
use mem;
use rand::{self, Rng};
use slice;
use sys::c;
use sys::fs::{File, OpenOptions};
use sys::handle::Handle;
////////////////////////////////////////////////////////////////////////////////
@ -25,14 +30,76 @@ pub struct AnonPipe {
}
pub fn anon_pipe() -> io::Result<(AnonPipe, AnonPipe)> {
let mut reader = c::INVALID_HANDLE_VALUE;
let mut writer = c::INVALID_HANDLE_VALUE;
try!(cvt(unsafe {
c::CreatePipe(&mut reader, &mut writer, ptr::null_mut(), 0)
}));
let reader = Handle::new(reader);
let writer = Handle::new(writer);
Ok((AnonPipe { inner: reader }, AnonPipe { inner: writer }))
// Note that we specifically do *not* use `CreatePipe` here because
// unfortunately the anonymous pipes returned do not support overlapped
// operations.
//
// Instead, we create a "hopefully unique" name and create a named pipe
// which has overlapped operations enabled.
//
// Once we do this, we connect do it as usual via `CreateFileW`, and then we
// return those reader/writer halves.
unsafe {
let reader;
let mut name;
let mut tries = 0;
loop {
tries += 1;
let key: u64 = rand::thread_rng().gen();
name = format!(r"\\.\pipe\__rust_anonymous_pipe1__.{}.{}",
c::GetCurrentProcessId(),
key);
let wide_name = OsStr::new(&name)
.encode_wide()
.chain(Some(0))
.collect::<Vec<_>>();
let handle = c::CreateNamedPipeW(wide_name.as_ptr(),
c::PIPE_ACCESS_INBOUND |
c::FILE_FLAG_FIRST_PIPE_INSTANCE |
c::FILE_FLAG_OVERLAPPED,
c::PIPE_TYPE_BYTE |
c::PIPE_READMODE_BYTE |
c::PIPE_WAIT |
c::PIPE_REJECT_REMOTE_CLIENTS,
1,
4096,
4096,
0,
0 as *mut _);
// We pass the FILE_FLAG_FIRST_PIPE_INSTANCE flag above, and we're
// also just doing a best effort at selecting a unique name. If
// ERROR_ACCESS_DENIED is returned then it could mean that we
// accidentally conflicted with an already existing pipe, so we try
// again.
//
// Don't try again too much though as this could also perhaps be a
// legit error.
if handle == c::INVALID_HANDLE_VALUE {
let err = io::Error::last_os_error();
if tries < 10 &&
err.raw_os_error() == Some(c::ERROR_ACCESS_DENIED as i32) {
continue
}
return Err(err)
}
reader = Handle::new(handle);
break
}
// Connect to the named pipe we just created in write-only mode (also
// overlapped for async I/O below).
let mut opts = OpenOptions::new();
opts.write(true);
opts.read(false);
opts.share_mode(0);
opts.attributes(c::FILE_FLAG_OVERLAPPED);
let writer = try!(File::open(Path::new(&name), &opts));
let writer = AnonPipe { inner: writer.into_handle() };
Ok((AnonPipe { inner: reader }, AnonPipe { inner: writer.into_handle() }))
}
}
impl AnonPipe {
@ -51,3 +118,185 @@ impl AnonPipe {
self.inner.write(buf)
}
}
pub fn read2(p1: AnonPipe,
v1: &mut Vec<u8>,
p2: AnonPipe,
v2: &mut Vec<u8>) -> io::Result<()> {
let p1 = p1.into_handle();
let p2 = p2.into_handle();
let mut p1 = try!(AsyncPipe::new(p1, v1));
let mut p2 = try!(AsyncPipe::new(p2, v2));
let objs = [p1.event.raw(), p2.event.raw()];
// In a loop we wait for either pipe's scheduled read operation to complete.
// If the operation completes with 0 bytes, that means EOF was reached, in
// which case we just finish out the other pipe entirely.
//
// Note that overlapped I/O is in general super unsafe because we have to
// be careful to ensure that all pointers in play are valid for the entire
// duration of the I/O operation (where tons of operations can also fail).
// The destructor for `AsyncPipe` ends up taking care of most of this.
loop {
let res = unsafe {
c::WaitForMultipleObjects(2, objs.as_ptr(), c::FALSE, c::INFINITE)
};
if res == c::WAIT_OBJECT_0 {
if !try!(p1.result()) || !try!(p1.schedule_read()) {
return p2.finish()
}
} else if res == c::WAIT_OBJECT_0 + 1 {
if !try!(p2.result()) || !try!(p2.schedule_read()) {
return p1.finish()
}
} else {
return Err(io::Error::last_os_error())
}
}
}
struct AsyncPipe<'a> {
pipe: Handle,
event: Handle,
overlapped: Box<c::OVERLAPPED>, // needs a stable address
dst: &'a mut Vec<u8>,
state: State,
}
#[derive(PartialEq, Debug)]
enum State {
NotReading,
Reading,
Read(usize),
}
impl<'a> AsyncPipe<'a> {
fn new(pipe: Handle, dst: &'a mut Vec<u8>) -> io::Result<AsyncPipe<'a>> {
// Create an event which we'll use to coordinate our overlapped
// opreations, this event will be used in WaitForMultipleObjects
// and passed as part of the OVERLAPPED handle.
//
// Note that we do a somewhat clever thing here by flagging the
// event as being manually reset and setting it initially to the
// signaled state. This means that we'll naturally fall through the
// WaitForMultipleObjects call above for pipes created initially,
// and the only time an even will go back to "unset" will be once an
// I/O operation is successfully scheduled (what we want).
let event = try!(Handle::new_event(true, true));
let mut overlapped: Box<c::OVERLAPPED> = unsafe {
Box::new(mem::zeroed())
};
overlapped.hEvent = event.raw();
Ok(AsyncPipe {
pipe: pipe,
overlapped: overlapped,
event: event,
dst: dst,
state: State::NotReading,
})
}
/// Executes an overlapped read operation.
///
/// Must not currently be reading, and returns whether the pipe is currently
/// at EOF or not. If the pipe is not at EOF then `result()` must be called
/// to complete the read later on (may block), but if the pipe is at EOF
/// then `result()` should not be called as it will just block forever.
fn schedule_read(&mut self) -> io::Result<bool> {
assert_eq!(self.state, State::NotReading);
let amt = unsafe {
let slice = slice_to_end(self.dst);
try!(self.pipe.read_overlapped(slice, &mut *self.overlapped))
};
// If this read finished immediately then our overlapped event will
// remain signaled (it was signaled coming in here) and we'll progress
// down to the method below.
//
// Otherwise the I/O operation is scheduled and the system set our event
// to not signaled, so we flag ourselves into the reading state and move
// on.
self.state = match amt {
Some(0) => return Ok(false),
Some(amt) => State::Read(amt),
None => State::Reading,
};
Ok(true)
}
/// Wait for the result of the overlapped operation previously executed.
///
/// Takes a parameter `wait` which indicates if this pipe is currently being
/// read whether the function should block waiting for the read to complete.
///
/// Return values:
///
/// * `true` - finished any pending read and the pipe is not at EOF (keep
/// going)
/// * `false` - finished any pending read and pipe is at EOF (stop issuing
/// reads)
fn result(&mut self) -> io::Result<bool> {
let amt = match self.state {
State::NotReading => return Ok(true),
State::Reading => {
try!(self.pipe.overlapped_result(&mut *self.overlapped, true))
}
State::Read(amt) => amt,
};
self.state = State::NotReading;
unsafe {
let len = self.dst.len();
self.dst.set_len(len + amt);
}
Ok(amt != 0)
}
/// Finishes out reading this pipe entirely.
///
/// Waits for any pending and schedule read, and then calls `read_to_end`
/// if necessary to read all the remaining information.
fn finish(&mut self) -> io::Result<()> {
while try!(self.result()) && try!(self.schedule_read()) {
// ...
}
Ok(())
}
}
impl<'a> Drop for AsyncPipe<'a> {
fn drop(&mut self) {
match self.state {
State::Reading => {}
_ => return,
}
// If we have a pending read operation, then we have to make sure that
// it's *done* before we actually drop this type. The kernel requires
// that the `OVERLAPPED` and buffer pointers are valid for the entire
// I/O operation.
//
// To do that, we call `CancelIo` to cancel any pending operation, and
// if that succeeds we wait for the overlapped result.
//
// If anything here fails, there's not really much we can do, so we leak
// the buffer/OVERLAPPED pointers to ensure we're at least memory safe.
if self.pipe.cancel_io().is_err() || self.result().is_err() {
let buf = mem::replace(self.dst, Vec::new());
let overlapped = Box::new(unsafe { mem::zeroed() });
let overlapped = mem::replace(&mut self.overlapped, overlapped);
mem::forget((buf, overlapped));
}
}
}
unsafe fn slice_to_end(v: &mut Vec<u8>) -> &mut [u8] {
if v.capacity() == 0 {
v.reserve(16);
}
if v.capacity() == v.len() {
v.reserve(1);
}
slice::from_raw_parts_mut(v.as_mut_ptr().offset(v.len() as isize),
v.capacity() - v.len())
}

View File

@ -8,6 +8,8 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
#![unstable(issue = "0", feature = "windows_stdio")]
use prelude::v1::*;
use io::prelude::*;