From 04eced750e78770e16354c07fddf7ecaaab6ef43 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 9 Jun 2014 13:23:49 -0700 Subject: [PATCH] std: Improve pipe() functionality * os::pipe() now returns IoResult * os::pipe() is now unsafe because it does not arrange for deallocation of file descriptors * os::Pipe fields are renamed from input to reader and out to write. * PipeStream::pair() has been added. This is a safe method to get a pair of pipes. * Dealing with pipes in native process bindings have been improved to be more robust in the face of failure and intermittent errors. This converts a few fail!() situations to Err situations. Closes #9458 cc #13538 Closes #14724 [breaking-change] --- src/liblibc/lib.rs | 4 +- src/libnative/io/file_unix.rs | 6 +- src/libnative/io/helper_thread.rs | 4 +- src/libnative/io/process.rs | 203 +++++++++++------- src/libstd/io/pipe.rs | 45 +++- src/libstd/os.rs | 57 +++-- .../run-pass/sigpipe-should-be-ignored.rs | 10 +- 7 files changed, 211 insertions(+), 118 deletions(-) diff --git a/src/liblibc/lib.rs b/src/liblibc/lib.rs index 9ed0d50a03e..8f245f1d5b4 100644 --- a/src/liblibc/lib.rs +++ b/src/liblibc/lib.rs @@ -172,7 +172,7 @@ pub use funcs::bsd43::{shutdown}; #[cfg(unix)] pub use consts::os::posix88::{ENOTCONN, ECONNABORTED, EADDRNOTAVAIL, EINTR}; #[cfg(unix)] pub use consts::os::posix88::{EADDRINUSE, ENOENT, EISDIR, EAGAIN, EWOULDBLOCK}; #[cfg(unix)] pub use consts::os::posix88::{ECANCELED, SIGINT, EINPROGRESS}; -#[cfg(unix)] pub use consts::os::posix88::{ENOSYS, ENOTTY, ETIMEDOUT}; +#[cfg(unix)] pub use consts::os::posix88::{ENOSYS, ENOTTY, ETIMEDOUT, EMFILE}; #[cfg(unix)] pub use consts::os::posix88::{SIGTERM, SIGKILL, SIGPIPE, PROT_NONE}; #[cfg(unix)] pub use consts::os::posix01::{SIG_IGN}; #[cfg(unix)] pub use consts::os::bsd44::{AF_UNIX}; @@ -196,7 +196,7 @@ pub use funcs::bsd43::{shutdown}; #[cfg(windows)] pub use consts::os::c95::{WSAECONNREFUSED, WSAECONNRESET, WSAEACCES}; #[cfg(windows)] pub use consts::os::c95::{WSAEWOULDBLOCK, WSAENOTCONN, WSAECONNABORTED}; #[cfg(windows)] pub use consts::os::c95::{WSAEADDRNOTAVAIL, WSAEADDRINUSE, WSAEINTR}; -#[cfg(windows)] pub use consts::os::c95::{WSAEINPROGRESS, WSAEINVAL}; +#[cfg(windows)] pub use consts::os::c95::{WSAEINPROGRESS, WSAEINVAL, WSAEMFILE}; #[cfg(windows)] pub use consts::os::extra::{ERROR_INSUFFICIENT_BUFFER}; #[cfg(windows)] pub use consts::os::extra::{O_BINARY, O_NOINHERIT, PAGE_NOACCESS}; #[cfg(windows)] pub use consts::os::extra::{PAGE_READONLY, PAGE_READWRITE, PAGE_EXECUTE}; diff --git a/src/libnative/io/file_unix.rs b/src/libnative/io/file_unix.rs index 6472356ea16..93938e3d5b8 100644 --- a/src/libnative/io/file_unix.rs +++ b/src/libnative/io/file_unix.rs @@ -525,9 +525,9 @@ mod tests { fn test_file_desc() { // Run this test with some pipes so we don't have to mess around with // opening or closing files. - let os::Pipe { input, out } = os::pipe(); - let mut reader = FileDesc::new(input, true); - let mut writer = FileDesc::new(out, true); + let os::Pipe { reader, writer } = unsafe { os::pipe().unwrap() }; + let mut reader = FileDesc::new(reader, true); + let mut writer = FileDesc::new(writer, true); writer.inner_write(bytes!("test")).ok().unwrap(); let mut buf = [0u8, ..4]; diff --git a/src/libnative/io/helper_thread.rs b/src/libnative/io/helper_thread.rs index 443c82c6a54..d18e92866bf 100644 --- a/src/libnative/io/helper_thread.rs +++ b/src/libnative/io/helper_thread.rs @@ -158,8 +158,8 @@ mod imp { pub type signal = libc::c_int; pub fn new() -> (signal, signal) { - let pipe = os::pipe(); - (pipe.input, pipe.out) + let os::Pipe { reader, writer } = unsafe { os::pipe().unwrap() }; + (reader, writer) } pub fn signal(fd: libc::c_int) { diff --git a/src/libnative/io/process.rs b/src/libnative/io/process.rs index f26d87ba1b5..c421dada205 100644 --- a/src/libnative/io/process.rs +++ b/src/libnative/io/process.rs @@ -10,12 +10,13 @@ use libc::{pid_t, c_void, c_int}; use libc; +use std::c_str::CString; +use std::io; use std::mem; use std::os; use std::ptr; -use std::rt::rtio; use std::rt::rtio::{ProcessConfig, IoResult, IoError}; -use std::c_str::CString; +use std::rt::rtio; use super::file; use super::util; @@ -73,47 +74,43 @@ impl Process { fn get_io(io: rtio::StdioContainer, ret: &mut Vec>) - -> (Option, c_int) + -> IoResult> { match io { - rtio::Ignored => { ret.push(None); (None, -1) } - rtio::InheritFd(fd) => { ret.push(None); (None, fd) } + rtio::Ignored => { ret.push(None); Ok(None) } + rtio::InheritFd(fd) => { + ret.push(None); + Ok(Some(file::FileDesc::new(fd, true))) + } rtio::CreatePipe(readable, _writable) => { - let pipe = os::pipe(); + let (reader, writer) = try!(pipe()); let (theirs, ours) = if readable { - (pipe.input, pipe.out) + (reader, writer) } else { - (pipe.out, pipe.input) + (writer, reader) }; - ret.push(Some(file::FileDesc::new(ours, true))); - (Some(pipe), theirs) + ret.push(Some(ours)); + Ok(Some(theirs)) } } } let mut ret_io = Vec::new(); - let (in_pipe, in_fd) = get_io(cfg.stdin, &mut ret_io); - let (out_pipe, out_fd) = get_io(cfg.stdout, &mut ret_io); - let (err_pipe, err_fd) = get_io(cfg.stderr, &mut ret_io); - - let res = spawn_process_os(cfg, in_fd, out_fd, err_fd); - - unsafe { - for pipe in in_pipe.iter() { let _ = libc::close(pipe.input); } - for pipe in out_pipe.iter() { let _ = libc::close(pipe.out); } - for pipe in err_pipe.iter() { let _ = libc::close(pipe.out); } - } + let res = spawn_process_os(cfg, + try!(get_io(cfg.stdin, &mut ret_io)), + try!(get_io(cfg.stdout, &mut ret_io)), + try!(get_io(cfg.stderr, &mut ret_io))); match res { Ok(res) => { - Ok((Process { - pid: res.pid, - handle: res.handle, - exit_code: None, - exit_signal: None, - deadline: 0, - }, - ret_io)) + let p = Process { + pid: res.pid, + handle: res.handle, + exit_code: None, + exit_signal: None, + deadline: 0, + }; + Ok((p, ret_io)) } Err(e) => Err(e) } @@ -194,6 +191,37 @@ impl Drop for Process { } } +fn pipe() -> IoResult<(file::FileDesc, file::FileDesc)> { + #[cfg(unix)] use ERROR = libc::EMFILE; + #[cfg(windows)] use ERROR = libc::WSAEMFILE; + struct Closer { fd: libc::c_int } + + let os::Pipe { reader, writer } = match unsafe { os::pipe() } { + Ok(p) => p, + Err(io::IoError { detail, .. }) => return Err(IoError { + code: ERROR as uint, + extra: 0, + detail: detail, + }) + }; + let mut reader = Closer { fd: reader }; + let mut writer = Closer { fd: writer }; + + let native_reader = file::FileDesc::new(reader.fd, true); + reader.fd = -1; + let native_writer = file::FileDesc::new(writer.fd, true); + writer.fd = -1; + return Ok((native_reader, native_writer)); + + impl Drop for Closer { + fn drop(&mut self) { + if self.fd != -1 { + let _ = unsafe { libc::close(self.fd) }; + } + } + } +} + #[cfg(windows)] unsafe fn killpid(pid: pid_t, signal: int) -> IoResult<()> { let handle = libc::OpenProcess(libc::PROCESS_TERMINATE | @@ -246,7 +274,9 @@ struct SpawnProcessResult { #[cfg(windows)] fn spawn_process_os(cfg: ProcessConfig, - in_fd: c_int, out_fd: c_int, err_fd: c_int) + in_fd: Option, + out_fd: Option, + err_fd: Option) -> IoResult { use libc::types::os::arch::extra::{DWORD, HANDLE, STARTUPINFO}; use libc::consts::os::extra::{ @@ -283,47 +313,51 @@ fn spawn_process_os(cfg: ProcessConfig, // Similarly to unix, we don't actually leave holes for the stdio file // descriptors, but rather open up /dev/null equivalents. These // equivalents are drawn from libuv's windows process spawning. - let set_fd = |fd: c_int, slot: &mut HANDLE, is_stdin: bool| { - if fd == -1 { - let access = if is_stdin { - libc::FILE_GENERIC_READ - } else { - libc::FILE_GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES - }; - let size = mem::size_of::(); - let mut sa = libc::SECURITY_ATTRIBUTES { - nLength: size as libc::DWORD, - lpSecurityDescriptor: ptr::mut_null(), - bInheritHandle: 1, - }; - let filename = "NUL".to_utf16().append_one(0); - *slot = libc::CreateFileW(filename.as_ptr(), - access, - libc::FILE_SHARE_READ | - libc::FILE_SHARE_WRITE, - &mut sa, - libc::OPEN_EXISTING, - 0, - ptr::mut_null()); - if *slot == INVALID_HANDLE_VALUE as libc::HANDLE { - return Err(super::last_error()) + let set_fd = |fd: &Option, slot: &mut HANDLE, + is_stdin: bool| { + match *fd { + None => { + let access = if is_stdin { + libc::FILE_GENERIC_READ + } else { + libc::FILE_GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES + }; + let size = mem::size_of::(); + let mut sa = libc::SECURITY_ATTRIBUTES { + nLength: size as libc::DWORD, + lpSecurityDescriptor: ptr::mut_null(), + bInheritHandle: 1, + }; + let filename = "NUL".to_utf16().append_one(0); + *slot = libc::CreateFileW(filename.as_ptr(), + access, + libc::FILE_SHARE_READ | + libc::FILE_SHARE_WRITE, + &mut sa, + libc::OPEN_EXISTING, + 0, + ptr::mut_null()); + if *slot == INVALID_HANDLE_VALUE as libc::HANDLE { + return Err(super::last_error()) + } } - } else { - let orig = get_osfhandle(fd) as HANDLE; - if orig == INVALID_HANDLE_VALUE as HANDLE { - return Err(super::last_error()) - } - if DuplicateHandle(cur_proc, orig, cur_proc, slot, - 0, TRUE, DUPLICATE_SAME_ACCESS) == FALSE { - return Err(super::last_error()) + Some(ref fd) => { + let orig = get_osfhandle(fd.fd()) as HANDLE; + if orig == INVALID_HANDLE_VALUE as HANDLE { + return Err(super::last_error()) + } + if DuplicateHandle(cur_proc, orig, cur_proc, slot, + 0, TRUE, DUPLICATE_SAME_ACCESS) == FALSE { + return Err(super::last_error()) + } } } Ok(()) }; - try!(set_fd(in_fd, &mut si.hStdInput, true)); - try!(set_fd(out_fd, &mut si.hStdOutput, false)); - try!(set_fd(err_fd, &mut si.hStdError, false)); + try!(set_fd(&in_fd, &mut si.hStdInput, true)); + try!(set_fd(&out_fd, &mut si.hStdOutput, false)); + try!(set_fd(&err_fd, &mut si.hStdError, false)); let cmd_str = make_command_line(cfg.program, cfg.args); let mut pi = zeroed_process_information(); @@ -464,7 +498,10 @@ fn make_command_line(prog: &CString, args: &[CString]) -> String { } #[cfg(unix)] -fn spawn_process_os(cfg: ProcessConfig, in_fd: c_int, out_fd: c_int, err_fd: c_int) +fn spawn_process_os(cfg: ProcessConfig, + in_fd: Option, + out_fd: Option, + err_fd: Option) -> IoResult { use libc::funcs::posix88::unistd::{fork, dup2, close, chdir, execvp}; @@ -498,9 +535,7 @@ fn spawn_process_os(cfg: ProcessConfig, in_fd: c_int, out_fd: c_int, err_fd: c_i with_envp(cfg.env, proc(envp) { with_argv(cfg.program, cfg.args, proc(argv) unsafe { - let pipe = os::pipe(); - let mut input = file::FileDesc::new(pipe.input, true); - let mut output = file::FileDesc::new(pipe.out, true); + let (mut input, mut output) = try!(pipe()); // We may use this in the child, so perform allocations before the // fork @@ -510,7 +545,7 @@ fn spawn_process_os(cfg: ProcessConfig, in_fd: c_int, out_fd: c_int, err_fd: c_i let pid = fork(); if pid < 0 { - fail!("failure in fork: {}", os::last_os_error()); + return Err(super::last_error()) } else if pid > 0 { drop(output); let mut bytes = [0, ..4]; @@ -586,16 +621,24 @@ fn spawn_process_os(cfg: ProcessConfig, in_fd: c_int, out_fd: c_int, err_fd: c_i // up /dev/null into that file descriptor. Otherwise, the first file // descriptor opened up in the child would be numbered as one of the // stdio file descriptors, which is likely to wreak havoc. - let setup = |src: c_int, dst: c_int| { - let src = if src == -1 { - let flags = if dst == libc::STDIN_FILENO { - libc::O_RDONLY - } else { - libc::O_RDWR - }; - devnull.with_ref(|p| libc::open(p, flags, 0)) - } else { - src + let setup = |src: Option, dst: c_int| { + let src = match src { + None => { + let flags = if dst == libc::STDIN_FILENO { + libc::O_RDONLY + } else { + libc::O_RDWR + }; + devnull.with_ref(|p| libc::open(p, flags, 0)) + } + Some(obj) => { + let fd = obj.fd(); + // Leak the memory and the file descriptor. We're in the + // child now an all our resources are going to be + // cleaned up very soon + mem::forget(obj); + fd + } }; src != -1 && retry(|| dup2(src, dst)) != -1 }; diff --git a/src/libstd/io/pipe.rs b/src/libstd/io/pipe.rs index 6e2009545aa..84d388c1136 100644 --- a/src/libstd/io/pipe.rs +++ b/src/libstd/io/pipe.rs @@ -16,8 +16,10 @@ #![allow(missing_doc)] use prelude::*; + use io::{IoResult, IoError}; use libc; +use os; use owned::Box; use rt::rtio::{RtioPipe, LocalIo}; @@ -27,6 +29,11 @@ pub struct PipeStream { obj: Box, } +pub struct PipePair { + pub reader: PipeStream, + pub writer: PipeStream, +} + impl PipeStream { /// Consumes a file descriptor to return a pipe stream that will have /// synchronous, but non-blocking reads/writes. This is useful if the file @@ -58,6 +65,38 @@ impl PipeStream { pub fn new(inner: Box) -> PipeStream { PipeStream { obj: inner } } + + /// Creates a pair of in-memory OS pipes for a unidirectional communication + /// stream. + /// + /// The structure returned contains a reader and writer I/O object. Data + /// written to the writer can be read from the reader. + /// + /// # Errors + /// + /// This function can fail to succeed if the underlying OS has run out of + /// available resources to allocate a new pipe. + pub fn pair() -> IoResult { + struct Closer { fd: libc::c_int } + + let os::Pipe { reader, writer } = try!(unsafe { os::pipe() }); + let mut reader = Closer { fd: reader }; + let mut writer = Closer { fd: writer }; + + let io_reader = try!(PipeStream::open(reader.fd)); + reader.fd = -1; + let io_writer = try!(PipeStream::open(writer.fd)); + writer.fd = -1; + return Ok(PipePair { reader: io_reader, writer: io_writer }); + + impl Drop for Closer { + fn drop(&mut self) { + if self.fd != -1 { + let _ = unsafe { libc::close(self.fd) }; + } + } + } + } } impl Clone for PipeStream { @@ -84,9 +123,9 @@ mod test { use os; use io::pipe::PipeStream; - let os::Pipe { input, out } = os::pipe(); - let out = PipeStream::open(out); - let mut input = PipeStream::open(input); + let os::Pipe { reader, writer } = unsafe { os::pipe().unwrap() }; + let out = PipeStream::open(writer); + let mut input = PipeStream::open(reader); let (tx, rx) = channel(); spawn(proc() { let mut out = out; diff --git a/src/libstd/os.rs b/src/libstd/os.rs index f6b1c04dd34..0747e7ccbe3 100644 --- a/src/libstd/os.rs +++ b/src/libstd/os.rs @@ -32,6 +32,7 @@ use clone::Clone; use collections::Collection; use fmt; +use io::{IoResult, IoError}; use iter::Iterator; use libc::{c_void, c_int}; use libc; @@ -513,40 +514,50 @@ pub fn split_paths(unparsed: T) -> Vec { pub struct Pipe { /// A file descriptor representing the reading end of the pipe. Data written /// on the `out` file descriptor can be read from this file descriptor. - pub input: c_int, + pub reader: c_int, /// A file descriptor representing the write end of the pipe. Data written /// to this file descriptor can be read from the `input` file descriptor. - pub out: c_int, + pub writer: c_int, } -/// Creates a new low-level OS in-memory pipe represented as a Pipe struct. -#[cfg(unix)] -pub fn pipe() -> Pipe { - unsafe { - let mut fds = Pipe {input: 0, - out: 0}; - assert_eq!(libc::pipe(&mut fds.input), 0); - return Pipe {input: fds.input, out: fds.out}; +/// Creates a new low-level OS in-memory pipe. +/// +/// This function can fail to succeed if there are no more resources available +/// to allocate a pipe. +/// +/// This function is also unsafe as there is no destructor associated with the +/// `Pipe` structure will return. If it is not arranged for the returned file +/// descriptors to be closed, the file descriptors will leak. For safe handling +/// of this scenario, use `std::io::PipeStream` instead. +pub unsafe fn pipe() -> IoResult { + return _pipe(); + + #[cfg(unix)] + unsafe fn _pipe() -> IoResult { + let mut fds = [0, ..2]; + match libc::pipe(fds.as_mut_ptr()) { + 0 => Ok(Pipe { reader: fds[0], writer: fds[1] }), + _ => Err(IoError::last_error()), + } } -} -/// Creates a new low-level OS in-memory pipe represented as a Pipe struct. -#[cfg(windows)] -pub fn pipe() -> Pipe { - unsafe { + #[cfg(windows)] + unsafe fn _pipe() -> IoResult { // Windows pipes work subtly differently than unix pipes, and their // inheritance has to be handled in a different way that I do not // fully understand. Here we explicitly make the pipe non-inheritable, // which means to pass it to a subprocess they need to be duplicated // first, as in std::run. - let mut fds = Pipe {input: 0, - out: 0}; - let res = libc::pipe(&mut fds.input, 1024 as ::libc::c_uint, - (libc::O_BINARY | libc::O_NOINHERIT) as c_int); - assert_eq!(res, 0); - assert!((fds.input != -1 && fds.input != 0 )); - assert!((fds.out != -1 && fds.input != 0)); - return Pipe {input: fds.input, out: fds.out}; + let mut fds = [0, ..2]; + match libc::pipe(fds.as_mut_ptr(), 1024 as ::libc::c_uint, + (libc::O_BINARY | libc::O_NOINHERIT) as c_int) { + 0 => { + assert!(fds[0] != -1 && fds[0] != 0); + assert!(fds[1] != -1 && fds[1] != 0); + Ok(Pipe { reader: fds[0], writer: fds[1] }) + } + _ => Err(IoError::last_error()), + } } } diff --git a/src/test/run-pass/sigpipe-should-be-ignored.rs b/src/test/run-pass/sigpipe-should-be-ignored.rs index 8e2cfa30066..8c68ef173a5 100644 --- a/src/test/run-pass/sigpipe-should-be-ignored.rs +++ b/src/test/run-pass/sigpipe-should-be-ignored.rs @@ -16,12 +16,12 @@ use std::io::PipeStream; use std::io::Command; fn test() { - let os::Pipe { input, out } = os::pipe(); - let input = PipeStream::open(input); - let mut out = PipeStream::open(out); - drop(input); + let os::Pipe { reader, writer } = unsafe { os::pipe().unwrap() }; + let reader = PipeStream::open(reader); + let mut writer = PipeStream::open(writer); + drop(reader); - let _ = out.write([1]); + let _ = writer.write([1]); } fn main() {