Move stdin to using libuv's pipes instead of a tty

I was seeing a lot of weird behavior with stdin behaving as a tty, and it
doesn't really quite make sense, so instead this moves to using libuv's pipes
instead (which make more sense for stdin specifically).

This prevents piping input to rustc hanging forever.
This commit is contained in:
Alex Crichton 2013-10-18 14:01:22 -07:00
parent 6b70ddfba1
commit 279c351820
9 changed files with 48 additions and 84 deletions

View File

@ -19,7 +19,7 @@ on a `ToCStr` object. This trait is already defined for common
objects such as strings and `Path` instances.
All operations in this module, including those as part of `FileStream` et al
block the task during execution. Most will raise `std::rt::io::{io_error,io_error}`
block the task during execution. Most will raise `std::rt::io::io_error`
conditions in the event of failure.
Also included in this module are the `FileInfo` and `DirectoryInfo` traits. When

View File

@ -37,7 +37,7 @@ pub struct UnixStream {
impl UnixStream {
fn new(obj: ~RtioPipe) -> UnixStream {
UnixStream { obj: PipeStream::new_bound(obj) }
UnixStream { obj: PipeStream::new(obj) }
}
/// Connect to a pipe named by `path`. This will attempt to open a

View File

@ -23,7 +23,7 @@ pub struct PipeStream {
}
impl PipeStream {
pub fn new_bound(inner: ~RtioPipe) -> PipeStream {
pub fn new(inner: ~RtioPipe) -> PipeStream {
PipeStream { obj: inner }
}
}
@ -42,7 +42,7 @@ impl Reader for PipeStream {
}
}
fn eof(&mut self) -> bool { fail!() }
fn eof(&mut self) -> bool { false }
}
impl Writer for PipeStream {
@ -55,5 +55,5 @@ impl Writer for PipeStream {
}
}
fn flush(&mut self) { fail!() }
fn flush(&mut self) {}
}

View File

@ -89,7 +89,7 @@ impl Process {
Ok((p, io)) => Some(Process{
handle: p,
io: io.move_iter().map(|p|
p.map(|p| io::PipeStream::new_bound(p))
p.map(|p| io::PipeStream::new(p))
).collect()
}),
Err(ioerr) => {

View File

@ -30,7 +30,7 @@ use fmt;
use libc;
use option::{Option, Some, None};
use result::{Ok, Err};
use rt::rtio::{IoFactory, RtioTTY, with_local_io};
use rt::rtio::{IoFactory, RtioTTY, with_local_io, RtioPipe};
use super::{Reader, Writer, io_error};
#[fixed_stack_segment] #[inline(never)]
@ -52,8 +52,17 @@ fn tty<T>(fd: libc::c_int, f: &fn(~RtioTTY) -> T) -> T {
/// Creates a new non-blocking handle to the stdin of the current process.
///
/// See `stdout()` for notes about this function.
#[fixed_stack_segment] #[inline(never)]
pub fn stdin() -> StdReader {
do tty(libc::STDIN_FILENO) |tty| { StdReader { inner: tty } }
do with_local_io |io| {
match io.pipe_open(unsafe { libc::dup(libc::STDIN_FILENO) }) {
Ok(stream) => Some(StdReader { inner: stream }),
Err(e) => {
io_error::cond.raise(e);
None
}
}
}.unwrap()
}
/// Creates a new non-blocking handle to the stdout of the current process.
@ -108,28 +117,7 @@ pub fn println_args(fmt: &fmt::Arguments) {
/// Representation of a reader of a standard input stream
pub struct StdReader {
priv inner: ~RtioTTY
}
impl StdReader {
/// Controls whether this output stream is a "raw stream" or simply a normal
/// stream.
///
/// # Failure
///
/// This function will raise on the `io_error` condition if an error
/// happens.
pub fn set_raw(&mut self, raw: bool) {
match self.inner.set_raw(raw) {
Ok(()) => {},
Err(e) => io_error::cond.raise(e),
}
}
/// Returns whether this tream is attached to a TTY instance or not.
///
/// This is similar to libc's isatty() function
pub fn isatty(&self) -> bool { self.inner.isatty() }
priv inner: ~RtioPipe
}
impl Reader for StdReader {

View File

@ -94,6 +94,7 @@ pub trait IoFactory {
fn spawn(&mut self, config: ProcessConfig)
-> Result<(~RtioProcess, ~[Option<~RtioPipe>]), IoError>;
fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError>;
fn unix_bind(&mut self, path: &CString) ->
Result<~RtioUnixListener, IoError>;
fn unix_connect(&mut self, path: &CString) -> Result<~RtioPipe, IoError>;

View File

@ -159,7 +159,7 @@ impl StreamWatcher {
// but read_stop may be called from inside one of them and we
// would end up freeing the in-use environment
let handle = self.native_handle();
unsafe { uvll::read_stop(handle); }
unsafe { assert_eq!(uvll::read_stop(handle), 0); }
}
pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) {

View File

@ -146,7 +146,7 @@ unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,
if writable {
flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int;
}
let pipe = UvUnboundPipe::new_fresh(loop_);
let pipe = UvUnboundPipe::new(loop_);
let handle = pipe.pipe.as_stream().native_handle();
uvll::set_stdio_container_flags(dst, flags);
uvll::set_stdio_container_stream(dst, handle);

View File

@ -805,47 +805,32 @@ impl IoFactory for UvIoFactory {
fn unix_bind(&mut self, path: &CString) ->
Result<~RtioUnixListener, IoError> {
let mut pipe = Pipe::new(self.uv_loop(), false);
match pipe.bind(path) {
Ok(()) => {
let handle = get_handle_to_current_scheduler!();
let pipe = UvUnboundPipe::new(pipe, handle);
Ok(~UvUnixListener::new(pipe) as ~RtioUnixListener)
}
Err(e) => {
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do pipe.close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(
task_cell.take());
}
}
Err(uv_error_to_io_error(e))
}
let mut pipe = UvUnboundPipe::new(self.uv_loop());
match pipe.pipe.bind(path) {
Ok(()) => Ok(~UvUnixListener::new(pipe) as ~RtioUnixListener),
Err(e) => Err(uv_error_to_io_error(e)),
}
}
fn unix_connect(&mut self, path: &CString) -> Result<~RtioPipe, IoError> {
let scheduler: ~Scheduler = Local::take();
let mut pipe = Pipe::new(self.uv_loop(), false);
let pipe = UvUnboundPipe::new(self.uv_loop());
let mut rawpipe = pipe.pipe;
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<~RtioPipe, IoError>> = &result_cell;
let pipe_cell = Cell::new(pipe);
let pipe_cell_ptr: *Cell<UvUnboundPipe> = &pipe_cell;
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do pipe.connect(path) |stream, err| {
do rawpipe.connect(path) |_stream, err| {
let res = match err {
None => {
let handle = stream.native_handle();
let pipe = NativeHandle::from_native_handle(
handle as *uvll::uv_pipe_t);
let home = get_handle_to_current_scheduler!();
let pipe = UvUnboundPipe::new(pipe, home);
let pipe = unsafe { (*pipe_cell_ptr).take() };
Ok(~UvPipeStream::new(pipe) as ~RtioPipe)
}
Some(e) => { Err(uv_error_to_io_error(e)) }
Some(e) => Err(uv_error_to_io_error(e)),
};
unsafe { (*result_cell_ptr).put_back(res); }
let scheduler: ~Scheduler = Local::take();
@ -854,18 +839,7 @@ impl IoFactory for UvIoFactory {
}
assert!(!result_cell.is_empty());
let ret = result_cell.take();
if ret.is_err() {
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do pipe.close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
return ret;
return result_cell.take();
}
fn tty_open(&mut self, fd: c_int, readable: bool)
@ -879,6 +853,14 @@ impl IoFactory for UvIoFactory {
Err(e) => Err(uv_error_to_io_error(e))
}
}
fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError> {
let mut pipe = UvUnboundPipe::new(self.uv_loop());
match pipe.pipe.open(fd) {
Ok(()) => Ok(~UvPipeStream::new(pipe) as ~RtioPipe),
Err(e) => Err(uv_error_to_io_error(e))
}
}
}
pub struct UvTcpListener {
@ -1075,14 +1057,9 @@ pub struct UvUnboundPipe {
}
impl UvUnboundPipe {
/// Takes ownership of an unbound pipe along with the scheduler that it is
/// homed on.
fn new(pipe: Pipe, home: SchedHandle) -> UvUnboundPipe {
UvUnboundPipe { pipe: pipe, home: home }
}
/// Creates a fresh new unbound pipe on the specified I/O loop
pub fn new_fresh(loop_: &Loop) -> UvUnboundPipe {
/// Creates a new unbound pipe homed to the current scheduler, placed on the
/// specified event loop
pub fn new(loop_: &Loop) -> UvUnboundPipe {
UvUnboundPipe {
pipe: Pipe::new(loop_, false),
home: get_handle_to_current_scheduler!(),
@ -1727,10 +1704,8 @@ impl RtioUnixListener for UvUnixListener {
let inc = match status {
Some(e) => Err(uv_error_to_io_error(e)),
None => {
let inc = Pipe::new(&server.event_loop(), false);
server.accept(inc.as_stream());
let home = get_handle_to_current_scheduler!();
let pipe = UvUnboundPipe::new(inc, home);
let pipe = UvUnboundPipe::new(&server.event_loop());
server.accept(pipe.pipe.as_stream());
Ok(~UvPipeStream::new(pipe) as ~RtioPipe)
}
};