std,green: Mark some queue types as NoShare

This commit is contained in:
Alex Crichton 2014-05-20 18:54:31 -07:00
parent 54f6eacf34
commit fdf935a524
5 changed files with 26 additions and 16 deletions

View File

@ -10,6 +10,7 @@
use alloc::arc::Arc;
use mpsc = std::sync::mpsc_queue;
use std::kinds::marker;
pub enum PopResult<T> {
Inconsistent,
@ -19,15 +20,18 @@ pub enum PopResult<T> {
pub fn queue<T: Send>() -> (Consumer<T>, Producer<T>) {
let a = Arc::new(mpsc::Queue::new());
(Consumer { inner: a.clone() }, Producer { inner: a })
(Consumer { inner: a.clone(), noshare: marker::NoShare },
Producer { inner: a, noshare: marker::NoShare })
}
pub struct Producer<T> {
inner: Arc<mpsc::Queue<T>>,
noshare: marker::NoShare,
}
pub struct Consumer<T> {
inner: Arc<mpsc::Queue<T>>,
noshare: marker::NoShare,
}
impl<T: Send> Consumer<T> {
@ -56,6 +60,6 @@ impl<T: Send> Producer<T> {
impl<T: Send> Clone for Producer<T> {
fn clone(&self) -> Producer<T> {
Producer { inner: self.inner.clone() }
Producer { inner: self.inner.clone(), noshare: marker::NoShare }
}
}

View File

@ -326,7 +326,7 @@ impl TcpStream {
fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
let ret = Guard {
fd: self.fd(),
guard: unsafe { (*self.inner.get()).lock.lock() },
guard: unsafe { self.inner.lock.lock() },
};
assert!(util::set_nonblocking(self.fd(), true).is_ok());
ret
@ -597,7 +597,7 @@ impl UdpSocket {
fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
let ret = Guard {
fd: self.fd(),
guard: unsafe { (*self.inner.get()).lock.lock() },
guard: unsafe { self.inner.lock.lock() },
};
assert!(util::set_nonblocking(self.fd(), true).is_ok());
ret

View File

@ -138,7 +138,7 @@ impl UnixStream {
fn lock_nonblocking<'a>(&'a self) -> net::Guard<'a> {
let ret = net::Guard {
fd: self.fd(),
guard: self.inner.lock.lock(),
guard: unsafe { self.inner.lock.lock() },
};
assert!(util::set_nonblocking(self.fd(), true).is_ok());
ret

View File

@ -320,11 +320,11 @@ impl UnixStream {
fn handle(&self) -> libc::HANDLE { self.inner.handle }
fn read_closed(&self) -> bool {
unsafe { (*self.inner.get()).read_closed.load(atomics::SeqCst) }
self.inner.read_closed.load(atomics::SeqCst)
}
fn write_closed(&self) -> bool {
unsafe { (*self.inner.get()).write_closed.load(atomics::SeqCst) }
self.inner.write_closed.load(atomics::SeqCst)
}
fn cancel_io(&self) -> IoResult<()> {
@ -353,7 +353,7 @@ impl rtio::RtioPipe for UnixStream {
// acquire the lock.
//
// See comments in close_read() about why this lock is necessary.
let guard = unsafe { (*self.inner.get()).lock.lock() };
let guard = unsafe { self.inner.lock.lock() };
if self.read_closed() {
return Err(io::standard_error(io::EndOfFile))
}
@ -429,7 +429,7 @@ impl rtio::RtioPipe for UnixStream {
// going after we woke up.
//
// See comments in close_read() about why this lock is necessary.
let guard = unsafe { (*self.inner.get()).lock.lock() };
let guard = unsafe { self.inner.lock.lock() };
if self.write_closed() {
return Err(io::standard_error(io::BrokenPipe))
}
@ -514,15 +514,15 @@ impl rtio::RtioPipe for UnixStream {
// close_read() between steps 1 and 2. By atomically executing steps 1
// and 2 with a lock with respect to close_read(), we're guaranteed that
// no thread will erroneously sit in a read forever.
let _guard = unsafe { (*self.inner.get()).lock.lock() };
unsafe { (*self.inner.get()).read_closed.store(true, atomics::SeqCst) }
let _guard = unsafe { self.inner.lock.lock() };
self.inner.read_closed.store(true, atomics::SeqCst);
self.cancel_io()
}
fn close_write(&mut self) -> IoResult<()> {
// see comments in close_read() for why this lock is necessary
let _guard = unsafe { (*self.inner.get()).lock.lock() };
unsafe { (*self.inner.get()).write_closed.store(true, atomics::SeqCst) }
let _guard = unsafe { self.inner.lock.lock() };
self.inner.write_closed.store(true, atomics::SeqCst);
self.cancel_io()
}

View File

@ -53,16 +53,17 @@ use alloc::arc::Arc;
use clone::Clone;
use iter::{range, Iterator};
use kinds::Send;
use kinds::marker;
use mem::{forget, min_align_of, size_of, transmute};
use ops::Drop;
use option::{Option, Some, None};
use owned::Box;
use ptr::RawPtr;
use ptr;
use rt::heap::{allocate, deallocate};
use slice::ImmutableVector;
use sync::atomics::{AtomicInt, AtomicPtr, SeqCst};
use unstable::sync::Exclusive;
use rt::heap::{allocate, deallocate};
use vec::Vec;
// Once the queue is less than 1/K full, then it will be downsized. Note that
@ -89,6 +90,7 @@ struct Deque<T> {
/// There may only be one worker per deque.
pub struct Worker<T> {
deque: Arc<Deque<T>>,
noshare: marker::NoShare,
}
/// The stealing half of the work-stealing deque. Stealers have access to the
@ -96,6 +98,7 @@ pub struct Worker<T> {
/// `steal` method.
pub struct Stealer<T> {
deque: Arc<Deque<T>>,
noshare: marker::NoShare,
}
/// When stealing some data, this is an enumeration of the possible outcomes.
@ -153,7 +156,8 @@ impl<T: Send> BufferPool<T> {
pub fn deque(&self) -> (Worker<T>, Stealer<T>) {
let a = Arc::new(Deque::new(self.clone()));
let b = a.clone();
(Worker { deque: a }, Stealer { deque: b })
(Worker { deque: a, noshare: marker::NoShare },
Stealer { deque: b, noshare: marker::NoShare })
}
fn alloc(&self, bits: int) -> Box<Buffer<T>> {
@ -219,7 +223,9 @@ impl<T: Send> Stealer<T> {
}
impl<T: Send> Clone for Stealer<T> {
fn clone(&self) -> Stealer<T> { Stealer { deque: self.deque.clone() } }
fn clone(&self) -> Stealer<T> {
Stealer { deque: self.deque.clone(), noshare: marker::NoShare }
}
}
// Almost all of this code can be found directly in the paper so I'm not