diff --git a/.gitmodules b/.gitmodules index 88ead6e608d..fa979b6d868 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,5 +4,5 @@ branch = master [submodule "src/libuv"] path = src/libuv - url = https://github.com/brson/libuv.git + url = https://github.com/alexcrichton/libuv.git branch = master diff --git a/mk/rt.mk b/mk/rt.mk index 6a9620c7364..3178c579129 100644 --- a/mk/rt.mk +++ b/mk/rt.mk @@ -163,34 +163,49 @@ LIBUV_DEPS := $$(wildcard \ $$(S)src/libuv/*/*/*/*) endif +LIBUV_GYP := $$(S)src/libuv/build/gyp +LIBUV_MAKEFILE_$(1)_$(2) := $$(CFG_BUILD_DIR)rt/$(1)/stage$(2)/libuv/Makefile +LIBUV_NO_LOAD = run-benchmarks.target.mk run-tests.target.mk \ + uv_dtrace_header.target.mk uv_dtrace_provider.target.mk + +$$(LIBUV_MAKEFILE_$(1)_$(2)): $$(LIBUV_GYP) + (cd $(S)src/libuv/ && \ + ./gyp_uv -f make -Dtarget_arch=$$(HOST_$(1)) -D ninja \ + -Goutput_dir=$$(@D) --generator-output $$(@D)) + # XXX: Shouldn't need platform-specific conditions here ifdef CFG_WINDOWSY_$(1) $$(LIBUV_LIB_$(1)_$(2)): $$(LIBUV_DEPS) - $$(Q)$$(MAKE) -C $$(S)src/libuv/ \ - builddir_name="$$(CFG_BUILD_DIR)/rt/$(1)/stage$(2)/libuv" \ - OS=mingw \ + $$(Q)rm -f $$(S)src/libuv/libuv.a + $$(Q)$$(MAKE) -C $$(S)src/libuv -f Makefile.mingw \ + CFLAGS="$$(CFG_GCCISH_CFLAGS) $$(LIBUV_FLAGS_$$(HOST_$(1))) $$(SNAP_DEFINES)" \ + AR="$$(AR_$(1))" \ V=$$(VERBOSE) + $$(Q)cp $$(S)src/libuv/libuv.a $$@ else ifeq ($(OSTYPE_$(1)), linux-androideabi) -$$(LIBUV_LIB_$(1)_$(2)): $$(LIBUV_DEPS) - $$(Q)$$(MAKE) -C $$(S)src/libuv/ \ +$$(LIBUV_LIB_$(1)_$(2)): $$(LIBUV_DEPS) $$(LIBUV_MAKEFILE_$(1)_$(2)) + $$(Q)$$(MAKE) -C $$(@D) \ CFLAGS="$$(CFG_GCCISH_CFLAGS) $$(LIBUV_FLAGS_$$(HOST_$(1))) $$(SNAP_DEFINES)" \ LDFLAGS="$$(CFG_GCCISH_LINK_FLAGS) $$(LIBUV_FLAGS_$$(HOST_$(1)))" \ CC="$$(CC_$(1))" \ CXX="$$(CXX_$(1))" \ AR="$$(AR_$(1))" \ - BUILDTYPE=Release \ - builddir_name="$$(CFG_BUILD_DIR)/rt/$(1)/stage$(2)/libuv" \ host=android OS=linux \ + builddir="." \ + BUILDTYPE=Release \ + NO_LOAD="$$(LIBUV_NO_LOAD)" \ V=$$(VERBOSE) else -$$(LIBUV_LIB_$(1)_$(2)): $$(LIBUV_DEPS) - $$(Q)$$(MAKE) -C $$(S)src/libuv/ \ +$$(LIBUV_LIB_$(1)_$(2)): $$(LIBUV_DEPS) $$(LIBUV_MAKEFILE_$(1)_$(2)) + $$(Q)$$(MAKE) -C $$(@D) \ CFLAGS="$$(CFG_GCCISH_CFLAGS) $$(LIBUV_FLAGS_$$(HOST_$(1))) $$(SNAP_DEFINES)" \ LDFLAGS="$$(CFG_GCCISH_LINK_FLAGS) $$(LIBUV_FLAGS_$$(HOST_$(1)))" \ CC="$$(CC_$(1))" \ CXX="$$(CXX_$(1))" \ AR="$$(AR_$(1))" \ - builddir_name="$$(CFG_BUILD_DIR)/rt/$(1)/stage$(2)/libuv" \ + builddir="." \ + BUILDTYPE=Release \ + NO_LOAD="$$(LIBUV_NO_LOAD)" \ V=$$(VERBOSE) endif @@ -254,3 +269,7 @@ endef $(foreach stage,$(STAGES), \ $(foreach target,$(CFG_TARGET_TRIPLES), \ $(eval $(call DEF_RUNTIME_TARGETS,$(target),$(stage))))) + +$(LIBUV_GYP): + mkdir -p $(S)src/libuv/build + git clone https://git.chromium.org/external/gyp.git $(S)src/libuv/build/gyp diff --git a/src/compiletest/procsrv.rs b/src/compiletest/procsrv.rs index 45e4f756d7a..74627829c60 100644 --- a/src/compiletest/procsrv.rs +++ b/src/compiletest/procsrv.rs @@ -54,10 +54,10 @@ pub fn run(lib_path: &str, in_fd: None, out_fd: None, err_fd: None - }); + }).unwrap(); for input in input.iter() { - proc.input().write_str(*input); + proc.input().write(input.as_bytes()); } let output = proc.finish_with_output(); diff --git a/src/compiletest/runtest.rs b/src/compiletest/runtest.rs index 05fd621e597..16de4f8e822 100644 --- a/src/compiletest/runtest.rs +++ b/src/compiletest/runtest.rs @@ -20,41 +20,16 @@ use procsrv; use util; use util::logv; -use std::cell::Cell; use std::io; use std::os; use std::str; -use std::task::{spawn_sched, SingleThreaded}; use std::vec; -use std::unstable::running_on_valgrind; use extra::test::MetricMap; pub fn run(config: config, testfile: ~str) { - let config = Cell::new(config); - let testfile = Cell::new(testfile); - // FIXME #6436: Creating another thread to run the test because this - // is going to call waitpid. The new scheduler has some strange - // interaction between the blocking tasks and 'friend' schedulers - // that destroys parallelism if we let normal schedulers block. - // It should be possible to remove this spawn once std::run is - // rewritten to be non-blocking. - // - // We do _not_ create another thread if we're running on V because - // it serializes all threads anyways. - if running_on_valgrind() { - let config = config.take(); - let testfile = testfile.take(); - let mut _mm = MetricMap::new(); - run_metrics(config, testfile, &mut _mm); - } else { - do spawn_sched(SingleThreaded) { - let config = config.take(); - let testfile = testfile.take(); - let mut _mm = MetricMap::new(); - run_metrics(config, testfile, &mut _mm); - } - } + let mut _mm = MetricMap::new(); + run_metrics(config, testfile, &mut _mm); } pub fn run_metrics(config: config, testfile: ~str, mm: &mut MetricMap) { diff --git a/src/librustdoc/markdown_writer.rs b/src/librustdoc/markdown_writer.rs index c13e85ea716..635d02196fe 100644 --- a/src/librustdoc/markdown_writer.rs +++ b/src/librustdoc/markdown_writer.rs @@ -104,14 +104,14 @@ fn pandoc_writer( ]; do generic_writer |markdown| { - use std::io::WriterUtil; - debug!("pandoc cmd: %s", pandoc_cmd); debug!("pandoc args: %s", pandoc_args.connect(" ")); - let mut proc = run::Process::new(pandoc_cmd, pandoc_args, run::ProcessOptions::new()); + let proc = run::Process::new(pandoc_cmd, pandoc_args, + run::ProcessOptions::new()); + let mut proc = proc.unwrap(); - proc.input().write_str(markdown); + proc.input().write(markdown.as_bytes()); let output = proc.finish_with_output(); debug!("pandoc result: %i", output.status); diff --git a/src/librustpkg/source_control.rs b/src/librustpkg/source_control.rs index caa004a53b2..c67a8158139 100644 --- a/src/librustpkg/source_control.rs +++ b/src/librustpkg/source_control.rs @@ -89,7 +89,7 @@ pub fn git_clone_general(source: &str, target: &Path, v: &Version) -> bool { fn process_output_in_cwd(prog: &str, args: &[~str], cwd: &Path) -> ProcessOutput { let mut prog = Process::new(prog, args, ProcessOptions{ dir: Some(cwd) - ,..ProcessOptions::new()}); + ,..ProcessOptions::new()}).unwrap(); prog.finish_with_output() } diff --git a/src/librustpkg/tests.rs b/src/librustpkg/tests.rs index 98999da41c8..b0d996ea0af 100644 --- a/src/librustpkg/tests.rs +++ b/src/librustpkg/tests.rs @@ -112,13 +112,14 @@ fn mk_temp_workspace(short_name: &Path, version: &Version) -> Path { fn run_git(args: &[~str], env: Option<~[(~str, ~str)]>, cwd: &Path, err_msg: &str) { let cwd = (*cwd).clone(); - let mut prog = run::Process::new("git", args, run::ProcessOptions { + let prog = run::Process::new("git", args, run::ProcessOptions { env: env, dir: Some(&cwd), in_fd: None, out_fd: None, err_fd: None }); + let mut prog = prog.unwrap(); let rslt = prog.finish_with_output(); if rslt.status != 0 { fail!("%s [git returned %?, output = %s, error = %s]", err_msg, @@ -226,7 +227,7 @@ fn command_line_test_with_env(args: &[~str], cwd: &Path, env: Option<~[(~str, ~s in_fd: None, out_fd: None, err_fd: None - }); + }).unwrap(); let output = prog.finish_with_output(); debug!("Output from command %s with args %? was %s {%s}[%?]", cmd, args, str::from_bytes(output.output), @@ -1027,16 +1028,17 @@ fn test_extern_mod() { test_sysroot().to_str(), exec_file.to_str()); - let mut prog = run::Process::new(rustc.to_str(), [main_file.to_str(), - ~"--sysroot", test_sysroot().to_str(), - ~"-o", exec_file.to_str()], - run::ProcessOptions { + let prog = run::Process::new(rustc.to_str(), [main_file.to_str(), + ~"--sysroot", test_sysroot().to_str(), + ~"-o", exec_file.to_str()], + run::ProcessOptions { env: env, dir: Some(&dir), in_fd: None, out_fd: None, err_fd: None }); + let mut prog = prog.unwrap(); let outp = prog.finish_with_output(); if outp.status != 0 { fail!("output was %s, error was %s", diff --git a/src/libstd/rt/io/file.rs b/src/libstd/rt/io/file.rs index f4e9c4d7c11..534e308a1a6 100644 --- a/src/libstd/rt/io/file.rs +++ b/src/libstd/rt/io/file.rs @@ -71,9 +71,6 @@ pub struct FileStream { last_nread: int, } -impl FileStream { -} - impl Reader for FileStream { fn read(&mut self, buf: &mut [u8]) -> Option { match self.fd.read(buf) { diff --git a/src/libstd/rt/io/mod.rs b/src/libstd/rt/io/mod.rs index 116d240308a..038fca9a1ad 100644 --- a/src/libstd/rt/io/mod.rs +++ b/src/libstd/rt/io/mod.rs @@ -268,6 +268,9 @@ pub use self::extensions::WriterByteConversions; /// Synchronous, non-blocking file I/O. pub mod file; +/// Synchronous, in-memory I/O. +pub mod pipe; + /// Synchronous, non-blocking network I/O. pub mod net { pub mod tcp; diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index 9be5540de48..dc7135f4a61 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -16,7 +16,7 @@ use rt::io::{io_error, read_error, EndOfFile}; use rt::rtio::{IoFactory, IoFactoryObject, RtioSocket, RtioTcpListener, RtioTcpListenerObject, RtioTcpStream, - RtioTcpStreamObject}; + RtioTcpStreamObject, RtioStream}; use rt::local::Local; pub struct TcpStream(~RtioTcpStreamObject); @@ -69,7 +69,7 @@ impl TcpStream { impl Reader for TcpStream { fn read(&mut self, buf: &mut [u8]) -> Option { - match (**self).read(buf) { + match (***self).read(buf) { Ok(read) => Some(read), Err(ioerr) => { // EOF is indicated by returning None @@ -86,7 +86,7 @@ impl Reader for TcpStream { impl Writer for TcpStream { fn write(&mut self, buf: &[u8]) { - match (**self).write(buf) { + match (***self).write(buf) { Ok(_) => (), Err(ioerr) => io_error::cond.raise(ioerr), } @@ -166,7 +166,7 @@ mod test { do run_in_newsched_task { let mut called = false; do io_error::cond.trap(|e| { - assert!(e.kind == ConnectionRefused); + assert_eq!(e.kind, ConnectionRefused); called = true; }).inside { let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 }; diff --git a/src/libstd/rt/io/pipe.rs b/src/libstd/rt/io/pipe.rs new file mode 100644 index 00000000000..02b3d0fe57d --- /dev/null +++ b/src/libstd/rt/io/pipe.rs @@ -0,0 +1,77 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Synchronous, in-memory pipes. +//! +//! Currently these aren't particularly useful, there only exists bindings +//! enough so that pipes can be created to child processes. + +use prelude::*; +use super::{Reader, Writer}; +use rt::io::{io_error, read_error, EndOfFile}; +use rt::local::Local; +use rt::rtio::{RtioPipeObject, RtioStream, IoFactoryObject, IoFactory}; +use rt::uv::pipe; + +pub struct PipeStream(~RtioPipeObject); + +impl PipeStream { + /// Creates a new pipe initialized, but not bound to any particular + /// source/destination + pub fn new() -> Option { + let pipe = unsafe { + let io: *mut IoFactoryObject = Local::unsafe_borrow(); + (*io).pipe_init(false) + }; + match pipe { + Ok(p) => Some(PipeStream(p)), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + } + + /// Extracts the underlying libuv pipe to be bound to another source. + pub fn uv_pipe(&self) -> pipe::Pipe { + // Did someone say multiple layers of indirection? + (**self).uv_pipe() + } +} + +impl Reader for PipeStream { + fn read(&mut self, buf: &mut [u8]) -> Option { + match (***self).read(buf) { + Ok(read) => Some(read), + Err(ioerr) => { + // EOF is indicated by returning None + if ioerr.kind != EndOfFile { + read_error::cond.raise(ioerr); + } + return None; + } + } + } + + fn eof(&mut self) -> bool { fail!() } +} + +impl Writer for PipeStream { + fn write(&mut self, buf: &[u8]) { + match (***self).write(buf) { + Ok(_) => (), + Err(ioerr) => { + io_error::cond.raise(ioerr); + } + } + } + + fn flush(&mut self) { fail!() } +} diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 1788b7a04e3..1a7ef6ea309 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -8,12 +8,14 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use libc; use option::*; use result::*; use libc::c_int; use rt::io::IoError; use super::io::net::ip::{IpAddr, SocketAddr}; +use rt::uv; use rt::uv::uvio; use path::Path; use super::io::support::PathLike; @@ -30,6 +32,9 @@ pub type RtioTcpListenerObject = uvio::UvTcpListener; pub type RtioUdpSocketObject = uvio::UvUdpSocket; pub type RtioTimerObject = uvio::UvTimer; pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback; +pub type RtioPipeObject = uvio::UvPipeStream; +pub type RtioProcessObject = uvio::UvProcess; +pub type RtioProcessConfig<'self> = uv::process::Config<'self>; pub trait EventLoop { fn run(&mut self); @@ -72,6 +77,13 @@ pub trait IoFactory { fn fs_open(&mut self, path: &P, fm: FileMode, fa: FileAccess) -> Result<~RtioFileStream, IoError>; fn fs_unlink(&mut self, path: &P) -> Result<(), IoError>; + fn pipe_init(&mut self, ipc: bool) -> Result<~RtioPipeObject, IoError>; + fn spawn(&mut self, config: &RtioProcessConfig) -> Result<~RtioProcessObject, IoError>; +} + +pub trait RtioStream { + fn read(&mut self, buf: &mut [u8]) -> Result; + fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; } pub trait RtioTcpListener : RtioSocket { @@ -80,9 +92,7 @@ pub trait RtioTcpListener : RtioSocket { fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>; } -pub trait RtioTcpStream : RtioSocket { - fn read(&mut self, buf: &mut [u8]) -> Result; - fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; +pub trait RtioTcpStream : RtioSocket + RtioStream { fn peer_name(&mut self) -> Result; fn control_congestion(&mut self) -> Result<(), IoError>; fn nodelay(&mut self) -> Result<(), IoError>; @@ -124,3 +134,9 @@ pub trait RtioFileStream { fn tell(&self) -> Result; fn flush(&mut self) -> Result<(), IoError>; } + +pub trait RtioProcess { + fn id(&self) -> libc::pid_t; + fn kill(&mut self, signal: int) -> Result<(), IoError>; + fn wait(&mut self) -> int; +} diff --git a/src/libstd/rt/uv/async.rs b/src/libstd/rt/uv/async.rs index d0ca38317cb..ff7bb9dd03a 100644 --- a/src/libstd/rt/uv/async.rs +++ b/src/libstd/rt/uv/async.rs @@ -34,7 +34,7 @@ impl AsyncWatcher { extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle); - let status = status_to_maybe_uv_error(watcher, status); + let status = status_to_maybe_uv_error(status); let data = watcher.get_watcher_data(); let cb = data.async_cb.get_ref(); (*cb)(watcher, status); diff --git a/src/libstd/rt/uv/file.rs b/src/libstd/rt/uv/file.rs index 405dfe0a7f0..5c77181d7eb 100644 --- a/src/libstd/rt/uv/file.rs +++ b/src/libstd/rt/uv/file.rs @@ -11,8 +11,8 @@ use prelude::*; use ptr::null; use libc::c_void; -use rt::uv::{Request, NativeHandle, Loop, FsCallback, Buf, - status_to_maybe_uv_error_with_loop, UvError}; +use rt::uv::{Request, NativeHandle, Loop, FsCallback, Buf, UvError}; +use rt::uv::status_to_maybe_uv_error; use rt::uv::uvll; use rt::uv::uvll::*; use super::super::io::support::PathLike; @@ -62,7 +62,7 @@ impl FsRequest { pub fn open_sync(loop_: &Loop, path: &P, flags: int, mode: int) -> Result { let result = FsRequest::open_common(loop_, path, flags, mode, None); - sync_cleanup(loop_, result) + sync_cleanup(result) } fn unlink_common(loop_: &Loop, path: &P, cb: Option) -> int { @@ -83,11 +83,11 @@ impl FsRequest { } pub fn unlink(loop_: &Loop, path: &P, cb: FsCallback) { let result = FsRequest::unlink_common(loop_, path, Some(cb)); - sync_cleanup(loop_, result); + sync_cleanup(result); } pub fn unlink_sync(loop_: &Loop, path: &P) -> Result { let result = FsRequest::unlink_common(loop_, path, None); - sync_cleanup(loop_, result) + sync_cleanup(result) } pub fn install_req_data(&self, cb: Option) { @@ -139,9 +139,8 @@ impl NativeHandle<*uvll::uv_fs_t> for FsRequest { match self { &FsRequest(ptr) => ptr } } } - fn sync_cleanup(loop_: &Loop, result: int) - -> Result { - match status_to_maybe_uv_error_with_loop(loop_.native_handle(), result as i32) { + fn sync_cleanup(result: int) -> Result { + match status_to_maybe_uv_error(result as i32) { Some(err) => Err(err), None => Ok(result) } @@ -184,7 +183,7 @@ impl FileDescriptor { pub fn write_sync(&mut self, loop_: &Loop, buf: Buf, offset: i64) -> Result { let result = self.write_common(loop_, buf, offset, None); - sync_cleanup(loop_, result) + sync_cleanup(result) } fn read_common(&mut self, loop_: &Loop, buf: Buf, @@ -212,7 +211,7 @@ impl FileDescriptor { pub fn read_sync(&mut self, loop_: &Loop, buf: Buf, offset: i64) -> Result { let result = self.read_common(loop_, buf, offset, None); - sync_cleanup(loop_, result) + sync_cleanup(result) } fn close_common(self, loop_: &Loop, cb: Option) -> int { @@ -234,12 +233,11 @@ impl FileDescriptor { } pub fn close_sync(self, loop_: &Loop) -> Result { let result = self.close_common(loop_, None); - sync_cleanup(loop_, result) + sync_cleanup(result) } } extern fn compl_cb(req: *uv_fs_t) { let mut req: FsRequest = NativeHandle::from_native_handle(req); - let loop_ = req.get_loop(); // pull the user cb out of the req data let cb = { let data = req.get_req_data(); @@ -250,8 +248,7 @@ extern fn compl_cb(req: *uv_fs_t) { // in uv_fs_open calls, the result will be the fd in the // case of success, otherwise it's -1 indicating an error let result = req.get_result(); - let status = status_to_maybe_uv_error_with_loop( - loop_.native_handle(), result); + let status = status_to_maybe_uv_error(result); // we have a req and status, call the user cb.. // only giving the user a ref to the FsRequest, as we // have to clean it up, afterwards (and they aren't really diff --git a/src/libstd/rt/uv/idle.rs b/src/libstd/rt/uv/idle.rs index a21146620ca..8cbcd7b77c0 100644 --- a/src/libstd/rt/uv/idle.rs +++ b/src/libstd/rt/uv/idle.rs @@ -43,7 +43,7 @@ impl IdleWatcher { let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); let data = idle_watcher.get_watcher_data(); let cb: &IdleCallback = data.idle_cb.get_ref(); - let status = status_to_maybe_uv_error(idle_watcher, status); + let status = status_to_maybe_uv_error(status); (*cb)(idle_watcher, status); } } @@ -57,7 +57,7 @@ impl IdleWatcher { let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); let data = idle_watcher.get_watcher_data(); let cb: &IdleCallback = data.idle_cb.get_ref(); - let status = status_to_maybe_uv_error(idle_watcher, status); + let status = status_to_maybe_uv_error(status); (*cb)(idle_watcher, status); } } diff --git a/src/libstd/rt/uv/mod.rs b/src/libstd/rt/uv/mod.rs index 75b9a5ac553..700b80c7398 100644 --- a/src/libstd/rt/uv/mod.rs +++ b/src/libstd/rt/uv/mod.rs @@ -58,6 +58,8 @@ pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher}; pub use self::idle::IdleWatcher; pub use self::timer::TimerWatcher; pub use self::async::AsyncWatcher; +pub use self::process::Process; +pub use self::pipe::Pipe; /// The implementation of `rtio` for libuv pub mod uvio; @@ -70,6 +72,8 @@ pub mod net; pub mod idle; pub mod timer; pub mod async; +pub mod process; +pub mod pipe; /// XXX: Loop(*handle) is buggy with destructors. Normal structs /// with dtors may not be destructured, but tuple structs can, @@ -126,6 +130,8 @@ pub type NullCallback = ~fn(); pub type IdleCallback = ~fn(IdleWatcher, Option); pub type ConnectionCallback = ~fn(StreamWatcher, Option); pub type FsCallback = ~fn(&mut FsRequest, Option); +// first int is exit_status, second is term_signal +pub type ExitCallback = ~fn(Process, int, int, Option); pub type TimerCallback = ~fn(TimerWatcher, Option); pub type AsyncCallback = ~fn(AsyncWatcher, Option); pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option); @@ -143,7 +149,8 @@ struct WatcherData { timer_cb: Option, async_cb: Option, udp_recv_cb: Option, - udp_send_cb: Option + udp_send_cb: Option, + exit_cb: Option, } pub trait WatcherInterop { @@ -175,7 +182,8 @@ impl> WatcherInterop for W { timer_cb: None, async_cb: None, udp_recv_cb: None, - udp_send_cb: None + udp_send_cb: None, + exit_cb: None, }; let data = transmute::<~WatcherData, *c_void>(data); uvll::set_data_for_uv_handle(self.native_handle(), data); @@ -202,12 +210,12 @@ impl> WatcherInterop for W { // XXX: Need to define the error constants like EOF so they can be // compared to the UvError type -pub struct UvError(uvll::uv_err_t); +pub struct UvError(c_int); impl UvError { pub fn name(&self) -> ~str { unsafe { - let inner = match self { &UvError(ref a) => a }; + let inner = match self { &UvError(a) => a }; let name_str = uvll::err_name(inner); assert!(name_str.is_not_null()); from_c_str(name_str) @@ -216,7 +224,7 @@ impl UvError { pub fn desc(&self) -> ~str { unsafe { - let inner = match self { &UvError(ref a) => a }; + let inner = match self { &UvError(a) => a }; let desc_str = uvll::strerror(inner); assert!(desc_str.is_not_null()); from_c_str(desc_str) @@ -224,7 +232,7 @@ impl UvError { } pub fn is_eof(&self) -> bool { - self.code == uvll::EOF + **self == uvll::EOF } } @@ -236,18 +244,10 @@ impl ToStr for UvError { #[test] fn error_smoke_test() { - let err = uvll::uv_err_t { code: 1, sys_errno_: 1 }; - let err: UvError = UvError(err); + let err: UvError = UvError(uvll::EOF); assert_eq!(err.to_str(), ~"EOF: end of file"); } -pub fn last_uv_error>(watcher: &W) -> UvError { - unsafe { - let loop_ = watcher.event_loop(); - UvError(uvll::last_error(loop_.native_handle())) - } -} - pub fn uv_error_to_io_error(uverr: UvError) -> IoError { unsafe { // Importing error constants @@ -255,10 +255,10 @@ pub fn uv_error_to_io_error(uverr: UvError) -> IoError { use rt::io::*; // uv error descriptions are static - let c_desc = uvll::strerror(&*uverr); + let c_desc = uvll::strerror(*uverr); let desc = str::raw::c_str_to_static_slice(c_desc); - let kind = match uverr.code { + let kind = match *uverr { UNKNOWN => OtherIoError, OK => OtherIoError, EOF => EndOfFile, @@ -266,8 +266,8 @@ pub fn uv_error_to_io_error(uverr: UvError) -> IoError { ECONNREFUSED => ConnectionRefused, ECONNRESET => ConnectionReset, EPIPE => BrokenPipe, - _ => { - rtdebug!("uverr.code %u", uverr.code as uint); + err => { + rtdebug!("uverr.code %d", err as int); // XXX: Need to map remaining uv error types OtherIoError } @@ -281,31 +281,12 @@ pub fn uv_error_to_io_error(uverr: UvError) -> IoError { } } -/// Given a uv handle, convert a callback status to a UvError -pub fn status_to_maybe_uv_error_with_loop( - loop_: *uvll::uv_loop_t, - status: c_int) -> Option { - if status != -1 { +/// Convert a callback status to a UvError +pub fn status_to_maybe_uv_error(status: c_int) -> Option { + if status >= 0 { None } else { - unsafe { - rtdebug!("loop: %x", loop_ as uint); - let err = uvll::last_error(loop_); - Some(UvError(err)) - } - } -} -/// Given a uv handle, convert a callback status to a UvError -pub fn status_to_maybe_uv_error>(handle: U, - status: c_int) -> Option { - if status != -1 { - None - } else { - unsafe { - rtdebug!("handle: %x", handle.native_handle() as uint); - let loop_ = uvll::get_loop_for_uv_handle(handle.native_handle()); - status_to_maybe_uv_error_with_loop(loop_, status) - } + Some(UvError(status)) } } diff --git a/src/libstd/rt/uv/net.rs b/src/libstd/rt/uv/net.rs index e8d0296e543..1581b017087 100644 --- a/src/libstd/rt/uv/net.rs +++ b/src/libstd/rt/uv/net.rs @@ -16,7 +16,6 @@ use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback, status_to_maybe_uv_error}; use rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr}; -use rt::uv::last_uv_error; use vec; use str; use from_str::{FromStr}; @@ -137,7 +136,7 @@ impl StreamWatcher { rtdebug!("buf len: %d", buf.len as int); let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream); let cb = stream_watcher.get_watcher_data().read_cb.get_ref(); - let status = status_to_maybe_uv_error(stream_watcher, nread as c_int); + let status = status_to_maybe_uv_error(nread as c_int); (*cb)(stream_watcher, nread as int, buf, status); } } @@ -167,7 +166,7 @@ impl StreamWatcher { let mut stream_watcher = write_request.stream(); write_request.delete(); let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap(); - let status = status_to_maybe_uv_error(stream_watcher, status); + let status = status_to_maybe_uv_error(status); cb(stream_watcher, status); } } @@ -232,7 +231,7 @@ impl TcpWatcher { }; match result { 0 => Ok(()), - _ => Err(last_uv_error(self)), + _ => Err(UvError(result)), } } } @@ -260,7 +259,7 @@ impl TcpWatcher { let mut stream_watcher = connect_request.stream(); connect_request.delete(); let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap(); - let status = status_to_maybe_uv_error(stream_watcher, status); + let status = status_to_maybe_uv_error(status); cb(stream_watcher, status); } } @@ -283,7 +282,7 @@ impl TcpWatcher { rtdebug!("connection_cb"); let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle); let cb = stream_watcher.get_watcher_data().connect_cb.get_ref(); - let status = status_to_maybe_uv_error(stream_watcher, status); + let status = status_to_maybe_uv_error(status); (*cb)(stream_watcher, status); } } @@ -327,7 +326,7 @@ impl UdpWatcher { }; match result { 0 => Ok(()), - _ => Err(last_uv_error(self)), + _ => Err(UvError(result)), } } } @@ -360,7 +359,7 @@ impl UdpWatcher { rtdebug!("buf len: %d", buf.len as int); let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle); let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref(); - let status = status_to_maybe_uv_error(udp_watcher, nread as c_int); + let status = status_to_maybe_uv_error(nread as c_int); let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr)); (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status); } @@ -395,7 +394,7 @@ impl UdpWatcher { let mut udp_watcher = send_request.handle(); send_request.delete(); let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap(); - let status = status_to_maybe_uv_error(udp_watcher, status); + let status = status_to_maybe_uv_error(status); cb(udp_watcher, status); } } diff --git a/src/libstd/rt/uv/pipe.rs b/src/libstd/rt/uv/pipe.rs new file mode 100644 index 00000000000..1147c731a60 --- /dev/null +++ b/src/libstd/rt/uv/pipe.rs @@ -0,0 +1,66 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use prelude::*; +use libc; + +use rt::uv; +use rt::uv::net; +use rt::uv::uvll; + +pub struct Pipe(*uvll::uv_pipe_t); + +impl uv::Watcher for Pipe {} + +impl Pipe { + pub fn new(loop_: &uv::Loop, ipc: bool) -> Pipe { + unsafe { + let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE); + assert!(handle.is_not_null()); + let ipc = ipc as libc::c_int; + assert_eq!(uvll::pipe_init(loop_.native_handle(), handle, ipc), 0); + let mut ret: Pipe = + uv::NativeHandle::from_native_handle(handle); + ret.install_watcher_data(); + ret + } + } + + pub fn as_stream(&self) -> net::StreamWatcher { + net::StreamWatcher(**self as *uvll::uv_stream_t) + } + + pub fn close(self, cb: uv::NullCallback) { + { + let mut this = self; + let data = this.get_watcher_data(); + assert!(data.close_cb.is_none()); + data.close_cb = Some(cb); + } + + unsafe { uvll::close(self.native_handle(), close_cb); } + + extern fn close_cb(handle: *uvll::uv_pipe_t) { + let mut process: Pipe = uv::NativeHandle::from_native_handle(handle); + process.get_watcher_data().close_cb.take_unwrap()(); + process.drop_watcher_data(); + unsafe { uvll::free_handle(handle as *libc::c_void) } + } + } +} + +impl uv::NativeHandle<*uvll::uv_pipe_t> for Pipe { + fn from_native_handle(handle: *uvll::uv_pipe_t) -> Pipe { + Pipe(handle) + } + fn native_handle(&self) -> *uvll::uv_pipe_t { + match self { &Pipe(ptr) => ptr } + } +} diff --git a/src/libstd/rt/uv/process.rs b/src/libstd/rt/uv/process.rs new file mode 100644 index 00000000000..a02cf67ec26 --- /dev/null +++ b/src/libstd/rt/uv/process.rs @@ -0,0 +1,264 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use prelude::*; +use libc; +use ptr; +use vec; +use cell::Cell; + +use rt::uv; +use rt::uv::net; +use rt::uv::pipe; +use rt::uv::uvll; + +/// A process wraps the handle of the underlying uv_process_t. +pub struct Process(*uvll::uv_process_t); + +/// This configuration describes how a new process should be spawned. This is +/// translated to libuv's own configuration +pub struct Config<'self> { + /// Path to the program to run + program: &'self str, + + /// Arguments to pass to the program (doesn't include the program itself) + args: &'self [~str], + + /// Optional environment to specify for the program. If this is None, then + /// it will inherit the current process's environment. + env: Option<&'self [(~str, ~str)]>, + + /// Optional working directory for the new process. If this is None, then + /// the current directory of the running process is inherited. + cwd: Option<&'self str>, + + /// Any number of streams/file descriptors/pipes may be attached to this + /// process. This list enumerates the file descriptors and such for the + /// process to be spawned, and the file descriptors inherited will start at + /// 0 and go to the length of this array. + /// + /// Standard file descriptors are: + /// + /// 0 - stdin + /// 1 - stdout + /// 2 - stderr + io: &'self [StdioContainer] +} + +/// Describes what to do with a standard io stream for a child process. +pub enum StdioContainer { + /// This stream will be ignored. This is the equivalent of attaching the + /// stream to `/dev/null` + Ignored, + + /// The specified file descriptor is inherited for the stream which it is + /// specified for. + InheritFd(libc::c_int), + + /// The specified libuv stream is inherited for the corresponding file + /// descriptor it is assigned to. + InheritStream(net::StreamWatcher), + + /// Creates a pipe for the specified file descriptor which will be directed + /// into the previously-initialized pipe passed in. + /// + /// The first boolean argument is whether the pipe is readable, and the + /// second is whether it is writable. These properties are from the view of + /// the *child* process, not the parent process. + CreatePipe(pipe::Pipe, bool /* readable */, bool /* writable */), +} + +impl uv::Watcher for Process {} + +impl Process { + /// Creates a new process, ready to spawn inside an event loop + pub fn new() -> Process { + let handle = unsafe { uvll::malloc_handle(uvll::UV_PROCESS) }; + assert!(handle.is_not_null()); + let mut ret: Process = uv::NativeHandle::from_native_handle(handle); + ret.install_watcher_data(); + return ret; + } + + /// Spawn a new process inside the specified event loop. + /// + /// The `config` variable will be passed down to libuv, and the `exit_cb` + /// will be run only once, when the process exits. + /// + /// Returns either the corresponding process object or an error which + /// occurred. + pub fn spawn(&mut self, loop_: &uv::Loop, config: &Config, + exit_cb: uv::ExitCallback) -> Result<(), uv::UvError> { + let cwd = config.cwd.map_move(|s| s.to_c_str()); + + extern fn on_exit(p: *uvll::uv_process_t, + exit_status: libc::c_int, + term_signal: libc::c_int) { + let mut p: Process = uv::NativeHandle::from_native_handle(p); + let err = match exit_status { + 0 => None, + _ => uv::status_to_maybe_uv_error(-1) + }; + p.get_watcher_data().exit_cb.take_unwrap()(p, + exit_status as int, + term_signal as int, + err); + } + + let mut stdio = vec::with_capacity::( + config.io.len()); + unsafe { + vec::raw::set_len(&mut stdio, config.io.len()); + for (slot, &other) in stdio.iter().zip(config.io.iter()) { + set_stdio(slot as *uvll::uv_stdio_container_t, other); + } + } + + let exit_cb = Cell::new(exit_cb); + do with_argv(config.program, config.args) |argv| { + do with_env(config.env) |envp| { + let options = uvll::uv_process_options_t { + exit_cb: on_exit, + file: unsafe { *argv }, + args: argv, + env: envp, + cwd: match cwd { + Some(ref cwd) => cwd.with_ref(|p| p), + None => ptr::null(), + }, + flags: 0, + stdio_count: stdio.len() as libc::c_int, + stdio: stdio.as_imm_buf(|p, _| p), + uid: 0, + gid: 0, + }; + + match unsafe { + uvll::spawn(loop_.native_handle(), **self, options) + } { + 0 => { + (*self).get_watcher_data().exit_cb = Some(exit_cb.take()); + Ok(()) + } + err => Err(uv::UvError(err)) + } + } + } + } + + /// Sends a signal to this process. + /// + /// This is a wrapper around `uv_process_kill` + pub fn kill(&self, signum: int) -> Result<(), uv::UvError> { + match unsafe { + uvll::process_kill(self.native_handle(), signum as libc::c_int) + } { + 0 => Ok(()), + err => Err(uv::UvError(err)) + } + } + + /// Returns the process id of a spawned process + pub fn pid(&self) -> libc::pid_t { + unsafe { uvll::process_pid(**self) as libc::pid_t } + } + + /// Closes this handle, invoking the specified callback once closed + pub fn close(self, cb: uv::NullCallback) { + { + let mut this = self; + let data = this.get_watcher_data(); + assert!(data.close_cb.is_none()); + data.close_cb = Some(cb); + } + + unsafe { uvll::close(self.native_handle(), close_cb); } + + extern fn close_cb(handle: *uvll::uv_process_t) { + let mut process: Process = uv::NativeHandle::from_native_handle(handle); + process.get_watcher_data().close_cb.take_unwrap()(); + process.drop_watcher_data(); + unsafe { uvll::free_handle(handle as *libc::c_void) } + } + } +} + +unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t, io: StdioContainer) { + match io { + Ignored => { uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE); } + InheritFd(fd) => { + uvll::set_stdio_container_flags(dst, uvll::STDIO_INHERIT_FD); + uvll::set_stdio_container_fd(dst, fd); + } + InheritStream(stream) => { + uvll::set_stdio_container_flags(dst, uvll::STDIO_INHERIT_STREAM); + uvll::set_stdio_container_stream(dst, stream.native_handle()); + } + CreatePipe(pipe, readable, writable) => { + let mut flags = uvll::STDIO_CREATE_PIPE as libc::c_int; + if readable { + flags |= uvll::STDIO_READABLE_PIPE as libc::c_int; + } + if writable { + flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int; + } + uvll::set_stdio_container_flags(dst, flags); + uvll::set_stdio_container_stream(dst, + pipe.as_stream().native_handle()); + } + } +} + +/// Converts the program and arguments to the argv array expected by libuv +fn with_argv(prog: &str, args: &[~str], f: &fn(**libc::c_char) -> T) -> T { + // First, allocation space to put all the C-strings (we need to have + // ownership of them somewhere + let mut c_strs = vec::with_capacity(args.len() + 1); + c_strs.push(prog.to_c_str()); + for arg in args.iter() { + c_strs.push(arg.to_c_str()); + } + + // Next, create the char** array + let mut c_args = vec::with_capacity(c_strs.len() + 1); + for s in c_strs.iter() { + c_args.push(s.with_ref(|p| p)); + } + c_args.push(ptr::null()); + c_args.as_imm_buf(|buf, _| f(buf)) +} + +/// Converts the environment to the env array expected by libuv +fn with_env(env: Option<&[(~str, ~str)]>, f: &fn(**libc::c_char) -> T) -> T { + let env = match env { + Some(s) => s, + None => { return f(ptr::null()); } + }; + // As with argv, create some temporary storage and then the actual array + let mut envp = vec::with_capacity(env.len()); + for &(ref key, ref value) in env.iter() { + envp.push(fmt!("%s=%s", *key, *value).to_c_str()); + } + let mut c_envp = vec::with_capacity(envp.len() + 1); + for s in envp.iter() { + c_envp.push(s.with_ref(|p| p)); + } + c_envp.push(ptr::null()); + c_envp.as_imm_buf(|buf, _| f(buf)) +} + +impl uv::NativeHandle<*uvll::uv_process_t> for Process { + fn from_native_handle(handle: *uvll::uv_process_t) -> Process { + Process(handle) + } + fn native_handle(&self) -> *uvll::uv_process_t { + match self { &Process(ptr) => ptr } + } +} diff --git a/src/libstd/rt/uv/timer.rs b/src/libstd/rt/uv/timer.rs index eaa5e77a6da..7b09cf2eb0e 100644 --- a/src/libstd/rt/uv/timer.rs +++ b/src/libstd/rt/uv/timer.rs @@ -43,7 +43,7 @@ impl TimerWatcher { let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle); let data = watcher.get_watcher_data(); let cb = data.timer_cb.get_ref(); - let status = status_to_maybe_uv_error(watcher, status); + let status = status_to_maybe_uv_error(status); (*cb)(watcher, status); } } diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index e620ab274b1..c771f93cef5 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -13,7 +13,7 @@ use cast::transmute; use cast; use cell::Cell; use clone::Clone; -use libc::{c_int, c_uint, c_void}; +use libc::{c_int, c_uint, c_void, pid_t}; use ops::Drop; use option::*; use ptr; @@ -22,6 +22,7 @@ use result::*; use rt::io::IoError; use rt::io::net::ip::{SocketAddr, IpAddr}; use rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, SeekEnd}; +use rt::kill::BlockedTask; use rt::local::Local; use rt::rtio::*; use rt::sched::{Scheduler, SchedHandle}; @@ -148,7 +149,7 @@ fn socket_name>(sk: SocketNameKind, }; if r != 0 { - let status = status_to_maybe_uv_error(handle, r); + let status = status_to_maybe_uv_error(r); return Err(uv_error_to_io_error(status.unwrap())); } @@ -591,6 +592,63 @@ impl IoFactory for UvIoFactory { assert!(!result_cell.is_empty()); return result_cell.take(); } + + fn pipe_init(&mut self, ipc: bool) -> Result<~RtioPipeObject, IoError> { + let home = get_handle_to_current_scheduler!(); + Ok(~UvPipeStream { pipe: Pipe::new(self.uv_loop(), ipc), home: home }) + } + + fn spawn(&mut self, + config: &process::Config) -> Result<~RtioProcessObject, IoError> { + // Sadly, we must create the UvProcess before we actually call uv_spawn + // so that the exit_cb can close over it and notify it when the process + // has exited. + let mut ret = ~UvProcess { + process: Process::new(), + home: None, + exit_status: None, + term_signal: None, + exit_error: None, + descheduled: None, + }; + let ret_ptr = unsafe { + *cast::transmute::<&~UvProcess, &*mut UvProcess>(&ret) + }; + + // The purpose of this exit callback is to record the data about the + // exit and then wake up the task which may be waiting for the process + // to exit. This is all performed in the current io-loop, and the + // implementation of UvProcess ensures that reading these fields always + // occurs on the current io-loop. + let exit_cb: ExitCallback = |_, exit_status, term_signal, error| { + unsafe { + assert!((*ret_ptr).exit_status.is_none()); + (*ret_ptr).exit_status = Some(exit_status); + (*ret_ptr).term_signal = Some(term_signal); + (*ret_ptr).exit_error = error; + match (*ret_ptr).descheduled.take() { + Some(task) => { + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task); + } + None => {} + } + } + }; + + match ret.process.spawn(self.uv_loop(), config, exit_cb) { + Ok(()) => { + // Only now do we actually get a handle to this scheduler. + ret.home = Some(get_handle_to_current_scheduler!()); + Ok(ret) + } + Err(uverr) => { + // We still need to close the process handle we created, but + // that's taken care for us in the destructor of UvProcess + Err(uv_error_to_io_error(uverr)) + } + } + } } pub struct UvTcpListener { @@ -679,7 +737,7 @@ impl RtioTcpListener for UvTcpListener { uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(self_.watcher(), r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -692,7 +750,7 @@ impl RtioTcpListener for UvTcpListener { uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(self_.watcher(), r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -700,40 +758,15 @@ impl RtioTcpListener for UvTcpListener { } } -pub struct UvTcpStream { - watcher: TcpWatcher, - home: SchedHandle, +trait UvStream: HomingIO { + fn as_stream(&mut self) -> StreamWatcher; } -impl HomingIO for UvTcpStream { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } -} - -impl Drop for UvTcpStream { - fn drop(&self) { - // XXX need mutable finalizer - let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) }; - do this.home_for_io_with_sched |self_, scheduler| { - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self_.watcher.as_stream().close { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - } - } -} - -impl RtioSocket for UvTcpStream { - fn socket_name(&mut self) -> Result { - do self.home_for_io |self_| { - socket_name(Tcp, self_.watcher) - } - } -} - -impl RtioTcpStream for UvTcpStream { +// FIXME(#3429) I would rather this be `impl RtioStream for T` but +// that has conflicts with other traits that also have methods +// called `read` and `write` +macro_rules! rtiostream(($t:ident) => { +impl RtioStream for $t { fn read(&mut self, buf: &mut [u8]) -> Result { do self.home_for_io_with_sched |self_, scheduler| { let result_cell = Cell::new_empty(); @@ -747,7 +780,7 @@ impl RtioTcpStream for UvTcpStream { let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; - let mut watcher = self_.watcher.as_stream(); + let mut watcher = self_.as_stream(); do watcher.read_start(alloc) |mut watcher, nread, _buf, status| { // Stop reading so that no read callbacks are @@ -783,7 +816,7 @@ impl RtioTcpStream for UvTcpStream { do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; - let mut watcher = self_.watcher.as_stream(); + let mut watcher = self_.as_stream(); do watcher.write(buf) |_watcher, status| { let result = if status.is_none() { Ok(()) @@ -802,7 +835,85 @@ impl RtioTcpStream for UvTcpStream { result_cell.take() } } +} +}) +rtiostream!(UvPipeStream) +rtiostream!(UvTcpStream) + +pub struct UvPipeStream { + pipe: Pipe, + home: SchedHandle, +} + +impl UvStream for UvPipeStream { + fn as_stream(&mut self) -> StreamWatcher { self.pipe.as_stream() } +} + +impl HomingIO for UvPipeStream { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} + +impl Drop for UvPipeStream { + fn drop(&self) { + // FIXME(#4330): should not need a transmute + let this = unsafe { cast::transmute_mut(self) }; + do this.home_for_io |self_| { + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do self_.pipe.close { + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + } + } +} + +impl UvPipeStream { + pub fn uv_pipe(&self) -> Pipe { self.pipe } +} + +pub struct UvTcpStream { + watcher: TcpWatcher, + home: SchedHandle, +} + +impl HomingIO for UvTcpStream { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} + +impl Drop for UvTcpStream { + fn drop(&self) { + // FIXME(#4330): should not need a transmute + let this = unsafe { cast::transmute_mut(self) }; + do this.home_for_io |self_| { + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do self_.watcher.as_stream().close { + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + } + } +} + +impl UvStream for UvTcpStream { + fn as_stream(&mut self) -> StreamWatcher { self.watcher.as_stream() } +} + +impl RtioSocket for UvTcpStream { + fn socket_name(&mut self) -> Result { + do self.home_for_io |self_| { + socket_name(Tcp, self_.watcher) + } + } +} + +impl RtioTcpStream for UvTcpStream { fn peer_name(&mut self) -> Result { do self.home_for_io |self_| { socket_name(TcpPeer, self_.watcher) @@ -813,7 +924,7 @@ impl RtioTcpStream for UvTcpStream { do self.home_for_io |self_| { let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -824,7 +935,7 @@ impl RtioTcpStream for UvTcpStream { do self.home_for_io |self_| { let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -838,7 +949,7 @@ impl RtioTcpStream for UvTcpStream { delay_in_seconds as c_uint) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -851,7 +962,7 @@ impl RtioTcpStream for UvTcpStream { uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -963,7 +1074,7 @@ impl RtioUdpSocket for UvUdpSocket { } }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -979,7 +1090,7 @@ impl RtioUdpSocket for UvUdpSocket { } }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -993,7 +1104,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1007,7 +1118,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1021,7 +1132,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1035,7 +1146,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1049,7 +1160,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1063,7 +1174,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1250,6 +1361,89 @@ impl RtioFileStream for UvFileStream { } } +pub struct UvProcess { + process: process::Process, + + // Sadly, this structure must be created before we return it, so in that + // brief interim the `home` is None. + home: Option, + + // All None until the process exits (exit_error may stay None) + priv exit_status: Option, + priv term_signal: Option, + priv exit_error: Option, + + // Used to store which task to wake up from the exit_cb + priv descheduled: Option, +} + +impl HomingIO for UvProcess { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.home.get_mut_ref() } +} + +impl Drop for UvProcess { + fn drop(&self) { + // FIXME(#4330): should not need a transmute + let this = unsafe { cast::transmute_mut(self) }; + + let close = |self_: &mut UvProcess| { + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task = Cell::new(task); + do self_.process.close { + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task.take()); + } + } + }; + + // If home is none, then this process never actually successfully + // spawned, so there's no need to switch event loops + if this.home.is_none() { + close(this) + } else { + this.home_for_io(close) + } + } +} + +impl RtioProcess for UvProcess { + fn id(&self) -> pid_t { + self.process.pid() + } + + fn kill(&mut self, signal: int) -> Result<(), IoError> { + do self.home_for_io |self_| { + match self_.process.kill(signal) { + Ok(()) => Ok(()), + Err(uverr) => Err(uv_error_to_io_error(uverr)) + } + } + } + + fn wait(&mut self) -> int { + // Make sure (on the home scheduler) that we have an exit status listed + do self.home_for_io |self_| { + match self_.exit_status { + Some(*) => {} + None => { + // If there's no exit code previously listed, then the + // process's exit callback has yet to be invoked. We just + // need to deschedule ourselves and wait to be reawoken. + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + assert!(self_.descheduled.is_none()); + self_.descheduled = Some(task); + } + assert!(self_.exit_status.is_some()); + } + } + } + + self.exit_status.unwrap() + } +} + #[test] fn test_simple_io_no_connect() { do run_in_newsched_task { diff --git a/src/libstd/rt/uv/uvll.rs b/src/libstd/rt/uv/uvll.rs index 1e189e90885..24e070ca239 100644 --- a/src/libstd/rt/uv/uvll.rs +++ b/src/libstd/rt/uv/uvll.rs @@ -37,28 +37,74 @@ use libc::{malloc, free}; use libc; use prelude::*; use ptr; -use str; use vec; -pub static UNKNOWN: c_int = -1; -pub static OK: c_int = 0; -pub static EOF: c_int = 1; -pub static EADDRINFO: c_int = 2; -pub static EACCES: c_int = 3; -pub static ECONNREFUSED: c_int = 12; -pub static ECONNRESET: c_int = 13; -pub static EPIPE: c_int = 36; +pub use self::errors::*; -pub struct uv_err_t { - code: c_int, - sys_errno_: c_int +pub static OK: c_int = 0; +pub static EOF: c_int = -4095; +pub static UNKNOWN: c_int = -4094; + +// uv-errno.h redefines error codes for windows, but not for unix... + +#[cfg(windows)] +pub mod errors { + use libc::c_int; + + pub static EACCES: c_int = -4093; + pub static ECONNREFUSED: c_int = -4079; + pub static ECONNRESET: c_int = -4078; + pub static EPIPE: c_int = -4048; } +#[cfg(not(windows))] +pub mod errors { + use libc; + use libc::c_int; + + pub static EACCES: c_int = -libc::EACCES; + pub static ECONNREFUSED: c_int = -libc::ECONNREFUSED; + pub static ECONNRESET: c_int = -libc::ECONNRESET; + pub static EPIPE: c_int = -libc::EPIPE; +} + +pub static PROCESS_SETUID: c_int = 1 << 0; +pub static PROCESS_SETGID: c_int = 1 << 1; +pub static PROCESS_WINDOWS_VERBATIM_ARGUMENTS: c_int = 1 << 2; +pub static PROCESS_DETACHED: c_int = 1 << 3; +pub static PROCESS_WINDOWS_HIDE: c_int = 1 << 4; + +pub static STDIO_IGNORE: c_int = 0x00; +pub static STDIO_CREATE_PIPE: c_int = 0x01; +pub static STDIO_INHERIT_FD: c_int = 0x02; +pub static STDIO_INHERIT_STREAM: c_int = 0x04; +pub static STDIO_READABLE_PIPE: c_int = 0x10; +pub static STDIO_WRITABLE_PIPE: c_int = 0x20; pub struct uv_buf_t { base: *u8, len: libc::size_t, } +pub struct uv_process_options_t { + exit_cb: uv_exit_cb, + file: *libc::c_char, + args: **libc::c_char, + env: **libc::c_char, + cwd: *libc::c_char, + flags: libc::c_uint, + stdio_count: libc::c_int, + stdio: *uv_stdio_container_t, + uid: uv_uid_t, + gid: uv_gid_t, +} + +// These fields are private because they must be interfaced with through the +// functions below. +pub struct uv_stdio_container_t { + priv flags: libc::c_int, + priv stream: *uv_stream_t, +} + pub type uv_handle_t = c_void; pub type uv_loop_t = c_void; pub type uv_idle_t = c_void; @@ -72,6 +118,8 @@ pub type uv_timer_t = c_void; pub type uv_stream_t = c_void; pub type uv_fs_t = c_void; pub type uv_udp_send_t = c_void; +pub type uv_process_t = c_void; +pub type uv_pipe_t = c_void; #[cfg(stage0)] pub type uv_idle_cb = *u8; @@ -97,6 +145,8 @@ pub type uv_connection_cb = *u8; pub type uv_timer_cb = *u8; #[cfg(stage0)] pub type uv_write_cb = *u8; +#[cfg(stage0)] +pub type uv_exit_cb = *u8; #[cfg(not(stage0))] pub type uv_idle_cb = extern "C" fn(handle: *uv_idle_t, @@ -137,12 +187,21 @@ pub type uv_timer_cb = extern "C" fn(handle: *uv_timer_t, #[cfg(not(stage0))] pub type uv_write_cb = extern "C" fn(handle: *uv_write_t, status: c_int); +#[cfg(not(stage0))] +pub type uv_exit_cb = extern "C" fn(handle: *uv_process_t, + exit_status: c_int, + term_signal: c_int); pub type sockaddr = c_void; pub type sockaddr_in = c_void; pub type sockaddr_in6 = c_void; pub type sockaddr_storage = c_void; +#[cfg(unix)] pub type uv_uid_t = libc::types::os::arch::posix88::uid_t; +#[cfg(unix)] pub type uv_gid_t = libc::types::os::arch::posix88::gid_t; +#[cfg(windows)] pub type uv_uid_t = libc::c_uchar; +#[cfg(windows)] pub type uv_gid_t = libc::c_uchar; + #[deriving(Eq)] pub enum uv_handle_type { UV_UNKNOWN_HANDLE, @@ -487,20 +546,12 @@ pub unsafe fn read_stop(stream: *uv_stream_t) -> c_int { return rust_uv_read_stop(stream as *c_void); } -pub unsafe fn last_error(loop_handle: *c_void) -> uv_err_t { +pub unsafe fn strerror(err: c_int) -> *c_char { #[fixed_stack_segment]; #[inline(never)]; - - return rust_uv_last_error(loop_handle); -} - -pub unsafe fn strerror(err: *uv_err_t) -> *c_char { - #[fixed_stack_segment]; #[inline(never)]; - return rust_uv_strerror(err); } -pub unsafe fn err_name(err: *uv_err_t) -> *c_char { +pub unsafe fn err_name(err: c_int) -> *c_char { #[fixed_stack_segment]; #[inline(never)]; - return rust_uv_err_name(err); } @@ -654,6 +705,45 @@ pub unsafe fn fs_req_cleanup(req: *uv_fs_t) { rust_uv_fs_req_cleanup(req); } +pub unsafe fn spawn(loop_ptr: *c_void, result: *uv_process_t, + options: uv_process_options_t) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + return rust_uv_spawn(loop_ptr, result, options); +} + +pub unsafe fn process_kill(p: *uv_process_t, signum: c_int) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + return rust_uv_process_kill(p, signum); +} + +pub unsafe fn process_pid(p: *uv_process_t) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + return rust_uv_process_pid(p); +} + +pub unsafe fn set_stdio_container_flags(c: *uv_stdio_container_t, + flags: libc::c_int) { + #[fixed_stack_segment]; #[inline(never)]; + rust_set_stdio_container_flags(c, flags); +} + +pub unsafe fn set_stdio_container_fd(c: *uv_stdio_container_t, + fd: libc::c_int) { + #[fixed_stack_segment]; #[inline(never)]; + rust_set_stdio_container_fd(c, fd); +} + +pub unsafe fn set_stdio_container_stream(c: *uv_stdio_container_t, + stream: *uv_stream_t) { + #[fixed_stack_segment]; #[inline(never)]; + rust_set_stdio_container_stream(c, stream); +} + +pub unsafe fn pipe_init(loop_ptr: *c_void, p: *uv_pipe_t, ipc: c_int) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + rust_uv_pipe_init(loop_ptr, p, ipc) +} + // data access helpers pub unsafe fn get_result_from_fs_req(req: *uv_fs_t) -> c_int { #[fixed_stack_segment]; #[inline(never)]; @@ -720,22 +810,6 @@ pub unsafe fn get_len_from_buf(buf: uv_buf_t) -> size_t { return rust_uv_get_len_from_buf(buf); } -pub unsafe fn get_last_err_info(uv_loop: *c_void) -> ~str { - let err = last_error(uv_loop); - let err_ptr = ptr::to_unsafe_ptr(&err); - let err_name = str::raw::from_c_str(err_name(err_ptr)); - let err_msg = str::raw::from_c_str(strerror(err_ptr)); - return fmt!("LIBUV ERROR: name: %s msg: %s", - err_name, err_msg); -} - -pub unsafe fn get_last_err_data(uv_loop: *c_void) -> uv_err_data { - let err = last_error(uv_loop); - let err_ptr = ptr::to_unsafe_ptr(&err); - let err_name = str::raw::from_c_str(err_name(err_ptr)); - let err_msg = str::raw::from_c_str(strerror(err_ptr)); - uv_err_data { err_name: err_name, err_msg: err_msg } -} pub struct uv_err_data { err_name: ~str, @@ -768,9 +842,8 @@ extern { cb: uv_async_cb) -> c_int; fn rust_uv_tcp_init(loop_handle: *c_void, handle_ptr: *uv_tcp_t) -> c_int; fn rust_uv_buf_init(out_buf: *uv_buf_t, base: *u8, len: size_t); - fn rust_uv_last_error(loop_handle: *c_void) -> uv_err_t; - fn rust_uv_strerror(err: *uv_err_t) -> *c_char; - fn rust_uv_err_name(err: *uv_err_t) -> *c_char; + fn rust_uv_strerror(err: c_int) -> *c_char; + fn rust_uv_err_name(err: c_int) -> *c_char; fn rust_uv_ip4_addrp(ip: *u8, port: c_int) -> *sockaddr_in; fn rust_uv_ip6_addrp(ip: *u8, port: c_int) -> *sockaddr_in6; fn rust_uv_free_ip4_addr(addr: *sockaddr_in); @@ -856,4 +929,13 @@ extern { fn rust_uv_set_data_for_req(req: *c_void, data: *c_void); fn rust_uv_get_base_from_buf(buf: uv_buf_t) -> *u8; fn rust_uv_get_len_from_buf(buf: uv_buf_t) -> size_t; + fn rust_uv_spawn(loop_ptr: *c_void, outptr: *uv_process_t, + options: uv_process_options_t) -> c_int; + fn rust_uv_process_kill(p: *uv_process_t, signum: c_int) -> c_int; + fn rust_uv_process_pid(p: *uv_process_t) -> c_int; + fn rust_set_stdio_container_flags(c: *uv_stdio_container_t, flags: c_int); + fn rust_set_stdio_container_fd(c: *uv_stdio_container_t, fd: c_int); + fn rust_set_stdio_container_stream(c: *uv_stdio_container_t, + stream: *uv_stream_t); + fn rust_uv_pipe_init(loop_ptr: *c_void, p: *uv_pipe_t, ipc: c_int) -> c_int; } diff --git a/src/libstd/run.rs b/src/libstd/run.rs index 7fc2deff97d..b91aac22244 100644 --- a/src/libstd/run.rs +++ b/src/libstd/run.rs @@ -10,22 +10,21 @@ //! Process spawning. -#[allow(missing_doc)]; - -use c_str::ToCStr; use cast; -use clone::Clone; +use cell::Cell; use comm::{stream, SharedChan, GenericChan, GenericPort}; -use io; -use libc::{pid_t, c_void, c_int}; +#[cfg(not(windows))] use libc; -use option::{Some, None}; -use os; +use libc::{pid_t, c_int}; use prelude::*; -use ptr; use task; use vec::ImmutableVector; +use rt::io; +use rt::local::Local; +use rt::rtio::{IoFactoryObject, RtioProcessObject, RtioProcess, IoFactory}; +use rt::uv::process; + /** * A value representing a child process. * @@ -34,28 +33,23 @@ use vec::ImmutableVector; * for the process to terminate. */ pub struct Process { - /// The unique id of the process (this should never be negative). priv pid: pid_t, - /** - * A handle to the process - on unix this will always be NULL, but on - * windows it will be a HANDLE to the process, which will prevent the - * pid being re-used until the handle is closed. - */ - priv handle: *(), + /// The internal handle to the underlying libuv process. + priv handle: ~RtioProcessObject, - /// Some(fd), or None when stdin is being redirected from a fd not created by Process::new. - priv input: Option, + /// Some(fd), or None when stdin is being redirected from a fd not created + /// by Process::new. + priv input: Option<~io::Writer>, - /// Some(file), or None when stdout is being redirected to a fd not created by Process::new. - priv output: Option<*libc::FILE>, + /// Some(file), or None when stdout is being redirected to a fd not created + /// by Process::new. + priv output: Option<~io::Reader>, - /// Some(file), or None when stderr is being redirected to a fd not created by Process::new. - priv error: Option<*libc::FILE>, - - /// None until finish() is called. - priv exit_code: Option, + /// Some(file), or None when stderr is being redirected to a fd not created + /// by Process::new. + priv error: Option<~io::Reader>, } /// Options that can be given when starting a Process. @@ -93,26 +87,27 @@ pub struct ProcessOptions<'self> { * If this is None then a new pipe will be created for the new program's * output and Process.output() will provide a Reader to read from this pipe. * - * If this is Some(file-descriptor) then the new process will write its output - * to the given file descriptor, Process.output_redirected() will return - * true, and Process.output() will fail. + * If this is Some(file-descriptor) then the new process will write its + * output to the given file descriptor, Process.output_redirected() will + * return true, and Process.output() will fail. */ out_fd: Option, /** - * If this is None then a new pipe will be created for the new program's - * error stream and Process.error() will provide a Reader to read from this pipe. + * If this is None then a new pipe will be created for the new progam's + * error stream and Process.error() will provide a Reader to read from this + * pipe. * - * If this is Some(file-descriptor) then the new process will write its error output - * to the given file descriptor, Process.error_redirected() will return true, and - * and Process.error() will fail. + * If this is Some(file-descriptor) then the new process will write its + * error output to the given file descriptor, Process.error_redirected() + * will return true, and and Process.error() will fail. */ err_fd: Option, } -impl <'self> ProcessOptions<'self> { +impl<'self> ProcessOptions<'self> { /// Return a ProcessOptions that has None in every field. - pub fn new<'a>() -> ProcessOptions<'a> { + pub fn new() -> ProcessOptions { ProcessOptions { env: None, dir: None, @@ -125,7 +120,6 @@ impl <'self> ProcessOptions<'self> { /// The output of a finished process. pub struct ProcessOutput { - /// The status (exit code) of the process. status: int, @@ -148,223 +142,159 @@ impl Process { * the working directory and the standard IO streams. */ pub fn new(prog: &str, args: &[~str], - options: ProcessOptions) - -> Process { - #[fixed_stack_segment]; #[inline(never)]; - - let (in_pipe, in_fd) = match options.in_fd { + options: ProcessOptions) -> Option { + // First, translate all the stdio options into their libuv equivalents + let (uv_stdin, stdin) = match options.in_fd { + Some(fd) => (process::InheritFd(fd), None), None => { - let pipe = os::pipe(); - (Some(pipe), pipe.input) - }, - Some(fd) => (None, fd) + let p = io::pipe::PipeStream::new().expect("need stdin pipe"); + (process::CreatePipe(p.uv_pipe(), true, false), + Some(~p as ~io::Writer)) + } }; - let (out_pipe, out_fd) = match options.out_fd { + let (uv_stdout, stdout) = match options.out_fd { + Some(fd) => (process::InheritFd(fd), None), None => { - let pipe = os::pipe(); - (Some(pipe), pipe.out) - }, - Some(fd) => (None, fd) + let p = io::pipe::PipeStream::new().expect("need stdout pipe"); + (process::CreatePipe(p.uv_pipe(), false, true), + Some(~p as ~io::Reader)) + } }; - let (err_pipe, err_fd) = match options.err_fd { + let (uv_stderr, stderr) = match options.err_fd { + Some(fd) => (process::InheritFd(fd), None), None => { - let pipe = os::pipe(); - (Some(pipe), pipe.out) - }, - Some(fd) => (None, fd) + let p = io::pipe::PipeStream::new().expect("need stderr pipe"); + (process::CreatePipe(p.uv_pipe(), false, true), + Some(~p as ~io::Reader)) + } }; - let res = spawn_process_os(prog, args, options.env.clone(), options.dir, - in_fd, out_fd, err_fd); + // Next, massage our options into the libuv options + let dir = options.dir.map(|d| d.to_str()); + let dir = dir.map(|d| d.as_slice()); + let config = process::Config { + program: prog, + args: args, + env: options.env.map(|e| e.as_slice()), + cwd: dir, + io: [uv_stdin, uv_stdout, uv_stderr], + }; + // Finally, actually spawn the process unsafe { - for pipe in in_pipe.iter() { libc::close(pipe.input); } - for pipe in out_pipe.iter() { libc::close(pipe.out); } - for pipe in err_pipe.iter() { libc::close(pipe.out); } - } - - Process { - pid: res.pid, - handle: res.handle, - input: in_pipe.map(|pipe| pipe.out), - output: out_pipe.map(|pipe| os::fdopen(pipe.input)), - error: err_pipe.map(|pipe| os::fdopen(pipe.input)), - exit_code: None, + let io: *mut IoFactoryObject = Local::unsafe_borrow(); + match (*io).spawn(&config) { + Ok(handle) => { + Some(Process { + pid: handle.id(), + handle: handle, + input: stdin, + output: stdout, + error: stderr, + }) + } + Err(*) => { None } + } } } /// Returns the unique id of the process pub fn get_id(&self) -> pid_t { self.pid } - fn input_fd(&mut self) -> c_int { - match self.input { - Some(fd) => fd, - None => fail!("This Process's stdin was redirected to an \ - existing file descriptor.") - } - } - - fn output_file(&mut self) -> *libc::FILE { - match self.output { - Some(file) => file, - None => fail!("This Process's stdout was redirected to an \ - existing file descriptor.") - } - } - - fn error_file(&mut self) -> *libc::FILE { - match self.error { - Some(file) => file, - None => fail!("This Process's stderr was redirected to an \ - existing file descriptor.") - } - } - /** - * Returns whether this process is reading its stdin from an existing file - * descriptor rather than a pipe that was created specifically for this - * process. + * Returns a rt::io::Writer that can be used to write to this Process's + * stdin. * - * If this method returns true then self.input() will fail. + * Fails if this Process's stdin was redirected to an existing file + * descriptor. */ - pub fn input_redirected(&self) -> bool { - self.input.is_none() + pub fn input<'a>(&'a mut self) -> &'a mut io::Writer { + let ret: &mut io::Writer = *self.input.get_mut_ref(); + return ret; } /** - * Returns whether this process is writing its stdout to an existing file - * descriptor rather than a pipe that was created specifically for this - * process. + * Returns a rt::io::Reader that can be used to read from this Process's + * stdout. * - * If this method returns true then self.output() will fail. + * Fails if this Process's stdout was redirected to an existing file + * descriptor. */ - pub fn output_redirected(&self) -> bool { - self.output.is_none() + pub fn output<'a>(&'a mut self) -> &'a mut io::Reader { + let ret: &mut io::Reader = *self.output.get_mut_ref(); + return ret; } /** - * Returns whether this process is writing its stderr to an existing file - * descriptor rather than a pipe that was created specifically for this - * process. + * Returns a rt::io::Reader that can be used to read from this Process's + * stderr. * - * If this method returns true then self.error() will fail. + * Fails if this Process's stderr was redirected to an existing file + * descriptor. */ - pub fn error_redirected(&self) -> bool { - self.error.is_none() + pub fn error<'a>(&'a mut self) -> &'a mut io::Reader { + let ret: &mut io::Reader = *self.error.get_mut_ref(); + return ret; } /** - * Returns an io::Writer that can be used to write to this Process's stdin. - * - * Fails if this Process's stdin was redirected to an existing file descriptor. - */ - pub fn input(&mut self) -> @io::Writer { - // FIXME: the Writer can still be used after self is destroyed: #2625 - io::fd_writer(self.input_fd(), false) - } - - /** - * Returns an io::Reader that can be used to read from this Process's stdout. - * - * Fails if this Process's stdout was redirected to an existing file descriptor. - */ - pub fn output(&mut self) -> @io::Reader { - // FIXME: the Reader can still be used after self is destroyed: #2625 - io::FILE_reader(self.output_file(), false) - } - - /** - * Returns an io::Reader that can be used to read from this Process's stderr. - * - * Fails if this Process's stderr was redirected to an existing file descriptor. - */ - pub fn error(&mut self) -> @io::Reader { - // FIXME: the Reader can still be used after self is destroyed: #2625 - io::FILE_reader(self.error_file(), false) - } - - /** - * Closes the handle to the child process's stdin. - * - * If this process is reading its stdin from an existing file descriptor, then this - * method does nothing. - */ - pub fn close_input(&mut self) { - #[fixed_stack_segment]; #[inline(never)]; - match self.input { - Some(-1) | None => (), - Some(fd) => { - unsafe { - libc::close(fd); - } - self.input = Some(-1); - } - } - } - - fn close_outputs(&mut self) { - #[fixed_stack_segment]; #[inline(never)]; - fclose_and_null(&mut self.output); - fclose_and_null(&mut self.error); - - fn fclose_and_null(f_opt: &mut Option<*libc::FILE>) { - #[allow(cstack)]; // fixed_stack_segment declared on enclosing fn - match *f_opt { - Some(f) if !f.is_null() => { - unsafe { - libc::fclose(f); - *f_opt = Some(0 as *libc::FILE); - } - }, - _ => () - } - } - } - - /** - * Closes the handle to stdin, waits for the child process to terminate, - * and returns the exit code. + * Closes the handle to stdin, waits for the child process to terminate, and + * returns the exit code. * * If the child has already been finished then the exit code is returned. */ pub fn finish(&mut self) -> int { - for &code in self.exit_code.iter() { - return code; - } - self.close_input(); - let code = waitpid(self.pid); - self.exit_code = Some(code); - return code; + // We're not going to be giving any more input, so close the input by + // destroying it. Also, if the output is desired, then + // finish_with_output is called so we discard all the outputs here. Note + // that the process may not terminate if we don't destroy stdio because + // it'll be waiting in a write which we'll just never read. + self.input.take(); + self.output.take(); + self.error.take(); + + self.handle.wait() } /** - * Closes the handle to stdin, waits for the child process to terminate, and reads - * and returns all remaining output of stdout and stderr, along with the exit code. + * Closes the handle to stdin, waits for the child process to terminate, + * and reads and returns all remaining output of stdout and stderr, along + * with the exit code. * - * If the child has already been finished then the exit code and any remaining - * unread output of stdout and stderr will be returned. + * If the child has already been finished then the exit code and any + * remaining unread output of stdout and stderr will be returned. * - * This method will fail if the child process's stdout or stderr streams were - * redirected to existing file descriptors. + * This method will fail if the child process's stdout or stderr streams + * were redirected to existing file descriptors, or if this method has + * already been called. */ pub fn finish_with_output(&mut self) -> ProcessOutput { - let output_file = self.output_file(); - let error_file = self.error_file(); + // This should probably be a helper method in rt::io + fn read_everything(input: &mut io::Reader) -> ~[u8] { + let mut result = ~[]; + let mut buf = [0u8, ..1024]; + loop { + match input.read(buf) { + Some(i) => { result = result + buf.slice_to(i) } + None => break + } + } + return result; + } - // Spawn two entire schedulers to read both stdout and sterr - // in parallel so we don't deadlock while blocking on one - // or the other. FIXME (#2625): Surely there's a much more - // clever way to do this. let (p, ch) = stream(); let ch = SharedChan::new(ch); let ch_clone = ch.clone(); - do task::spawn_sched(task::SingleThreaded) { - let errput = io::FILE_reader(error_file, false); - ch.send((2, errput.read_whole_stream())); + + let stderr = Cell::new(self.error.take().unwrap()); + do task::spawn { + let output = read_everything(stderr.take()); + ch.send((2, output)); } - do task::spawn_sched(task::SingleThreaded) { - let output = io::FILE_reader(output_file, false); - ch_clone.send((1, output.read_whole_stream())); + let stdout = Cell::new(self.output.take().unwrap()); + do task::spawn { + let output = read_everything(stdout.take()); + ch_clone.send((1, output)); } let status = self.finish(); @@ -382,40 +312,6 @@ impl Process { error: errs}; } - fn destroy_internal(&mut self, force: bool) { - // if the process has finished, and therefore had waitpid called, - // and we kill it, then on unix we might ending up killing a - // newer process that happens to have the same (re-used) id - if self.exit_code.is_none() { - killpid(self.pid, force); - self.finish(); - } - - #[cfg(windows)] - fn killpid(pid: pid_t, _force: bool) { - #[fixed_stack_segment]; #[inline(never)]; - unsafe { - libc::funcs::extra::kernel32::TerminateProcess( - cast::transmute(pid), 1); - } - } - - #[cfg(unix)] - fn killpid(pid: pid_t, force: bool) { - #[fixed_stack_segment]; #[inline(never)]; - - let signal = if force { - libc::consts::os::posix88::SIGKILL - } else { - libc::consts::os::posix88::SIGTERM - }; - - unsafe { - libc::funcs::posix88::signal::kill(pid, signal as c_int); - } - } - } - /** * Terminates the process, giving it a chance to clean itself up if * this is supported by the operating system. @@ -423,7 +319,12 @@ impl Process { * On Posix OSs SIGTERM will be sent to the process. On Win32 * TerminateProcess(..) will be called. */ - pub fn destroy(&mut self) { self.destroy_internal(false); } + pub fn destroy(&mut self) { + #[cfg(windows)] fn sigterm() -> int { 15 } + #[cfg(not(windows))] fn sigterm() -> int { libc::SIGTERM as int } + self.handle.kill(sigterm()); + self.finish(); + } /** * Terminates the process as soon as possible without giving it a @@ -432,378 +333,22 @@ impl Process { * On Posix OSs SIGKILL will be sent to the process. On Win32 * TerminateProcess(..) will be called. */ - pub fn force_destroy(&mut self) { self.destroy_internal(true); } + pub fn force_destroy(&mut self) { + #[cfg(windows)] fn sigkill() -> int { 9 } + #[cfg(not(windows))] fn sigkill() -> int { libc::SIGKILL as int } + self.handle.kill(sigkill()); + self.finish(); + } } impl Drop for Process { fn drop(&self) { // FIXME(#4330) Need self by value to get mutability. let mut_self: &mut Process = unsafe { cast::transmute(self) }; - mut_self.finish(); - mut_self.close_outputs(); - free_handle(self.handle); } } -struct SpawnProcessResult { - pid: pid_t, - handle: *(), -} - -#[cfg(windows)] -fn spawn_process_os(prog: &str, args: &[~str], - env: Option<~[(~str, ~str)]>, - dir: Option<&Path>, - in_fd: c_int, out_fd: c_int, err_fd: c_int) -> SpawnProcessResult { - #[fixed_stack_segment]; #[inline(never)]; - - use libc::types::os::arch::extra::{DWORD, HANDLE, STARTUPINFO}; - use libc::consts::os::extra::{ - TRUE, FALSE, - STARTF_USESTDHANDLES, - INVALID_HANDLE_VALUE, - DUPLICATE_SAME_ACCESS - }; - use libc::funcs::extra::kernel32::{ - GetCurrentProcess, - DuplicateHandle, - CloseHandle, - CreateProcessA - }; - use libc::funcs::extra::msvcrt::get_osfhandle; - - use sys; - - unsafe { - - let mut si = zeroed_startupinfo(); - si.cb = sys::size_of::() as DWORD; - si.dwFlags = STARTF_USESTDHANDLES; - - let cur_proc = GetCurrentProcess(); - - let orig_std_in = get_osfhandle(in_fd) as HANDLE; - if orig_std_in == INVALID_HANDLE_VALUE as HANDLE { - fail!("failure in get_osfhandle: %s", os::last_os_error()); - } - if DuplicateHandle(cur_proc, orig_std_in, cur_proc, &mut si.hStdInput, - 0, TRUE, DUPLICATE_SAME_ACCESS) == FALSE { - fail!("failure in DuplicateHandle: %s", os::last_os_error()); - } - - let orig_std_out = get_osfhandle(out_fd) as HANDLE; - if orig_std_out == INVALID_HANDLE_VALUE as HANDLE { - fail!("failure in get_osfhandle: %s", os::last_os_error()); - } - if DuplicateHandle(cur_proc, orig_std_out, cur_proc, &mut si.hStdOutput, - 0, TRUE, DUPLICATE_SAME_ACCESS) == FALSE { - fail!("failure in DuplicateHandle: %s", os::last_os_error()); - } - - let orig_std_err = get_osfhandle(err_fd) as HANDLE; - if orig_std_err == INVALID_HANDLE_VALUE as HANDLE { - fail!("failure in get_osfhandle: %s", os::last_os_error()); - } - if DuplicateHandle(cur_proc, orig_std_err, cur_proc, &mut si.hStdError, - 0, TRUE, DUPLICATE_SAME_ACCESS) == FALSE { - fail!("failure in DuplicateHandle: %s", os::last_os_error()); - } - - let cmd = make_command_line(prog, args); - let mut pi = zeroed_process_information(); - let mut create_err = None; - - do with_envp(env) |envp| { - do with_dirp(dir) |dirp| { - do cmd.with_c_str |cmdp| { - let created = CreateProcessA(ptr::null(), cast::transmute(cmdp), - ptr::mut_null(), ptr::mut_null(), TRUE, - 0, envp, dirp, &mut si, &mut pi); - if created == FALSE { - create_err = Some(os::last_os_error()); - } - } - } - } - - CloseHandle(si.hStdInput); - CloseHandle(si.hStdOutput); - CloseHandle(si.hStdError); - - for msg in create_err.iter() { - fail!("failure in CreateProcess: %s", *msg); - } - - // We close the thread handle because we don't care about keeping the thread id valid, - // and we aren't keeping the thread handle around to be able to close it later. We don't - // close the process handle however because we want the process id to stay valid at least - // until the calling code closes the process handle. - CloseHandle(pi.hThread); - - SpawnProcessResult { - pid: pi.dwProcessId as pid_t, - handle: pi.hProcess as *() - } - } -} - -#[cfg(windows)] -fn zeroed_startupinfo() -> libc::types::os::arch::extra::STARTUPINFO { - libc::types::os::arch::extra::STARTUPINFO { - cb: 0, - lpReserved: ptr::mut_null(), - lpDesktop: ptr::mut_null(), - lpTitle: ptr::mut_null(), - dwX: 0, - dwY: 0, - dwXSize: 0, - dwYSize: 0, - dwXCountChars: 0, - dwYCountCharts: 0, - dwFillAttribute: 0, - dwFlags: 0, - wShowWindow: 0, - cbReserved2: 0, - lpReserved2: ptr::mut_null(), - hStdInput: ptr::mut_null(), - hStdOutput: ptr::mut_null(), - hStdError: ptr::mut_null() - } -} - -#[cfg(windows)] -fn zeroed_process_information() -> libc::types::os::arch::extra::PROCESS_INFORMATION { - libc::types::os::arch::extra::PROCESS_INFORMATION { - hProcess: ptr::mut_null(), - hThread: ptr::mut_null(), - dwProcessId: 0, - dwThreadId: 0 - } -} - -// FIXME: this is only pub so it can be tested (see issue #4536) -#[cfg(windows)] -pub fn make_command_line(prog: &str, args: &[~str]) -> ~str { - let mut cmd = ~""; - append_arg(&mut cmd, prog); - for arg in args.iter() { - cmd.push_char(' '); - append_arg(&mut cmd, *arg); - } - return cmd; - - fn append_arg(cmd: &mut ~str, arg: &str) { - let quote = arg.iter().any(|c| c == ' ' || c == '\t'); - if quote { - cmd.push_char('"'); - } - for i in range(0u, arg.len()) { - append_char_at(cmd, arg, i); - } - if quote { - cmd.push_char('"'); - } - } - - fn append_char_at(cmd: &mut ~str, arg: &str, i: uint) { - match arg[i] as char { - '"' => { - // Escape quotes. - cmd.push_str("\\\""); - } - '\\' => { - if backslash_run_ends_in_quote(arg, i) { - // Double all backslashes that are in runs before quotes. - cmd.push_str("\\\\"); - } else { - // Pass other backslashes through unescaped. - cmd.push_char('\\'); - } - } - c => { - cmd.push_char(c); - } - } - } - - fn backslash_run_ends_in_quote(s: &str, mut i: uint) -> bool { - while i < s.len() && s[i] as char == '\\' { - i += 1; - } - return i < s.len() && s[i] as char == '"'; - } -} - -#[cfg(unix)] -fn spawn_process_os(prog: &str, args: &[~str], - env: Option<~[(~str, ~str)]>, - dir: Option<&Path>, - in_fd: c_int, out_fd: c_int, err_fd: c_int) -> SpawnProcessResult { - #[fixed_stack_segment]; #[inline(never)]; - - use libc::funcs::posix88::unistd::{fork, dup2, close, chdir, execvp}; - use libc::funcs::bsd44::getdtablesize; - - mod rustrt { - use libc::c_void; - - #[abi = "cdecl"] - extern { - pub fn rust_unset_sigprocmask(); - pub fn rust_set_environ(envp: *c_void); - } - } - - unsafe { - - let pid = fork(); - if pid < 0 { - fail!("failure in fork: %s", os::last_os_error()); - } else if pid > 0 { - return SpawnProcessResult {pid: pid, handle: ptr::null()}; - } - - rustrt::rust_unset_sigprocmask(); - - if dup2(in_fd, 0) == -1 { - fail!("failure in dup2(in_fd, 0): %s", os::last_os_error()); - } - if dup2(out_fd, 1) == -1 { - fail!("failure in dup2(out_fd, 1): %s", os::last_os_error()); - } - if dup2(err_fd, 2) == -1 { - fail!("failure in dup3(err_fd, 2): %s", os::last_os_error()); - } - // close all other fds - for fd in range(3, getdtablesize()).invert() { - close(fd as c_int); - } - - do with_dirp(dir) |dirp| { - if !dirp.is_null() && chdir(dirp) == -1 { - fail!("failure in chdir: %s", os::last_os_error()); - } - } - - do with_envp(env) |envp| { - if !envp.is_null() { - rustrt::rust_set_environ(envp); - } - do with_argv(prog, args) |argv| { - execvp(*argv, argv); - // execvp only returns if an error occurred - fail!("failure in execvp: %s", os::last_os_error()); - } - } - } -} - -#[cfg(unix)] -fn with_argv(prog: &str, args: &[~str], cb: &fn(**libc::c_char) -> T) -> T { - use vec; - - // We can't directly convert `str`s into `*char`s, as someone needs to hold - // a reference to the intermediary byte buffers. So first build an array to - // hold all the ~[u8] byte strings. - let mut tmps = vec::with_capacity(args.len() + 1); - - tmps.push(prog.to_c_str()); - - for arg in args.iter() { - tmps.push(arg.to_c_str()); - } - - // Next, convert each of the byte strings into a pointer. This is - // technically unsafe as the caller could leak these pointers out of our - // scope. - let mut ptrs = do tmps.map |tmp| { - tmp.with_ref(|buf| buf) - }; - - // Finally, make sure we add a null pointer. - ptrs.push(ptr::null()); - - ptrs.as_imm_buf(|buf, _| cb(buf)) -} - -#[cfg(unix)] -fn with_envp(env: Option<~[(~str, ~str)]>, cb: &fn(*c_void) -> T) -> T { - use vec; - - // On posixy systems we can pass a char** for envp, which is a - // null-terminated array of "k=v\n" strings. Like `with_argv`, we have to - // have a temporary buffer to hold the intermediary `~[u8]` byte strings. - match env { - Some(env) => { - let mut tmps = vec::with_capacity(env.len()); - - for pair in env.iter() { - // Use of match here is just to workaround limitations - // in the stage0 irrefutable pattern impl. - let kv = fmt!("%s=%s", pair.first(), pair.second()); - tmps.push(kv.to_c_str()); - } - - // Once again, this is unsafe. - let mut ptrs = do tmps.map |tmp| { - tmp.with_ref(|buf| buf) - }; - ptrs.push(ptr::null()); - - do ptrs.as_imm_buf |buf, _| { - unsafe { cb(cast::transmute(buf)) } - } - } - _ => cb(ptr::null()) - } -} - -#[cfg(windows)] -fn with_envp(env: Option<~[(~str, ~str)]>, cb: &fn(*mut c_void) -> T) -> T { - // On win32 we pass an "environment block" which is not a char**, but - // rather a concatenation of null-terminated k=v\0 sequences, with a final - // \0 to terminate. - match env { - Some(env) => { - let mut blk = ~[]; - - for pair in env.iter() { - let kv = fmt!("%s=%s", pair.first(), pair.second()); - blk.push_all(kv.as_bytes()); - blk.push(0); - } - - blk.push(0); - - do blk.as_imm_buf |p, _len| { - unsafe { cb(cast::transmute(p)) } - } - } - _ => cb(ptr::mut_null()) - } -} - -fn with_dirp(d: Option<&Path>, cb: &fn(*libc::c_char) -> T) -> T { - match d { - Some(dir) => dir.with_c_str(|buf| cb(buf)), - None => cb(ptr::null()) - } -} - -#[cfg(windows)] -fn free_handle(handle: *()) { - #[fixed_stack_segment]; #[inline(never)]; - unsafe { - libc::funcs::extra::kernel32::CloseHandle(cast::transmute(handle)); - } -} - -#[cfg(unix)] -fn free_handle(_handle: *()) { - // unix has no process handle object, just a pid -} - /** * Spawns a process and waits for it to terminate. The process will * inherit the current stdin/stdout/stderr file descriptors. @@ -824,7 +369,7 @@ pub fn process_status(prog: &str, args: &[~str]) -> int { in_fd: Some(0), out_fd: Some(1), err_fd: Some(2) - }); + }).unwrap(); prog.finish() } @@ -841,162 +386,38 @@ pub fn process_status(prog: &str, args: &[~str]) -> int { * The process's stdout/stderr output and exit code. */ pub fn process_output(prog: &str, args: &[~str]) -> ProcessOutput { - let mut prog = Process::new(prog, args, ProcessOptions::new()); + let mut prog = Process::new(prog, args, ProcessOptions::new()).unwrap(); prog.finish_with_output() } -/** - * Waits for a process to exit and returns the exit code, failing - * if there is no process with the specified id. - * - * Note that this is private to avoid race conditions on unix where if - * a user calls waitpid(some_process.get_id()) then some_process.finish() - * and some_process.destroy() and some_process.finalize() will then either - * operate on a none-existent process or, even worse, on a newer process - * with the same id. - */ -fn waitpid(pid: pid_t) -> int { - return waitpid_os(pid); - - #[cfg(windows)] - fn waitpid_os(pid: pid_t) -> int { - #[fixed_stack_segment]; #[inline(never)]; - - use libc::types::os::arch::extra::DWORD; - use libc::consts::os::extra::{ - SYNCHRONIZE, - PROCESS_QUERY_INFORMATION, - FALSE, - STILL_ACTIVE, - INFINITE, - WAIT_FAILED - }; - use libc::funcs::extra::kernel32::{ - OpenProcess, - GetExitCodeProcess, - CloseHandle, - WaitForSingleObject - }; - - unsafe { - - let proc = OpenProcess(SYNCHRONIZE | PROCESS_QUERY_INFORMATION, FALSE, pid as DWORD); - if proc.is_null() { - fail!("failure in OpenProcess: %s", os::last_os_error()); - } - - loop { - let mut status = 0; - if GetExitCodeProcess(proc, &mut status) == FALSE { - CloseHandle(proc); - fail!("failure in GetExitCodeProcess: %s", os::last_os_error()); - } - if status != STILL_ACTIVE { - CloseHandle(proc); - return status as int; - } - if WaitForSingleObject(proc, INFINITE) == WAIT_FAILED { - CloseHandle(proc); - fail!("failure in WaitForSingleObject: %s", os::last_os_error()); - } - } - } - } - - #[cfg(unix)] - fn waitpid_os(pid: pid_t) -> int { - #[fixed_stack_segment]; #[inline(never)]; - - use libc::funcs::posix01::wait::*; - - #[cfg(target_os = "linux")] - #[cfg(target_os = "android")] - fn WIFEXITED(status: i32) -> bool { - (status & 0xffi32) == 0i32 - } - - #[cfg(target_os = "macos")] - #[cfg(target_os = "freebsd")] - fn WIFEXITED(status: i32) -> bool { - (status & 0x7fi32) == 0i32 - } - - #[cfg(target_os = "linux")] - #[cfg(target_os = "android")] - fn WEXITSTATUS(status: i32) -> i32 { - (status >> 8i32) & 0xffi32 - } - - #[cfg(target_os = "macos")] - #[cfg(target_os = "freebsd")] - fn WEXITSTATUS(status: i32) -> i32 { - status >> 8i32 - } - - let mut status = 0 as c_int; - if unsafe { waitpid(pid, &mut status, 0) } == -1 { - fail!("failure in waitpid: %s", os::last_os_error()); - } - - return if WIFEXITED(status) { - WEXITSTATUS(status) as int - } else { - 1 - }; - } -} - #[cfg(test)] mod tests { - use io; - use libc::c_int; - use option::{Option, None, Some}; use os; use path::Path; - use run; + use prelude::*; use str; + use super::*; use unstable::running_on_valgrind; - #[test] - #[cfg(windows)] - fn test_make_command_line() { - assert_eq!( - run::make_command_line("prog", [~"aaa", ~"bbb", ~"ccc"]), - ~"prog aaa bbb ccc" - ); - assert_eq!( - run::make_command_line("C:\\Program Files\\blah\\blah.exe", [~"aaa"]), - ~"\"C:\\Program Files\\blah\\blah.exe\" aaa" - ); - assert_eq!( - run::make_command_line("C:\\Program Files\\test", [~"aa\"bb"]), - ~"\"C:\\Program Files\\test\" aa\\\"bb" - ); - assert_eq!( - run::make_command_line("echo", [~"a b c"]), - ~"echo \"a b c\"" - ); - } - #[test] #[cfg(not(target_os="android"))] fn test_process_status() { - assert_eq!(run::process_status("false", []), 1); - assert_eq!(run::process_status("true", []), 0); + assert_eq!(process_status("false", []), 1); + assert_eq!(process_status("true", []), 0); } #[test] #[cfg(target_os="android")] fn test_process_status() { - assert_eq!(run::process_status("/system/bin/sh", [~"-c",~"false"]), 1); - assert_eq!(run::process_status("/system/bin/sh", [~"-c",~"true"]), 0); + assert_eq!(process_status("/system/bin/sh", [~"-c",~"false"]), 1); + assert_eq!(process_status("/system/bin/sh", [~"-c",~"true"]), 0); } #[test] #[cfg(not(target_os="android"))] fn test_process_output_output() { - let run::ProcessOutput {status, output, error} - = run::process_output("echo", [~"hello"]); + let ProcessOutput {status, output, error} + = process_output("echo", [~"hello"]); let output_str = str::from_bytes(output); assert_eq!(status, 0); @@ -1010,8 +431,8 @@ mod tests { #[cfg(target_os="android")] fn test_process_output_output() { - let run::ProcessOutput {status, output, error} - = run::process_output("/system/bin/sh", [~"-c",~"echo hello"]); + let ProcessOutput {status, output, error} + = process_output("/system/bin/sh", [~"-c",~"echo hello"]); let output_str = str::from_bytes(output); assert_eq!(status, 0); @@ -1026,8 +447,8 @@ mod tests { #[cfg(not(target_os="android"))] fn test_process_output_error() { - let run::ProcessOutput {status, output, error} - = run::process_output("mkdir", [~"."]); + let ProcessOutput {status, output, error} + = process_output("mkdir", [~"."]); assert_eq!(status, 1); assert_eq!(output, ~[]); @@ -1037,90 +458,40 @@ mod tests { #[cfg(target_os="android")] fn test_process_output_error() { - let run::ProcessOutput {status, output, error} - = run::process_output("/system/bin/mkdir", [~"."]); + let ProcessOutput {status, output, error} + = process_output("/system/bin/mkdir", [~"."]); assert_eq!(status, 255); assert_eq!(output, ~[]); assert!(!error.is_empty()); } - #[test] - fn test_pipes() { - - let pipe_in = os::pipe(); - let pipe_out = os::pipe(); - let pipe_err = os::pipe(); - - let mut proc = run::Process::new("cat", [], run::ProcessOptions { - dir: None, - env: None, - in_fd: Some(pipe_in.input), - out_fd: Some(pipe_out.out), - err_fd: Some(pipe_err.out) - }); - - assert!(proc.input_redirected()); - assert!(proc.output_redirected()); - assert!(proc.error_redirected()); - - os::close(pipe_in.input); - os::close(pipe_out.out); - os::close(pipe_err.out); - - let expected = ~"test"; - writeclose(pipe_in.out, expected); - let actual = readclose(pipe_out.input); - readclose(pipe_err.input); - proc.finish(); - - assert_eq!(expected, actual); - } - - fn writeclose(fd: c_int, s: &str) { - let writer = io::fd_writer(fd, false); - writer.write_str(s); - os::close(fd); - } - - fn readclose(fd: c_int) -> ~str { - #[fixed_stack_segment]; #[inline(never)]; - - unsafe { - let file = os::fdopen(fd); - let reader = io::FILE_reader(file, false); - let buf = reader.read_whole_stream(); - os::fclose(file); - str::from_bytes(buf) - } - } - #[test] #[cfg(not(target_os="android"))] fn test_finish_once() { - let mut prog = run::Process::new("false", [], run::ProcessOptions::new()); + let mut prog = Process::new("false", [], ProcessOptions::new()).unwrap(); assert_eq!(prog.finish(), 1); } #[test] #[cfg(target_os="android")] fn test_finish_once() { - let mut prog = run::Process::new("/system/bin/sh", [~"-c",~"false"], - run::ProcessOptions::new()); + let mut prog = Process::new("/system/bin/sh", [~"-c",~"false"], + ProcessOptions::new()).unwrap(); assert_eq!(prog.finish(), 1); } #[test] #[cfg(not(target_os="android"))] fn test_finish_twice() { - let mut prog = run::Process::new("false", [], run::ProcessOptions::new()); + let mut prog = Process::new("false", [], ProcessOptions::new()).unwrap(); assert_eq!(prog.finish(), 1); assert_eq!(prog.finish(), 1); } #[test] #[cfg(target_os="android")] fn test_finish_twice() { - let mut prog = run::Process::new("/system/bin/sh", [~"-c",~"false"], - run::ProcessOptions::new()); + let mut prog = Process::new("/system/bin/sh", [~"-c",~"false"], + ProcessOptions::new()).unwrap(); assert_eq!(prog.finish(), 1); assert_eq!(prog.finish(), 1); } @@ -1129,8 +500,9 @@ mod tests { #[cfg(not(target_os="android"))] fn test_finish_with_output_once() { - let mut prog = run::Process::new("echo", [~"hello"], run::ProcessOptions::new()); - let run::ProcessOutput {status, output, error} + let prog = Process::new("echo", [~"hello"], ProcessOptions::new()); + let mut prog = prog.unwrap(); + let ProcessOutput {status, output, error} = prog.finish_with_output(); let output_str = str::from_bytes(output); @@ -1145,9 +517,9 @@ mod tests { #[cfg(target_os="android")] fn test_finish_with_output_once() { - let mut prog = run::Process::new("/system/bin/sh", [~"-c",~"echo hello"], - run::ProcessOptions::new()); - let run::ProcessOutput {status, output, error} + let mut prog = Process::new("/system/bin/sh", [~"-c",~"echo hello"], + ProcessOptions::new()).unwrap(); + let ProcessOutput {status, output, error} = prog.finish_with_output(); let output_str = str::from_bytes(output); @@ -1159,113 +531,59 @@ mod tests { } } - #[test] - #[cfg(not(target_os="android"))] - fn test_finish_with_output_twice() { - - let mut prog = run::Process::new("echo", [~"hello"], run::ProcessOptions::new()); - let run::ProcessOutput {status, output, error} - = prog.finish_with_output(); - - let output_str = str::from_bytes(output); - - assert_eq!(status, 0); - assert_eq!(output_str.trim().to_owned(), ~"hello"); - // FIXME #7224 - if !running_on_valgrind() { - assert_eq!(error, ~[]); - } - - let run::ProcessOutput {status, output, error} - = prog.finish_with_output(); - - assert_eq!(status, 0); - assert_eq!(output, ~[]); - // FIXME #7224 - if !running_on_valgrind() { - assert_eq!(error, ~[]); - } - } - #[test] - #[cfg(target_os="android")] - fn test_finish_with_output_twice() { - - let mut prog = run::Process::new("/system/bin/sh", [~"-c",~"echo hello"], - run::ProcessOptions::new()); - let run::ProcessOutput {status, output, error} - = prog.finish_with_output(); - - let output_str = str::from_bytes(output); - - assert_eq!(status, 0); - assert_eq!(output_str.trim().to_owned(), ~"hello"); - // FIXME #7224 - if !running_on_valgrind() { - assert_eq!(error, ~[]); - } - - let run::ProcessOutput {status, output, error} - = prog.finish_with_output(); - - assert_eq!(status, 0); - assert_eq!(output, ~[]); - // FIXME #7224 - if !running_on_valgrind() { - assert_eq!(error, ~[]); - } - } - #[test] #[should_fail] #[cfg(not(windows),not(target_os="android"))] fn test_finish_with_output_redirected() { - let mut prog = run::Process::new("echo", [~"hello"], run::ProcessOptions { + let mut prog = Process::new("echo", [~"hello"], ProcessOptions { env: None, dir: None, in_fd: Some(0), out_fd: Some(1), err_fd: Some(2) - }); - // this should fail because it is not valid to read the output when it was redirected + }).unwrap(); + // this should fail because it is not valid to read the output when it + // was redirected prog.finish_with_output(); } #[test] #[should_fail] #[cfg(not(windows),target_os="android")] fn test_finish_with_output_redirected() { - let mut prog = run::Process::new("/system/bin/sh", [~"-c",~"echo hello"], - run::ProcessOptions { + let mut prog = Process::new("/system/bin/sh", [~"-c",~"echo hello"], + ProcessOptions { env: None, dir: None, in_fd: Some(0), out_fd: Some(1), err_fd: Some(2) - }); - // this should fail because it is not valid to read the output when it was redirected + }).unwrap(); + // this should fail because it is not valid to read the output when it + // was redirected prog.finish_with_output(); } #[cfg(unix,not(target_os="android"))] - fn run_pwd(dir: Option<&Path>) -> run::Process { - run::Process::new("pwd", [], run::ProcessOptions { + fn run_pwd(dir: Option<&Path>) -> Process { + Process::new("pwd", [], ProcessOptions { dir: dir, - .. run::ProcessOptions::new() - }) + .. ProcessOptions::new() + }).unwrap() } #[cfg(unix,target_os="android")] - fn run_pwd(dir: Option<&Path>) -> run::Process { - run::Process::new("/system/bin/sh", [~"-c",~"pwd"], run::ProcessOptions { + fn run_pwd(dir: Option<&Path>) -> Process { + Process::new("/system/bin/sh", [~"-c",~"pwd"], ProcessOptions { dir: dir, - .. run::ProcessOptions::new() - }) + .. ProcessOptions::new() + }).unwrap() } #[cfg(windows)] - fn run_pwd(dir: Option<&Path>) -> run::Process { - run::Process::new("cmd", [~"/c", ~"cd"], run::ProcessOptions { + fn run_pwd(dir: Option<&Path>) -> Process { + Process::new("cmd", [~"/c", ~"cd"], ProcessOptions { dir: dir, - .. run::ProcessOptions::new() - }) + .. ProcessOptions::new() + }).unwrap() } #[test] @@ -1301,26 +619,26 @@ mod tests { } #[cfg(unix,not(target_os="android"))] - fn run_env(env: Option<~[(~str, ~str)]>) -> run::Process { - run::Process::new("env", [], run::ProcessOptions { + fn run_env(env: Option<~[(~str, ~str)]>) -> Process { + Process::new("env", [], ProcessOptions { env: env, - .. run::ProcessOptions::new() - }) + .. ProcessOptions::new() + }).unwrap() } #[cfg(unix,target_os="android")] - fn run_env(env: Option<~[(~str, ~str)]>) -> run::Process { - run::Process::new("/system/bin/sh", [~"-c",~"set"], run::ProcessOptions { + fn run_env(env: Option<~[(~str, ~str)]>) -> Process { + Process::new("/system/bin/sh", [~"-c",~"set"], ProcessOptions { env: env, - .. run::ProcessOptions::new() - }) + .. ProcessOptions::new() + }).unwrap() } #[cfg(windows)] - fn run_env(env: Option<~[(~str, ~str)]>) -> run::Process { - run::Process::new("cmd", [~"/c", ~"set"], run::ProcessOptions { + fn run_env(env: Option<~[(~str, ~str)]>) -> Process { + Process::new("cmd", [~"/c", ~"set"], ProcessOptions { env: env, - .. run::ProcessOptions::new() - }) + .. ProcessOptions::new() + }).unwrap() } #[test] @@ -1357,7 +675,6 @@ mod tests { #[test] fn test_add_to_env() { - let mut new_env = os::env(); new_env.push((~"RUN_TEST_NEW_ENV", ~"123")); diff --git a/src/libuv b/src/libuv index dfae9c3e958..ef2bcd13416 160000 --- a/src/libuv +++ b/src/libuv @@ -1 +1 @@ -Subproject commit dfae9c3e958dc086d9c0ab068cd76d196c95a433 +Subproject commit ef2bcd134164adcaa072dcb56e62b737fdcb075e diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index 8ef4572f810..a181e76df5c 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -329,20 +329,13 @@ rust_uv_get_len_from_buf(uv_buf_t buf) { return buf.len; } -extern "C" uv_err_t -rust_uv_last_error(uv_loop_t* loop) { - return uv_last_error(loop); -} - extern "C" const char* -rust_uv_strerror(uv_err_t* err_ptr) { - uv_err_t err = *err_ptr; +rust_uv_strerror(int err) { return uv_strerror(err); } extern "C" const char* -rust_uv_err_name(uv_err_t* err_ptr) { - uv_err_t err = *err_ptr; +rust_uv_err_name(int err) { return uv_err_name(err); } @@ -553,3 +546,37 @@ extern "C" uv_loop_t* rust_uv_get_loop_from_fs_req(uv_fs_t* req) { return req->loop; } +extern "C" int +rust_uv_spawn(uv_loop_t *loop, uv_process_t *p, uv_process_options_t options) { + return uv_spawn(loop, p, options); +} + +extern "C" int +rust_uv_process_kill(uv_process_t *p, int signum) { + return uv_process_kill(p, signum); +} + +extern "C" void +rust_set_stdio_container_flags(uv_stdio_container_t *c, int flags) { + c->flags = (uv_stdio_flags) flags; +} + +extern "C" void +rust_set_stdio_container_fd(uv_stdio_container_t *c, int fd) { + c->data.fd = fd; +} + +extern "C" void +rust_set_stdio_container_stream(uv_stdio_container_t *c, uv_stream_t *stream) { + c->data.stream = stream; +} + +extern "C" int +rust_uv_process_pid(uv_process_t* p) { + return p->pid; +} + +extern "C" int +rust_uv_pipe_init(uv_loop_t *loop, uv_pipe_t* p, int ipc) { + return uv_pipe_init(loop, p, ipc); +} diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index b668d394406..2fc1a91a132 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -47,7 +47,6 @@ rust_uv_timer_start rust_uv_timer_stop rust_uv_tcp_init rust_uv_buf_init -rust_uv_last_error rust_uv_strerror rust_uv_err_name rust_uv_ip4_addr @@ -191,4 +190,11 @@ rust_drop_global_args_lock rust_take_change_dir_lock rust_drop_change_dir_lock rust_get_test_int -rust_get_task \ No newline at end of file +rust_get_task +rust_uv_spawn +rust_uv_process_kill +rust_set_stdio_container_flags +rust_set_stdio_container_fd +rust_set_stdio_container_stream +rust_uv_process_pid +rust_uv_pipe_init diff --git a/src/test/run-pass/core-run-destroy.rs b/src/test/run-pass/core-run-destroy.rs index 2551d1a5cfc..90e63fc977d 100644 --- a/src/test/run-pass/core-run-destroy.rs +++ b/src/test/run-pass/core-run-destroy.rs @@ -22,13 +22,15 @@ use std::str; #[test] fn test_destroy_once() { - let mut p = run::Process::new("echo", [], run::ProcessOptions::new()); + let p = run::Process::new("echo", [], run::ProcessOptions::new()); + let mut p = p.unwrap(); p.destroy(); // this shouldn't crash (and nor should the destructor) } #[test] fn test_destroy_twice() { - let mut p = run::Process::new("echo", [], run::ProcessOptions::new()); + let p = run::Process::new("echo", [], run::ProcessOptions::new()); + let mut p = p.unwrap(); p.destroy(); // this shouldnt crash... p.destroy(); // ...and nor should this (and nor should the destructor) } @@ -74,7 +76,8 @@ fn test_destroy_actually_kills(force: bool) { } // this process will stay alive indefinitely trying to read from stdin - let mut p = run::Process::new(BLOCK_COMMAND, [], run::ProcessOptions::new()); + let p = run::Process::new(BLOCK_COMMAND, [], run::ProcessOptions::new()); + let mut p = p.unwrap(); assert!(process_exists(p.get_id()));