std: Improve pipe() functionality

* os::pipe() now returns IoResult<os::Pipe>
* os::pipe() is now unsafe because it does not arrange for deallocation of file
  descriptors
* os::Pipe fields are renamed from input to reader and out to write.
* PipeStream::pair() has been added. This is a safe method to get a pair of
  pipes.
* Dealing with pipes in native process bindings have been improved to be more
  robust in the face of failure and intermittent errors. This converts a few
  fail!() situations to Err situations.

Closes #9458
cc #13538
Closes #14724
[breaking-change]
This commit is contained in:
Alex Crichton 2014-06-09 13:23:49 -07:00
parent 0b32d42a5d
commit 04eced750e
7 changed files with 211 additions and 118 deletions

View File

@ -172,7 +172,7 @@ pub use funcs::bsd43::{shutdown};
#[cfg(unix)] pub use consts::os::posix88::{ENOTCONN, ECONNABORTED, EADDRNOTAVAIL, EINTR};
#[cfg(unix)] pub use consts::os::posix88::{EADDRINUSE, ENOENT, EISDIR, EAGAIN, EWOULDBLOCK};
#[cfg(unix)] pub use consts::os::posix88::{ECANCELED, SIGINT, EINPROGRESS};
#[cfg(unix)] pub use consts::os::posix88::{ENOSYS, ENOTTY, ETIMEDOUT};
#[cfg(unix)] pub use consts::os::posix88::{ENOSYS, ENOTTY, ETIMEDOUT, EMFILE};
#[cfg(unix)] pub use consts::os::posix88::{SIGTERM, SIGKILL, SIGPIPE, PROT_NONE};
#[cfg(unix)] pub use consts::os::posix01::{SIG_IGN};
#[cfg(unix)] pub use consts::os::bsd44::{AF_UNIX};
@ -196,7 +196,7 @@ pub use funcs::bsd43::{shutdown};
#[cfg(windows)] pub use consts::os::c95::{WSAECONNREFUSED, WSAECONNRESET, WSAEACCES};
#[cfg(windows)] pub use consts::os::c95::{WSAEWOULDBLOCK, WSAENOTCONN, WSAECONNABORTED};
#[cfg(windows)] pub use consts::os::c95::{WSAEADDRNOTAVAIL, WSAEADDRINUSE, WSAEINTR};
#[cfg(windows)] pub use consts::os::c95::{WSAEINPROGRESS, WSAEINVAL};
#[cfg(windows)] pub use consts::os::c95::{WSAEINPROGRESS, WSAEINVAL, WSAEMFILE};
#[cfg(windows)] pub use consts::os::extra::{ERROR_INSUFFICIENT_BUFFER};
#[cfg(windows)] pub use consts::os::extra::{O_BINARY, O_NOINHERIT, PAGE_NOACCESS};
#[cfg(windows)] pub use consts::os::extra::{PAGE_READONLY, PAGE_READWRITE, PAGE_EXECUTE};

View File

@ -525,9 +525,9 @@ mod tests {
fn test_file_desc() {
// Run this test with some pipes so we don't have to mess around with
// opening or closing files.
let os::Pipe { input, out } = os::pipe();
let mut reader = FileDesc::new(input, true);
let mut writer = FileDesc::new(out, true);
let os::Pipe { reader, writer } = unsafe { os::pipe().unwrap() };
let mut reader = FileDesc::new(reader, true);
let mut writer = FileDesc::new(writer, true);
writer.inner_write(bytes!("test")).ok().unwrap();
let mut buf = [0u8, ..4];

View File

@ -158,8 +158,8 @@ mod imp {
pub type signal = libc::c_int;
pub fn new() -> (signal, signal) {
let pipe = os::pipe();
(pipe.input, pipe.out)
let os::Pipe { reader, writer } = unsafe { os::pipe().unwrap() };
(reader, writer)
}
pub fn signal(fd: libc::c_int) {

View File

@ -10,12 +10,13 @@
use libc::{pid_t, c_void, c_int};
use libc;
use std::c_str::CString;
use std::io;
use std::mem;
use std::os;
use std::ptr;
use std::rt::rtio;
use std::rt::rtio::{ProcessConfig, IoResult, IoError};
use std::c_str::CString;
use std::rt::rtio;
use super::file;
use super::util;
@ -73,47 +74,43 @@ impl Process {
fn get_io(io: rtio::StdioContainer,
ret: &mut Vec<Option<file::FileDesc>>)
-> (Option<os::Pipe>, c_int)
-> IoResult<Option<file::FileDesc>>
{
match io {
rtio::Ignored => { ret.push(None); (None, -1) }
rtio::InheritFd(fd) => { ret.push(None); (None, fd) }
rtio::Ignored => { ret.push(None); Ok(None) }
rtio::InheritFd(fd) => {
ret.push(None);
Ok(Some(file::FileDesc::new(fd, true)))
}
rtio::CreatePipe(readable, _writable) => {
let pipe = os::pipe();
let (reader, writer) = try!(pipe());
let (theirs, ours) = if readable {
(pipe.input, pipe.out)
(reader, writer)
} else {
(pipe.out, pipe.input)
(writer, reader)
};
ret.push(Some(file::FileDesc::new(ours, true)));
(Some(pipe), theirs)
ret.push(Some(ours));
Ok(Some(theirs))
}
}
}
let mut ret_io = Vec::new();
let (in_pipe, in_fd) = get_io(cfg.stdin, &mut ret_io);
let (out_pipe, out_fd) = get_io(cfg.stdout, &mut ret_io);
let (err_pipe, err_fd) = get_io(cfg.stderr, &mut ret_io);
let res = spawn_process_os(cfg, in_fd, out_fd, err_fd);
unsafe {
for pipe in in_pipe.iter() { let _ = libc::close(pipe.input); }
for pipe in out_pipe.iter() { let _ = libc::close(pipe.out); }
for pipe in err_pipe.iter() { let _ = libc::close(pipe.out); }
}
let res = spawn_process_os(cfg,
try!(get_io(cfg.stdin, &mut ret_io)),
try!(get_io(cfg.stdout, &mut ret_io)),
try!(get_io(cfg.stderr, &mut ret_io)));
match res {
Ok(res) => {
Ok((Process {
pid: res.pid,
handle: res.handle,
exit_code: None,
exit_signal: None,
deadline: 0,
},
ret_io))
let p = Process {
pid: res.pid,
handle: res.handle,
exit_code: None,
exit_signal: None,
deadline: 0,
};
Ok((p, ret_io))
}
Err(e) => Err(e)
}
@ -194,6 +191,37 @@ impl Drop for Process {
}
}
fn pipe() -> IoResult<(file::FileDesc, file::FileDesc)> {
#[cfg(unix)] use ERROR = libc::EMFILE;
#[cfg(windows)] use ERROR = libc::WSAEMFILE;
struct Closer { fd: libc::c_int }
let os::Pipe { reader, writer } = match unsafe { os::pipe() } {
Ok(p) => p,
Err(io::IoError { detail, .. }) => return Err(IoError {
code: ERROR as uint,
extra: 0,
detail: detail,
})
};
let mut reader = Closer { fd: reader };
let mut writer = Closer { fd: writer };
let native_reader = file::FileDesc::new(reader.fd, true);
reader.fd = -1;
let native_writer = file::FileDesc::new(writer.fd, true);
writer.fd = -1;
return Ok((native_reader, native_writer));
impl Drop for Closer {
fn drop(&mut self) {
if self.fd != -1 {
let _ = unsafe { libc::close(self.fd) };
}
}
}
}
#[cfg(windows)]
unsafe fn killpid(pid: pid_t, signal: int) -> IoResult<()> {
let handle = libc::OpenProcess(libc::PROCESS_TERMINATE |
@ -246,7 +274,9 @@ struct SpawnProcessResult {
#[cfg(windows)]
fn spawn_process_os(cfg: ProcessConfig,
in_fd: c_int, out_fd: c_int, err_fd: c_int)
in_fd: Option<file::FileDesc>,
out_fd: Option<file::FileDesc>,
err_fd: Option<file::FileDesc>)
-> IoResult<SpawnProcessResult> {
use libc::types::os::arch::extra::{DWORD, HANDLE, STARTUPINFO};
use libc::consts::os::extra::{
@ -283,47 +313,51 @@ fn spawn_process_os(cfg: ProcessConfig,
// Similarly to unix, we don't actually leave holes for the stdio file
// descriptors, but rather open up /dev/null equivalents. These
// equivalents are drawn from libuv's windows process spawning.
let set_fd = |fd: c_int, slot: &mut HANDLE, is_stdin: bool| {
if fd == -1 {
let access = if is_stdin {
libc::FILE_GENERIC_READ
} else {
libc::FILE_GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES
};
let size = mem::size_of::<libc::SECURITY_ATTRIBUTES>();
let mut sa = libc::SECURITY_ATTRIBUTES {
nLength: size as libc::DWORD,
lpSecurityDescriptor: ptr::mut_null(),
bInheritHandle: 1,
};
let filename = "NUL".to_utf16().append_one(0);
*slot = libc::CreateFileW(filename.as_ptr(),
access,
libc::FILE_SHARE_READ |
libc::FILE_SHARE_WRITE,
&mut sa,
libc::OPEN_EXISTING,
0,
ptr::mut_null());
if *slot == INVALID_HANDLE_VALUE as libc::HANDLE {
return Err(super::last_error())
let set_fd = |fd: &Option<file::FileDesc>, slot: &mut HANDLE,
is_stdin: bool| {
match *fd {
None => {
let access = if is_stdin {
libc::FILE_GENERIC_READ
} else {
libc::FILE_GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES
};
let size = mem::size_of::<libc::SECURITY_ATTRIBUTES>();
let mut sa = libc::SECURITY_ATTRIBUTES {
nLength: size as libc::DWORD,
lpSecurityDescriptor: ptr::mut_null(),
bInheritHandle: 1,
};
let filename = "NUL".to_utf16().append_one(0);
*slot = libc::CreateFileW(filename.as_ptr(),
access,
libc::FILE_SHARE_READ |
libc::FILE_SHARE_WRITE,
&mut sa,
libc::OPEN_EXISTING,
0,
ptr::mut_null());
if *slot == INVALID_HANDLE_VALUE as libc::HANDLE {
return Err(super::last_error())
}
}
} else {
let orig = get_osfhandle(fd) as HANDLE;
if orig == INVALID_HANDLE_VALUE as HANDLE {
return Err(super::last_error())
}
if DuplicateHandle(cur_proc, orig, cur_proc, slot,
0, TRUE, DUPLICATE_SAME_ACCESS) == FALSE {
return Err(super::last_error())
Some(ref fd) => {
let orig = get_osfhandle(fd.fd()) as HANDLE;
if orig == INVALID_HANDLE_VALUE as HANDLE {
return Err(super::last_error())
}
if DuplicateHandle(cur_proc, orig, cur_proc, slot,
0, TRUE, DUPLICATE_SAME_ACCESS) == FALSE {
return Err(super::last_error())
}
}
}
Ok(())
};
try!(set_fd(in_fd, &mut si.hStdInput, true));
try!(set_fd(out_fd, &mut si.hStdOutput, false));
try!(set_fd(err_fd, &mut si.hStdError, false));
try!(set_fd(&in_fd, &mut si.hStdInput, true));
try!(set_fd(&out_fd, &mut si.hStdOutput, false));
try!(set_fd(&err_fd, &mut si.hStdError, false));
let cmd_str = make_command_line(cfg.program, cfg.args);
let mut pi = zeroed_process_information();
@ -464,7 +498,10 @@ fn make_command_line(prog: &CString, args: &[CString]) -> String {
}
#[cfg(unix)]
fn spawn_process_os(cfg: ProcessConfig, in_fd: c_int, out_fd: c_int, err_fd: c_int)
fn spawn_process_os(cfg: ProcessConfig,
in_fd: Option<file::FileDesc>,
out_fd: Option<file::FileDesc>,
err_fd: Option<file::FileDesc>)
-> IoResult<SpawnProcessResult>
{
use libc::funcs::posix88::unistd::{fork, dup2, close, chdir, execvp};
@ -498,9 +535,7 @@ fn spawn_process_os(cfg: ProcessConfig, in_fd: c_int, out_fd: c_int, err_fd: c_i
with_envp(cfg.env, proc(envp) {
with_argv(cfg.program, cfg.args, proc(argv) unsafe {
let pipe = os::pipe();
let mut input = file::FileDesc::new(pipe.input, true);
let mut output = file::FileDesc::new(pipe.out, true);
let (mut input, mut output) = try!(pipe());
// We may use this in the child, so perform allocations before the
// fork
@ -510,7 +545,7 @@ fn spawn_process_os(cfg: ProcessConfig, in_fd: c_int, out_fd: c_int, err_fd: c_i
let pid = fork();
if pid < 0 {
fail!("failure in fork: {}", os::last_os_error());
return Err(super::last_error())
} else if pid > 0 {
drop(output);
let mut bytes = [0, ..4];
@ -586,16 +621,24 @@ fn spawn_process_os(cfg: ProcessConfig, in_fd: c_int, out_fd: c_int, err_fd: c_i
// up /dev/null into that file descriptor. Otherwise, the first file
// descriptor opened up in the child would be numbered as one of the
// stdio file descriptors, which is likely to wreak havoc.
let setup = |src: c_int, dst: c_int| {
let src = if src == -1 {
let flags = if dst == libc::STDIN_FILENO {
libc::O_RDONLY
} else {
libc::O_RDWR
};
devnull.with_ref(|p| libc::open(p, flags, 0))
} else {
src
let setup = |src: Option<file::FileDesc>, dst: c_int| {
let src = match src {
None => {
let flags = if dst == libc::STDIN_FILENO {
libc::O_RDONLY
} else {
libc::O_RDWR
};
devnull.with_ref(|p| libc::open(p, flags, 0))
}
Some(obj) => {
let fd = obj.fd();
// Leak the memory and the file descriptor. We're in the
// child now an all our resources are going to be
// cleaned up very soon
mem::forget(obj);
fd
}
};
src != -1 && retry(|| dup2(src, dst)) != -1
};

View File

@ -16,8 +16,10 @@
#![allow(missing_doc)]
use prelude::*;
use io::{IoResult, IoError};
use libc;
use os;
use owned::Box;
use rt::rtio::{RtioPipe, LocalIo};
@ -27,6 +29,11 @@ pub struct PipeStream {
obj: Box<RtioPipe + Send>,
}
pub struct PipePair {
pub reader: PipeStream,
pub writer: PipeStream,
}
impl PipeStream {
/// Consumes a file descriptor to return a pipe stream that will have
/// synchronous, but non-blocking reads/writes. This is useful if the file
@ -58,6 +65,38 @@ impl PipeStream {
pub fn new(inner: Box<RtioPipe + Send>) -> PipeStream {
PipeStream { obj: inner }
}
/// Creates a pair of in-memory OS pipes for a unidirectional communication
/// stream.
///
/// The structure returned contains a reader and writer I/O object. Data
/// written to the writer can be read from the reader.
///
/// # Errors
///
/// This function can fail to succeed if the underlying OS has run out of
/// available resources to allocate a new pipe.
pub fn pair() -> IoResult<PipePair> {
struct Closer { fd: libc::c_int }
let os::Pipe { reader, writer } = try!(unsafe { os::pipe() });
let mut reader = Closer { fd: reader };
let mut writer = Closer { fd: writer };
let io_reader = try!(PipeStream::open(reader.fd));
reader.fd = -1;
let io_writer = try!(PipeStream::open(writer.fd));
writer.fd = -1;
return Ok(PipePair { reader: io_reader, writer: io_writer });
impl Drop for Closer {
fn drop(&mut self) {
if self.fd != -1 {
let _ = unsafe { libc::close(self.fd) };
}
}
}
}
}
impl Clone for PipeStream {
@ -84,9 +123,9 @@ mod test {
use os;
use io::pipe::PipeStream;
let os::Pipe { input, out } = os::pipe();
let out = PipeStream::open(out);
let mut input = PipeStream::open(input);
let os::Pipe { reader, writer } = unsafe { os::pipe().unwrap() };
let out = PipeStream::open(writer);
let mut input = PipeStream::open(reader);
let (tx, rx) = channel();
spawn(proc() {
let mut out = out;

View File

@ -32,6 +32,7 @@
use clone::Clone;
use collections::Collection;
use fmt;
use io::{IoResult, IoError};
use iter::Iterator;
use libc::{c_void, c_int};
use libc;
@ -513,40 +514,50 @@ pub fn split_paths<T: BytesContainer>(unparsed: T) -> Vec<Path> {
pub struct Pipe {
/// A file descriptor representing the reading end of the pipe. Data written
/// on the `out` file descriptor can be read from this file descriptor.
pub input: c_int,
pub reader: c_int,
/// A file descriptor representing the write end of the pipe. Data written
/// to this file descriptor can be read from the `input` file descriptor.
pub out: c_int,
pub writer: c_int,
}
/// Creates a new low-level OS in-memory pipe represented as a Pipe struct.
#[cfg(unix)]
pub fn pipe() -> Pipe {
unsafe {
let mut fds = Pipe {input: 0,
out: 0};
assert_eq!(libc::pipe(&mut fds.input), 0);
return Pipe {input: fds.input, out: fds.out};
/// Creates a new low-level OS in-memory pipe.
///
/// This function can fail to succeed if there are no more resources available
/// to allocate a pipe.
///
/// This function is also unsafe as there is no destructor associated with the
/// `Pipe` structure will return. If it is not arranged for the returned file
/// descriptors to be closed, the file descriptors will leak. For safe handling
/// of this scenario, use `std::io::PipeStream` instead.
pub unsafe fn pipe() -> IoResult<Pipe> {
return _pipe();
#[cfg(unix)]
unsafe fn _pipe() -> IoResult<Pipe> {
let mut fds = [0, ..2];
match libc::pipe(fds.as_mut_ptr()) {
0 => Ok(Pipe { reader: fds[0], writer: fds[1] }),
_ => Err(IoError::last_error()),
}
}
}
/// Creates a new low-level OS in-memory pipe represented as a Pipe struct.
#[cfg(windows)]
pub fn pipe() -> Pipe {
unsafe {
#[cfg(windows)]
unsafe fn _pipe() -> IoResult<Pipe> {
// Windows pipes work subtly differently than unix pipes, and their
// inheritance has to be handled in a different way that I do not
// fully understand. Here we explicitly make the pipe non-inheritable,
// which means to pass it to a subprocess they need to be duplicated
// first, as in std::run.
let mut fds = Pipe {input: 0,
out: 0};
let res = libc::pipe(&mut fds.input, 1024 as ::libc::c_uint,
(libc::O_BINARY | libc::O_NOINHERIT) as c_int);
assert_eq!(res, 0);
assert!((fds.input != -1 && fds.input != 0 ));
assert!((fds.out != -1 && fds.input != 0));
return Pipe {input: fds.input, out: fds.out};
let mut fds = [0, ..2];
match libc::pipe(fds.as_mut_ptr(), 1024 as ::libc::c_uint,
(libc::O_BINARY | libc::O_NOINHERIT) as c_int) {
0 => {
assert!(fds[0] != -1 && fds[0] != 0);
assert!(fds[1] != -1 && fds[1] != 0);
Ok(Pipe { reader: fds[0], writer: fds[1] })
}
_ => Err(IoError::last_error()),
}
}
}

View File

@ -16,12 +16,12 @@ use std::io::PipeStream;
use std::io::Command;
fn test() {
let os::Pipe { input, out } = os::pipe();
let input = PipeStream::open(input);
let mut out = PipeStream::open(out);
drop(input);
let os::Pipe { reader, writer } = unsafe { os::pipe().unwrap() };
let reader = PipeStream::open(reader);
let mut writer = PipeStream::open(writer);
drop(reader);
let _ = out.write([1]);
let _ = writer.write([1]);
}
fn main() {