From 3c5a43e5b6f857c71c6f66dbbd640a47d34a0e7e Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Thu, 29 Aug 2013 14:20:48 -0700 Subject: [PATCH] Revert "auto merge of #8645 : alexcrichton/rust/issue-6436-run-non-blocking, r=brson" This reverts commit b8d1fa399402c71331aefd634d710004e00b73a6, reversing changes made to f22b4b169854c8a4ba86c16ee43327d6bcf94562. Conflicts: mk/rt.mk src/libuv --- .gitmodules | 2 +- mk/rt.mk | 46 +- src/compiletest/procsrv.rs | 4 +- src/compiletest/runtest.rs | 29 +- src/librustdoc/markdown_writer.rs | 8 +- src/librustpkg/source_control.rs | 2 +- src/librustpkg/tests.rs | 14 +- src/libstd/rt/io/file.rs | 3 + src/libstd/rt/io/mod.rs | 3 - src/libstd/rt/io/net/tcp.rs | 8 +- src/libstd/rt/io/pipe.rs | 77 -- src/libstd/rt/rtio.rs | 22 +- src/libstd/rt/uv/async.rs | 2 +- src/libstd/rt/uv/file.rs | 25 +- src/libstd/rt/uv/idle.rs | 4 +- src/libstd/rt/uv/mod.rs | 65 +- src/libstd/rt/uv/net.rs | 17 +- src/libstd/rt/uv/pipe.rs | 66 -- src/libstd/rt/uv/process.rs | 264 ------ src/libstd/rt/uv/timer.rs | 2 +- src/libstd/rt/uv/uvio.rs | 294 ++----- src/libstd/rt/uv/uvll.rs | 162 +--- src/libstd/run.rs | 1107 ++++++++++++++++++++----- src/libuv | 2 +- src/rt/rust_uv.cpp | 45 +- src/rt/rustrt.def.in | 10 +- src/test/run-pass/core-run-destroy.rs | 9 +- 27 files changed, 1130 insertions(+), 1162 deletions(-) delete mode 100644 src/libstd/rt/io/pipe.rs delete mode 100644 src/libstd/rt/uv/pipe.rs delete mode 100644 src/libstd/rt/uv/process.rs diff --git a/.gitmodules b/.gitmodules index fa979b6d868..88ead6e608d 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,5 +4,5 @@ branch = master [submodule "src/libuv"] path = src/libuv - url = https://github.com/alexcrichton/libuv.git + url = https://github.com/brson/libuv.git branch = master diff --git a/mk/rt.mk b/mk/rt.mk index ed877ee818e..c260945cbc9 100644 --- a/mk/rt.mk +++ b/mk/rt.mk @@ -168,58 +168,36 @@ LIBUV_DEPS := $$(wildcard \ $$(S)src/libuv/*/*/*/*) endif -LIBUV_GYP := $$(S)src/libuv/build/gyp -LIBUV_MAKEFILE_$(1)_$(2) := $$(CFG_BUILD_DIR)rt/$(1)/stage$(2)/libuv/Makefile -LIBUV_NO_LOAD = run-benchmarks.target.mk run-tests.target.mk \ - uv_dtrace_header.target.mk uv_dtrace_provider.target.mk - -ifeq ($(OSTYPE_$(1)), linux-androideabi) -$$(LIBUV_MAKEFILE_$(1)_$(2)): $$(LIBUV_GYP) - (cd $(S)src/libuv/ && \ - $$(CFG_PYTHON) ./gyp_uv -f make -Dtarget_arch=$$(LIBUV_ARCH_$(1)) -D ninja -DOS=android \ - -Goutput_dir=$$(@D) --generator-output $$(@D)) -else -$$(LIBUV_MAKEFILE_$(1)_$(2)): $$(LIBUV_GYP) - (cd $(S)src/libuv/ && \ - $$(CFG_PYTHON) ./gyp_uv -f make -Dtarget_arch=$$(LIBUV_ARCH_$(1)) -D ninja \ - -Goutput_dir=$$(@D) --generator-output $$(@D)) -endif - # XXX: Shouldn't need platform-specific conditions here ifdef CFG_WINDOWSY_$(1) $$(LIBUV_LIB_$(1)_$(2)): $$(LIBUV_DEPS) - $$(Q)rm -f $$(S)src/libuv/libuv.a - $$(Q)$$(MAKE) -C $$(S)src/libuv -f Makefile.mingw \ - CFLAGS="$$(CFG_GCCISH_CFLAGS) $$(LIBUV_FLAGS_$$(HOST_$(1))) $$(SNAP_DEFINES)" \ - AR="$$(AR_$(1))" \ + $$(Q)$$(MAKE) -C $$(S)src/libuv/ \ + builddir_name="$$(CFG_BUILD_DIR)/rt/$(1)/stage$(2)/libuv" \ + OS=mingw \ V=$$(VERBOSE) - $$(Q)cp $$(S)src/libuv/libuv.a $$@ else ifeq ($(OSTYPE_$(1)), linux-androideabi) -$$(LIBUV_LIB_$(1)_$(2)): $$(LIBUV_DEPS) $$(LIBUV_MAKEFILE_$(1)_$(2)) - $$(Q)$$(MAKE) -C $$(@D) \ +$$(LIBUV_LIB_$(1)_$(2)): $$(LIBUV_DEPS) + $$(Q)$$(MAKE) -C $$(S)src/libuv/ \ CFLAGS="$$(CFG_GCCISH_CFLAGS) $$(LIBUV_FLAGS_$$(HOST_$(1))) $$(SNAP_DEFINES)" \ LDFLAGS="$$(CFG_GCCISH_LINK_FLAGS) $$(LIBUV_FLAGS_$$(HOST_$(1)))" \ CC="$$(CC_$(1))" \ CXX="$$(CXX_$(1))" \ LINK="$$(CXX_$(1))" \ AR="$$(AR_$(1))" \ - host=android OS=linux \ PLATFORM=android \ - builddir="." \ BUILDTYPE=Release \ - NO_LOAD="$$(LIBUV_NO_LOAD)" \ + builddir_name="$$(CFG_BUILD_DIR)/rt/$(1)/stage$(2)/libuv" \ + host=android OS=linux \ V=$$(VERBOSE) else -$$(LIBUV_LIB_$(1)_$(2)): $$(LIBUV_DEPS) $$(LIBUV_MAKEFILE_$(1)_$(2)) - $$(Q)$$(MAKE) -C $$(@D) \ +$$(LIBUV_LIB_$(1)_$(2)): $$(LIBUV_DEPS) + $$(Q)$$(MAKE) -C $$(S)src/libuv/ \ CFLAGS="$$(CFG_GCCISH_CFLAGS) $$(LIBUV_FLAGS_$$(HOST_$(1))) $$(SNAP_DEFINES)" \ LDFLAGS="$$(CFG_GCCISH_LINK_FLAGS) $$(LIBUV_FLAGS_$$(HOST_$(1)))" \ CC="$$(CC_$(1))" \ CXX="$$(CXX_$(1))" \ AR="$$(AR_$(1))" \ - builddir="." \ - BUILDTYPE=Release \ - NO_LOAD="$$(LIBUV_NO_LOAD)" \ + builddir_name="$$(CFG_BUILD_DIR)/rt/$(1)/stage$(2)/libuv" \ V=$$(VERBOSE) endif @@ -283,7 +261,3 @@ endef $(foreach stage,$(STAGES), \ $(foreach target,$(CFG_TARGET_TRIPLES), \ $(eval $(call DEF_RUNTIME_TARGETS,$(target),$(stage))))) - -$(LIBUV_GYP): - mkdir -p $(S)src/libuv/build - git clone https://git.chromium.org/external/gyp.git $(S)src/libuv/build/gyp diff --git a/src/compiletest/procsrv.rs b/src/compiletest/procsrv.rs index 74627829c60..45e4f756d7a 100644 --- a/src/compiletest/procsrv.rs +++ b/src/compiletest/procsrv.rs @@ -54,10 +54,10 @@ pub fn run(lib_path: &str, in_fd: None, out_fd: None, err_fd: None - }).unwrap(); + }); for input in input.iter() { - proc.input().write(input.as_bytes()); + proc.input().write_str(*input); } let output = proc.finish_with_output(); diff --git a/src/compiletest/runtest.rs b/src/compiletest/runtest.rs index 16de4f8e822..05fd621e597 100644 --- a/src/compiletest/runtest.rs +++ b/src/compiletest/runtest.rs @@ -20,16 +20,41 @@ use procsrv; use util; use util::logv; +use std::cell::Cell; use std::io; use std::os; use std::str; +use std::task::{spawn_sched, SingleThreaded}; use std::vec; +use std::unstable::running_on_valgrind; use extra::test::MetricMap; pub fn run(config: config, testfile: ~str) { - let mut _mm = MetricMap::new(); - run_metrics(config, testfile, &mut _mm); + let config = Cell::new(config); + let testfile = Cell::new(testfile); + // FIXME #6436: Creating another thread to run the test because this + // is going to call waitpid. The new scheduler has some strange + // interaction between the blocking tasks and 'friend' schedulers + // that destroys parallelism if we let normal schedulers block. + // It should be possible to remove this spawn once std::run is + // rewritten to be non-blocking. + // + // We do _not_ create another thread if we're running on V because + // it serializes all threads anyways. + if running_on_valgrind() { + let config = config.take(); + let testfile = testfile.take(); + let mut _mm = MetricMap::new(); + run_metrics(config, testfile, &mut _mm); + } else { + do spawn_sched(SingleThreaded) { + let config = config.take(); + let testfile = testfile.take(); + let mut _mm = MetricMap::new(); + run_metrics(config, testfile, &mut _mm); + } + } } pub fn run_metrics(config: config, testfile: ~str, mm: &mut MetricMap) { diff --git a/src/librustdoc/markdown_writer.rs b/src/librustdoc/markdown_writer.rs index 635d02196fe..c13e85ea716 100644 --- a/src/librustdoc/markdown_writer.rs +++ b/src/librustdoc/markdown_writer.rs @@ -104,14 +104,14 @@ fn pandoc_writer( ]; do generic_writer |markdown| { + use std::io::WriterUtil; + debug!("pandoc cmd: %s", pandoc_cmd); debug!("pandoc args: %s", pandoc_args.connect(" ")); - let proc = run::Process::new(pandoc_cmd, pandoc_args, - run::ProcessOptions::new()); - let mut proc = proc.unwrap(); + let mut proc = run::Process::new(pandoc_cmd, pandoc_args, run::ProcessOptions::new()); - proc.input().write(markdown.as_bytes()); + proc.input().write_str(markdown); let output = proc.finish_with_output(); debug!("pandoc result: %i", output.status); diff --git a/src/librustpkg/source_control.rs b/src/librustpkg/source_control.rs index c67a8158139..caa004a53b2 100644 --- a/src/librustpkg/source_control.rs +++ b/src/librustpkg/source_control.rs @@ -89,7 +89,7 @@ pub fn git_clone_general(source: &str, target: &Path, v: &Version) -> bool { fn process_output_in_cwd(prog: &str, args: &[~str], cwd: &Path) -> ProcessOutput { let mut prog = Process::new(prog, args, ProcessOptions{ dir: Some(cwd) - ,..ProcessOptions::new()}).unwrap(); + ,..ProcessOptions::new()}); prog.finish_with_output() } diff --git a/src/librustpkg/tests.rs b/src/librustpkg/tests.rs index b0d996ea0af..98999da41c8 100644 --- a/src/librustpkg/tests.rs +++ b/src/librustpkg/tests.rs @@ -112,14 +112,13 @@ fn mk_temp_workspace(short_name: &Path, version: &Version) -> Path { fn run_git(args: &[~str], env: Option<~[(~str, ~str)]>, cwd: &Path, err_msg: &str) { let cwd = (*cwd).clone(); - let prog = run::Process::new("git", args, run::ProcessOptions { + let mut prog = run::Process::new("git", args, run::ProcessOptions { env: env, dir: Some(&cwd), in_fd: None, out_fd: None, err_fd: None }); - let mut prog = prog.unwrap(); let rslt = prog.finish_with_output(); if rslt.status != 0 { fail!("%s [git returned %?, output = %s, error = %s]", err_msg, @@ -227,7 +226,7 @@ fn command_line_test_with_env(args: &[~str], cwd: &Path, env: Option<~[(~str, ~s in_fd: None, out_fd: None, err_fd: None - }).unwrap(); + }); let output = prog.finish_with_output(); debug!("Output from command %s with args %? was %s {%s}[%?]", cmd, args, str::from_bytes(output.output), @@ -1028,17 +1027,16 @@ fn test_extern_mod() { test_sysroot().to_str(), exec_file.to_str()); - let prog = run::Process::new(rustc.to_str(), [main_file.to_str(), - ~"--sysroot", test_sysroot().to_str(), - ~"-o", exec_file.to_str()], - run::ProcessOptions { + let mut prog = run::Process::new(rustc.to_str(), [main_file.to_str(), + ~"--sysroot", test_sysroot().to_str(), + ~"-o", exec_file.to_str()], + run::ProcessOptions { env: env, dir: Some(&dir), in_fd: None, out_fd: None, err_fd: None }); - let mut prog = prog.unwrap(); let outp = prog.finish_with_output(); if outp.status != 0 { fail!("output was %s, error was %s", diff --git a/src/libstd/rt/io/file.rs b/src/libstd/rt/io/file.rs index 534e308a1a6..f4e9c4d7c11 100644 --- a/src/libstd/rt/io/file.rs +++ b/src/libstd/rt/io/file.rs @@ -71,6 +71,9 @@ pub struct FileStream { last_nread: int, } +impl FileStream { +} + impl Reader for FileStream { fn read(&mut self, buf: &mut [u8]) -> Option { match self.fd.read(buf) { diff --git a/src/libstd/rt/io/mod.rs b/src/libstd/rt/io/mod.rs index 038fca9a1ad..116d240308a 100644 --- a/src/libstd/rt/io/mod.rs +++ b/src/libstd/rt/io/mod.rs @@ -268,9 +268,6 @@ pub use self::extensions::WriterByteConversions; /// Synchronous, non-blocking file I/O. pub mod file; -/// Synchronous, in-memory I/O. -pub mod pipe; - /// Synchronous, non-blocking network I/O. pub mod net { pub mod tcp; diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index dc7135f4a61..9be5540de48 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -16,7 +16,7 @@ use rt::io::{io_error, read_error, EndOfFile}; use rt::rtio::{IoFactory, IoFactoryObject, RtioSocket, RtioTcpListener, RtioTcpListenerObject, RtioTcpStream, - RtioTcpStreamObject, RtioStream}; + RtioTcpStreamObject}; use rt::local::Local; pub struct TcpStream(~RtioTcpStreamObject); @@ -69,7 +69,7 @@ impl TcpStream { impl Reader for TcpStream { fn read(&mut self, buf: &mut [u8]) -> Option { - match (***self).read(buf) { + match (**self).read(buf) { Ok(read) => Some(read), Err(ioerr) => { // EOF is indicated by returning None @@ -86,7 +86,7 @@ impl Reader for TcpStream { impl Writer for TcpStream { fn write(&mut self, buf: &[u8]) { - match (***self).write(buf) { + match (**self).write(buf) { Ok(_) => (), Err(ioerr) => io_error::cond.raise(ioerr), } @@ -166,7 +166,7 @@ mod test { do run_in_newsched_task { let mut called = false; do io_error::cond.trap(|e| { - assert_eq!(e.kind, ConnectionRefused); + assert!(e.kind == ConnectionRefused); called = true; }).inside { let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 }; diff --git a/src/libstd/rt/io/pipe.rs b/src/libstd/rt/io/pipe.rs deleted file mode 100644 index 02b3d0fe57d..00000000000 --- a/src/libstd/rt/io/pipe.rs +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Synchronous, in-memory pipes. -//! -//! Currently these aren't particularly useful, there only exists bindings -//! enough so that pipes can be created to child processes. - -use prelude::*; -use super::{Reader, Writer}; -use rt::io::{io_error, read_error, EndOfFile}; -use rt::local::Local; -use rt::rtio::{RtioPipeObject, RtioStream, IoFactoryObject, IoFactory}; -use rt::uv::pipe; - -pub struct PipeStream(~RtioPipeObject); - -impl PipeStream { - /// Creates a new pipe initialized, but not bound to any particular - /// source/destination - pub fn new() -> Option { - let pipe = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).pipe_init(false) - }; - match pipe { - Ok(p) => Some(PipeStream(p)), - Err(ioerr) => { - io_error::cond.raise(ioerr); - None - } - } - } - - /// Extracts the underlying libuv pipe to be bound to another source. - pub fn uv_pipe(&self) -> pipe::Pipe { - // Did someone say multiple layers of indirection? - (**self).uv_pipe() - } -} - -impl Reader for PipeStream { - fn read(&mut self, buf: &mut [u8]) -> Option { - match (***self).read(buf) { - Ok(read) => Some(read), - Err(ioerr) => { - // EOF is indicated by returning None - if ioerr.kind != EndOfFile { - read_error::cond.raise(ioerr); - } - return None; - } - } - } - - fn eof(&mut self) -> bool { fail!() } -} - -impl Writer for PipeStream { - fn write(&mut self, buf: &[u8]) { - match (***self).write(buf) { - Ok(_) => (), - Err(ioerr) => { - io_error::cond.raise(ioerr); - } - } - } - - fn flush(&mut self) { fail!() } -} diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 1a7ef6ea309..1788b7a04e3 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -8,14 +8,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use libc; use option::*; use result::*; use libc::c_int; use rt::io::IoError; use super::io::net::ip::{IpAddr, SocketAddr}; -use rt::uv; use rt::uv::uvio; use path::Path; use super::io::support::PathLike; @@ -32,9 +30,6 @@ pub type RtioTcpListenerObject = uvio::UvTcpListener; pub type RtioUdpSocketObject = uvio::UvUdpSocket; pub type RtioTimerObject = uvio::UvTimer; pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback; -pub type RtioPipeObject = uvio::UvPipeStream; -pub type RtioProcessObject = uvio::UvProcess; -pub type RtioProcessConfig<'self> = uv::process::Config<'self>; pub trait EventLoop { fn run(&mut self); @@ -77,13 +72,6 @@ pub trait IoFactory { fn fs_open(&mut self, path: &P, fm: FileMode, fa: FileAccess) -> Result<~RtioFileStream, IoError>; fn fs_unlink(&mut self, path: &P) -> Result<(), IoError>; - fn pipe_init(&mut self, ipc: bool) -> Result<~RtioPipeObject, IoError>; - fn spawn(&mut self, config: &RtioProcessConfig) -> Result<~RtioProcessObject, IoError>; -} - -pub trait RtioStream { - fn read(&mut self, buf: &mut [u8]) -> Result; - fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; } pub trait RtioTcpListener : RtioSocket { @@ -92,7 +80,9 @@ pub trait RtioTcpListener : RtioSocket { fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>; } -pub trait RtioTcpStream : RtioSocket + RtioStream { +pub trait RtioTcpStream : RtioSocket { + fn read(&mut self, buf: &mut [u8]) -> Result; + fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; fn peer_name(&mut self) -> Result; fn control_congestion(&mut self) -> Result<(), IoError>; fn nodelay(&mut self) -> Result<(), IoError>; @@ -134,9 +124,3 @@ pub trait RtioFileStream { fn tell(&self) -> Result; fn flush(&mut self) -> Result<(), IoError>; } - -pub trait RtioProcess { - fn id(&self) -> libc::pid_t; - fn kill(&mut self, signal: int) -> Result<(), IoError>; - fn wait(&mut self) -> int; -} diff --git a/src/libstd/rt/uv/async.rs b/src/libstd/rt/uv/async.rs index ff7bb9dd03a..d0ca38317cb 100644 --- a/src/libstd/rt/uv/async.rs +++ b/src/libstd/rt/uv/async.rs @@ -34,7 +34,7 @@ impl AsyncWatcher { extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle); - let status = status_to_maybe_uv_error(status); + let status = status_to_maybe_uv_error(watcher, status); let data = watcher.get_watcher_data(); let cb = data.async_cb.get_ref(); (*cb)(watcher, status); diff --git a/src/libstd/rt/uv/file.rs b/src/libstd/rt/uv/file.rs index 5c77181d7eb..405dfe0a7f0 100644 --- a/src/libstd/rt/uv/file.rs +++ b/src/libstd/rt/uv/file.rs @@ -11,8 +11,8 @@ use prelude::*; use ptr::null; use libc::c_void; -use rt::uv::{Request, NativeHandle, Loop, FsCallback, Buf, UvError}; -use rt::uv::status_to_maybe_uv_error; +use rt::uv::{Request, NativeHandle, Loop, FsCallback, Buf, + status_to_maybe_uv_error_with_loop, UvError}; use rt::uv::uvll; use rt::uv::uvll::*; use super::super::io::support::PathLike; @@ -62,7 +62,7 @@ impl FsRequest { pub fn open_sync(loop_: &Loop, path: &P, flags: int, mode: int) -> Result { let result = FsRequest::open_common(loop_, path, flags, mode, None); - sync_cleanup(result) + sync_cleanup(loop_, result) } fn unlink_common(loop_: &Loop, path: &P, cb: Option) -> int { @@ -83,11 +83,11 @@ impl FsRequest { } pub fn unlink(loop_: &Loop, path: &P, cb: FsCallback) { let result = FsRequest::unlink_common(loop_, path, Some(cb)); - sync_cleanup(result); + sync_cleanup(loop_, result); } pub fn unlink_sync(loop_: &Loop, path: &P) -> Result { let result = FsRequest::unlink_common(loop_, path, None); - sync_cleanup(result) + sync_cleanup(loop_, result) } pub fn install_req_data(&self, cb: Option) { @@ -139,8 +139,9 @@ impl NativeHandle<*uvll::uv_fs_t> for FsRequest { match self { &FsRequest(ptr) => ptr } } } - fn sync_cleanup(result: int) -> Result { - match status_to_maybe_uv_error(result as i32) { + fn sync_cleanup(loop_: &Loop, result: int) + -> Result { + match status_to_maybe_uv_error_with_loop(loop_.native_handle(), result as i32) { Some(err) => Err(err), None => Ok(result) } @@ -183,7 +184,7 @@ impl FileDescriptor { pub fn write_sync(&mut self, loop_: &Loop, buf: Buf, offset: i64) -> Result { let result = self.write_common(loop_, buf, offset, None); - sync_cleanup(result) + sync_cleanup(loop_, result) } fn read_common(&mut self, loop_: &Loop, buf: Buf, @@ -211,7 +212,7 @@ impl FileDescriptor { pub fn read_sync(&mut self, loop_: &Loop, buf: Buf, offset: i64) -> Result { let result = self.read_common(loop_, buf, offset, None); - sync_cleanup(result) + sync_cleanup(loop_, result) } fn close_common(self, loop_: &Loop, cb: Option) -> int { @@ -233,11 +234,12 @@ impl FileDescriptor { } pub fn close_sync(self, loop_: &Loop) -> Result { let result = self.close_common(loop_, None); - sync_cleanup(result) + sync_cleanup(loop_, result) } } extern fn compl_cb(req: *uv_fs_t) { let mut req: FsRequest = NativeHandle::from_native_handle(req); + let loop_ = req.get_loop(); // pull the user cb out of the req data let cb = { let data = req.get_req_data(); @@ -248,7 +250,8 @@ extern fn compl_cb(req: *uv_fs_t) { // in uv_fs_open calls, the result will be the fd in the // case of success, otherwise it's -1 indicating an error let result = req.get_result(); - let status = status_to_maybe_uv_error(result); + let status = status_to_maybe_uv_error_with_loop( + loop_.native_handle(), result); // we have a req and status, call the user cb.. // only giving the user a ref to the FsRequest, as we // have to clean it up, afterwards (and they aren't really diff --git a/src/libstd/rt/uv/idle.rs b/src/libstd/rt/uv/idle.rs index 8cbcd7b77c0..a21146620ca 100644 --- a/src/libstd/rt/uv/idle.rs +++ b/src/libstd/rt/uv/idle.rs @@ -43,7 +43,7 @@ impl IdleWatcher { let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); let data = idle_watcher.get_watcher_data(); let cb: &IdleCallback = data.idle_cb.get_ref(); - let status = status_to_maybe_uv_error(status); + let status = status_to_maybe_uv_error(idle_watcher, status); (*cb)(idle_watcher, status); } } @@ -57,7 +57,7 @@ impl IdleWatcher { let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); let data = idle_watcher.get_watcher_data(); let cb: &IdleCallback = data.idle_cb.get_ref(); - let status = status_to_maybe_uv_error(status); + let status = status_to_maybe_uv_error(idle_watcher, status); (*cb)(idle_watcher, status); } } diff --git a/src/libstd/rt/uv/mod.rs b/src/libstd/rt/uv/mod.rs index 700b80c7398..75b9a5ac553 100644 --- a/src/libstd/rt/uv/mod.rs +++ b/src/libstd/rt/uv/mod.rs @@ -58,8 +58,6 @@ pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher}; pub use self::idle::IdleWatcher; pub use self::timer::TimerWatcher; pub use self::async::AsyncWatcher; -pub use self::process::Process; -pub use self::pipe::Pipe; /// The implementation of `rtio` for libuv pub mod uvio; @@ -72,8 +70,6 @@ pub mod net; pub mod idle; pub mod timer; pub mod async; -pub mod process; -pub mod pipe; /// XXX: Loop(*handle) is buggy with destructors. Normal structs /// with dtors may not be destructured, but tuple structs can, @@ -130,8 +126,6 @@ pub type NullCallback = ~fn(); pub type IdleCallback = ~fn(IdleWatcher, Option); pub type ConnectionCallback = ~fn(StreamWatcher, Option); pub type FsCallback = ~fn(&mut FsRequest, Option); -// first int is exit_status, second is term_signal -pub type ExitCallback = ~fn(Process, int, int, Option); pub type TimerCallback = ~fn(TimerWatcher, Option); pub type AsyncCallback = ~fn(AsyncWatcher, Option); pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option); @@ -149,8 +143,7 @@ struct WatcherData { timer_cb: Option, async_cb: Option, udp_recv_cb: Option, - udp_send_cb: Option, - exit_cb: Option, + udp_send_cb: Option } pub trait WatcherInterop { @@ -182,8 +175,7 @@ impl> WatcherInterop for W { timer_cb: None, async_cb: None, udp_recv_cb: None, - udp_send_cb: None, - exit_cb: None, + udp_send_cb: None }; let data = transmute::<~WatcherData, *c_void>(data); uvll::set_data_for_uv_handle(self.native_handle(), data); @@ -210,12 +202,12 @@ impl> WatcherInterop for W { // XXX: Need to define the error constants like EOF so they can be // compared to the UvError type -pub struct UvError(c_int); +pub struct UvError(uvll::uv_err_t); impl UvError { pub fn name(&self) -> ~str { unsafe { - let inner = match self { &UvError(a) => a }; + let inner = match self { &UvError(ref a) => a }; let name_str = uvll::err_name(inner); assert!(name_str.is_not_null()); from_c_str(name_str) @@ -224,7 +216,7 @@ impl UvError { pub fn desc(&self) -> ~str { unsafe { - let inner = match self { &UvError(a) => a }; + let inner = match self { &UvError(ref a) => a }; let desc_str = uvll::strerror(inner); assert!(desc_str.is_not_null()); from_c_str(desc_str) @@ -232,7 +224,7 @@ impl UvError { } pub fn is_eof(&self) -> bool { - **self == uvll::EOF + self.code == uvll::EOF } } @@ -244,10 +236,18 @@ impl ToStr for UvError { #[test] fn error_smoke_test() { - let err: UvError = UvError(uvll::EOF); + let err = uvll::uv_err_t { code: 1, sys_errno_: 1 }; + let err: UvError = UvError(err); assert_eq!(err.to_str(), ~"EOF: end of file"); } +pub fn last_uv_error>(watcher: &W) -> UvError { + unsafe { + let loop_ = watcher.event_loop(); + UvError(uvll::last_error(loop_.native_handle())) + } +} + pub fn uv_error_to_io_error(uverr: UvError) -> IoError { unsafe { // Importing error constants @@ -255,10 +255,10 @@ pub fn uv_error_to_io_error(uverr: UvError) -> IoError { use rt::io::*; // uv error descriptions are static - let c_desc = uvll::strerror(*uverr); + let c_desc = uvll::strerror(&*uverr); let desc = str::raw::c_str_to_static_slice(c_desc); - let kind = match *uverr { + let kind = match uverr.code { UNKNOWN => OtherIoError, OK => OtherIoError, EOF => EndOfFile, @@ -266,8 +266,8 @@ pub fn uv_error_to_io_error(uverr: UvError) -> IoError { ECONNREFUSED => ConnectionRefused, ECONNRESET => ConnectionReset, EPIPE => BrokenPipe, - err => { - rtdebug!("uverr.code %d", err as int); + _ => { + rtdebug!("uverr.code %u", uverr.code as uint); // XXX: Need to map remaining uv error types OtherIoError } @@ -281,12 +281,31 @@ pub fn uv_error_to_io_error(uverr: UvError) -> IoError { } } -/// Convert a callback status to a UvError -pub fn status_to_maybe_uv_error(status: c_int) -> Option { - if status >= 0 { +/// Given a uv handle, convert a callback status to a UvError +pub fn status_to_maybe_uv_error_with_loop( + loop_: *uvll::uv_loop_t, + status: c_int) -> Option { + if status != -1 { None } else { - Some(UvError(status)) + unsafe { + rtdebug!("loop: %x", loop_ as uint); + let err = uvll::last_error(loop_); + Some(UvError(err)) + } + } +} +/// Given a uv handle, convert a callback status to a UvError +pub fn status_to_maybe_uv_error>(handle: U, + status: c_int) -> Option { + if status != -1 { + None + } else { + unsafe { + rtdebug!("handle: %x", handle.native_handle() as uint); + let loop_ = uvll::get_loop_for_uv_handle(handle.native_handle()); + status_to_maybe_uv_error_with_loop(loop_, status) + } } } diff --git a/src/libstd/rt/uv/net.rs b/src/libstd/rt/uv/net.rs index 1581b017087..e8d0296e543 100644 --- a/src/libstd/rt/uv/net.rs +++ b/src/libstd/rt/uv/net.rs @@ -16,6 +16,7 @@ use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback, status_to_maybe_uv_error}; use rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr}; +use rt::uv::last_uv_error; use vec; use str; use from_str::{FromStr}; @@ -136,7 +137,7 @@ impl StreamWatcher { rtdebug!("buf len: %d", buf.len as int); let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream); let cb = stream_watcher.get_watcher_data().read_cb.get_ref(); - let status = status_to_maybe_uv_error(nread as c_int); + let status = status_to_maybe_uv_error(stream_watcher, nread as c_int); (*cb)(stream_watcher, nread as int, buf, status); } } @@ -166,7 +167,7 @@ impl StreamWatcher { let mut stream_watcher = write_request.stream(); write_request.delete(); let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap(); - let status = status_to_maybe_uv_error(status); + let status = status_to_maybe_uv_error(stream_watcher, status); cb(stream_watcher, status); } } @@ -231,7 +232,7 @@ impl TcpWatcher { }; match result { 0 => Ok(()), - _ => Err(UvError(result)), + _ => Err(last_uv_error(self)), } } } @@ -259,7 +260,7 @@ impl TcpWatcher { let mut stream_watcher = connect_request.stream(); connect_request.delete(); let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap(); - let status = status_to_maybe_uv_error(status); + let status = status_to_maybe_uv_error(stream_watcher, status); cb(stream_watcher, status); } } @@ -282,7 +283,7 @@ impl TcpWatcher { rtdebug!("connection_cb"); let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle); let cb = stream_watcher.get_watcher_data().connect_cb.get_ref(); - let status = status_to_maybe_uv_error(status); + let status = status_to_maybe_uv_error(stream_watcher, status); (*cb)(stream_watcher, status); } } @@ -326,7 +327,7 @@ impl UdpWatcher { }; match result { 0 => Ok(()), - _ => Err(UvError(result)), + _ => Err(last_uv_error(self)), } } } @@ -359,7 +360,7 @@ impl UdpWatcher { rtdebug!("buf len: %d", buf.len as int); let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle); let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref(); - let status = status_to_maybe_uv_error(nread as c_int); + let status = status_to_maybe_uv_error(udp_watcher, nread as c_int); let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr)); (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status); } @@ -394,7 +395,7 @@ impl UdpWatcher { let mut udp_watcher = send_request.handle(); send_request.delete(); let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap(); - let status = status_to_maybe_uv_error(status); + let status = status_to_maybe_uv_error(udp_watcher, status); cb(udp_watcher, status); } } diff --git a/src/libstd/rt/uv/pipe.rs b/src/libstd/rt/uv/pipe.rs deleted file mode 100644 index 1147c731a60..00000000000 --- a/src/libstd/rt/uv/pipe.rs +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use prelude::*; -use libc; - -use rt::uv; -use rt::uv::net; -use rt::uv::uvll; - -pub struct Pipe(*uvll::uv_pipe_t); - -impl uv::Watcher for Pipe {} - -impl Pipe { - pub fn new(loop_: &uv::Loop, ipc: bool) -> Pipe { - unsafe { - let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE); - assert!(handle.is_not_null()); - let ipc = ipc as libc::c_int; - assert_eq!(uvll::pipe_init(loop_.native_handle(), handle, ipc), 0); - let mut ret: Pipe = - uv::NativeHandle::from_native_handle(handle); - ret.install_watcher_data(); - ret - } - } - - pub fn as_stream(&self) -> net::StreamWatcher { - net::StreamWatcher(**self as *uvll::uv_stream_t) - } - - pub fn close(self, cb: uv::NullCallback) { - { - let mut this = self; - let data = this.get_watcher_data(); - assert!(data.close_cb.is_none()); - data.close_cb = Some(cb); - } - - unsafe { uvll::close(self.native_handle(), close_cb); } - - extern fn close_cb(handle: *uvll::uv_pipe_t) { - let mut process: Pipe = uv::NativeHandle::from_native_handle(handle); - process.get_watcher_data().close_cb.take_unwrap()(); - process.drop_watcher_data(); - unsafe { uvll::free_handle(handle as *libc::c_void) } - } - } -} - -impl uv::NativeHandle<*uvll::uv_pipe_t> for Pipe { - fn from_native_handle(handle: *uvll::uv_pipe_t) -> Pipe { - Pipe(handle) - } - fn native_handle(&self) -> *uvll::uv_pipe_t { - match self { &Pipe(ptr) => ptr } - } -} diff --git a/src/libstd/rt/uv/process.rs b/src/libstd/rt/uv/process.rs deleted file mode 100644 index a02cf67ec26..00000000000 --- a/src/libstd/rt/uv/process.rs +++ /dev/null @@ -1,264 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use prelude::*; -use libc; -use ptr; -use vec; -use cell::Cell; - -use rt::uv; -use rt::uv::net; -use rt::uv::pipe; -use rt::uv::uvll; - -/// A process wraps the handle of the underlying uv_process_t. -pub struct Process(*uvll::uv_process_t); - -/// This configuration describes how a new process should be spawned. This is -/// translated to libuv's own configuration -pub struct Config<'self> { - /// Path to the program to run - program: &'self str, - - /// Arguments to pass to the program (doesn't include the program itself) - args: &'self [~str], - - /// Optional environment to specify for the program. If this is None, then - /// it will inherit the current process's environment. - env: Option<&'self [(~str, ~str)]>, - - /// Optional working directory for the new process. If this is None, then - /// the current directory of the running process is inherited. - cwd: Option<&'self str>, - - /// Any number of streams/file descriptors/pipes may be attached to this - /// process. This list enumerates the file descriptors and such for the - /// process to be spawned, and the file descriptors inherited will start at - /// 0 and go to the length of this array. - /// - /// Standard file descriptors are: - /// - /// 0 - stdin - /// 1 - stdout - /// 2 - stderr - io: &'self [StdioContainer] -} - -/// Describes what to do with a standard io stream for a child process. -pub enum StdioContainer { - /// This stream will be ignored. This is the equivalent of attaching the - /// stream to `/dev/null` - Ignored, - - /// The specified file descriptor is inherited for the stream which it is - /// specified for. - InheritFd(libc::c_int), - - /// The specified libuv stream is inherited for the corresponding file - /// descriptor it is assigned to. - InheritStream(net::StreamWatcher), - - /// Creates a pipe for the specified file descriptor which will be directed - /// into the previously-initialized pipe passed in. - /// - /// The first boolean argument is whether the pipe is readable, and the - /// second is whether it is writable. These properties are from the view of - /// the *child* process, not the parent process. - CreatePipe(pipe::Pipe, bool /* readable */, bool /* writable */), -} - -impl uv::Watcher for Process {} - -impl Process { - /// Creates a new process, ready to spawn inside an event loop - pub fn new() -> Process { - let handle = unsafe { uvll::malloc_handle(uvll::UV_PROCESS) }; - assert!(handle.is_not_null()); - let mut ret: Process = uv::NativeHandle::from_native_handle(handle); - ret.install_watcher_data(); - return ret; - } - - /// Spawn a new process inside the specified event loop. - /// - /// The `config` variable will be passed down to libuv, and the `exit_cb` - /// will be run only once, when the process exits. - /// - /// Returns either the corresponding process object or an error which - /// occurred. - pub fn spawn(&mut self, loop_: &uv::Loop, config: &Config, - exit_cb: uv::ExitCallback) -> Result<(), uv::UvError> { - let cwd = config.cwd.map_move(|s| s.to_c_str()); - - extern fn on_exit(p: *uvll::uv_process_t, - exit_status: libc::c_int, - term_signal: libc::c_int) { - let mut p: Process = uv::NativeHandle::from_native_handle(p); - let err = match exit_status { - 0 => None, - _ => uv::status_to_maybe_uv_error(-1) - }; - p.get_watcher_data().exit_cb.take_unwrap()(p, - exit_status as int, - term_signal as int, - err); - } - - let mut stdio = vec::with_capacity::( - config.io.len()); - unsafe { - vec::raw::set_len(&mut stdio, config.io.len()); - for (slot, &other) in stdio.iter().zip(config.io.iter()) { - set_stdio(slot as *uvll::uv_stdio_container_t, other); - } - } - - let exit_cb = Cell::new(exit_cb); - do with_argv(config.program, config.args) |argv| { - do with_env(config.env) |envp| { - let options = uvll::uv_process_options_t { - exit_cb: on_exit, - file: unsafe { *argv }, - args: argv, - env: envp, - cwd: match cwd { - Some(ref cwd) => cwd.with_ref(|p| p), - None => ptr::null(), - }, - flags: 0, - stdio_count: stdio.len() as libc::c_int, - stdio: stdio.as_imm_buf(|p, _| p), - uid: 0, - gid: 0, - }; - - match unsafe { - uvll::spawn(loop_.native_handle(), **self, options) - } { - 0 => { - (*self).get_watcher_data().exit_cb = Some(exit_cb.take()); - Ok(()) - } - err => Err(uv::UvError(err)) - } - } - } - } - - /// Sends a signal to this process. - /// - /// This is a wrapper around `uv_process_kill` - pub fn kill(&self, signum: int) -> Result<(), uv::UvError> { - match unsafe { - uvll::process_kill(self.native_handle(), signum as libc::c_int) - } { - 0 => Ok(()), - err => Err(uv::UvError(err)) - } - } - - /// Returns the process id of a spawned process - pub fn pid(&self) -> libc::pid_t { - unsafe { uvll::process_pid(**self) as libc::pid_t } - } - - /// Closes this handle, invoking the specified callback once closed - pub fn close(self, cb: uv::NullCallback) { - { - let mut this = self; - let data = this.get_watcher_data(); - assert!(data.close_cb.is_none()); - data.close_cb = Some(cb); - } - - unsafe { uvll::close(self.native_handle(), close_cb); } - - extern fn close_cb(handle: *uvll::uv_process_t) { - let mut process: Process = uv::NativeHandle::from_native_handle(handle); - process.get_watcher_data().close_cb.take_unwrap()(); - process.drop_watcher_data(); - unsafe { uvll::free_handle(handle as *libc::c_void) } - } - } -} - -unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t, io: StdioContainer) { - match io { - Ignored => { uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE); } - InheritFd(fd) => { - uvll::set_stdio_container_flags(dst, uvll::STDIO_INHERIT_FD); - uvll::set_stdio_container_fd(dst, fd); - } - InheritStream(stream) => { - uvll::set_stdio_container_flags(dst, uvll::STDIO_INHERIT_STREAM); - uvll::set_stdio_container_stream(dst, stream.native_handle()); - } - CreatePipe(pipe, readable, writable) => { - let mut flags = uvll::STDIO_CREATE_PIPE as libc::c_int; - if readable { - flags |= uvll::STDIO_READABLE_PIPE as libc::c_int; - } - if writable { - flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int; - } - uvll::set_stdio_container_flags(dst, flags); - uvll::set_stdio_container_stream(dst, - pipe.as_stream().native_handle()); - } - } -} - -/// Converts the program and arguments to the argv array expected by libuv -fn with_argv(prog: &str, args: &[~str], f: &fn(**libc::c_char) -> T) -> T { - // First, allocation space to put all the C-strings (we need to have - // ownership of them somewhere - let mut c_strs = vec::with_capacity(args.len() + 1); - c_strs.push(prog.to_c_str()); - for arg in args.iter() { - c_strs.push(arg.to_c_str()); - } - - // Next, create the char** array - let mut c_args = vec::with_capacity(c_strs.len() + 1); - for s in c_strs.iter() { - c_args.push(s.with_ref(|p| p)); - } - c_args.push(ptr::null()); - c_args.as_imm_buf(|buf, _| f(buf)) -} - -/// Converts the environment to the env array expected by libuv -fn with_env(env: Option<&[(~str, ~str)]>, f: &fn(**libc::c_char) -> T) -> T { - let env = match env { - Some(s) => s, - None => { return f(ptr::null()); } - }; - // As with argv, create some temporary storage and then the actual array - let mut envp = vec::with_capacity(env.len()); - for &(ref key, ref value) in env.iter() { - envp.push(fmt!("%s=%s", *key, *value).to_c_str()); - } - let mut c_envp = vec::with_capacity(envp.len() + 1); - for s in envp.iter() { - c_envp.push(s.with_ref(|p| p)); - } - c_envp.push(ptr::null()); - c_envp.as_imm_buf(|buf, _| f(buf)) -} - -impl uv::NativeHandle<*uvll::uv_process_t> for Process { - fn from_native_handle(handle: *uvll::uv_process_t) -> Process { - Process(handle) - } - fn native_handle(&self) -> *uvll::uv_process_t { - match self { &Process(ptr) => ptr } - } -} diff --git a/src/libstd/rt/uv/timer.rs b/src/libstd/rt/uv/timer.rs index 7b09cf2eb0e..eaa5e77a6da 100644 --- a/src/libstd/rt/uv/timer.rs +++ b/src/libstd/rt/uv/timer.rs @@ -43,7 +43,7 @@ impl TimerWatcher { let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle); let data = watcher.get_watcher_data(); let cb = data.timer_cb.get_ref(); - let status = status_to_maybe_uv_error(status); + let status = status_to_maybe_uv_error(watcher, status); (*cb)(watcher, status); } } diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index c771f93cef5..e620ab274b1 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -13,7 +13,7 @@ use cast::transmute; use cast; use cell::Cell; use clone::Clone; -use libc::{c_int, c_uint, c_void, pid_t}; +use libc::{c_int, c_uint, c_void}; use ops::Drop; use option::*; use ptr; @@ -22,7 +22,6 @@ use result::*; use rt::io::IoError; use rt::io::net::ip::{SocketAddr, IpAddr}; use rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, SeekEnd}; -use rt::kill::BlockedTask; use rt::local::Local; use rt::rtio::*; use rt::sched::{Scheduler, SchedHandle}; @@ -149,7 +148,7 @@ fn socket_name>(sk: SocketNameKind, }; if r != 0 { - let status = status_to_maybe_uv_error(r); + let status = status_to_maybe_uv_error(handle, r); return Err(uv_error_to_io_error(status.unwrap())); } @@ -592,63 +591,6 @@ impl IoFactory for UvIoFactory { assert!(!result_cell.is_empty()); return result_cell.take(); } - - fn pipe_init(&mut self, ipc: bool) -> Result<~RtioPipeObject, IoError> { - let home = get_handle_to_current_scheduler!(); - Ok(~UvPipeStream { pipe: Pipe::new(self.uv_loop(), ipc), home: home }) - } - - fn spawn(&mut self, - config: &process::Config) -> Result<~RtioProcessObject, IoError> { - // Sadly, we must create the UvProcess before we actually call uv_spawn - // so that the exit_cb can close over it and notify it when the process - // has exited. - let mut ret = ~UvProcess { - process: Process::new(), - home: None, - exit_status: None, - term_signal: None, - exit_error: None, - descheduled: None, - }; - let ret_ptr = unsafe { - *cast::transmute::<&~UvProcess, &*mut UvProcess>(&ret) - }; - - // The purpose of this exit callback is to record the data about the - // exit and then wake up the task which may be waiting for the process - // to exit. This is all performed in the current io-loop, and the - // implementation of UvProcess ensures that reading these fields always - // occurs on the current io-loop. - let exit_cb: ExitCallback = |_, exit_status, term_signal, error| { - unsafe { - assert!((*ret_ptr).exit_status.is_none()); - (*ret_ptr).exit_status = Some(exit_status); - (*ret_ptr).term_signal = Some(term_signal); - (*ret_ptr).exit_error = error; - match (*ret_ptr).descheduled.take() { - Some(task) => { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task); - } - None => {} - } - } - }; - - match ret.process.spawn(self.uv_loop(), config, exit_cb) { - Ok(()) => { - // Only now do we actually get a handle to this scheduler. - ret.home = Some(get_handle_to_current_scheduler!()); - Ok(ret) - } - Err(uverr) => { - // We still need to close the process handle we created, but - // that's taken care for us in the destructor of UvProcess - Err(uv_error_to_io_error(uverr)) - } - } - } } pub struct UvTcpListener { @@ -737,7 +679,7 @@ impl RtioTcpListener for UvTcpListener { uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher(), r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -750,7 +692,7 @@ impl RtioTcpListener for UvTcpListener { uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher(), r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -758,15 +700,40 @@ impl RtioTcpListener for UvTcpListener { } } -trait UvStream: HomingIO { - fn as_stream(&mut self) -> StreamWatcher; +pub struct UvTcpStream { + watcher: TcpWatcher, + home: SchedHandle, } -// FIXME(#3429) I would rather this be `impl RtioStream for T` but -// that has conflicts with other traits that also have methods -// called `read` and `write` -macro_rules! rtiostream(($t:ident) => { -impl RtioStream for $t { +impl HomingIO for UvTcpStream { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} + +impl Drop for UvTcpStream { + fn drop(&self) { + // XXX need mutable finalizer + let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) }; + do this.home_for_io_with_sched |self_, scheduler| { + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do self_.watcher.as_stream().close { + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + } + } +} + +impl RtioSocket for UvTcpStream { + fn socket_name(&mut self) -> Result { + do self.home_for_io |self_| { + socket_name(Tcp, self_.watcher) + } + } +} + +impl RtioTcpStream for UvTcpStream { fn read(&mut self, buf: &mut [u8]) -> Result { do self.home_for_io_with_sched |self_, scheduler| { let result_cell = Cell::new_empty(); @@ -780,7 +747,7 @@ impl RtioStream for $t { let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; - let mut watcher = self_.as_stream(); + let mut watcher = self_.watcher.as_stream(); do watcher.read_start(alloc) |mut watcher, nread, _buf, status| { // Stop reading so that no read callbacks are @@ -816,7 +783,7 @@ impl RtioStream for $t { do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; - let mut watcher = self_.as_stream(); + let mut watcher = self_.watcher.as_stream(); do watcher.write(buf) |_watcher, status| { let result = if status.is_none() { Ok(()) @@ -835,85 +802,7 @@ impl RtioStream for $t { result_cell.take() } } -} -}) -rtiostream!(UvPipeStream) -rtiostream!(UvTcpStream) - -pub struct UvPipeStream { - pipe: Pipe, - home: SchedHandle, -} - -impl UvStream for UvPipeStream { - fn as_stream(&mut self) -> StreamWatcher { self.pipe.as_stream() } -} - -impl HomingIO for UvPipeStream { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } -} - -impl Drop for UvPipeStream { - fn drop(&self) { - // FIXME(#4330): should not need a transmute - let this = unsafe { cast::transmute_mut(self) }; - do this.home_for_io |self_| { - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self_.pipe.close { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - } - } -} - -impl UvPipeStream { - pub fn uv_pipe(&self) -> Pipe { self.pipe } -} - -pub struct UvTcpStream { - watcher: TcpWatcher, - home: SchedHandle, -} - -impl HomingIO for UvTcpStream { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } -} - -impl Drop for UvTcpStream { - fn drop(&self) { - // FIXME(#4330): should not need a transmute - let this = unsafe { cast::transmute_mut(self) }; - do this.home_for_io |self_| { - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self_.watcher.as_stream().close { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - } - } -} - -impl UvStream for UvTcpStream { - fn as_stream(&mut self) -> StreamWatcher { self.watcher.as_stream() } -} - -impl RtioSocket for UvTcpStream { - fn socket_name(&mut self) -> Result { - do self.home_for_io |self_| { - socket_name(Tcp, self_.watcher) - } - } -} - -impl RtioTcpStream for UvTcpStream { fn peer_name(&mut self) -> Result { do self.home_for_io |self_| { socket_name(TcpPeer, self_.watcher) @@ -924,7 +813,7 @@ impl RtioTcpStream for UvTcpStream { do self.home_for_io |self_| { let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -935,7 +824,7 @@ impl RtioTcpStream for UvTcpStream { do self.home_for_io |self_| { let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -949,7 +838,7 @@ impl RtioTcpStream for UvTcpStream { delay_in_seconds as c_uint) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -962,7 +851,7 @@ impl RtioTcpStream for UvTcpStream { uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1074,7 +963,7 @@ impl RtioUdpSocket for UvUdpSocket { } }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1090,7 +979,7 @@ impl RtioUdpSocket for UvUdpSocket { } }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1104,7 +993,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1118,7 +1007,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1132,7 +1021,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1146,7 +1035,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1160,7 +1049,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1174,7 +1063,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1361,89 +1250,6 @@ impl RtioFileStream for UvFileStream { } } -pub struct UvProcess { - process: process::Process, - - // Sadly, this structure must be created before we return it, so in that - // brief interim the `home` is None. - home: Option, - - // All None until the process exits (exit_error may stay None) - priv exit_status: Option, - priv term_signal: Option, - priv exit_error: Option, - - // Used to store which task to wake up from the exit_cb - priv descheduled: Option, -} - -impl HomingIO for UvProcess { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.home.get_mut_ref() } -} - -impl Drop for UvProcess { - fn drop(&self) { - // FIXME(#4330): should not need a transmute - let this = unsafe { cast::transmute_mut(self) }; - - let close = |self_: &mut UvProcess| { - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task = Cell::new(task); - do self_.process.close { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task.take()); - } - } - }; - - // If home is none, then this process never actually successfully - // spawned, so there's no need to switch event loops - if this.home.is_none() { - close(this) - } else { - this.home_for_io(close) - } - } -} - -impl RtioProcess for UvProcess { - fn id(&self) -> pid_t { - self.process.pid() - } - - fn kill(&mut self, signal: int) -> Result<(), IoError> { - do self.home_for_io |self_| { - match self_.process.kill(signal) { - Ok(()) => Ok(()), - Err(uverr) => Err(uv_error_to_io_error(uverr)) - } - } - } - - fn wait(&mut self) -> int { - // Make sure (on the home scheduler) that we have an exit status listed - do self.home_for_io |self_| { - match self_.exit_status { - Some(*) => {} - None => { - // If there's no exit code previously listed, then the - // process's exit callback has yet to be invoked. We just - // need to deschedule ourselves and wait to be reawoken. - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - assert!(self_.descheduled.is_none()); - self_.descheduled = Some(task); - } - assert!(self_.exit_status.is_some()); - } - } - } - - self.exit_status.unwrap() - } -} - #[test] fn test_simple_io_no_connect() { do run_in_newsched_task { diff --git a/src/libstd/rt/uv/uvll.rs b/src/libstd/rt/uv/uvll.rs index 24e070ca239..1e189e90885 100644 --- a/src/libstd/rt/uv/uvll.rs +++ b/src/libstd/rt/uv/uvll.rs @@ -37,74 +37,28 @@ use libc::{malloc, free}; use libc; use prelude::*; use ptr; +use str; use vec; -pub use self::errors::*; - +pub static UNKNOWN: c_int = -1; pub static OK: c_int = 0; -pub static EOF: c_int = -4095; -pub static UNKNOWN: c_int = -4094; +pub static EOF: c_int = 1; +pub static EADDRINFO: c_int = 2; +pub static EACCES: c_int = 3; +pub static ECONNREFUSED: c_int = 12; +pub static ECONNRESET: c_int = 13; +pub static EPIPE: c_int = 36; -// uv-errno.h redefines error codes for windows, but not for unix... - -#[cfg(windows)] -pub mod errors { - use libc::c_int; - - pub static EACCES: c_int = -4093; - pub static ECONNREFUSED: c_int = -4079; - pub static ECONNRESET: c_int = -4078; - pub static EPIPE: c_int = -4048; +pub struct uv_err_t { + code: c_int, + sys_errno_: c_int } -#[cfg(not(windows))] -pub mod errors { - use libc; - use libc::c_int; - - pub static EACCES: c_int = -libc::EACCES; - pub static ECONNREFUSED: c_int = -libc::ECONNREFUSED; - pub static ECONNRESET: c_int = -libc::ECONNRESET; - pub static EPIPE: c_int = -libc::EPIPE; -} - -pub static PROCESS_SETUID: c_int = 1 << 0; -pub static PROCESS_SETGID: c_int = 1 << 1; -pub static PROCESS_WINDOWS_VERBATIM_ARGUMENTS: c_int = 1 << 2; -pub static PROCESS_DETACHED: c_int = 1 << 3; -pub static PROCESS_WINDOWS_HIDE: c_int = 1 << 4; - -pub static STDIO_IGNORE: c_int = 0x00; -pub static STDIO_CREATE_PIPE: c_int = 0x01; -pub static STDIO_INHERIT_FD: c_int = 0x02; -pub static STDIO_INHERIT_STREAM: c_int = 0x04; -pub static STDIO_READABLE_PIPE: c_int = 0x10; -pub static STDIO_WRITABLE_PIPE: c_int = 0x20; pub struct uv_buf_t { base: *u8, len: libc::size_t, } -pub struct uv_process_options_t { - exit_cb: uv_exit_cb, - file: *libc::c_char, - args: **libc::c_char, - env: **libc::c_char, - cwd: *libc::c_char, - flags: libc::c_uint, - stdio_count: libc::c_int, - stdio: *uv_stdio_container_t, - uid: uv_uid_t, - gid: uv_gid_t, -} - -// These fields are private because they must be interfaced with through the -// functions below. -pub struct uv_stdio_container_t { - priv flags: libc::c_int, - priv stream: *uv_stream_t, -} - pub type uv_handle_t = c_void; pub type uv_loop_t = c_void; pub type uv_idle_t = c_void; @@ -118,8 +72,6 @@ pub type uv_timer_t = c_void; pub type uv_stream_t = c_void; pub type uv_fs_t = c_void; pub type uv_udp_send_t = c_void; -pub type uv_process_t = c_void; -pub type uv_pipe_t = c_void; #[cfg(stage0)] pub type uv_idle_cb = *u8; @@ -145,8 +97,6 @@ pub type uv_connection_cb = *u8; pub type uv_timer_cb = *u8; #[cfg(stage0)] pub type uv_write_cb = *u8; -#[cfg(stage0)] -pub type uv_exit_cb = *u8; #[cfg(not(stage0))] pub type uv_idle_cb = extern "C" fn(handle: *uv_idle_t, @@ -187,21 +137,12 @@ pub type uv_timer_cb = extern "C" fn(handle: *uv_timer_t, #[cfg(not(stage0))] pub type uv_write_cb = extern "C" fn(handle: *uv_write_t, status: c_int); -#[cfg(not(stage0))] -pub type uv_exit_cb = extern "C" fn(handle: *uv_process_t, - exit_status: c_int, - term_signal: c_int); pub type sockaddr = c_void; pub type sockaddr_in = c_void; pub type sockaddr_in6 = c_void; pub type sockaddr_storage = c_void; -#[cfg(unix)] pub type uv_uid_t = libc::types::os::arch::posix88::uid_t; -#[cfg(unix)] pub type uv_gid_t = libc::types::os::arch::posix88::gid_t; -#[cfg(windows)] pub type uv_uid_t = libc::c_uchar; -#[cfg(windows)] pub type uv_gid_t = libc::c_uchar; - #[deriving(Eq)] pub enum uv_handle_type { UV_UNKNOWN_HANDLE, @@ -546,12 +487,20 @@ pub unsafe fn read_stop(stream: *uv_stream_t) -> c_int { return rust_uv_read_stop(stream as *c_void); } -pub unsafe fn strerror(err: c_int) -> *c_char { +pub unsafe fn last_error(loop_handle: *c_void) -> uv_err_t { #[fixed_stack_segment]; #[inline(never)]; + + return rust_uv_last_error(loop_handle); +} + +pub unsafe fn strerror(err: *uv_err_t) -> *c_char { + #[fixed_stack_segment]; #[inline(never)]; + return rust_uv_strerror(err); } -pub unsafe fn err_name(err: c_int) -> *c_char { +pub unsafe fn err_name(err: *uv_err_t) -> *c_char { #[fixed_stack_segment]; #[inline(never)]; + return rust_uv_err_name(err); } @@ -705,45 +654,6 @@ pub unsafe fn fs_req_cleanup(req: *uv_fs_t) { rust_uv_fs_req_cleanup(req); } -pub unsafe fn spawn(loop_ptr: *c_void, result: *uv_process_t, - options: uv_process_options_t) -> c_int { - #[fixed_stack_segment]; #[inline(never)]; - return rust_uv_spawn(loop_ptr, result, options); -} - -pub unsafe fn process_kill(p: *uv_process_t, signum: c_int) -> c_int { - #[fixed_stack_segment]; #[inline(never)]; - return rust_uv_process_kill(p, signum); -} - -pub unsafe fn process_pid(p: *uv_process_t) -> c_int { - #[fixed_stack_segment]; #[inline(never)]; - return rust_uv_process_pid(p); -} - -pub unsafe fn set_stdio_container_flags(c: *uv_stdio_container_t, - flags: libc::c_int) { - #[fixed_stack_segment]; #[inline(never)]; - rust_set_stdio_container_flags(c, flags); -} - -pub unsafe fn set_stdio_container_fd(c: *uv_stdio_container_t, - fd: libc::c_int) { - #[fixed_stack_segment]; #[inline(never)]; - rust_set_stdio_container_fd(c, fd); -} - -pub unsafe fn set_stdio_container_stream(c: *uv_stdio_container_t, - stream: *uv_stream_t) { - #[fixed_stack_segment]; #[inline(never)]; - rust_set_stdio_container_stream(c, stream); -} - -pub unsafe fn pipe_init(loop_ptr: *c_void, p: *uv_pipe_t, ipc: c_int) -> c_int { - #[fixed_stack_segment]; #[inline(never)]; - rust_uv_pipe_init(loop_ptr, p, ipc) -} - // data access helpers pub unsafe fn get_result_from_fs_req(req: *uv_fs_t) -> c_int { #[fixed_stack_segment]; #[inline(never)]; @@ -810,6 +720,22 @@ pub unsafe fn get_len_from_buf(buf: uv_buf_t) -> size_t { return rust_uv_get_len_from_buf(buf); } +pub unsafe fn get_last_err_info(uv_loop: *c_void) -> ~str { + let err = last_error(uv_loop); + let err_ptr = ptr::to_unsafe_ptr(&err); + let err_name = str::raw::from_c_str(err_name(err_ptr)); + let err_msg = str::raw::from_c_str(strerror(err_ptr)); + return fmt!("LIBUV ERROR: name: %s msg: %s", + err_name, err_msg); +} + +pub unsafe fn get_last_err_data(uv_loop: *c_void) -> uv_err_data { + let err = last_error(uv_loop); + let err_ptr = ptr::to_unsafe_ptr(&err); + let err_name = str::raw::from_c_str(err_name(err_ptr)); + let err_msg = str::raw::from_c_str(strerror(err_ptr)); + uv_err_data { err_name: err_name, err_msg: err_msg } +} pub struct uv_err_data { err_name: ~str, @@ -842,8 +768,9 @@ extern { cb: uv_async_cb) -> c_int; fn rust_uv_tcp_init(loop_handle: *c_void, handle_ptr: *uv_tcp_t) -> c_int; fn rust_uv_buf_init(out_buf: *uv_buf_t, base: *u8, len: size_t); - fn rust_uv_strerror(err: c_int) -> *c_char; - fn rust_uv_err_name(err: c_int) -> *c_char; + fn rust_uv_last_error(loop_handle: *c_void) -> uv_err_t; + fn rust_uv_strerror(err: *uv_err_t) -> *c_char; + fn rust_uv_err_name(err: *uv_err_t) -> *c_char; fn rust_uv_ip4_addrp(ip: *u8, port: c_int) -> *sockaddr_in; fn rust_uv_ip6_addrp(ip: *u8, port: c_int) -> *sockaddr_in6; fn rust_uv_free_ip4_addr(addr: *sockaddr_in); @@ -929,13 +856,4 @@ extern { fn rust_uv_set_data_for_req(req: *c_void, data: *c_void); fn rust_uv_get_base_from_buf(buf: uv_buf_t) -> *u8; fn rust_uv_get_len_from_buf(buf: uv_buf_t) -> size_t; - fn rust_uv_spawn(loop_ptr: *c_void, outptr: *uv_process_t, - options: uv_process_options_t) -> c_int; - fn rust_uv_process_kill(p: *uv_process_t, signum: c_int) -> c_int; - fn rust_uv_process_pid(p: *uv_process_t) -> c_int; - fn rust_set_stdio_container_flags(c: *uv_stdio_container_t, flags: c_int); - fn rust_set_stdio_container_fd(c: *uv_stdio_container_t, fd: c_int); - fn rust_set_stdio_container_stream(c: *uv_stdio_container_t, - stream: *uv_stream_t); - fn rust_uv_pipe_init(loop_ptr: *c_void, p: *uv_pipe_t, ipc: c_int) -> c_int; } diff --git a/src/libstd/run.rs b/src/libstd/run.rs index b91aac22244..7fc2deff97d 100644 --- a/src/libstd/run.rs +++ b/src/libstd/run.rs @@ -10,21 +10,22 @@ //! Process spawning. +#[allow(missing_doc)]; + +use c_str::ToCStr; use cast; -use cell::Cell; +use clone::Clone; use comm::{stream, SharedChan, GenericChan, GenericPort}; -#[cfg(not(windows))] +use io; +use libc::{pid_t, c_void, c_int}; use libc; -use libc::{pid_t, c_int}; +use option::{Some, None}; +use os; use prelude::*; +use ptr; use task; use vec::ImmutableVector; -use rt::io; -use rt::local::Local; -use rt::rtio::{IoFactoryObject, RtioProcessObject, RtioProcess, IoFactory}; -use rt::uv::process; - /** * A value representing a child process. * @@ -33,23 +34,28 @@ use rt::uv::process; * for the process to terminate. */ pub struct Process { + /// The unique id of the process (this should never be negative). priv pid: pid_t, - /// The internal handle to the underlying libuv process. - priv handle: ~RtioProcessObject, + /** + * A handle to the process - on unix this will always be NULL, but on + * windows it will be a HANDLE to the process, which will prevent the + * pid being re-used until the handle is closed. + */ + priv handle: *(), - /// Some(fd), or None when stdin is being redirected from a fd not created - /// by Process::new. - priv input: Option<~io::Writer>, + /// Some(fd), or None when stdin is being redirected from a fd not created by Process::new. + priv input: Option, - /// Some(file), or None when stdout is being redirected to a fd not created - /// by Process::new. - priv output: Option<~io::Reader>, + /// Some(file), or None when stdout is being redirected to a fd not created by Process::new. + priv output: Option<*libc::FILE>, - /// Some(file), or None when stderr is being redirected to a fd not created - /// by Process::new. - priv error: Option<~io::Reader>, + /// Some(file), or None when stderr is being redirected to a fd not created by Process::new. + priv error: Option<*libc::FILE>, + + /// None until finish() is called. + priv exit_code: Option, } /// Options that can be given when starting a Process. @@ -87,27 +93,26 @@ pub struct ProcessOptions<'self> { * If this is None then a new pipe will be created for the new program's * output and Process.output() will provide a Reader to read from this pipe. * - * If this is Some(file-descriptor) then the new process will write its - * output to the given file descriptor, Process.output_redirected() will - * return true, and Process.output() will fail. + * If this is Some(file-descriptor) then the new process will write its output + * to the given file descriptor, Process.output_redirected() will return + * true, and Process.output() will fail. */ out_fd: Option, /** - * If this is None then a new pipe will be created for the new progam's - * error stream and Process.error() will provide a Reader to read from this - * pipe. + * If this is None then a new pipe will be created for the new program's + * error stream and Process.error() will provide a Reader to read from this pipe. * - * If this is Some(file-descriptor) then the new process will write its - * error output to the given file descriptor, Process.error_redirected() - * will return true, and and Process.error() will fail. + * If this is Some(file-descriptor) then the new process will write its error output + * to the given file descriptor, Process.error_redirected() will return true, and + * and Process.error() will fail. */ err_fd: Option, } -impl<'self> ProcessOptions<'self> { +impl <'self> ProcessOptions<'self> { /// Return a ProcessOptions that has None in every field. - pub fn new() -> ProcessOptions { + pub fn new<'a>() -> ProcessOptions<'a> { ProcessOptions { env: None, dir: None, @@ -120,6 +125,7 @@ impl<'self> ProcessOptions<'self> { /// The output of a finished process. pub struct ProcessOutput { + /// The status (exit code) of the process. status: int, @@ -142,159 +148,223 @@ impl Process { * the working directory and the standard IO streams. */ pub fn new(prog: &str, args: &[~str], - options: ProcessOptions) -> Option { - // First, translate all the stdio options into their libuv equivalents - let (uv_stdin, stdin) = match options.in_fd { - Some(fd) => (process::InheritFd(fd), None), + options: ProcessOptions) + -> Process { + #[fixed_stack_segment]; #[inline(never)]; + + let (in_pipe, in_fd) = match options.in_fd { None => { - let p = io::pipe::PipeStream::new().expect("need stdin pipe"); - (process::CreatePipe(p.uv_pipe(), true, false), - Some(~p as ~io::Writer)) - } + let pipe = os::pipe(); + (Some(pipe), pipe.input) + }, + Some(fd) => (None, fd) }; - let (uv_stdout, stdout) = match options.out_fd { - Some(fd) => (process::InheritFd(fd), None), + let (out_pipe, out_fd) = match options.out_fd { None => { - let p = io::pipe::PipeStream::new().expect("need stdout pipe"); - (process::CreatePipe(p.uv_pipe(), false, true), - Some(~p as ~io::Reader)) - } + let pipe = os::pipe(); + (Some(pipe), pipe.out) + }, + Some(fd) => (None, fd) }; - let (uv_stderr, stderr) = match options.err_fd { - Some(fd) => (process::InheritFd(fd), None), + let (err_pipe, err_fd) = match options.err_fd { None => { - let p = io::pipe::PipeStream::new().expect("need stderr pipe"); - (process::CreatePipe(p.uv_pipe(), false, true), - Some(~p as ~io::Reader)) - } + let pipe = os::pipe(); + (Some(pipe), pipe.out) + }, + Some(fd) => (None, fd) }; - // Next, massage our options into the libuv options - let dir = options.dir.map(|d| d.to_str()); - let dir = dir.map(|d| d.as_slice()); - let config = process::Config { - program: prog, - args: args, - env: options.env.map(|e| e.as_slice()), - cwd: dir, - io: [uv_stdin, uv_stdout, uv_stderr], - }; + let res = spawn_process_os(prog, args, options.env.clone(), options.dir, + in_fd, out_fd, err_fd); - // Finally, actually spawn the process unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - match (*io).spawn(&config) { - Ok(handle) => { - Some(Process { - pid: handle.id(), - handle: handle, - input: stdin, - output: stdout, - error: stderr, - }) - } - Err(*) => { None } - } + for pipe in in_pipe.iter() { libc::close(pipe.input); } + for pipe in out_pipe.iter() { libc::close(pipe.out); } + for pipe in err_pipe.iter() { libc::close(pipe.out); } + } + + Process { + pid: res.pid, + handle: res.handle, + input: in_pipe.map(|pipe| pipe.out), + output: out_pipe.map(|pipe| os::fdopen(pipe.input)), + error: err_pipe.map(|pipe| os::fdopen(pipe.input)), + exit_code: None, } } /// Returns the unique id of the process pub fn get_id(&self) -> pid_t { self.pid } - /** - * Returns a rt::io::Writer that can be used to write to this Process's - * stdin. - * - * Fails if this Process's stdin was redirected to an existing file - * descriptor. - */ - pub fn input<'a>(&'a mut self) -> &'a mut io::Writer { - let ret: &mut io::Writer = *self.input.get_mut_ref(); - return ret; + fn input_fd(&mut self) -> c_int { + match self.input { + Some(fd) => fd, + None => fail!("This Process's stdin was redirected to an \ + existing file descriptor.") + } + } + + fn output_file(&mut self) -> *libc::FILE { + match self.output { + Some(file) => file, + None => fail!("This Process's stdout was redirected to an \ + existing file descriptor.") + } + } + + fn error_file(&mut self) -> *libc::FILE { + match self.error { + Some(file) => file, + None => fail!("This Process's stderr was redirected to an \ + existing file descriptor.") + } } /** - * Returns a rt::io::Reader that can be used to read from this Process's - * stdout. + * Returns whether this process is reading its stdin from an existing file + * descriptor rather than a pipe that was created specifically for this + * process. * - * Fails if this Process's stdout was redirected to an existing file - * descriptor. + * If this method returns true then self.input() will fail. */ - pub fn output<'a>(&'a mut self) -> &'a mut io::Reader { - let ret: &mut io::Reader = *self.output.get_mut_ref(); - return ret; + pub fn input_redirected(&self) -> bool { + self.input.is_none() } /** - * Returns a rt::io::Reader that can be used to read from this Process's - * stderr. + * Returns whether this process is writing its stdout to an existing file + * descriptor rather than a pipe that was created specifically for this + * process. * - * Fails if this Process's stderr was redirected to an existing file - * descriptor. + * If this method returns true then self.output() will fail. */ - pub fn error<'a>(&'a mut self) -> &'a mut io::Reader { - let ret: &mut io::Reader = *self.error.get_mut_ref(); - return ret; + pub fn output_redirected(&self) -> bool { + self.output.is_none() } /** - * Closes the handle to stdin, waits for the child process to terminate, and - * returns the exit code. + * Returns whether this process is writing its stderr to an existing file + * descriptor rather than a pipe that was created specifically for this + * process. * - * If the child has already been finished then the exit code is returned. + * If this method returns true then self.error() will fail. */ - pub fn finish(&mut self) -> int { - // We're not going to be giving any more input, so close the input by - // destroying it. Also, if the output is desired, then - // finish_with_output is called so we discard all the outputs here. Note - // that the process may not terminate if we don't destroy stdio because - // it'll be waiting in a write which we'll just never read. - self.input.take(); - self.output.take(); - self.error.take(); + pub fn error_redirected(&self) -> bool { + self.error.is_none() + } - self.handle.wait() + /** + * Returns an io::Writer that can be used to write to this Process's stdin. + * + * Fails if this Process's stdin was redirected to an existing file descriptor. + */ + pub fn input(&mut self) -> @io::Writer { + // FIXME: the Writer can still be used after self is destroyed: #2625 + io::fd_writer(self.input_fd(), false) + } + + /** + * Returns an io::Reader that can be used to read from this Process's stdout. + * + * Fails if this Process's stdout was redirected to an existing file descriptor. + */ + pub fn output(&mut self) -> @io::Reader { + // FIXME: the Reader can still be used after self is destroyed: #2625 + io::FILE_reader(self.output_file(), false) + } + + /** + * Returns an io::Reader that can be used to read from this Process's stderr. + * + * Fails if this Process's stderr was redirected to an existing file descriptor. + */ + pub fn error(&mut self) -> @io::Reader { + // FIXME: the Reader can still be used after self is destroyed: #2625 + io::FILE_reader(self.error_file(), false) + } + + /** + * Closes the handle to the child process's stdin. + * + * If this process is reading its stdin from an existing file descriptor, then this + * method does nothing. + */ + pub fn close_input(&mut self) { + #[fixed_stack_segment]; #[inline(never)]; + match self.input { + Some(-1) | None => (), + Some(fd) => { + unsafe { + libc::close(fd); + } + self.input = Some(-1); + } + } + } + + fn close_outputs(&mut self) { + #[fixed_stack_segment]; #[inline(never)]; + fclose_and_null(&mut self.output); + fclose_and_null(&mut self.error); + + fn fclose_and_null(f_opt: &mut Option<*libc::FILE>) { + #[allow(cstack)]; // fixed_stack_segment declared on enclosing fn + match *f_opt { + Some(f) if !f.is_null() => { + unsafe { + libc::fclose(f); + *f_opt = Some(0 as *libc::FILE); + } + }, + _ => () + } + } } /** * Closes the handle to stdin, waits for the child process to terminate, - * and reads and returns all remaining output of stdout and stderr, along - * with the exit code. + * and returns the exit code. * - * If the child has already been finished then the exit code and any - * remaining unread output of stdout and stderr will be returned. + * If the child has already been finished then the exit code is returned. + */ + pub fn finish(&mut self) -> int { + for &code in self.exit_code.iter() { + return code; + } + self.close_input(); + let code = waitpid(self.pid); + self.exit_code = Some(code); + return code; + } + + /** + * Closes the handle to stdin, waits for the child process to terminate, and reads + * and returns all remaining output of stdout and stderr, along with the exit code. * - * This method will fail if the child process's stdout or stderr streams - * were redirected to existing file descriptors, or if this method has - * already been called. + * If the child has already been finished then the exit code and any remaining + * unread output of stdout and stderr will be returned. + * + * This method will fail if the child process's stdout or stderr streams were + * redirected to existing file descriptors. */ pub fn finish_with_output(&mut self) -> ProcessOutput { - // This should probably be a helper method in rt::io - fn read_everything(input: &mut io::Reader) -> ~[u8] { - let mut result = ~[]; - let mut buf = [0u8, ..1024]; - loop { - match input.read(buf) { - Some(i) => { result = result + buf.slice_to(i) } - None => break - } - } - return result; - } + let output_file = self.output_file(); + let error_file = self.error_file(); + // Spawn two entire schedulers to read both stdout and sterr + // in parallel so we don't deadlock while blocking on one + // or the other. FIXME (#2625): Surely there's a much more + // clever way to do this. let (p, ch) = stream(); let ch = SharedChan::new(ch); let ch_clone = ch.clone(); - - let stderr = Cell::new(self.error.take().unwrap()); - do task::spawn { - let output = read_everything(stderr.take()); - ch.send((2, output)); + do task::spawn_sched(task::SingleThreaded) { + let errput = io::FILE_reader(error_file, false); + ch.send((2, errput.read_whole_stream())); } - let stdout = Cell::new(self.output.take().unwrap()); - do task::spawn { - let output = read_everything(stdout.take()); - ch_clone.send((1, output)); + do task::spawn_sched(task::SingleThreaded) { + let output = io::FILE_reader(output_file, false); + ch_clone.send((1, output.read_whole_stream())); } let status = self.finish(); @@ -312,6 +382,40 @@ impl Process { error: errs}; } + fn destroy_internal(&mut self, force: bool) { + // if the process has finished, and therefore had waitpid called, + // and we kill it, then on unix we might ending up killing a + // newer process that happens to have the same (re-used) id + if self.exit_code.is_none() { + killpid(self.pid, force); + self.finish(); + } + + #[cfg(windows)] + fn killpid(pid: pid_t, _force: bool) { + #[fixed_stack_segment]; #[inline(never)]; + unsafe { + libc::funcs::extra::kernel32::TerminateProcess( + cast::transmute(pid), 1); + } + } + + #[cfg(unix)] + fn killpid(pid: pid_t, force: bool) { + #[fixed_stack_segment]; #[inline(never)]; + + let signal = if force { + libc::consts::os::posix88::SIGKILL + } else { + libc::consts::os::posix88::SIGTERM + }; + + unsafe { + libc::funcs::posix88::signal::kill(pid, signal as c_int); + } + } + } + /** * Terminates the process, giving it a chance to clean itself up if * this is supported by the operating system. @@ -319,12 +423,7 @@ impl Process { * On Posix OSs SIGTERM will be sent to the process. On Win32 * TerminateProcess(..) will be called. */ - pub fn destroy(&mut self) { - #[cfg(windows)] fn sigterm() -> int { 15 } - #[cfg(not(windows))] fn sigterm() -> int { libc::SIGTERM as int } - self.handle.kill(sigterm()); - self.finish(); - } + pub fn destroy(&mut self) { self.destroy_internal(false); } /** * Terminates the process as soon as possible without giving it a @@ -333,22 +432,378 @@ impl Process { * On Posix OSs SIGKILL will be sent to the process. On Win32 * TerminateProcess(..) will be called. */ - pub fn force_destroy(&mut self) { - #[cfg(windows)] fn sigkill() -> int { 9 } - #[cfg(not(windows))] fn sigkill() -> int { libc::SIGKILL as int } - self.handle.kill(sigkill()); - self.finish(); - } + pub fn force_destroy(&mut self) { self.destroy_internal(true); } } impl Drop for Process { fn drop(&self) { // FIXME(#4330) Need self by value to get mutability. let mut_self: &mut Process = unsafe { cast::transmute(self) }; + mut_self.finish(); + mut_self.close_outputs(); + free_handle(self.handle); } } +struct SpawnProcessResult { + pid: pid_t, + handle: *(), +} + +#[cfg(windows)] +fn spawn_process_os(prog: &str, args: &[~str], + env: Option<~[(~str, ~str)]>, + dir: Option<&Path>, + in_fd: c_int, out_fd: c_int, err_fd: c_int) -> SpawnProcessResult { + #[fixed_stack_segment]; #[inline(never)]; + + use libc::types::os::arch::extra::{DWORD, HANDLE, STARTUPINFO}; + use libc::consts::os::extra::{ + TRUE, FALSE, + STARTF_USESTDHANDLES, + INVALID_HANDLE_VALUE, + DUPLICATE_SAME_ACCESS + }; + use libc::funcs::extra::kernel32::{ + GetCurrentProcess, + DuplicateHandle, + CloseHandle, + CreateProcessA + }; + use libc::funcs::extra::msvcrt::get_osfhandle; + + use sys; + + unsafe { + + let mut si = zeroed_startupinfo(); + si.cb = sys::size_of::() as DWORD; + si.dwFlags = STARTF_USESTDHANDLES; + + let cur_proc = GetCurrentProcess(); + + let orig_std_in = get_osfhandle(in_fd) as HANDLE; + if orig_std_in == INVALID_HANDLE_VALUE as HANDLE { + fail!("failure in get_osfhandle: %s", os::last_os_error()); + } + if DuplicateHandle(cur_proc, orig_std_in, cur_proc, &mut si.hStdInput, + 0, TRUE, DUPLICATE_SAME_ACCESS) == FALSE { + fail!("failure in DuplicateHandle: %s", os::last_os_error()); + } + + let orig_std_out = get_osfhandle(out_fd) as HANDLE; + if orig_std_out == INVALID_HANDLE_VALUE as HANDLE { + fail!("failure in get_osfhandle: %s", os::last_os_error()); + } + if DuplicateHandle(cur_proc, orig_std_out, cur_proc, &mut si.hStdOutput, + 0, TRUE, DUPLICATE_SAME_ACCESS) == FALSE { + fail!("failure in DuplicateHandle: %s", os::last_os_error()); + } + + let orig_std_err = get_osfhandle(err_fd) as HANDLE; + if orig_std_err == INVALID_HANDLE_VALUE as HANDLE { + fail!("failure in get_osfhandle: %s", os::last_os_error()); + } + if DuplicateHandle(cur_proc, orig_std_err, cur_proc, &mut si.hStdError, + 0, TRUE, DUPLICATE_SAME_ACCESS) == FALSE { + fail!("failure in DuplicateHandle: %s", os::last_os_error()); + } + + let cmd = make_command_line(prog, args); + let mut pi = zeroed_process_information(); + let mut create_err = None; + + do with_envp(env) |envp| { + do with_dirp(dir) |dirp| { + do cmd.with_c_str |cmdp| { + let created = CreateProcessA(ptr::null(), cast::transmute(cmdp), + ptr::mut_null(), ptr::mut_null(), TRUE, + 0, envp, dirp, &mut si, &mut pi); + if created == FALSE { + create_err = Some(os::last_os_error()); + } + } + } + } + + CloseHandle(si.hStdInput); + CloseHandle(si.hStdOutput); + CloseHandle(si.hStdError); + + for msg in create_err.iter() { + fail!("failure in CreateProcess: %s", *msg); + } + + // We close the thread handle because we don't care about keeping the thread id valid, + // and we aren't keeping the thread handle around to be able to close it later. We don't + // close the process handle however because we want the process id to stay valid at least + // until the calling code closes the process handle. + CloseHandle(pi.hThread); + + SpawnProcessResult { + pid: pi.dwProcessId as pid_t, + handle: pi.hProcess as *() + } + } +} + +#[cfg(windows)] +fn zeroed_startupinfo() -> libc::types::os::arch::extra::STARTUPINFO { + libc::types::os::arch::extra::STARTUPINFO { + cb: 0, + lpReserved: ptr::mut_null(), + lpDesktop: ptr::mut_null(), + lpTitle: ptr::mut_null(), + dwX: 0, + dwY: 0, + dwXSize: 0, + dwYSize: 0, + dwXCountChars: 0, + dwYCountCharts: 0, + dwFillAttribute: 0, + dwFlags: 0, + wShowWindow: 0, + cbReserved2: 0, + lpReserved2: ptr::mut_null(), + hStdInput: ptr::mut_null(), + hStdOutput: ptr::mut_null(), + hStdError: ptr::mut_null() + } +} + +#[cfg(windows)] +fn zeroed_process_information() -> libc::types::os::arch::extra::PROCESS_INFORMATION { + libc::types::os::arch::extra::PROCESS_INFORMATION { + hProcess: ptr::mut_null(), + hThread: ptr::mut_null(), + dwProcessId: 0, + dwThreadId: 0 + } +} + +// FIXME: this is only pub so it can be tested (see issue #4536) +#[cfg(windows)] +pub fn make_command_line(prog: &str, args: &[~str]) -> ~str { + let mut cmd = ~""; + append_arg(&mut cmd, prog); + for arg in args.iter() { + cmd.push_char(' '); + append_arg(&mut cmd, *arg); + } + return cmd; + + fn append_arg(cmd: &mut ~str, arg: &str) { + let quote = arg.iter().any(|c| c == ' ' || c == '\t'); + if quote { + cmd.push_char('"'); + } + for i in range(0u, arg.len()) { + append_char_at(cmd, arg, i); + } + if quote { + cmd.push_char('"'); + } + } + + fn append_char_at(cmd: &mut ~str, arg: &str, i: uint) { + match arg[i] as char { + '"' => { + // Escape quotes. + cmd.push_str("\\\""); + } + '\\' => { + if backslash_run_ends_in_quote(arg, i) { + // Double all backslashes that are in runs before quotes. + cmd.push_str("\\\\"); + } else { + // Pass other backslashes through unescaped. + cmd.push_char('\\'); + } + } + c => { + cmd.push_char(c); + } + } + } + + fn backslash_run_ends_in_quote(s: &str, mut i: uint) -> bool { + while i < s.len() && s[i] as char == '\\' { + i += 1; + } + return i < s.len() && s[i] as char == '"'; + } +} + +#[cfg(unix)] +fn spawn_process_os(prog: &str, args: &[~str], + env: Option<~[(~str, ~str)]>, + dir: Option<&Path>, + in_fd: c_int, out_fd: c_int, err_fd: c_int) -> SpawnProcessResult { + #[fixed_stack_segment]; #[inline(never)]; + + use libc::funcs::posix88::unistd::{fork, dup2, close, chdir, execvp}; + use libc::funcs::bsd44::getdtablesize; + + mod rustrt { + use libc::c_void; + + #[abi = "cdecl"] + extern { + pub fn rust_unset_sigprocmask(); + pub fn rust_set_environ(envp: *c_void); + } + } + + unsafe { + + let pid = fork(); + if pid < 0 { + fail!("failure in fork: %s", os::last_os_error()); + } else if pid > 0 { + return SpawnProcessResult {pid: pid, handle: ptr::null()}; + } + + rustrt::rust_unset_sigprocmask(); + + if dup2(in_fd, 0) == -1 { + fail!("failure in dup2(in_fd, 0): %s", os::last_os_error()); + } + if dup2(out_fd, 1) == -1 { + fail!("failure in dup2(out_fd, 1): %s", os::last_os_error()); + } + if dup2(err_fd, 2) == -1 { + fail!("failure in dup3(err_fd, 2): %s", os::last_os_error()); + } + // close all other fds + for fd in range(3, getdtablesize()).invert() { + close(fd as c_int); + } + + do with_dirp(dir) |dirp| { + if !dirp.is_null() && chdir(dirp) == -1 { + fail!("failure in chdir: %s", os::last_os_error()); + } + } + + do with_envp(env) |envp| { + if !envp.is_null() { + rustrt::rust_set_environ(envp); + } + do with_argv(prog, args) |argv| { + execvp(*argv, argv); + // execvp only returns if an error occurred + fail!("failure in execvp: %s", os::last_os_error()); + } + } + } +} + +#[cfg(unix)] +fn with_argv(prog: &str, args: &[~str], cb: &fn(**libc::c_char) -> T) -> T { + use vec; + + // We can't directly convert `str`s into `*char`s, as someone needs to hold + // a reference to the intermediary byte buffers. So first build an array to + // hold all the ~[u8] byte strings. + let mut tmps = vec::with_capacity(args.len() + 1); + + tmps.push(prog.to_c_str()); + + for arg in args.iter() { + tmps.push(arg.to_c_str()); + } + + // Next, convert each of the byte strings into a pointer. This is + // technically unsafe as the caller could leak these pointers out of our + // scope. + let mut ptrs = do tmps.map |tmp| { + tmp.with_ref(|buf| buf) + }; + + // Finally, make sure we add a null pointer. + ptrs.push(ptr::null()); + + ptrs.as_imm_buf(|buf, _| cb(buf)) +} + +#[cfg(unix)] +fn with_envp(env: Option<~[(~str, ~str)]>, cb: &fn(*c_void) -> T) -> T { + use vec; + + // On posixy systems we can pass a char** for envp, which is a + // null-terminated array of "k=v\n" strings. Like `with_argv`, we have to + // have a temporary buffer to hold the intermediary `~[u8]` byte strings. + match env { + Some(env) => { + let mut tmps = vec::with_capacity(env.len()); + + for pair in env.iter() { + // Use of match here is just to workaround limitations + // in the stage0 irrefutable pattern impl. + let kv = fmt!("%s=%s", pair.first(), pair.second()); + tmps.push(kv.to_c_str()); + } + + // Once again, this is unsafe. + let mut ptrs = do tmps.map |tmp| { + tmp.with_ref(|buf| buf) + }; + ptrs.push(ptr::null()); + + do ptrs.as_imm_buf |buf, _| { + unsafe { cb(cast::transmute(buf)) } + } + } + _ => cb(ptr::null()) + } +} + +#[cfg(windows)] +fn with_envp(env: Option<~[(~str, ~str)]>, cb: &fn(*mut c_void) -> T) -> T { + // On win32 we pass an "environment block" which is not a char**, but + // rather a concatenation of null-terminated k=v\0 sequences, with a final + // \0 to terminate. + match env { + Some(env) => { + let mut blk = ~[]; + + for pair in env.iter() { + let kv = fmt!("%s=%s", pair.first(), pair.second()); + blk.push_all(kv.as_bytes()); + blk.push(0); + } + + blk.push(0); + + do blk.as_imm_buf |p, _len| { + unsafe { cb(cast::transmute(p)) } + } + } + _ => cb(ptr::mut_null()) + } +} + +fn with_dirp(d: Option<&Path>, cb: &fn(*libc::c_char) -> T) -> T { + match d { + Some(dir) => dir.with_c_str(|buf| cb(buf)), + None => cb(ptr::null()) + } +} + +#[cfg(windows)] +fn free_handle(handle: *()) { + #[fixed_stack_segment]; #[inline(never)]; + unsafe { + libc::funcs::extra::kernel32::CloseHandle(cast::transmute(handle)); + } +} + +#[cfg(unix)] +fn free_handle(_handle: *()) { + // unix has no process handle object, just a pid +} + /** * Spawns a process and waits for it to terminate. The process will * inherit the current stdin/stdout/stderr file descriptors. @@ -369,7 +824,7 @@ pub fn process_status(prog: &str, args: &[~str]) -> int { in_fd: Some(0), out_fd: Some(1), err_fd: Some(2) - }).unwrap(); + }); prog.finish() } @@ -386,38 +841,162 @@ pub fn process_status(prog: &str, args: &[~str]) -> int { * The process's stdout/stderr output and exit code. */ pub fn process_output(prog: &str, args: &[~str]) -> ProcessOutput { - let mut prog = Process::new(prog, args, ProcessOptions::new()).unwrap(); + let mut prog = Process::new(prog, args, ProcessOptions::new()); prog.finish_with_output() } +/** + * Waits for a process to exit and returns the exit code, failing + * if there is no process with the specified id. + * + * Note that this is private to avoid race conditions on unix where if + * a user calls waitpid(some_process.get_id()) then some_process.finish() + * and some_process.destroy() and some_process.finalize() will then either + * operate on a none-existent process or, even worse, on a newer process + * with the same id. + */ +fn waitpid(pid: pid_t) -> int { + return waitpid_os(pid); + + #[cfg(windows)] + fn waitpid_os(pid: pid_t) -> int { + #[fixed_stack_segment]; #[inline(never)]; + + use libc::types::os::arch::extra::DWORD; + use libc::consts::os::extra::{ + SYNCHRONIZE, + PROCESS_QUERY_INFORMATION, + FALSE, + STILL_ACTIVE, + INFINITE, + WAIT_FAILED + }; + use libc::funcs::extra::kernel32::{ + OpenProcess, + GetExitCodeProcess, + CloseHandle, + WaitForSingleObject + }; + + unsafe { + + let proc = OpenProcess(SYNCHRONIZE | PROCESS_QUERY_INFORMATION, FALSE, pid as DWORD); + if proc.is_null() { + fail!("failure in OpenProcess: %s", os::last_os_error()); + } + + loop { + let mut status = 0; + if GetExitCodeProcess(proc, &mut status) == FALSE { + CloseHandle(proc); + fail!("failure in GetExitCodeProcess: %s", os::last_os_error()); + } + if status != STILL_ACTIVE { + CloseHandle(proc); + return status as int; + } + if WaitForSingleObject(proc, INFINITE) == WAIT_FAILED { + CloseHandle(proc); + fail!("failure in WaitForSingleObject: %s", os::last_os_error()); + } + } + } + } + + #[cfg(unix)] + fn waitpid_os(pid: pid_t) -> int { + #[fixed_stack_segment]; #[inline(never)]; + + use libc::funcs::posix01::wait::*; + + #[cfg(target_os = "linux")] + #[cfg(target_os = "android")] + fn WIFEXITED(status: i32) -> bool { + (status & 0xffi32) == 0i32 + } + + #[cfg(target_os = "macos")] + #[cfg(target_os = "freebsd")] + fn WIFEXITED(status: i32) -> bool { + (status & 0x7fi32) == 0i32 + } + + #[cfg(target_os = "linux")] + #[cfg(target_os = "android")] + fn WEXITSTATUS(status: i32) -> i32 { + (status >> 8i32) & 0xffi32 + } + + #[cfg(target_os = "macos")] + #[cfg(target_os = "freebsd")] + fn WEXITSTATUS(status: i32) -> i32 { + status >> 8i32 + } + + let mut status = 0 as c_int; + if unsafe { waitpid(pid, &mut status, 0) } == -1 { + fail!("failure in waitpid: %s", os::last_os_error()); + } + + return if WIFEXITED(status) { + WEXITSTATUS(status) as int + } else { + 1 + }; + } +} + #[cfg(test)] mod tests { + use io; + use libc::c_int; + use option::{Option, None, Some}; use os; use path::Path; - use prelude::*; + use run; use str; - use super::*; use unstable::running_on_valgrind; + #[test] + #[cfg(windows)] + fn test_make_command_line() { + assert_eq!( + run::make_command_line("prog", [~"aaa", ~"bbb", ~"ccc"]), + ~"prog aaa bbb ccc" + ); + assert_eq!( + run::make_command_line("C:\\Program Files\\blah\\blah.exe", [~"aaa"]), + ~"\"C:\\Program Files\\blah\\blah.exe\" aaa" + ); + assert_eq!( + run::make_command_line("C:\\Program Files\\test", [~"aa\"bb"]), + ~"\"C:\\Program Files\\test\" aa\\\"bb" + ); + assert_eq!( + run::make_command_line("echo", [~"a b c"]), + ~"echo \"a b c\"" + ); + } + #[test] #[cfg(not(target_os="android"))] fn test_process_status() { - assert_eq!(process_status("false", []), 1); - assert_eq!(process_status("true", []), 0); + assert_eq!(run::process_status("false", []), 1); + assert_eq!(run::process_status("true", []), 0); } #[test] #[cfg(target_os="android")] fn test_process_status() { - assert_eq!(process_status("/system/bin/sh", [~"-c",~"false"]), 1); - assert_eq!(process_status("/system/bin/sh", [~"-c",~"true"]), 0); + assert_eq!(run::process_status("/system/bin/sh", [~"-c",~"false"]), 1); + assert_eq!(run::process_status("/system/bin/sh", [~"-c",~"true"]), 0); } #[test] #[cfg(not(target_os="android"))] fn test_process_output_output() { - let ProcessOutput {status, output, error} - = process_output("echo", [~"hello"]); + let run::ProcessOutput {status, output, error} + = run::process_output("echo", [~"hello"]); let output_str = str::from_bytes(output); assert_eq!(status, 0); @@ -431,8 +1010,8 @@ mod tests { #[cfg(target_os="android")] fn test_process_output_output() { - let ProcessOutput {status, output, error} - = process_output("/system/bin/sh", [~"-c",~"echo hello"]); + let run::ProcessOutput {status, output, error} + = run::process_output("/system/bin/sh", [~"-c",~"echo hello"]); let output_str = str::from_bytes(output); assert_eq!(status, 0); @@ -447,8 +1026,8 @@ mod tests { #[cfg(not(target_os="android"))] fn test_process_output_error() { - let ProcessOutput {status, output, error} - = process_output("mkdir", [~"."]); + let run::ProcessOutput {status, output, error} + = run::process_output("mkdir", [~"."]); assert_eq!(status, 1); assert_eq!(output, ~[]); @@ -458,40 +1037,90 @@ mod tests { #[cfg(target_os="android")] fn test_process_output_error() { - let ProcessOutput {status, output, error} - = process_output("/system/bin/mkdir", [~"."]); + let run::ProcessOutput {status, output, error} + = run::process_output("/system/bin/mkdir", [~"."]); assert_eq!(status, 255); assert_eq!(output, ~[]); assert!(!error.is_empty()); } + #[test] + fn test_pipes() { + + let pipe_in = os::pipe(); + let pipe_out = os::pipe(); + let pipe_err = os::pipe(); + + let mut proc = run::Process::new("cat", [], run::ProcessOptions { + dir: None, + env: None, + in_fd: Some(pipe_in.input), + out_fd: Some(pipe_out.out), + err_fd: Some(pipe_err.out) + }); + + assert!(proc.input_redirected()); + assert!(proc.output_redirected()); + assert!(proc.error_redirected()); + + os::close(pipe_in.input); + os::close(pipe_out.out); + os::close(pipe_err.out); + + let expected = ~"test"; + writeclose(pipe_in.out, expected); + let actual = readclose(pipe_out.input); + readclose(pipe_err.input); + proc.finish(); + + assert_eq!(expected, actual); + } + + fn writeclose(fd: c_int, s: &str) { + let writer = io::fd_writer(fd, false); + writer.write_str(s); + os::close(fd); + } + + fn readclose(fd: c_int) -> ~str { + #[fixed_stack_segment]; #[inline(never)]; + + unsafe { + let file = os::fdopen(fd); + let reader = io::FILE_reader(file, false); + let buf = reader.read_whole_stream(); + os::fclose(file); + str::from_bytes(buf) + } + } + #[test] #[cfg(not(target_os="android"))] fn test_finish_once() { - let mut prog = Process::new("false", [], ProcessOptions::new()).unwrap(); + let mut prog = run::Process::new("false", [], run::ProcessOptions::new()); assert_eq!(prog.finish(), 1); } #[test] #[cfg(target_os="android")] fn test_finish_once() { - let mut prog = Process::new("/system/bin/sh", [~"-c",~"false"], - ProcessOptions::new()).unwrap(); + let mut prog = run::Process::new("/system/bin/sh", [~"-c",~"false"], + run::ProcessOptions::new()); assert_eq!(prog.finish(), 1); } #[test] #[cfg(not(target_os="android"))] fn test_finish_twice() { - let mut prog = Process::new("false", [], ProcessOptions::new()).unwrap(); + let mut prog = run::Process::new("false", [], run::ProcessOptions::new()); assert_eq!(prog.finish(), 1); assert_eq!(prog.finish(), 1); } #[test] #[cfg(target_os="android")] fn test_finish_twice() { - let mut prog = Process::new("/system/bin/sh", [~"-c",~"false"], - ProcessOptions::new()).unwrap(); + let mut prog = run::Process::new("/system/bin/sh", [~"-c",~"false"], + run::ProcessOptions::new()); assert_eq!(prog.finish(), 1); assert_eq!(prog.finish(), 1); } @@ -500,9 +1129,8 @@ mod tests { #[cfg(not(target_os="android"))] fn test_finish_with_output_once() { - let prog = Process::new("echo", [~"hello"], ProcessOptions::new()); - let mut prog = prog.unwrap(); - let ProcessOutput {status, output, error} + let mut prog = run::Process::new("echo", [~"hello"], run::ProcessOptions::new()); + let run::ProcessOutput {status, output, error} = prog.finish_with_output(); let output_str = str::from_bytes(output); @@ -517,9 +1145,9 @@ mod tests { #[cfg(target_os="android")] fn test_finish_with_output_once() { - let mut prog = Process::new("/system/bin/sh", [~"-c",~"echo hello"], - ProcessOptions::new()).unwrap(); - let ProcessOutput {status, output, error} + let mut prog = run::Process::new("/system/bin/sh", [~"-c",~"echo hello"], + run::ProcessOptions::new()); + let run::ProcessOutput {status, output, error} = prog.finish_with_output(); let output_str = str::from_bytes(output); @@ -531,59 +1159,113 @@ mod tests { } } + #[test] + #[cfg(not(target_os="android"))] + fn test_finish_with_output_twice() { + + let mut prog = run::Process::new("echo", [~"hello"], run::ProcessOptions::new()); + let run::ProcessOutput {status, output, error} + = prog.finish_with_output(); + + let output_str = str::from_bytes(output); + + assert_eq!(status, 0); + assert_eq!(output_str.trim().to_owned(), ~"hello"); + // FIXME #7224 + if !running_on_valgrind() { + assert_eq!(error, ~[]); + } + + let run::ProcessOutput {status, output, error} + = prog.finish_with_output(); + + assert_eq!(status, 0); + assert_eq!(output, ~[]); + // FIXME #7224 + if !running_on_valgrind() { + assert_eq!(error, ~[]); + } + } + #[test] + #[cfg(target_os="android")] + fn test_finish_with_output_twice() { + + let mut prog = run::Process::new("/system/bin/sh", [~"-c",~"echo hello"], + run::ProcessOptions::new()); + let run::ProcessOutput {status, output, error} + = prog.finish_with_output(); + + let output_str = str::from_bytes(output); + + assert_eq!(status, 0); + assert_eq!(output_str.trim().to_owned(), ~"hello"); + // FIXME #7224 + if !running_on_valgrind() { + assert_eq!(error, ~[]); + } + + let run::ProcessOutput {status, output, error} + = prog.finish_with_output(); + + assert_eq!(status, 0); + assert_eq!(output, ~[]); + // FIXME #7224 + if !running_on_valgrind() { + assert_eq!(error, ~[]); + } + } + #[test] #[should_fail] #[cfg(not(windows),not(target_os="android"))] fn test_finish_with_output_redirected() { - let mut prog = Process::new("echo", [~"hello"], ProcessOptions { + let mut prog = run::Process::new("echo", [~"hello"], run::ProcessOptions { env: None, dir: None, in_fd: Some(0), out_fd: Some(1), err_fd: Some(2) - }).unwrap(); - // this should fail because it is not valid to read the output when it - // was redirected + }); + // this should fail because it is not valid to read the output when it was redirected prog.finish_with_output(); } #[test] #[should_fail] #[cfg(not(windows),target_os="android")] fn test_finish_with_output_redirected() { - let mut prog = Process::new("/system/bin/sh", [~"-c",~"echo hello"], - ProcessOptions { + let mut prog = run::Process::new("/system/bin/sh", [~"-c",~"echo hello"], + run::ProcessOptions { env: None, dir: None, in_fd: Some(0), out_fd: Some(1), err_fd: Some(2) - }).unwrap(); - // this should fail because it is not valid to read the output when it - // was redirected + }); + // this should fail because it is not valid to read the output when it was redirected prog.finish_with_output(); } #[cfg(unix,not(target_os="android"))] - fn run_pwd(dir: Option<&Path>) -> Process { - Process::new("pwd", [], ProcessOptions { + fn run_pwd(dir: Option<&Path>) -> run::Process { + run::Process::new("pwd", [], run::ProcessOptions { dir: dir, - .. ProcessOptions::new() - }).unwrap() + .. run::ProcessOptions::new() + }) } #[cfg(unix,target_os="android")] - fn run_pwd(dir: Option<&Path>) -> Process { - Process::new("/system/bin/sh", [~"-c",~"pwd"], ProcessOptions { + fn run_pwd(dir: Option<&Path>) -> run::Process { + run::Process::new("/system/bin/sh", [~"-c",~"pwd"], run::ProcessOptions { dir: dir, - .. ProcessOptions::new() - }).unwrap() + .. run::ProcessOptions::new() + }) } #[cfg(windows)] - fn run_pwd(dir: Option<&Path>) -> Process { - Process::new("cmd", [~"/c", ~"cd"], ProcessOptions { + fn run_pwd(dir: Option<&Path>) -> run::Process { + run::Process::new("cmd", [~"/c", ~"cd"], run::ProcessOptions { dir: dir, - .. ProcessOptions::new() - }).unwrap() + .. run::ProcessOptions::new() + }) } #[test] @@ -619,26 +1301,26 @@ mod tests { } #[cfg(unix,not(target_os="android"))] - fn run_env(env: Option<~[(~str, ~str)]>) -> Process { - Process::new("env", [], ProcessOptions { + fn run_env(env: Option<~[(~str, ~str)]>) -> run::Process { + run::Process::new("env", [], run::ProcessOptions { env: env, - .. ProcessOptions::new() - }).unwrap() + .. run::ProcessOptions::new() + }) } #[cfg(unix,target_os="android")] - fn run_env(env: Option<~[(~str, ~str)]>) -> Process { - Process::new("/system/bin/sh", [~"-c",~"set"], ProcessOptions { + fn run_env(env: Option<~[(~str, ~str)]>) -> run::Process { + run::Process::new("/system/bin/sh", [~"-c",~"set"], run::ProcessOptions { env: env, - .. ProcessOptions::new() - }).unwrap() + .. run::ProcessOptions::new() + }) } #[cfg(windows)] - fn run_env(env: Option<~[(~str, ~str)]>) -> Process { - Process::new("cmd", [~"/c", ~"set"], ProcessOptions { + fn run_env(env: Option<~[(~str, ~str)]>) -> run::Process { + run::Process::new("cmd", [~"/c", ~"set"], run::ProcessOptions { env: env, - .. ProcessOptions::new() - }).unwrap() + .. run::ProcessOptions::new() + }) } #[test] @@ -675,6 +1357,7 @@ mod tests { #[test] fn test_add_to_env() { + let mut new_env = os::env(); new_env.push((~"RUN_TEST_NEW_ENV", ~"123")); diff --git a/src/libuv b/src/libuv index d88cf5652a1..dfae9c3e958 160000 --- a/src/libuv +++ b/src/libuv @@ -1 +1 @@ -Subproject commit d88cf5652a1afb23939da0bae86c70ec521b9921 +Subproject commit dfae9c3e958dc086d9c0ab068cd76d196c95a433 diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index a181e76df5c..8ef4572f810 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -329,13 +329,20 @@ rust_uv_get_len_from_buf(uv_buf_t buf) { return buf.len; } +extern "C" uv_err_t +rust_uv_last_error(uv_loop_t* loop) { + return uv_last_error(loop); +} + extern "C" const char* -rust_uv_strerror(int err) { +rust_uv_strerror(uv_err_t* err_ptr) { + uv_err_t err = *err_ptr; return uv_strerror(err); } extern "C" const char* -rust_uv_err_name(int err) { +rust_uv_err_name(uv_err_t* err_ptr) { + uv_err_t err = *err_ptr; return uv_err_name(err); } @@ -546,37 +553,3 @@ extern "C" uv_loop_t* rust_uv_get_loop_from_fs_req(uv_fs_t* req) { return req->loop; } -extern "C" int -rust_uv_spawn(uv_loop_t *loop, uv_process_t *p, uv_process_options_t options) { - return uv_spawn(loop, p, options); -} - -extern "C" int -rust_uv_process_kill(uv_process_t *p, int signum) { - return uv_process_kill(p, signum); -} - -extern "C" void -rust_set_stdio_container_flags(uv_stdio_container_t *c, int flags) { - c->flags = (uv_stdio_flags) flags; -} - -extern "C" void -rust_set_stdio_container_fd(uv_stdio_container_t *c, int fd) { - c->data.fd = fd; -} - -extern "C" void -rust_set_stdio_container_stream(uv_stdio_container_t *c, uv_stream_t *stream) { - c->data.stream = stream; -} - -extern "C" int -rust_uv_process_pid(uv_process_t* p) { - return p->pid; -} - -extern "C" int -rust_uv_pipe_init(uv_loop_t *loop, uv_pipe_t* p, int ipc) { - return uv_pipe_init(loop, p, ipc); -} diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 2fc1a91a132..b668d394406 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -47,6 +47,7 @@ rust_uv_timer_start rust_uv_timer_stop rust_uv_tcp_init rust_uv_buf_init +rust_uv_last_error rust_uv_strerror rust_uv_err_name rust_uv_ip4_addr @@ -190,11 +191,4 @@ rust_drop_global_args_lock rust_take_change_dir_lock rust_drop_change_dir_lock rust_get_test_int -rust_get_task -rust_uv_spawn -rust_uv_process_kill -rust_set_stdio_container_flags -rust_set_stdio_container_fd -rust_set_stdio_container_stream -rust_uv_process_pid -rust_uv_pipe_init +rust_get_task \ No newline at end of file diff --git a/src/test/run-pass/core-run-destroy.rs b/src/test/run-pass/core-run-destroy.rs index 90e63fc977d..2551d1a5cfc 100644 --- a/src/test/run-pass/core-run-destroy.rs +++ b/src/test/run-pass/core-run-destroy.rs @@ -22,15 +22,13 @@ use std::str; #[test] fn test_destroy_once() { - let p = run::Process::new("echo", [], run::ProcessOptions::new()); - let mut p = p.unwrap(); + let mut p = run::Process::new("echo", [], run::ProcessOptions::new()); p.destroy(); // this shouldn't crash (and nor should the destructor) } #[test] fn test_destroy_twice() { - let p = run::Process::new("echo", [], run::ProcessOptions::new()); - let mut p = p.unwrap(); + let mut p = run::Process::new("echo", [], run::ProcessOptions::new()); p.destroy(); // this shouldnt crash... p.destroy(); // ...and nor should this (and nor should the destructor) } @@ -76,8 +74,7 @@ fn test_destroy_actually_kills(force: bool) { } // this process will stay alive indefinitely trying to read from stdin - let p = run::Process::new(BLOCK_COMMAND, [], run::ProcessOptions::new()); - let mut p = p.unwrap(); + let mut p = run::Process::new(BLOCK_COMMAND, [], run::ProcessOptions::new()); assert!(process_exists(p.get_id()));