std: Rebuild sync::deque on Arc

This also removes the `&mut self` requirement, using the correct `&self`
requirement for concurrent types.
This commit is contained in:
Alex Crichton 2014-05-19 15:51:31 -07:00
parent 44fcf46b00
commit efbd3724c0

View File

@ -48,6 +48,8 @@
// FIXME: all atomic operations in this module use a SeqCst ordering. That is
// probably overkill
use alloc::arc::Arc;
use clone::Clone;
use iter::{range, Iterator};
use kinds::Send;
@ -58,7 +60,6 @@ use owned::Box;
use ptr::RawPtr;
use ptr;
use slice::ImmutableVector;
use sync::arc::UnsafeArc;
use sync::atomics::{AtomicInt, AtomicPtr, SeqCst};
use unstable::sync::Exclusive;
use rt::heap::{allocate, deallocate};
@ -87,14 +88,14 @@ struct Deque<T> {
///
/// There may only be one worker per deque.
pub struct Worker<T> {
deque: UnsafeArc<Deque<T>>,
deque: Arc<Deque<T>>,
}
/// The stealing half of the work-stealing deque. Stealers have access to the
/// opposite end of the deque from the worker, and they only have access to the
/// `steal` method.
pub struct Stealer<T> {
deque: UnsafeArc<Deque<T>>,
deque: Arc<Deque<T>>,
}
/// When stealing some data, this is an enumeration of the possible outcomes.
@ -149,12 +150,13 @@ impl<T: Send> BufferPool<T> {
/// Allocates a new work-stealing deque which will send/receiving memory to
/// and from this buffer pool.
pub fn deque(&mut self) -> (Worker<T>, Stealer<T>) {
let (a, b) = UnsafeArc::new2(Deque::new(self.clone()));
pub fn deque(&self) -> (Worker<T>, Stealer<T>) {
let a = Arc::new(Deque::new(self.clone()));
let b = a.clone();
(Worker { deque: a }, Stealer { deque: b })
}
fn alloc(&mut self, bits: int) -> Box<Buffer<T>> {
fn alloc(&self, bits: int) -> Box<Buffer<T>> {
unsafe {
self.pool.with(|pool| {
match pool.iter().position(|x| x.size() >= (1 << bits)) {
@ -165,7 +167,7 @@ impl<T: Send> BufferPool<T> {
}
}
fn free(&mut self, buf: Box<Buffer<T>>) {
fn free(&self, buf: Box<Buffer<T>>) {
unsafe {
let mut buf = Some(buf);
self.pool.with(|pool| {
@ -185,34 +187,34 @@ impl<T: Send> Clone for BufferPool<T> {
impl<T: Send> Worker<T> {
/// Pushes data onto the front of this work queue.
pub fn push(&mut self, t: T) {
unsafe { (*self.deque.get()).push(t) }
pub fn push(&self, t: T) {
unsafe { self.deque.push(t) }
}
/// Pops data off the front of the work queue, returning `None` on an empty
/// queue.
pub fn pop(&mut self) -> Option<T> {
unsafe { (*self.deque.get()).pop() }
pub fn pop(&self) -> Option<T> {
unsafe { self.deque.pop() }
}
/// Gets access to the buffer pool that this worker is attached to. This can
/// be used to create more deques which share the same buffer pool as this
/// deque.
pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> {
unsafe { &mut (*self.deque.get()).pool }
pub fn pool<'a>(&'a self) -> &'a BufferPool<T> {
&self.deque.pool
}
}
impl<T: Send> Stealer<T> {
/// Steals work off the end of the queue (opposite of the worker's end)
pub fn steal(&mut self) -> Stolen<T> {
unsafe { (*self.deque.get()).steal() }
pub fn steal(&self) -> Stolen<T> {
unsafe { self.deque.steal() }
}
/// Gets access to the buffer pool that this stealer is attached to. This
/// can be used to create more deques which share the same buffer pool as
/// this deque.
pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> {
unsafe { &mut (*self.deque.get()).pool }
pub fn pool<'a>(&'a self) -> &'a BufferPool<T> {
&self.deque.pool
}
}
@ -224,7 +226,7 @@ impl<T: Send> Clone for Stealer<T> {
// personally going to heavily comment what's going on here.
impl<T: Send> Deque<T> {
fn new(mut pool: BufferPool<T>) -> Deque<T> {
fn new(pool: BufferPool<T>) -> Deque<T> {
let buf = pool.alloc(MIN_BITS);
Deque {
bottom: AtomicInt::new(0),
@ -234,7 +236,7 @@ impl<T: Send> Deque<T> {
}
}
unsafe fn push(&mut self, data: T) {
unsafe fn push(&self, data: T) {
let mut b = self.bottom.load(SeqCst);
let t = self.top.load(SeqCst);
let mut a = self.array.load(SeqCst);
@ -250,7 +252,7 @@ impl<T: Send> Deque<T> {
self.bottom.store(b + 1, SeqCst);
}
unsafe fn pop(&mut self) -> Option<T> {
unsafe fn pop(&self) -> Option<T> {
let b = self.bottom.load(SeqCst);
let a = self.array.load(SeqCst);
let b = b - 1;
@ -276,7 +278,7 @@ impl<T: Send> Deque<T> {
}
}
unsafe fn steal(&mut self) -> Stolen<T> {
unsafe fn steal(&self) -> Stolen<T> {
let t = self.top.load(SeqCst);
let old = self.array.load(SeqCst);
let b = self.bottom.load(SeqCst);
@ -298,7 +300,7 @@ impl<T: Send> Deque<T> {
}
}
unsafe fn maybe_shrink(&mut self, b: int, t: int) {
unsafe fn maybe_shrink(&self, b: int, t: int) {
let a = self.array.load(SeqCst);
if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) {
self.swap_buffer(b, a, (*a).resize(b, t, -1));
@ -312,7 +314,7 @@ impl<T: Send> Deque<T> {
// after this method has called 'free' on it. The continued usage is simply
// a read followed by a forget, but we must make sure that the memory can
// continue to be read after we flag this buffer for reclamation.
unsafe fn swap_buffer(&mut self, b: int, old: *mut Buffer<T>,
unsafe fn swap_buffer(&self, b: int, old: *mut Buffer<T>,
buf: Buffer<T>) -> *mut Buffer<T> {
let newbuf: *mut Buffer<T> = transmute(box buf);
self.array.store(newbuf, SeqCst);
@ -373,7 +375,7 @@ impl<T: Send> Buffer<T> {
// Unsafe because this unsafely overwrites possibly uninitialized or
// initialized data.
unsafe fn put(&mut self, i: int, t: T) {
unsafe fn put(&self, i: int, t: T) {
let ptr = self.storage.offset(i & self.mask());
ptr::copy_nonoverlapping_memory(ptr as *mut T, &t as *T, 1);
forget(t);
@ -382,7 +384,7 @@ impl<T: Send> Buffer<T> {
// Again, unsafe because this has incredibly dubious ownership violations.
// It is assumed that this buffer is immediately dropped.
unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer<T> {
let mut buf = Buffer::new(self.log_size + delta);
let buf = Buffer::new(self.log_size + delta);
for i in range(t, b) {
buf.put(i, self.get(i));
}