Implement RaceBox for StdinReader

This commit is contained in:
Flavio Percoco 2014-12-24 17:40:40 +01:00
parent 52072dec0f
commit bb315f25f8
4 changed files with 43 additions and 16 deletions

View File

@ -22,6 +22,7 @@ use result::Result::{Ok, Err};
use slice::{SliceExt}; use slice::{SliceExt};
use slice; use slice;
use vec::Vec; use vec::Vec;
use kinds::{Send,Sync};
/// Wraps a Reader and buffers input from it /// Wraps a Reader and buffers input from it
/// ///
@ -51,6 +52,11 @@ pub struct BufferedReader<R> {
cap: uint, cap: uint,
} }
unsafe impl<R: Send> Send for BufferedReader<R> {}
unsafe impl<R: Send+Sync> Sync for BufferedReader<R> {}
impl<R: Reader> BufferedReader<R> { impl<R: Reader> BufferedReader<R> {
/// Creates a new `BufferedReader` with the specified buffer capacity /// Creates a new `BufferedReader` with the specified buffer capacity
pub fn with_capacity(cap: uint, inner: R) -> BufferedReader<R> { pub fn with_capacity(cap: uint, inner: R) -> BufferedReader<R> {

View File

@ -34,7 +34,7 @@ use failure::LOCAL_STDERR;
use fmt; use fmt;
use io::{Reader, Writer, IoResult, IoError, OtherIoError, Buffer, use io::{Reader, Writer, IoResult, IoError, OtherIoError, Buffer,
standard_error, EndOfFile, LineBufferedWriter, BufferedReader}; standard_error, EndOfFile, LineBufferedWriter, BufferedReader};
use kinds::Send; use kinds::{Sync, Send};
use libc; use libc;
use mem; use mem;
use option::Option; use option::Option;
@ -98,26 +98,34 @@ thread_local! {
} }
} }
struct RaceBox(BufferedReader<StdReader>);
unsafe impl Send for RaceBox {}
unsafe impl Sync for RaceBox {}
/// A synchronized wrapper around a buffered reader from stdin /// A synchronized wrapper around a buffered reader from stdin
#[deriving(Clone)] #[deriving(Clone)]
pub struct StdinReader { pub struct StdinReader {
inner: Arc<Mutex<BufferedReader<StdReader>>>, inner: Arc<Mutex<RaceBox>>,
} }
unsafe impl Send for StdinReader {}
unsafe impl Sync for StdinReader {}
/// A guard for exclusive access to `StdinReader`'s internal `BufferedReader`. /// A guard for exclusive access to `StdinReader`'s internal `BufferedReader`.
pub struct StdinReaderGuard<'a> { pub struct StdinReaderGuard<'a> {
inner: MutexGuard<'a, BufferedReader<StdReader>>, inner: MutexGuard<'a, RaceBox>,
} }
impl<'a> Deref<BufferedReader<StdReader>> for StdinReaderGuard<'a> { impl<'a> Deref<BufferedReader<StdReader>> for StdinReaderGuard<'a> {
fn deref(&self) -> &BufferedReader<StdReader> { fn deref(&self) -> &BufferedReader<StdReader> {
&*self.inner &self.inner.0
} }
} }
impl<'a> DerefMut<BufferedReader<StdReader>> for StdinReaderGuard<'a> { impl<'a> DerefMut<BufferedReader<StdReader>> for StdinReaderGuard<'a> {
fn deref_mut(&mut self) -> &mut BufferedReader<StdReader> { fn deref_mut(&mut self) -> &mut BufferedReader<StdReader> {
&mut *self.inner &mut self.inner.0
} }
} }
@ -147,7 +155,7 @@ impl StdinReader {
/// The read is performed atomically - concurrent read calls in other /// The read is performed atomically - concurrent read calls in other
/// threads will not interleave with this one. /// threads will not interleave with this one.
pub fn read_line(&mut self) -> IoResult<String> { pub fn read_line(&mut self) -> IoResult<String> {
self.inner.lock().read_line() self.inner.lock().0.read_line()
} }
/// Like `Buffer::read_until`. /// Like `Buffer::read_until`.
@ -155,7 +163,7 @@ impl StdinReader {
/// The read is performed atomically - concurrent read calls in other /// The read is performed atomically - concurrent read calls in other
/// threads will not interleave with this one. /// threads will not interleave with this one.
pub fn read_until(&mut self, byte: u8) -> IoResult<Vec<u8>> { pub fn read_until(&mut self, byte: u8) -> IoResult<Vec<u8>> {
self.inner.lock().read_until(byte) self.inner.lock().0.read_until(byte)
} }
/// Like `Buffer::read_char`. /// Like `Buffer::read_char`.
@ -163,13 +171,13 @@ impl StdinReader {
/// The read is performed atomically - concurrent read calls in other /// The read is performed atomically - concurrent read calls in other
/// threads will not interleave with this one. /// threads will not interleave with this one.
pub fn read_char(&mut self) -> IoResult<char> { pub fn read_char(&mut self) -> IoResult<char> {
self.inner.lock().read_char() self.inner.lock().0.read_char()
} }
} }
impl Reader for StdinReader { impl Reader for StdinReader {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
self.inner.lock().read(buf) self.inner.lock().0.read(buf)
} }
// We have to manually delegate all of these because the default impls call // We have to manually delegate all of these because the default impls call
@ -177,23 +185,23 @@ impl Reader for StdinReader {
// incur the costs of repeated locking). // incur the costs of repeated locking).
fn read_at_least(&mut self, min: uint, buf: &mut [u8]) -> IoResult<uint> { fn read_at_least(&mut self, min: uint, buf: &mut [u8]) -> IoResult<uint> {
self.inner.lock().read_at_least(min, buf) self.inner.lock().0.read_at_least(min, buf)
} }
fn push_at_least(&mut self, min: uint, len: uint, buf: &mut Vec<u8>) -> IoResult<uint> { fn push_at_least(&mut self, min: uint, len: uint, buf: &mut Vec<u8>) -> IoResult<uint> {
self.inner.lock().push_at_least(min, len, buf) self.inner.lock().0.push_at_least(min, len, buf)
} }
fn read_to_end(&mut self) -> IoResult<Vec<u8>> { fn read_to_end(&mut self) -> IoResult<Vec<u8>> {
self.inner.lock().read_to_end() self.inner.lock().0.read_to_end()
} }
fn read_le_uint_n(&mut self, nbytes: uint) -> IoResult<u64> { fn read_le_uint_n(&mut self, nbytes: uint) -> IoResult<u64> {
self.inner.lock().read_le_uint_n(nbytes) self.inner.lock().0.read_le_uint_n(nbytes)
} }
fn read_be_uint_n(&mut self, nbytes: uint) -> IoResult<u64> { fn read_be_uint_n(&mut self, nbytes: uint) -> IoResult<u64> {
self.inner.lock().read_be_uint_n(nbytes) self.inner.lock().0.read_be_uint_n(nbytes)
} }
} }
@ -221,7 +229,7 @@ pub fn stdin() -> StdinReader {
BufferedReader::new(stdin_raw()) BufferedReader::new(stdin_raw())
}; };
let stdin = StdinReader { let stdin = StdinReader {
inner: Arc::new(Mutex::new(stdin)) inner: Arc::new(Mutex::new(RaceBox(stdin)))
}; };
STDIN = mem::transmute(box stdin); STDIN = mem::transmute(box stdin);
@ -426,6 +434,9 @@ pub struct StdWriter {
inner: StdSource inner: StdSource
} }
unsafe impl Send for StdWriter {}
unsafe impl Sync for StdWriter {}
impl StdWriter { impl StdWriter {
/// Gets the size of this output window, if possible. This is typically used /// Gets the size of this output window, if possible. This is typically used
/// when the writer is attached to something like a terminal, this is used /// when the writer is attached to something like a terminal, this is used

View File

@ -63,6 +63,11 @@ unsafe impl<M:Send> Send for Helper<M> { }
unsafe impl<M:Send> Sync for Helper<M> { } unsafe impl<M:Send> Sync for Helper<M> { }
struct RaceBox(helper_signal::signal);
unsafe impl Send for RaceBox {}
unsafe impl Sync for RaceBox {}
impl<M: Send> Helper<M> { impl<M: Send> Helper<M> {
/// Lazily boots a helper thread, becoming a no-op if the helper has already /// Lazily boots a helper thread, becoming a no-op if the helper has already
/// been spawned. /// been spawned.
@ -85,9 +90,11 @@ impl<M: Send> Helper<M> {
let (receive, send) = helper_signal::new(); let (receive, send) = helper_signal::new();
*self.signal.get() = send as uint; *self.signal.get() = send as uint;
let receive = RaceBox(receive);
let t = f(); let t = f();
Thread::spawn(move |:| { Thread::spawn(move |:| {
helper(receive, rx, t); helper(receive.0, rx, t);
let _g = self.lock.lock(); let _g = self.lock.lock();
*self.shutdown.get() = true; *self.shutdown.get() = true;
self.cond.notify_one() self.cond.notify_one()

View File

@ -48,6 +48,9 @@ pub enum Req {
RemoveTimer(libc::HANDLE, Sender<()>), RemoveTimer(libc::HANDLE, Sender<()>),
} }
unsafe impl Send for Req {}
fn helper(input: libc::HANDLE, messages: Receiver<Req>, _: ()) { fn helper(input: libc::HANDLE, messages: Receiver<Req>, _: ()) {
let mut objs = vec![input]; let mut objs = vec![input];
let mut chans = vec![]; let mut chans = vec![];