std: Rebuild spsc with Unsafe/&self

This removes the incorrect usage of `&mut self` in a concurrent setting.
This commit is contained in:
Alex Crichton 2014-05-19 17:32:04 -07:00
parent 2966e970ca
commit fe93c3d47e

View File

@ -40,6 +40,7 @@ use option::{Some, None, Option};
use owned::Box;
use ptr::RawPtr;
use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};
use ty::Unsafe;
// Node within the linked list queue of messages to send
struct Node<T> {
@ -56,13 +57,13 @@ struct Node<T> {
/// time.
pub struct Queue<T> {
// consumer fields
tail: *mut Node<T>, // where to pop from
tail: Unsafe<*mut Node<T>>, // where to pop from
tail_prev: AtomicPtr<Node<T>>, // where to pop from
// producer fields
head: *mut Node<T>, // where to push to
first: *mut Node<T>, // where to get new nodes from
tail_copy: *mut Node<T>, // between first/tail
head: Unsafe<*mut Node<T>>, // where to push to
first: Unsafe<*mut Node<T>>, // where to get new nodes from
tail_copy: Unsafe<*mut Node<T>>, // between first/tail
// Cache maintenance fields. Additions and subtractions are stored
// separately in order to allow them to use nonatomic addition/subtraction.
@ -101,11 +102,11 @@ impl<T: Send> Queue<T> {
let n2 = Node::new();
unsafe { (*n1).next.store(n2, Relaxed) }
Queue {
tail: n2,
tail: Unsafe::new(n2),
tail_prev: AtomicPtr::new(n1),
head: n2,
first: n1,
tail_copy: n1,
head: Unsafe::new(n2),
first: Unsafe::new(n1),
tail_copy: Unsafe::new(n1),
cache_bound: bound,
cache_additions: AtomicUint::new(0),
cache_subtractions: AtomicUint::new(0),
@ -114,7 +115,7 @@ impl<T: Send> Queue<T> {
/// Pushes a new value onto this queue. Note that to use this function
/// safely, it must be externally guaranteed that there is only one pusher.
pub fn push(&mut self, t: T) {
pub fn push(&self, t: T) {
unsafe {
// Acquire a node (which either uses a cached one or allocates a new
// one), and then append this to the 'head' node.
@ -122,35 +123,35 @@ impl<T: Send> Queue<T> {
assert!((*n).value.is_none());
(*n).value = Some(t);
(*n).next.store(0 as *mut Node<T>, Relaxed);
(*self.head).next.store(n, Release);
self.head = n;
(**self.head.get()).next.store(n, Release);
*self.head.get() = n;
}
}
unsafe fn alloc(&mut self) -> *mut Node<T> {
unsafe fn alloc(&self) -> *mut Node<T> {
// First try to see if we can consume the 'first' node for our uses.
// We try to avoid as many atomic instructions as possible here, so
// the addition to cache_subtractions is not atomic (plus we're the
// only one subtracting from the cache).
if self.first != self.tail_copy {
if *self.first.get() != *self.tail_copy.get() {
if self.cache_bound > 0 {
let b = self.cache_subtractions.load(Relaxed);
self.cache_subtractions.store(b + 1, Relaxed);
}
let ret = self.first;
self.first = (*ret).next.load(Relaxed);
let ret = *self.first.get();
*self.first.get() = (*ret).next.load(Relaxed);
return ret;
}
// If the above fails, then update our copy of the tail and try
// again.
self.tail_copy = self.tail_prev.load(Acquire);
if self.first != self.tail_copy {
*self.tail_copy.get() = self.tail_prev.load(Acquire);
if *self.first.get() != *self.tail_copy.get() {
if self.cache_bound > 0 {
let b = self.cache_subtractions.load(Relaxed);
self.cache_subtractions.store(b + 1, Relaxed);
}
let ret = self.first;
self.first = (*ret).next.load(Relaxed);
let ret = *self.first.get();
*self.first.get() = (*ret).next.load(Relaxed);
return ret;
}
// If all of that fails, then we have to allocate a new node
@ -160,19 +161,19 @@ impl<T: Send> Queue<T> {
/// Attempts to pop a value from this queue. Remember that to use this type
/// safely you must ensure that there is only one popper at a time.
pub fn pop(&mut self) -> Option<T> {
pub fn pop(&self) -> Option<T> {
unsafe {
// The `tail` node is not actually a used node, but rather a
// sentinel from where we should start popping from. Hence, look at
// tail's next field and see if we can use it. If we do a pop, then
// the current tail node is a candidate for going into the cache.
let tail = self.tail;
let tail = *self.tail.get();
let next = (*tail).next.load(Acquire);
if next.is_null() { return None }
assert!((*next).value.is_some());
let ret = (*next).value.take();
self.tail = next;
*self.tail.get() = next;
if self.cache_bound == 0 {
self.tail_prev.store(tail, Release);
} else {
@ -197,11 +198,11 @@ impl<T: Send> Queue<T> {
/// Attempts to peek at the head of the queue, returning `None` if the queue
/// has no data currently
pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> {
pub fn peek<'a>(&'a self) -> Option<&'a mut T> {
// This is essentially the same as above with all the popping bits
// stripped out.
unsafe {
let tail = self.tail;
let tail = *self.tail.get();
let next = (*tail).next.load(Acquire);
if next.is_null() { return None }
return (*next).value.as_mut();
@ -213,7 +214,7 @@ impl<T: Send> Queue<T> {
impl<T: Send> Drop for Queue<T> {
fn drop(&mut self) {
unsafe {
let mut cur = self.first;
let mut cur = *self.first.get();
while !cur.is_null() {
let next = (*cur).next.load(Relaxed);
let _n: Box<Node<T>> = mem::transmute(cur);