std: Rewrite the sync module

This commit is a reimplementation of `std::sync` to be based on the
system-provided primitives wherever possible. The previous implementation was
fundamentally built on top of channels, and as part of the runtime reform it has
become clear that this is not the level of abstraction that the standard level
should be providing. This rewrite aims to provide as thin of a shim as possible
on top of the system primitives in order to make them safe.

The overall interface of the `std::sync` module has in general not changed, but
there are a few important distinctions, highlighted below:

* The condition variable type, `Condvar`, has been separated out of a `Mutex`.
  A condition variable is now an entirely separate type. This separation
  benefits users who only use one mutex, and provides a clearer distinction of
  who's responsible for managing condition variables (the application).

* All of `Condvar`, `Mutex`, and `RWLock` are now directly built on top of
  system primitives rather than using a custom implementation. The `Once`,
  `Barrier`, and `Semaphore` types are still built upon these abstractions of
  the system primitives.

* The `Condvar`, `Mutex`, and `RWLock` types all have a new static type and
  constant initializer corresponding to them. These are provided primarily for C
  FFI interoperation, but are often useful to otherwise simply have a global
  lock. The types, however, will leak memory unless `destroy()` is called on
  them, which is clearly documented.

* The `Condvar` implementation for an `RWLock` write lock has been removed. This
  may be added back in the future with a userspace implementation, but this
  commit is focused on exposing the system primitives first.

* The fundamental architecture of this design is to provide two separate layers.
  The first layer is that exposed by `sys_common` which is a cross-platform
  bare-metal abstraction of the system synchronization primitives. No attempt is
  made at making this layer safe, and it is quite unsafe to use! It is currently
  not exported as part of the API of the standard library, but the stabilization
  of the `sys` module will ensure that these will be exposed in time. The
  purpose of this layer is to provide the core cross-platform abstractions if
  necessary to implementors.

  The second layer is the layer provided by `std::sync` which is intended to be
  the thinnest possible layer on top of `sys_common` which is entirely safe to
  use. There are a few concerns which need to be addressed when making these
  system primitives safe:

    * Once used, the OS primitives can never be **moved**. This means that they
      essentially need to have a stable address. The static primitives use
      `&'static self` to enforce this, and the non-static primitives all use a
      `Box` to provide this guarantee.

    * Poisoning is leveraged to ensure that invalid data is not accessible from
      other tasks after one has panicked.

  In addition to these overall blanket safety limitations, each primitive has a
  few restrictions of its own:

    * Mutexes and rwlocks can only be unlocked from the same thread that they
      were locked by. This is achieved through RAII lock guards which cannot be
      sent across threads.

    * Mutexes and rwlocks can only be unlocked if they were previously locked.
      This is achieved by not exposing an unlocking method.

    * A condition variable can only be waited on with a locked mutex. This is
      achieved by requiring a `MutexGuard` in the `wait()` method.

    * A condition variable cannot be used concurrently with more than one mutex.
      This is guaranteed by dynamically binding a condition variable to
      precisely one mutex for its entire lifecycle. This restriction may be able
      to be relaxed in the future (a mutex is unbound when no threads are
      waiting on the condvar), but for now it is sufficient to guarantee safety.

* Condvars now support timeouts for their blocking operations. The
  implementation for these operations is provided by the system.

Due to the modification of the `Condvar` API, removal of the `std::sync::mutex`
API, and reimplementation, this is a breaking change. Most code should be fairly
easy to port using the examples in the documentation of these primitives.

[breaking-change]

Closes #17094
Closes #18003
This commit is contained in:
Alex Crichton 2014-11-24 11:16:40 -08:00
parent 361baabb07
commit 71d4e77db8
29 changed files with 2483 additions and 3051 deletions

View File

@ -132,15 +132,6 @@ impl<T: Send> Queue<T> {
if self.head.load(Acquire) == tail {Empty} else {Inconsistent}
}
}
/// Attempts to pop data from this queue, but doesn't attempt too hard. This
/// will canonicalize inconsistent states to a `None` value.
pub fn casual_pop(&self) -> Option<T> {
match self.pop() {
Data(t) => Some(t),
Empty | Inconsistent => None,
}
}
}
#[unsafe_destructor]

View File

@ -40,7 +40,6 @@ use core::prelude::*;
use alloc::boxed::Box;
use core::mem;
use core::cell::UnsafeCell;
use alloc::arc::Arc;
use sync::atomic::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};
@ -74,39 +73,6 @@ pub struct Queue<T> {
cache_subtractions: AtomicUint,
}
/// A safe abstraction for the consumer in a single-producer single-consumer
/// queue.
pub struct Consumer<T> {
inner: Arc<Queue<T>>
}
impl<T: Send> Consumer<T> {
/// Attempts to pop the value from the head of the queue, returning `None`
/// if the queue is empty.
pub fn pop(&mut self) -> Option<T> {
self.inner.pop()
}
/// Attempts to peek at the head of the queue, returning `None` if the queue
/// is empty.
pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> {
self.inner.peek()
}
}
/// A safe abstraction for the producer in a single-producer single-consumer
/// queue.
pub struct Producer<T> {
inner: Arc<Queue<T>>
}
impl<T: Send> Producer<T> {
/// Pushes a new value onto the queue.
pub fn push(&mut self, t: T) {
self.inner.push(t)
}
}
impl<T: Send> Node<T> {
fn new() -> *mut Node<T> {
unsafe {
@ -118,30 +84,6 @@ impl<T: Send> Node<T> {
}
}
/// Creates a new queue with a consumer-producer pair.
///
/// The producer returned is connected to the consumer to push all data to
/// the consumer.
///
/// # Arguments
///
/// * `bound` - This queue implementation is implemented with a linked
/// list, and this means that a push is always a malloc. In
/// order to amortize this cost, an internal cache of nodes is
/// maintained to prevent a malloc from always being
/// necessary. This bound is the limit on the size of the
/// cache (if desired). If the value is 0, then the cache has
/// no bound. Otherwise, the cache will never grow larger than
/// `bound` (although the queue itself could be much larger.
pub fn queue<T: Send>(bound: uint) -> (Consumer<T>, Producer<T>) {
let q = unsafe { Queue::new(bound) };
let arc = Arc::new(q);
let consumer = Consumer { inner: arc.clone() };
let producer = Producer { inner: arc };
(consumer, producer)
}
impl<T: Send> Queue<T> {
/// Creates a new queue.
///
@ -296,78 +238,88 @@ impl<T: Send> Drop for Queue<T> {
mod test {
use prelude::*;
use super::{queue};
use sync::Arc;
use super::Queue;
#[test]
fn smoke() {
let (mut consumer, mut producer) = queue(0);
producer.push(1i);
producer.push(2);
assert_eq!(consumer.pop(), Some(1i));
assert_eq!(consumer.pop(), Some(2));
assert_eq!(consumer.pop(), None);
producer.push(3);
producer.push(4);
assert_eq!(consumer.pop(), Some(3));
assert_eq!(consumer.pop(), Some(4));
assert_eq!(consumer.pop(), None);
unsafe {
let queue = Queue::new(0);
queue.push(1i);
queue.push(2);
assert_eq!(queue.pop(), Some(1i));
assert_eq!(queue.pop(), Some(2));
assert_eq!(queue.pop(), None);
queue.push(3);
queue.push(4);
assert_eq!(queue.pop(), Some(3));
assert_eq!(queue.pop(), Some(4));
assert_eq!(queue.pop(), None);
}
}
#[test]
fn peek() {
let (mut consumer, mut producer) = queue(0);
producer.push(vec![1i]);
unsafe {
let queue = Queue::new(0);
queue.push(vec![1i]);
// Ensure the borrowchecker works
match consumer.peek() {
Some(vec) => match vec.as_slice() {
// Note that `pop` is not allowed here due to borrow
[1] => {}
_ => return
},
None => unreachable!()
// Ensure the borrowchecker works
match queue.peek() {
Some(vec) => match vec.as_slice() {
// Note that `pop` is not allowed here due to borrow
[1] => {}
_ => return
},
None => unreachable!()
}
queue.pop();
}
consumer.pop();
}
#[test]
fn drop_full() {
let (_, mut producer) = queue(0);
producer.push(box 1i);
producer.push(box 2i);
unsafe {
let q = Queue::new(0);
q.push(box 1i);
q.push(box 2i);
}
}
#[test]
fn smoke_bound() {
let (mut consumer, mut producer) = queue(1);
producer.push(1i);
producer.push(2);
assert_eq!(consumer.pop(), Some(1));
assert_eq!(consumer.pop(), Some(2));
assert_eq!(consumer.pop(), None);
producer.push(3);
producer.push(4);
assert_eq!(consumer.pop(), Some(3));
assert_eq!(consumer.pop(), Some(4));
assert_eq!(consumer.pop(), None);
unsafe {
let q = Queue::new(0);
q.push(1i);
q.push(2);
assert_eq!(q.pop(), Some(1));
assert_eq!(q.pop(), Some(2));
assert_eq!(q.pop(), None);
q.push(3);
q.push(4);
assert_eq!(q.pop(), Some(3));
assert_eq!(q.pop(), Some(4));
assert_eq!(q.pop(), None);
}
}
#[test]
fn stress() {
stress_bound(0);
stress_bound(1);
unsafe {
stress_bound(0);
stress_bound(1);
}
fn stress_bound(bound: uint) {
let (consumer, mut producer) = queue(bound);
unsafe fn stress_bound(bound: uint) {
let q = Arc::new(Queue::new(bound));
let (tx, rx) = channel();
let q2 = q.clone();
spawn(proc() {
// Move the consumer to a local mutable slot
let mut consumer = consumer;
for _ in range(0u, 100000) {
loop {
match consumer.pop() {
match q2.pop() {
Some(1i) => break,
Some(_) => panic!(),
None => {}
@ -377,7 +329,7 @@ mod test {
tx.send(());
});
for _ in range(0i, 100000) {
producer.push(1);
q.push(1);
}
rx.recv();
}

116
src/libstd/sync/barrier.rs Normal file
View File

@ -0,0 +1,116 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use sync::{Mutex, Condvar};
/// A barrier enables multiple tasks to synchronize the beginning
/// of some computation.
///
/// ```rust
/// use std::sync::{Arc, Barrier};
///
/// let barrier = Arc::new(Barrier::new(10));
/// for _ in range(0u, 10) {
/// let c = barrier.clone();
/// // The same messages will be printed together.
/// // You will NOT see any interleaving.
/// spawn(proc() {
/// println!("before wait");
/// c.wait();
/// println!("after wait");
/// });
/// }
/// ```
pub struct Barrier {
lock: Mutex<BarrierState>,
cvar: Condvar,
num_threads: uint,
}
// The inner state of a double barrier
struct BarrierState {
count: uint,
generation_id: uint,
}
impl Barrier {
/// Create a new barrier that can block a given number of threads.
///
/// A barrier will block `n`-1 threads which call `wait` and then wake up
/// all threads at once when the `n`th thread calls `wait`.
pub fn new(n: uint) -> Barrier {
Barrier {
lock: Mutex::new(BarrierState {
count: 0,
generation_id: 0,
}),
cvar: Condvar::new(),
num_threads: n,
}
}
/// Block the current thread until all threads has rendezvoused here.
///
/// Barriers are re-usable after all threads have rendezvoused once, and can
/// be used continuously.
pub fn wait(&self) {
let mut lock = self.lock.lock();
let local_gen = lock.generation_id;
lock.count += 1;
if lock.count < self.num_threads {
// We need a while loop to guard against spurious wakeups.
// http://en.wikipedia.org/wiki/Spurious_wakeup
while local_gen == lock.generation_id &&
lock.count < self.num_threads {
self.cvar.wait(&lock);
}
} else {
lock.count = 0;
lock.generation_id += 1;
self.cvar.notify_all();
}
}
}
#[cfg(test)]
mod tests {
use prelude::*;
use sync::{Arc, Barrier};
use comm::Empty;
#[test]
fn test_barrier() {
let barrier = Arc::new(Barrier::new(10));
let (tx, rx) = channel();
for _ in range(0u, 9) {
let c = barrier.clone();
let tx = tx.clone();
spawn(proc() {
c.wait();
tx.send(true);
});
}
// At this point, all spawned tasks should be blocked,
// so we shouldn't get anything from the port
assert!(match rx.try_recv() {
Err(Empty) => true,
_ => false,
});
barrier.wait();
// Now, the barrier is cleared and we should get data.
for _ in range(0u, 9) {
rx.recv();
}
}
}

358
src/libstd/sync/condvar.rs Normal file
View File

@ -0,0 +1,358 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::*;
use sync::atomic::{mod, AtomicUint};
use sync::{mutex, StaticMutexGuard};
use sys_common::condvar as sys;
use sys_common::mutex as sys_mutex;
use time::Duration;
/// A Condition Variable
///
/// Condition variables represent the ability to block a thread such that it
/// consumes no CPU time while waiting for an event to occur. Condition
/// variables are typically associated with a boolean predicate (a condition)
/// and a mutex. The predicate is always verified inside of the mutex before
/// determining that thread must block.
///
/// Functions in this module will block the current **thread** of execution and
/// are bindings to system-provided condition variables where possible. Note
/// that this module places one additional restriction over the system condition
/// variables: each condvar can be used with precisely one mutex at runtime. Any
/// attempt to use multiple mutexes on the same condition variable will result
/// in a runtime panic. If this is not desired, then the unsafe primitives in
/// `sys` do not have this restriction but may result in undefined behavior.
///
/// # Example
///
/// ```
/// use std::sync::{Arc, Mutex, Condvar};
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = pair.clone();
///
/// // Inside of our lock, spawn a new thread, and then wait for it to start
/// spawn(proc() {
/// let &(ref lock, ref cvar) = &*pair2;
/// let mut started = lock.lock();
/// *started = true;
/// cvar.notify_one();
/// });
///
/// // wait for the thread to start up
/// let &(ref lock, ref cvar) = &*pair;
/// let started = lock.lock();
/// while !*started {
/// cvar.wait(&started);
/// }
/// ```
pub struct Condvar { inner: Box<StaticCondvar> }
/// Statically allocated condition variables.
///
/// This structure is identical to `Condvar` except that it is suitable for use
/// in static initializers for other structures.
///
/// # Example
///
/// ```
/// use std::sync::{StaticCondvar, CONDVAR_INIT};
///
/// static CVAR: StaticCondvar = CONDVAR_INIT;
/// ```
pub struct StaticCondvar {
inner: sys::Condvar,
mutex: AtomicUint,
}
/// Constant initializer for a statically allocated condition variable.
pub const CONDVAR_INIT: StaticCondvar = StaticCondvar {
inner: sys::CONDVAR_INIT,
mutex: atomic::INIT_ATOMIC_UINT,
};
/// A trait for vaules which can be passed to the waiting methods of condition
/// variables. This is implemented by the mutex guards in this module.
///
/// Note that this trait should likely not be implemented manually unless you
/// really know what you're doing.
pub trait AsMutexGuard {
#[allow(missing_docs)]
unsafe fn as_mutex_guard(&self) -> &StaticMutexGuard;
}
impl Condvar {
/// Creates a new condition variable which is ready to be waited on and
/// notified.
pub fn new() -> Condvar {
Condvar {
inner: box StaticCondvar {
inner: unsafe { sys::Condvar::new() },
mutex: AtomicUint::new(0),
}
}
}
/// Block the current thread until this condition variable receives a
/// notification.
///
/// This function will atomically unlock the mutex specified (represented by
/// `guard`) and block the current thread. This means that any calls to
/// `notify_*()` which happen logically after the mutex is unlocked are
/// candidates to wake this thread up. When this function call returns, the
/// lock specified will have been re-acquired.
///
/// Note that this function is susceptible to spurious wakeups. Condition
/// variables normally have a boolean predicate associated with them, and
/// the predicate must always be checked each time this function returns to
/// protect against spurious wakeups.
///
/// # Panics
///
/// This function will `panic!()` if it is used with more than one mutex
/// over time. Each condition variable is dynamically bound to exactly one
/// mutex to ensure defined behavior across platforms. If this functionality
/// is not desired, then unsafe primitives in `sys` are provided.
pub fn wait<T: AsMutexGuard>(&self, mutex_guard: &T) {
unsafe {
let me: &'static Condvar = &*(self as *const _);
me.inner.wait(mutex_guard)
}
}
/// Wait on this condition variable for a notification, timing out after a
/// specified duration.
///
/// The semantics of this function are equivalent to `wait()` except that
/// the thread will be blocked for roughly no longer than `dur`. This method
/// should not be used for precise timing due to anomalies such as
/// preemption or platform differences that may not cause the maximum amount
/// of time waited to be precisely `dur`.
///
/// If the wait timed out, then `false` will be returned. Otherwise if a
/// notification was received then `true` will be returned.
///
/// Like `wait`, the lock specified will be re-acquired when this function
/// returns, regardless of whether the timeout elapsed or not.
pub fn wait_timeout<T: AsMutexGuard>(&self, mutex_guard: &T,
dur: Duration) -> bool {
unsafe {
let me: &'static Condvar = &*(self as *const _);
me.inner.wait_timeout(mutex_guard, dur)
}
}
/// Wake up one blocked thread on this condvar.
///
/// If there is a blocked thread on this condition variable, then it will
/// be woken up from its call to `wait` or `wait_timeout`. Calls to
/// `notify_one` are not buffered in any way.
///
/// To wake up all threads, see `notify_one()`.
pub fn notify_one(&self) { unsafe { self.inner.inner.notify_one() } }
/// Wake up all blocked threads on this condvar.
///
/// This method will ensure that any current waiters on the condition
/// variable are awoken. Calls to `notify_all()` are not buffered in any
/// way.
///
/// To wake up only one thread, see `notify_one()`.
pub fn notify_all(&self) { unsafe { self.inner.inner.notify_all() } }
}
impl Drop for Condvar {
fn drop(&mut self) {
unsafe { self.inner.inner.destroy() }
}
}
impl StaticCondvar {
/// Block the current thread until this condition variable receives a
/// notification.
///
/// See `Condvar::wait`.
pub fn wait<T: AsMutexGuard>(&'static self, mutex_guard: &T) {
unsafe {
let lock = mutex_guard.as_mutex_guard();
let sys = mutex::guard_lock(lock);
self.verify(sys);
self.inner.wait(sys);
(*mutex::guard_poison(lock)).check("mutex");
}
}
/// Wait on this condition variable for a notification, timing out after a
/// specified duration.
///
/// See `Condvar::wait_timeout`.
pub fn wait_timeout<T: AsMutexGuard>(&'static self, mutex_guard: &T,
dur: Duration) -> bool {
unsafe {
let lock = mutex_guard.as_mutex_guard();
let sys = mutex::guard_lock(lock);
self.verify(sys);
let ret = self.inner.wait_timeout(sys, dur);
(*mutex::guard_poison(lock)).check("mutex");
return ret;
}
}
/// Wake up one blocked thread on this condvar.
///
/// See `Condvar::notify_one`.
pub fn notify_one(&'static self) { unsafe { self.inner.notify_one() } }
/// Wake up all blocked threads on this condvar.
///
/// See `Condvar::notify_all`.
pub fn notify_all(&'static self) { unsafe { self.inner.notify_all() } }
/// Deallocate all resources associated with this static condvar.
///
/// This method is unsafe to call as there is no guarantee that there are no
/// active users of the condvar, and this also doesn't prevent any future
/// users of the condvar. This method is required to be called to not leak
/// memory on all platforms.
pub unsafe fn destroy(&'static self) {
self.inner.destroy()
}
fn verify(&self, mutex: &sys_mutex::Mutex) {
let addr = mutex as *const _ as uint;
match self.mutex.compare_and_swap(0, addr, atomic::SeqCst) {
// If we got out 0, then we have successfully bound the mutex to
// this cvar.
0 => {}
// If we get out a value that's the same as `addr`, then someone
// already beat us to the punch.
n if n == addr => {}
// Anything else and we're using more than one mutex on this cvar,
// which is currently disallowed.
_ => panic!("attempted to use a condition variable with two \
mutexes"),
}
}
}
#[cfg(test)]
mod tests {
use prelude::*;
use time::Duration;
use super::{StaticCondvar, CONDVAR_INIT};
use sync::{StaticMutex, MUTEX_INIT, Condvar, Mutex, Arc};
#[test]
fn smoke() {
let c = Condvar::new();
c.notify_one();
c.notify_all();
}
#[test]
fn static_smoke() {
static C: StaticCondvar = CONDVAR_INIT;
C.notify_one();
C.notify_all();
unsafe { C.destroy(); }
}
#[test]
fn notify_one() {
static C: StaticCondvar = CONDVAR_INIT;
static M: StaticMutex = MUTEX_INIT;
let g = M.lock();
spawn(proc() {
let _g = M.lock();
C.notify_one();
});
C.wait(&g);
drop(g);
unsafe { C.destroy(); M.destroy(); }
}
#[test]
fn notify_all() {
const N: uint = 10;
let data = Arc::new((Mutex::new(0), Condvar::new()));
let (tx, rx) = channel();
for _ in range(0, N) {
let data = data.clone();
let tx = tx.clone();
spawn(proc() {
let &(ref lock, ref cond) = &*data;
let mut cnt = lock.lock();
*cnt += 1;
if *cnt == N {
tx.send(());
}
while *cnt != 0 {
cond.wait(&cnt);
}
tx.send(());
});
}
drop(tx);
let &(ref lock, ref cond) = &*data;
rx.recv();
let mut cnt = lock.lock();
*cnt = 0;
cond.notify_all();
drop(cnt);
for _ in range(0, N) {
rx.recv();
}
}
#[test]
fn wait_timeout() {
static C: StaticCondvar = CONDVAR_INIT;
static M: StaticMutex = MUTEX_INIT;
let g = M.lock();
assert!(!C.wait_timeout(&g, Duration::nanoseconds(1000)));
spawn(proc() {
let _g = M.lock();
C.notify_one();
});
assert!(C.wait_timeout(&g, Duration::days(1)));
drop(g);
unsafe { C.destroy(); M.destroy(); }
}
#[test]
#[should_fail]
fn two_mutexes() {
static M1: StaticMutex = MUTEX_INIT;
static M2: StaticMutex = MUTEX_INIT;
static C: StaticCondvar = CONDVAR_INIT;
let g = M1.lock();
spawn(proc() {
let _g = M1.lock();
C.notify_one();
});
C.wait(&g);
drop(g);
C.wait(&M2.lock());
}
}

View File

@ -1,663 +0,0 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! A (mostly) lock-free concurrent work-stealing deque
//!
//! This module contains an implementation of the Chase-Lev work stealing deque
//! described in "Dynamic Circular Work-Stealing Deque". The implementation is
//! heavily based on the pseudocode found in the paper.
//!
//! This implementation does not want to have the restriction of a garbage
//! collector for reclamation of buffers, and instead it uses a shared pool of
//! buffers. This shared pool is required for correctness in this
//! implementation.
//!
//! The only lock-synchronized portions of this deque are the buffer allocation
//! and deallocation portions. Otherwise all operations are lock-free.
//!
//! # Example
//!
//! use std::sync::deque::BufferPool;
//!
//! let mut pool = BufferPool::new();
//! let (mut worker, mut stealer) = pool.deque();
//!
//! // Only the worker may push/pop
//! worker.push(1i);
//! worker.pop();
//!
//! // Stealers take data from the other end of the deque
//! worker.push(1i);
//! stealer.steal();
//!
//! // Stealers can be cloned to have many stealers stealing in parallel
//! worker.push(1i);
//! let mut stealer2 = stealer.clone();
//! stealer2.steal();
#![experimental]
// NB: the "buffer pool" strategy is not done for speed, but rather for
// correctness. For more info, see the comment on `swap_buffer`
// FIXME: all atomic operations in this module use a SeqCst ordering. That is
// probably overkill
pub use self::Stolen::*;
use core::prelude::*;
use alloc::arc::Arc;
use alloc::heap::{allocate, deallocate};
use alloc::boxed::Box;
use vec::Vec;
use core::kinds::marker;
use core::mem::{forget, min_align_of, size_of, transmute};
use core::ptr;
use rustrt::exclusive::Exclusive;
use sync::atomic::{AtomicInt, AtomicPtr, SeqCst};
// Once the queue is less than 1/K full, then it will be downsized. Note that
// the deque requires that this number be less than 2.
static K: int = 4;
// Minimum number of bits that a buffer size should be. No buffer will resize to
// under this value, and all deques will initially contain a buffer of this
// size.
//
// The size in question is 1 << MIN_BITS
static MIN_BITS: uint = 7;
struct Deque<T> {
bottom: AtomicInt,
top: AtomicInt,
array: AtomicPtr<Buffer<T>>,
pool: BufferPool<T>,
}
/// Worker half of the work-stealing deque. This worker has exclusive access to
/// one side of the deque, and uses `push` and `pop` method to manipulate it.
///
/// There may only be one worker per deque.
pub struct Worker<T> {
deque: Arc<Deque<T>>,
_noshare: marker::NoSync,
}
/// The stealing half of the work-stealing deque. Stealers have access to the
/// opposite end of the deque from the worker, and they only have access to the
/// `steal` method.
pub struct Stealer<T> {
deque: Arc<Deque<T>>,
_noshare: marker::NoSync,
}
/// When stealing some data, this is an enumeration of the possible outcomes.
#[deriving(PartialEq, Show)]
pub enum Stolen<T> {
/// The deque was empty at the time of stealing
Empty,
/// The stealer lost the race for stealing data, and a retry may return more
/// data.
Abort,
/// The stealer has successfully stolen some data.
Data(T),
}
/// The allocation pool for buffers used by work-stealing deques. Right now this
/// structure is used for reclamation of memory after it is no longer in use by
/// deques.
///
/// This data structure is protected by a mutex, but it is rarely used. Deques
/// will only use this structure when allocating a new buffer or deallocating a
/// previous one.
pub struct BufferPool<T> {
pool: Arc<Exclusive<Vec<Box<Buffer<T>>>>>,
}
/// An internal buffer used by the chase-lev deque. This structure is actually
/// implemented as a circular buffer, and is used as the intermediate storage of
/// the data in the deque.
///
/// This type is implemented with *T instead of Vec<T> for two reasons:
///
/// 1. There is nothing safe about using this buffer. This easily allows the
/// same value to be read twice in to rust, and there is nothing to
/// prevent this. The usage by the deque must ensure that one of the
/// values is forgotten. Furthermore, we only ever want to manually run
/// destructors for values in this buffer (on drop) because the bounds
/// are defined by the deque it's owned by.
///
/// 2. We can certainly avoid bounds checks using *T instead of Vec<T>, although
/// LLVM is probably pretty good at doing this already.
struct Buffer<T> {
storage: *const T,
log_size: uint,
}
impl<T: Send> BufferPool<T> {
/// Allocates a new buffer pool which in turn can be used to allocate new
/// deques.
pub fn new() -> BufferPool<T> {
BufferPool { pool: Arc::new(Exclusive::new(Vec::new())) }
}
/// Allocates a new work-stealing deque which will send/receiving memory to
/// and from this buffer pool.
pub fn deque(&self) -> (Worker<T>, Stealer<T>) {
let a = Arc::new(Deque::new(self.clone()));
let b = a.clone();
(Worker { deque: a, _noshare: marker::NoSync },
Stealer { deque: b, _noshare: marker::NoSync })
}
fn alloc(&mut self, bits: uint) -> Box<Buffer<T>> {
unsafe {
let mut pool = self.pool.lock();
match pool.iter().position(|x| x.size() >= (1 << bits)) {
Some(i) => pool.remove(i).unwrap(),
None => box Buffer::new(bits)
}
}
}
fn free(&self, buf: Box<Buffer<T>>) {
unsafe {
let mut pool = self.pool.lock();
match pool.iter().position(|v| v.size() > buf.size()) {
Some(i) => pool.insert(i, buf),
None => pool.push(buf),
}
}
}
}
impl<T: Send> Clone for BufferPool<T> {
fn clone(&self) -> BufferPool<T> { BufferPool { pool: self.pool.clone() } }
}
impl<T: Send> Worker<T> {
/// Pushes data onto the front of this work queue.
pub fn push(&self, t: T) {
unsafe { self.deque.push(t) }
}
/// Pops data off the front of the work queue, returning `None` on an empty
/// queue.
pub fn pop(&self) -> Option<T> {
unsafe { self.deque.pop() }
}
/// Gets access to the buffer pool that this worker is attached to. This can
/// be used to create more deques which share the same buffer pool as this
/// deque.
pub fn pool<'a>(&'a self) -> &'a BufferPool<T> {
&self.deque.pool
}
}
impl<T: Send> Stealer<T> {
/// Steals work off the end of the queue (opposite of the worker's end)
pub fn steal(&self) -> Stolen<T> {
unsafe { self.deque.steal() }
}
/// Gets access to the buffer pool that this stealer is attached to. This
/// can be used to create more deques which share the same buffer pool as
/// this deque.
pub fn pool<'a>(&'a self) -> &'a BufferPool<T> {
&self.deque.pool
}
}
impl<T: Send> Clone for Stealer<T> {
fn clone(&self) -> Stealer<T> {
Stealer { deque: self.deque.clone(), _noshare: marker::NoSync }
}
}
// Almost all of this code can be found directly in the paper so I'm not
// personally going to heavily comment what's going on here.
impl<T: Send> Deque<T> {
fn new(mut pool: BufferPool<T>) -> Deque<T> {
let buf = pool.alloc(MIN_BITS);
Deque {
bottom: AtomicInt::new(0),
top: AtomicInt::new(0),
array: AtomicPtr::new(unsafe { transmute(buf) }),
pool: pool,
}
}
unsafe fn push(&self, data: T) {
let mut b = self.bottom.load(SeqCst);
let t = self.top.load(SeqCst);
let mut a = self.array.load(SeqCst);
let size = b - t;
if size >= (*a).size() - 1 {
// You won't find this code in the chase-lev deque paper. This is
// alluded to in a small footnote, however. We always free a buffer
// when growing in order to prevent leaks.
a = self.swap_buffer(b, a, (*a).resize(b, t, 1));
b = self.bottom.load(SeqCst);
}
(*a).put(b, data);
self.bottom.store(b + 1, SeqCst);
}
unsafe fn pop(&self) -> Option<T> {
let b = self.bottom.load(SeqCst);
let a = self.array.load(SeqCst);
let b = b - 1;
self.bottom.store(b, SeqCst);
let t = self.top.load(SeqCst);
let size = b - t;
if size < 0 {
self.bottom.store(t, SeqCst);
return None;
}
let data = (*a).get(b);
if size > 0 {
self.maybe_shrink(b, t);
return Some(data);
}
if self.top.compare_and_swap(t, t + 1, SeqCst) == t {
self.bottom.store(t + 1, SeqCst);
return Some(data);
} else {
self.bottom.store(t + 1, SeqCst);
forget(data); // someone else stole this value
return None;
}
}
unsafe fn steal(&self) -> Stolen<T> {
let t = self.top.load(SeqCst);
let old = self.array.load(SeqCst);
let b = self.bottom.load(SeqCst);
let a = self.array.load(SeqCst);
let size = b - t;
if size <= 0 { return Empty }
if size % (*a).size() == 0 {
if a == old && t == self.top.load(SeqCst) {
return Empty
}
return Abort
}
let data = (*a).get(t);
if self.top.compare_and_swap(t, t + 1, SeqCst) == t {
Data(data)
} else {
forget(data); // someone else stole this value
Abort
}
}
unsafe fn maybe_shrink(&self, b: int, t: int) {
let a = self.array.load(SeqCst);
if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) {
self.swap_buffer(b, a, (*a).resize(b, t, -1));
}
}
// Helper routine not mentioned in the paper which is used in growing and
// shrinking buffers to swap in a new buffer into place. As a bit of a
// recap, the whole point that we need a buffer pool rather than just
// calling malloc/free directly is that stealers can continue using buffers
// after this method has called 'free' on it. The continued usage is simply
// a read followed by a forget, but we must make sure that the memory can
// continue to be read after we flag this buffer for reclamation.
unsafe fn swap_buffer(&self, b: int, old: *mut Buffer<T>,
buf: Buffer<T>) -> *mut Buffer<T> {
let newbuf: *mut Buffer<T> = transmute(box buf);
self.array.store(newbuf, SeqCst);
let ss = (*newbuf).size();
self.bottom.store(b + ss, SeqCst);
let t = self.top.load(SeqCst);
if self.top.compare_and_swap(t, t + ss, SeqCst) != t {
self.bottom.store(b, SeqCst);
}
self.pool.free(transmute(old));
return newbuf;
}
}
#[unsafe_destructor]
impl<T: Send> Drop for Deque<T> {
fn drop(&mut self) {
let t = self.top.load(SeqCst);
let b = self.bottom.load(SeqCst);
let a = self.array.load(SeqCst);
// Free whatever is leftover in the dequeue, and then move the buffer
// back into the pool.
for i in range(t, b) {
let _: T = unsafe { (*a).get(i) };
}
self.pool.free(unsafe { transmute(a) });
}
}
#[inline]
fn buffer_alloc_size<T>(log_size: uint) -> uint {
(1 << log_size) * size_of::<T>()
}
impl<T: Send> Buffer<T> {
unsafe fn new(log_size: uint) -> Buffer<T> {
let size = buffer_alloc_size::<T>(log_size);
let buffer = allocate(size, min_align_of::<T>());
if buffer.is_null() { ::alloc::oom() }
Buffer {
storage: buffer as *const T,
log_size: log_size,
}
}
fn size(&self) -> int { 1 << self.log_size }
// Apparently LLVM cannot optimize (foo % (1 << bar)) into this implicitly
fn mask(&self) -> int { (1 << self.log_size) - 1 }
unsafe fn elem(&self, i: int) -> *const T {
self.storage.offset(i & self.mask())
}
// This does not protect against loading duplicate values of the same cell,
// nor does this clear out the contents contained within. Hence, this is a
// very unsafe method which the caller needs to treat specially in case a
// race is lost.
unsafe fn get(&self, i: int) -> T {
ptr::read(self.elem(i))
}
// Unsafe because this unsafely overwrites possibly uninitialized or
// initialized data.
unsafe fn put(&self, i: int, t: T) {
ptr::write(self.elem(i) as *mut T, t);
}
// Again, unsafe because this has incredibly dubious ownership violations.
// It is assumed that this buffer is immediately dropped.
unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer<T> {
// NB: not entirely obvious, but thanks to 2's complement,
// casting delta to uint and then adding gives the desired
// effect.
let buf = Buffer::new(self.log_size + delta as uint);
for i in range(t, b) {
buf.put(i, self.get(i));
}
return buf;
}
}
#[unsafe_destructor]
impl<T: Send> Drop for Buffer<T> {
fn drop(&mut self) {
// It is assumed that all buffers are empty on drop.
let size = buffer_alloc_size::<T>(self.log_size);
unsafe { deallocate(self.storage as *mut u8, size, min_align_of::<T>()) }
}
}
#[cfg(test)]
mod tests {
use prelude::*;
use super::{Data, BufferPool, Abort, Empty, Worker, Stealer};
use mem;
use rustrt::thread::Thread;
use rand;
use rand::Rng;
use sync::atomic::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst,
AtomicUint, INIT_ATOMIC_UINT};
use vec;
#[test]
fn smoke() {
let pool = BufferPool::new();
let (w, s) = pool.deque();
assert_eq!(w.pop(), None);
assert_eq!(s.steal(), Empty);
w.push(1i);
assert_eq!(w.pop(), Some(1));
w.push(1);
assert_eq!(s.steal(), Data(1));
w.push(1);
assert_eq!(s.clone().steal(), Data(1));
}
#[test]
fn stealpush() {
static AMT: int = 100000;
let pool = BufferPool::<int>::new();
let (w, s) = pool.deque();
let t = Thread::start(proc() {
let mut left = AMT;
while left > 0 {
match s.steal() {
Data(i) => {
assert_eq!(i, 1);
left -= 1;
}
Abort | Empty => {}
}
}
});
for _ in range(0, AMT) {
w.push(1);
}
t.join();
}
#[test]
fn stealpush_large() {
static AMT: int = 100000;
let pool = BufferPool::<(int, int)>::new();
let (w, s) = pool.deque();
let t = Thread::start(proc() {
let mut left = AMT;
while left > 0 {
match s.steal() {
Data((1, 10)) => { left -= 1; }
Data(..) => panic!(),
Abort | Empty => {}
}
}
});
for _ in range(0, AMT) {
w.push((1, 10));
}
t.join();
}
fn stampede(w: Worker<Box<int>>, s: Stealer<Box<int>>,
nthreads: int, amt: uint) {
for _ in range(0, amt) {
w.push(box 20);
}
let mut remaining = AtomicUint::new(amt);
let unsafe_remaining: *mut AtomicUint = &mut remaining;
let threads = range(0, nthreads).map(|_| {
let s = s.clone();
Thread::start(proc() {
unsafe {
while (*unsafe_remaining).load(SeqCst) > 0 {
match s.steal() {
Data(box 20) => {
(*unsafe_remaining).fetch_sub(1, SeqCst);
}
Data(..) => panic!(),
Abort | Empty => {}
}
}
}
})
}).collect::<Vec<Thread<()>>>();
while remaining.load(SeqCst) > 0 {
match w.pop() {
Some(box 20) => { remaining.fetch_sub(1, SeqCst); }
Some(..) => panic!(),
None => {}
}
}
for thread in threads.into_iter() {
thread.join();
}
}
#[test]
fn run_stampede() {
let pool = BufferPool::<Box<int>>::new();
let (w, s) = pool.deque();
stampede(w, s, 8, 10000);
}
#[test]
fn many_stampede() {
static AMT: uint = 4;
let pool = BufferPool::<Box<int>>::new();
let threads = range(0, AMT).map(|_| {
let (w, s) = pool.deque();
Thread::start(proc() {
stampede(w, s, 4, 10000);
})
}).collect::<Vec<Thread<()>>>();
for thread in threads.into_iter() {
thread.join();
}
}
#[test]
fn stress() {
static AMT: int = 100000;
static NTHREADS: int = 8;
static DONE: AtomicBool = INIT_ATOMIC_BOOL;
static HITS: AtomicUint = INIT_ATOMIC_UINT;
let pool = BufferPool::<int>::new();
let (w, s) = pool.deque();
let threads = range(0, NTHREADS).map(|_| {
let s = s.clone();
Thread::start(proc() {
loop {
match s.steal() {
Data(2) => { HITS.fetch_add(1, SeqCst); }
Data(..) => panic!(),
_ if DONE.load(SeqCst) => break,
_ => {}
}
}
})
}).collect::<Vec<Thread<()>>>();
let mut rng = rand::task_rng();
let mut expected = 0;
while expected < AMT {
if rng.gen_range(0i, 3) == 2 {
match w.pop() {
None => {}
Some(2) => { HITS.fetch_add(1, SeqCst); },
Some(_) => panic!(),
}
} else {
expected += 1;
w.push(2);
}
}
while HITS.load(SeqCst) < AMT as uint {
match w.pop() {
None => {}
Some(2) => { HITS.fetch_add(1, SeqCst); },
Some(_) => panic!(),
}
}
DONE.store(true, SeqCst);
for thread in threads.into_iter() {
thread.join();
}
assert_eq!(HITS.load(SeqCst), expected as uint);
}
#[test]
#[cfg_attr(windows, ignore)] // apparently windows scheduling is weird?
fn no_starvation() {
static AMT: int = 10000;
static NTHREADS: int = 4;
static DONE: AtomicBool = INIT_ATOMIC_BOOL;
let pool = BufferPool::<(int, uint)>::new();
let (w, s) = pool.deque();
let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| {
let s = s.clone();
let unique_box = box AtomicUint::new(0);
let thread_box = unsafe {
*mem::transmute::<&Box<AtomicUint>,
*const *mut AtomicUint>(&unique_box)
};
(Thread::start(proc() {
unsafe {
loop {
match s.steal() {
Data((1, 2)) => {
(*thread_box).fetch_add(1, SeqCst);
}
Data(..) => panic!(),
_ if DONE.load(SeqCst) => break,
_ => {}
}
}
}
}), unique_box)
}));
let mut rng = rand::task_rng();
let mut myhit = false;
'outer: loop {
for _ in range(0, rng.gen_range(0, AMT)) {
if !myhit && rng.gen_range(0i, 3) == 2 {
match w.pop() {
None => {}
Some((1, 2)) => myhit = true,
Some(_) => panic!(),
}
} else {
w.push((1, 2));
}
}
for slot in hits.iter() {
let amt = slot.load(SeqCst);
if amt == 0 { continue 'outer; }
}
if myhit {
break
}
}
DONE.store(true, SeqCst);
for thread in threads.into_iter() {
thread.join();
}
}
}

View File

@ -148,7 +148,7 @@ mod test {
use prelude::*;
use sync::Future;
use task;
use comm::{channel, Sender};
use comm::channel;
#[test]
fn test_from_value() {

View File

@ -1,805 +0,0 @@
// Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Wrappers for safe, shared, mutable memory between tasks
//!
//! The wrappers in this module build on the primitives from `sync::raw` to
//! provide safe interfaces around using the primitive locks. These primitives
//! implement a technique called "poisoning" where when a task panicked with a
//! held lock, all future attempts to use the lock will panic.
//!
//! For example, if two tasks are contending on a mutex and one of them panics
//! after grabbing the lock, the second task will immediately panic because the
//! lock is now poisoned.
use core::prelude::*;
use self::Inner::*;
use core::cell::UnsafeCell;
use rustrt::local::Local;
use rustrt::task::Task;
use super::raw;
// Poisoning helpers
struct PoisonOnFail<'a> {
flag: &'a mut bool,
failed: bool,
}
fn failing() -> bool {
Local::borrow(None::<Task>).unwinder.unwinding()
}
impl<'a> PoisonOnFail<'a> {
fn check(flag: bool, name: &str) {
if flag {
panic!("Poisoned {} - another task failed inside!", name);
}
}
fn new<'a>(flag: &'a mut bool, name: &str) -> PoisonOnFail<'a> {
PoisonOnFail::check(*flag, name);
PoisonOnFail {
flag: flag,
failed: failing()
}
}
}
#[unsafe_destructor]
impl<'a> Drop for PoisonOnFail<'a> {
fn drop(&mut self) {
if !self.failed && failing() {
*self.flag = true;
}
}
}
// Condvar
enum Inner<'a> {
InnerMutex(raw::MutexGuard<'a>),
InnerRWLock(raw::RWLockWriteGuard<'a>),
}
impl<'b> Inner<'b> {
fn cond<'a>(&'a self) -> &'a raw::Condvar<'b> {
match *self {
InnerMutex(ref m) => &m.cond,
InnerRWLock(ref m) => &m.cond,
}
}
}
/// A condition variable, a mechanism for unlock-and-descheduling and
/// signaling, for use with the lock types.
pub struct Condvar<'a> {
name: &'static str,
// n.b. Inner must be after PoisonOnFail because we must set the poison flag
// *inside* the mutex, and struct fields are destroyed top-to-bottom
// (destroy the lock guard last).
poison: PoisonOnFail<'a>,
inner: Inner<'a>,
}
impl<'a> Condvar<'a> {
/// Atomically exit the associated lock and block until a signal is sent.
///
/// wait() is equivalent to wait_on(0).
///
/// # Panics
///
/// A task which is killed while waiting on a condition variable will wake
/// up, panic, and unlock the associated lock as it unwinds.
#[inline]
pub fn wait(&self) { self.wait_on(0) }
/// Atomically exit the associated lock and block on a specified condvar
/// until a signal is sent on that same condvar.
///
/// The associated lock must have been initialised with an appropriate
/// number of condvars. The condvar_id must be between 0 and num_condvars-1
/// or else this call will fail.
#[inline]
pub fn wait_on(&self, condvar_id: uint) {
assert!(!*self.poison.flag);
self.inner.cond().wait_on(condvar_id);
// This is why we need to wrap sync::condvar.
PoisonOnFail::check(*self.poison.flag, self.name);
}
/// Wake up a blocked task. Returns false if there was no blocked task.
#[inline]
pub fn signal(&self) -> bool { self.signal_on(0) }
/// Wake up a blocked task on a specified condvar (as
/// sync::cond.signal_on). Returns false if there was no blocked task.
#[inline]
pub fn signal_on(&self, condvar_id: uint) -> bool {
assert!(!*self.poison.flag);
self.inner.cond().signal_on(condvar_id)
}
/// Wake up all blocked tasks. Returns the number of tasks woken.
#[inline]
pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
/// Wake up all blocked tasks on a specified condvar (as
/// sync::cond.broadcast_on). Returns the number of tasks woken.
#[inline]
pub fn broadcast_on(&self, condvar_id: uint) -> uint {
assert!(!*self.poison.flag);
self.inner.cond().broadcast_on(condvar_id)
}
}
/// A wrapper type which provides synchronized access to the underlying data, of
/// type `T`. A mutex always provides exclusive access, and concurrent requests
/// will block while the mutex is already locked.
///
/// # Example
///
/// ```
/// use std::sync::{Mutex, Arc};
///
/// let mutex = Arc::new(Mutex::new(1i));
/// let mutex2 = mutex.clone();
///
/// spawn(proc() {
/// let mut val = mutex2.lock();
/// *val += 1;
/// val.cond.signal();
/// });
///
/// let value = mutex.lock();
/// while *value != 2 {
/// value.cond.wait();
/// }
/// ```
pub struct Mutex<T> {
lock: raw::Mutex,
failed: UnsafeCell<bool>,
data: UnsafeCell<T>,
}
/// An guard which is created by locking a mutex. Through this guard the
/// underlying data can be accessed.
pub struct MutexGuard<'a, T:'a> {
// FIXME #12808: strange name to try to avoid interfering with
// field accesses of the contained type via Deref
_data: &'a mut T,
/// Inner condition variable connected to the locked mutex that this guard
/// was created from. This can be used for atomic-unlock-and-deschedule.
pub cond: Condvar<'a>,
}
impl<T: Send> Mutex<T> {
/// Creates a new mutex to protect the user-supplied data.
pub fn new(user_data: T) -> Mutex<T> {
Mutex::new_with_condvars(user_data, 1)
}
/// Create a new mutex, with a specified number of associated condvars.
///
/// This will allow calling wait_on/signal_on/broadcast_on with condvar IDs
/// between 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be
/// allowed but any operations on the condvar will fail.)
pub fn new_with_condvars(user_data: T, num_condvars: uint) -> Mutex<T> {
Mutex {
lock: raw::Mutex::new_with_condvars(num_condvars),
failed: UnsafeCell::new(false),
data: UnsafeCell::new(user_data),
}
}
/// Access the underlying mutable data with mutual exclusion from other
/// tasks. The returned value is an RAII guard which will unlock the mutex
/// when dropped. All concurrent tasks attempting to lock the mutex will
/// block while the returned value is still alive.
///
/// # Panics
///
/// Panicking while inside the Mutex will unlock the Mutex while unwinding, so
/// that other tasks won't block forever. It will also poison the Mutex:
/// any tasks that subsequently try to access it (including those already
/// blocked on the mutex) will also panic immediately.
#[inline]
pub fn lock<'a>(&'a self) -> MutexGuard<'a, T> {
let guard = self.lock.lock();
// These two accesses are safe because we're guaranteed at this point
// that we have exclusive access to this mutex. We are indeed able to
// promote ourselves from &Mutex to `&mut T`
let poison = unsafe { &mut *self.failed.get() };
let data = unsafe { &mut *self.data.get() };
MutexGuard {
_data: data,
cond: Condvar {
name: "Mutex",
poison: PoisonOnFail::new(poison, "Mutex"),
inner: InnerMutex(guard),
},
}
}
}
impl<'a, T: Send> Deref<T> for MutexGuard<'a, T> {
fn deref<'a>(&'a self) -> &'a T { &*self._data }
}
impl<'a, T: Send> DerefMut<T> for MutexGuard<'a, T> {
fn deref_mut<'a>(&'a mut self) -> &'a mut T { &mut *self._data }
}
/// A dual-mode reader-writer lock. The data can be accessed mutably or
/// immutably, and immutably-accessing tasks may run concurrently.
///
/// # Example
///
/// ```
/// use std::sync::{RWLock, Arc};
///
/// let lock1 = Arc::new(RWLock::new(1i));
/// let lock2 = lock1.clone();
///
/// spawn(proc() {
/// let mut val = lock2.write();
/// *val = 3;
/// let val = val.downgrade();
/// println!("{}", *val);
/// });
///
/// let val = lock1.read();
/// println!("{}", *val);
/// ```
pub struct RWLock<T> {
lock: raw::RWLock,
failed: UnsafeCell<bool>,
data: UnsafeCell<T>,
}
/// A guard which is created by locking an rwlock in write mode. Through this
/// guard the underlying data can be accessed.
pub struct RWLockWriteGuard<'a, T:'a> {
// FIXME #12808: strange name to try to avoid interfering with
// field accesses of the contained type via Deref
_data: &'a mut T,
/// Inner condition variable that can be used to sleep on the write mode of
/// this rwlock.
pub cond: Condvar<'a>,
}
/// A guard which is created by locking an rwlock in read mode. Through this
/// guard the underlying data can be accessed.
pub struct RWLockReadGuard<'a, T:'a> {
// FIXME #12808: strange names to try to avoid interfering with
// field accesses of the contained type via Deref
_data: &'a T,
_guard: raw::RWLockReadGuard<'a>,
}
impl<T: Send + Sync> RWLock<T> {
/// Create a reader/writer lock with the supplied data.
pub fn new(user_data: T) -> RWLock<T> {
RWLock::new_with_condvars(user_data, 1)
}
/// Create a reader/writer lock with the supplied data and a specified number
/// of condvars (as sync::RWLock::new_with_condvars).
pub fn new_with_condvars(user_data: T, num_condvars: uint) -> RWLock<T> {
RWLock {
lock: raw::RWLock::new_with_condvars(num_condvars),
failed: UnsafeCell::new(false),
data: UnsafeCell::new(user_data),
}
}
/// Access the underlying data mutably. Locks the rwlock in write mode;
/// other readers and writers will block.
///
/// # Panics
///
/// Panicking while inside the lock will unlock the lock while unwinding, so
/// that other tasks won't block forever. As Mutex.lock, it will also poison
/// the lock, so subsequent readers and writers will both also panic.
#[inline]
pub fn write<'a>(&'a self) -> RWLockWriteGuard<'a, T> {
let guard = self.lock.write();
// These two accesses are safe because we're guaranteed at this point
// that we have exclusive access to this rwlock. We are indeed able to
// promote ourselves from &RWLock to `&mut T`
let poison = unsafe { &mut *self.failed.get() };
let data = unsafe { &mut *self.data.get() };
RWLockWriteGuard {
_data: data,
cond: Condvar {
name: "RWLock",
poison: PoisonOnFail::new(poison, "RWLock"),
inner: InnerRWLock(guard),
},
}
}
/// Access the underlying data immutably. May run concurrently with other
/// reading tasks.
///
/// # Panics
///
/// Panicking will unlock the lock while unwinding. However, unlike all other
/// access modes, this will not poison the lock.
pub fn read<'a>(&'a self) -> RWLockReadGuard<'a, T> {
let guard = self.lock.read();
PoisonOnFail::check(unsafe { *self.failed.get() }, "RWLock");
RWLockReadGuard {
_guard: guard,
_data: unsafe { &*self.data.get() },
}
}
}
impl<'a, T: Send + Sync> RWLockWriteGuard<'a, T> {
/// Consumes this write lock token, returning a new read lock token.
///
/// This will allow pending readers to come into the lock.
pub fn downgrade(self) -> RWLockReadGuard<'a, T> {
let RWLockWriteGuard { _data, cond } = self;
// convert the data to read-only explicitly
let data = &*_data;
let guard = match cond.inner {
InnerMutex(..) => unreachable!(),
InnerRWLock(guard) => guard.downgrade()
};
RWLockReadGuard { _guard: guard, _data: data }
}
}
impl<'a, T: Send + Sync> Deref<T> for RWLockReadGuard<'a, T> {
fn deref<'a>(&'a self) -> &'a T { self._data }
}
impl<'a, T: Send + Sync> Deref<T> for RWLockWriteGuard<'a, T> {
fn deref<'a>(&'a self) -> &'a T { &*self._data }
}
impl<'a, T: Send + Sync> DerefMut<T> for RWLockWriteGuard<'a, T> {
fn deref_mut<'a>(&'a mut self) -> &'a mut T { &mut *self._data }
}
/// A barrier enables multiple tasks to synchronize the beginning
/// of some computation.
///
/// ```rust
/// use std::sync::{Arc, Barrier};
///
/// let barrier = Arc::new(Barrier::new(10));
/// for _ in range(0u, 10) {
/// let c = barrier.clone();
/// // The same messages will be printed together.
/// // You will NOT see any interleaving.
/// spawn(proc() {
/// println!("before wait");
/// c.wait();
/// println!("after wait");
/// });
/// }
/// ```
pub struct Barrier {
lock: Mutex<BarrierState>,
num_tasks: uint,
}
// The inner state of a double barrier
struct BarrierState {
count: uint,
generation_id: uint,
}
impl Barrier {
/// Create a new barrier that can block a given number of tasks.
pub fn new(num_tasks: uint) -> Barrier {
Barrier {
lock: Mutex::new(BarrierState {
count: 0,
generation_id: 0,
}),
num_tasks: num_tasks,
}
}
/// Block the current task until a certain number of tasks is waiting.
pub fn wait(&self) {
let mut lock = self.lock.lock();
let local_gen = lock.generation_id;
lock.count += 1;
if lock.count < self.num_tasks {
// We need a while loop to guard against spurious wakeups.
// http://en.wikipedia.org/wiki/Spurious_wakeup
while local_gen == lock.generation_id &&
lock.count < self.num_tasks {
lock.cond.wait();
}
} else {
lock.count = 0;
lock.generation_id += 1;
lock.cond.broadcast();
}
}
}
#[cfg(test)]
mod tests {
use prelude::*;
use comm::Empty;
use task;
use task::try_future;
use sync::Arc;
use super::{Mutex, Barrier, RWLock};
#[test]
fn test_mutex_arc_condvar() {
let arc = Arc::new(Mutex::new(false));
let arc2 = arc.clone();
let (tx, rx) = channel();
task::spawn(proc() {
// wait until parent gets in
rx.recv();
let mut lock = arc2.lock();
*lock = true;
lock.cond.signal();
});
let lock = arc.lock();
tx.send(());
assert!(!*lock);
while !*lock {
lock.cond.wait();
}
}
#[test] #[should_fail]
fn test_arc_condvar_poison() {
let arc = Arc::new(Mutex::new(1i));
let arc2 = arc.clone();
let (tx, rx) = channel();
spawn(proc() {
rx.recv();
let lock = arc2.lock();
lock.cond.signal();
// Parent should fail when it wakes up.
panic!();
});
let lock = arc.lock();
tx.send(());
while *lock == 1 {
lock.cond.wait();
}
}
#[test] #[should_fail]
fn test_mutex_arc_poison() {
let arc = Arc::new(Mutex::new(1i));
let arc2 = arc.clone();
let _ = task::try(proc() {
let lock = arc2.lock();
assert_eq!(*lock, 2);
});
let lock = arc.lock();
assert_eq!(*lock, 1);
}
#[test]
fn test_mutex_arc_nested() {
// Tests nested mutexes and access
// to underlying data.
let arc = Arc::new(Mutex::new(1i));
let arc2 = Arc::new(Mutex::new(arc));
task::spawn(proc() {
let lock = arc2.lock();
let lock2 = lock.deref().lock();
assert_eq!(*lock2, 1);
});
}
#[test]
fn test_mutex_arc_access_in_unwind() {
let arc = Arc::new(Mutex::new(1i));
let arc2 = arc.clone();
let _ = task::try::<()>(proc() {
struct Unwinder {
i: Arc<Mutex<int>>,
}
impl Drop for Unwinder {
fn drop(&mut self) {
let mut lock = self.i.lock();
*lock += 1;
}
}
let _u = Unwinder { i: arc2 };
panic!();
});
let lock = arc.lock();
assert_eq!(*lock, 2);
}
#[test] #[should_fail]
fn test_rw_arc_poison_wr() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try(proc() {
let lock = arc2.write();
assert_eq!(*lock, 2);
});
let lock = arc.read();
assert_eq!(*lock, 1);
}
#[test] #[should_fail]
fn test_rw_arc_poison_ww() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try(proc() {
let lock = arc2.write();
assert_eq!(*lock, 2);
});
let lock = arc.write();
assert_eq!(*lock, 1);
}
#[test]
fn test_rw_arc_no_poison_rr() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try(proc() {
let lock = arc2.read();
assert_eq!(*lock, 2);
});
let lock = arc.read();
assert_eq!(*lock, 1);
}
#[test]
fn test_rw_arc_no_poison_rw() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try(proc() {
let lock = arc2.read();
assert_eq!(*lock, 2);
});
let lock = arc.write();
assert_eq!(*lock, 1);
}
#[test]
fn test_rw_arc_no_poison_dr() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try(proc() {
let lock = arc2.write().downgrade();
assert_eq!(*lock, 2);
});
let lock = arc.write();
assert_eq!(*lock, 1);
}
#[test]
fn test_rw_arc() {
let arc = Arc::new(RWLock::new(0i));
let arc2 = arc.clone();
let (tx, rx) = channel();
task::spawn(proc() {
let mut lock = arc2.write();
for _ in range(0u, 10) {
let tmp = *lock;
*lock = -1;
task::deschedule();
*lock = tmp + 1;
}
tx.send(());
});
// Readers try to catch the writer in the act
let mut children = Vec::new();
for _ in range(0u, 5) {
let arc3 = arc.clone();
children.push(try_future(proc() {
let lock = arc3.read();
assert!(*lock >= 0);
}));
}
// Wait for children to pass their asserts
for r in children.iter_mut() {
assert!(r.get_ref().is_ok());
}
// Wait for writer to finish
rx.recv();
let lock = arc.read();
assert_eq!(*lock, 10);
}
#[test]
fn test_rw_arc_access_in_unwind() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try::<()>(proc() {
struct Unwinder {
i: Arc<RWLock<int>>,
}
impl Drop for Unwinder {
fn drop(&mut self) {
let mut lock = self.i.write();
*lock += 1;
}
}
let _u = Unwinder { i: arc2 };
panic!();
});
let lock = arc.read();
assert_eq!(*lock, 2);
}
#[test]
fn test_rw_downgrade() {
// (1) A downgrader gets in write mode and does cond.wait.
// (2) A writer gets in write mode, sets state to 42, and does signal.
// (3) Downgrader wakes, sets state to 31337.
// (4) tells writer and all other readers to contend as it downgrades.
// (5) Writer attempts to set state back to 42, while downgraded task
// and all reader tasks assert that it's 31337.
let arc = Arc::new(RWLock::new(0i));
// Reader tasks
let mut reader_convos = Vec::new();
for _ in range(0u, 10) {
let ((tx1, rx1), (tx2, rx2)) = (channel(), channel());
reader_convos.push((tx1, rx2));
let arcn = arc.clone();
task::spawn(proc() {
rx1.recv(); // wait for downgrader to give go-ahead
let lock = arcn.read();
assert_eq!(*lock, 31337);
tx2.send(());
});
}
// Writer task
let arc2 = arc.clone();
let ((tx1, rx1), (tx2, rx2)) = (channel(), channel());
task::spawn(proc() {
rx1.recv();
{
let mut lock = arc2.write();
assert_eq!(*lock, 0);
*lock = 42;
lock.cond.signal();
}
rx1.recv();
{
let mut lock = arc2.write();
// This shouldn't happen until after the downgrade read
// section, and all other readers, finish.
assert_eq!(*lock, 31337);
*lock = 42;
}
tx2.send(());
});
// Downgrader (us)
let mut lock = arc.write();
tx1.send(()); // send to another writer who will wake us up
while *lock == 0 {
lock.cond.wait();
}
assert_eq!(*lock, 42);
*lock = 31337;
// send to other readers
for &(ref mut rc, _) in reader_convos.iter_mut() {
rc.send(())
}
let lock = lock.downgrade();
// complete handshake with other readers
for &(_, ref mut rp) in reader_convos.iter_mut() {
rp.recv()
}
tx1.send(()); // tell writer to try again
assert_eq!(*lock, 31337);
drop(lock);
rx2.recv(); // complete handshake with writer
}
#[cfg(test)]
fn test_rw_write_cond_downgrade_read_race_helper() {
// Tests that when a downgrader hands off the "reader cloud" lock
// because of a contending reader, a writer can't race to get it
// instead, which would result in readers_and_writers. This tests
// the raw module rather than this one, but it's here because an
// rwarc gives us extra shared state to help check for the race.
let x = Arc::new(RWLock::new(true));
let (tx, rx) = channel();
// writer task
let xw = x.clone();
task::spawn(proc() {
let mut lock = xw.write();
tx.send(()); // tell downgrader it's ok to go
lock.cond.wait();
// The core of the test is here: the condvar reacquire path
// must involve order_lock, so that it cannot race with a reader
// trying to receive the "reader cloud lock hand-off".
*lock = false;
});
rx.recv(); // wait for writer to get in
let lock = x.write();
assert!(*lock);
// make writer contend in the cond-reacquire path
lock.cond.signal();
// make a reader task to trigger the "reader cloud lock" handoff
let xr = x.clone();
let (tx, rx) = channel();
task::spawn(proc() {
tx.send(());
drop(xr.read());
});
rx.recv(); // wait for reader task to exist
let lock = lock.downgrade();
// if writer mistakenly got in, make sure it mutates state
// before we assert on it
for _ in range(0u, 5) { task::deschedule(); }
// make sure writer didn't get in.
assert!(*lock);
}
#[test]
fn test_rw_write_cond_downgrade_read_race() {
// Ideally the above test case would have deschedule statements in it
// that helped to expose the race nearly 100% of the time... but adding
// deschedules in the intuitively-right locations made it even less
// likely, and I wasn't sure why :( . This is a mediocre "next best"
// option.
for _ in range(0u, 8) {
test_rw_write_cond_downgrade_read_race_helper();
}
}
#[test]
fn test_barrier() {
let barrier = Arc::new(Barrier::new(10));
let (tx, rx) = channel();
for _ in range(0u, 9) {
let c = barrier.clone();
let tx = tx.clone();
spawn(proc() {
c.wait();
tx.send(true);
});
}
// At this point, all spawned tasks should be blocked,
// so we shouldn't get anything from the port
assert!(match rx.try_recv() {
Err(Empty) => true,
_ => false,
});
barrier.wait();
// Now, the barrier is cleared and we should get data.
for _ in range(0u, 9) {
rx.recv();
}
}
}

View File

@ -17,41 +17,27 @@
#![experimental]
pub use self::one::{Once, ONCE_INIT};
pub use alloc::arc::{Arc, Weak};
pub use self::lock::{Mutex, MutexGuard, Condvar, Barrier,
RWLock, RWLockReadGuard, RWLockWriteGuard};
// The mutex/rwlock in this module are not meant for reexport
pub use self::raw::{Semaphore, SemaphoreGuard};
pub use self::mutex::{Mutex, MutexGuard, StaticMutex, StaticMutexGuard, MUTEX_INIT};
pub use self::rwlock::{RWLock, StaticRWLock, RWLOCK_INIT};
pub use self::rwlock::{RWLockReadGuard, RWLockWriteGuard};
pub use self::rwlock::{StaticRWLockReadGuard, StaticRWLockWriteGuard};
pub use self::condvar::{Condvar, StaticCondvar, CONDVAR_INIT, AsMutexGuard};
pub use self::once::{Once, ONCE_INIT};
pub use self::semaphore::{Semaphore, SemaphoreGuard};
pub use self::barrier::Barrier;
pub use self::future::Future;
pub use self::task_pool::TaskPool;
// Core building blocks for all primitives in this crate
#[stable]
pub mod atomic;
// Concurrent data structures
pub mod spsc_queue;
pub mod mpsc_queue;
pub mod mpmc_bounded_queue;
pub mod deque;
// Low-level concurrency primitives
mod raw;
mod mutex;
mod one;
// Higher level primitives based on those above
mod lock;
// Task management
mod barrier;
mod condvar;
mod future;
mod mutex;
mod once;
mod poison;
mod rwlock;
mod semaphore;
mod task_pool;

View File

@ -1,219 +0,0 @@
/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
* OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* The views and conclusions contained in the software and documentation are
* those of the authors and should not be interpreted as representing official
* policies, either expressed or implied, of Dmitry Vyukov.
*/
#![experimental]
#![allow(missing_docs, dead_code)]
// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
use core::prelude::*;
use alloc::arc::Arc;
use vec::Vec;
use core::num::UnsignedInt;
use core::cell::UnsafeCell;
use sync::atomic::{AtomicUint,Relaxed,Release,Acquire};
struct Node<T> {
sequence: AtomicUint,
value: Option<T>,
}
struct State<T> {
pad0: [u8, ..64],
buffer: Vec<UnsafeCell<Node<T>>>,
mask: uint,
pad1: [u8, ..64],
enqueue_pos: AtomicUint,
pad2: [u8, ..64],
dequeue_pos: AtomicUint,
pad3: [u8, ..64],
}
pub struct Queue<T> {
state: Arc<State<T>>,
}
impl<T: Send> State<T> {
fn with_capacity(capacity: uint) -> State<T> {
let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 {
if capacity < 2 {
2u
} else {
// use next power of 2 as capacity
capacity.next_power_of_two()
}
} else {
capacity
};
let buffer = Vec::from_fn(capacity, |i| {
UnsafeCell::new(Node { sequence:AtomicUint::new(i), value: None })
});
State{
pad0: [0, ..64],
buffer: buffer,
mask: capacity-1,
pad1: [0, ..64],
enqueue_pos: AtomicUint::new(0),
pad2: [0, ..64],
dequeue_pos: AtomicUint::new(0),
pad3: [0, ..64],
}
}
fn push(&self, value: T) -> bool {
let mask = self.mask;
let mut pos = self.enqueue_pos.load(Relaxed);
loop {
let node = &self.buffer[pos & mask];
let seq = unsafe { (*node.get()).sequence.load(Acquire) };
let diff: int = seq as int - pos as int;
if diff == 0 {
let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed);
if enqueue_pos == pos {
unsafe {
(*node.get()).value = Some(value);
(*node.get()).sequence.store(pos+1, Release);
}
break
} else {
pos = enqueue_pos;
}
} else if diff < 0 {
return false
} else {
pos = self.enqueue_pos.load(Relaxed);
}
}
true
}
fn pop(&self) -> Option<T> {
let mask = self.mask;
let mut pos = self.dequeue_pos.load(Relaxed);
loop {
let node = &self.buffer[pos & mask];
let seq = unsafe { (*node.get()).sequence.load(Acquire) };
let diff: int = seq as int - (pos + 1) as int;
if diff == 0 {
let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed);
if dequeue_pos == pos {
unsafe {
let value = (*node.get()).value.take();
(*node.get()).sequence.store(pos + mask + 1, Release);
return value
}
} else {
pos = dequeue_pos;
}
} else if diff < 0 {
return None
} else {
pos = self.dequeue_pos.load(Relaxed);
}
}
}
}
impl<T: Send> Queue<T> {
pub fn with_capacity(capacity: uint) -> Queue<T> {
Queue{
state: Arc::new(State::with_capacity(capacity))
}
}
pub fn push(&self, value: T) -> bool {
self.state.push(value)
}
pub fn pop(&self) -> Option<T> {
self.state.pop()
}
}
impl<T: Send> Clone for Queue<T> {
fn clone(&self) -> Queue<T> {
Queue { state: self.state.clone() }
}
}
#[cfg(test)]
mod tests {
use prelude::*;
use super::Queue;
#[test]
fn test() {
let nthreads = 8u;
let nmsgs = 1000u;
let q = Queue::with_capacity(nthreads*nmsgs);
assert_eq!(None, q.pop());
let (tx, rx) = channel();
for _ in range(0, nthreads) {
let q = q.clone();
let tx = tx.clone();
spawn(proc() {
let q = q;
for i in range(0, nmsgs) {
assert!(q.push(i));
}
tx.send(());
});
}
let mut completion_rxs = vec![];
for _ in range(0, nthreads) {
let (tx, rx) = channel();
completion_rxs.push(rx);
let q = q.clone();
spawn(proc() {
let q = q;
let mut i = 0u;
loop {
match q.pop() {
None => {},
Some(_) => {
i += 1;
if i == nmsgs { break }
}
}
}
tx.send(i);
});
}
for rx in completion_rxs.iter_mut() {
assert_eq!(nmsgs, rx.recv());
}
for _ in range(0, nthreads) {
rx.recv();
}
}
}

View File

@ -8,43 +8,68 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! A simple native mutex implementation. Warning: this API is likely
//! to change soon.
use prelude::*;
#![allow(dead_code)]
use core::prelude::*;
use alloc::boxed::Box;
use rustrt::mutex;
pub const LOCKED: uint = 1 << 0;
pub const BLOCKED: uint = 1 << 1;
use cell::UnsafeCell;
use kinds::marker;
use sync::{poison, AsMutexGuard};
use sys_common::mutex as sys;
/// A mutual exclusion primitive useful for protecting shared data
///
/// This mutex will properly block tasks waiting for the lock to become
/// available. The mutex can also be statically initialized or created via a
/// `new` constructor.
/// This mutex will block threads waiting for the lock to become available. The
/// mutex can also be statically initialized or created via a `new`
/// constructor. Each mutex has a type parameter which represents the data that
/// it is protecting. The data can only be accessed through the RAII guards
/// returned from `lock` and `try_lock`, which guarantees that the data is only
/// ever accessed when the mutex is locked.
///
/// # Poisoning
///
/// In order to prevent access to otherwise invalid data, each mutex will
/// propagate any panics which occur while the lock is held. Once a thread has
/// panicked while holding the lock, then all other threads will immediately
/// panic as well once they hold the lock.
///
/// # Example
///
/// ```rust,ignore
/// use std::sync::mutex::Mutex;
/// ```rust
/// use std::sync::{Arc, Mutex};
/// const N: uint = 10;
///
/// let m = Mutex::new();
/// let guard = m.lock();
/// // do some work
/// drop(guard); // unlock the lock
/// // Spawn a few threads to increment a shared variable (non-atomically), and
/// // let the main thread know once all increments are done.
/// //
/// // Here we're using an Arc to share memory among tasks, and the data inside
/// // the Arc is protected with a mutex.
/// let data = Arc::new(Mutex::new(0));
///
/// let (tx, rx) = channel();
/// for _ in range(0, 10) {
/// let (data, tx) = (data.clone(), tx.clone());
/// spawn(proc() {
/// // The shared static can only be accessed once the lock is held.
/// // Our non-atomic increment is safe because we're the only thread
/// // which can access the shared state when the lock is held.
/// let mut data = data.lock();
/// *data += 1;
/// if *data == N {
/// tx.send(());
/// }
/// // the lock is unlocked here when `data` goes out of scope.
/// });
/// }
///
/// rx.recv();
/// ```
pub struct Mutex {
pub struct Mutex<T> {
// Note that this static mutex is in a *box*, not inlined into the struct
// itself. This is done for memory safety reasons with the usage of a
// StaticNativeMutex inside the static mutex above. Once a native mutex has
// been used once, its address can never change (it can't be moved). This
// mutex type can be safely moved at any time, so to ensure that the native
// mutex is used correctly we box the inner lock to give it a constant
// address.
lock: Box<StaticMutex>,
// itself. Once a native mutex has been used once, its address can never
// change (it can't be moved). This mutex type can be safely moved at any
// time, so to ensure that the native mutex is used correctly we box the
// inner lock to give it a constant address.
inner: Box<StaticMutex>,
data: UnsafeCell<T>,
}
/// The static mutex type is provided to allow for static allocation of mutexes.
@ -57,8 +82,8 @@ pub struct Mutex {
///
/// # Example
///
/// ```rust,ignore
/// use std::sync::mutex::{StaticMutex, MUTEX_INIT};
/// ```rust
/// use std::sync::{StaticMutex, MUTEX_INIT};
///
/// static LOCK: StaticMutex = MUTEX_INIT;
///
@ -69,35 +94,113 @@ pub struct Mutex {
/// // lock is unlocked here.
/// ```
pub struct StaticMutex {
lock: mutex::StaticNativeMutex,
lock: sys::Mutex,
poison: UnsafeCell<poison::Flag>,
}
/// An RAII implementation of a "scoped lock" of a mutex. When this structure is
/// dropped (falls out of scope), the lock will be unlocked.
///
/// The data protected by the mutex can be access through this guard via its
/// Deref and DerefMut implementations
#[must_use]
pub struct Guard<'a> {
guard: mutex::LockGuard<'a>,
pub struct MutexGuard<'a, T: 'a> {
// funny underscores due to how Deref/DerefMut currently work (they
// disregard field privacy).
__lock: &'a Mutex<T>,
__guard: StaticMutexGuard,
}
fn lift_guard(guard: mutex::LockGuard) -> Guard {
Guard { guard: guard }
/// An RAII implementation of a "scoped lock" of a static mutex. When this
/// structure is dropped (falls out of scope), the lock will be unlocked.
#[must_use]
pub struct StaticMutexGuard {
lock: &'static sys::Mutex,
marker: marker::NoSend,
poison: poison::Guard<'static>,
}
/// Static initialization of a mutex. This constant can be used to initialize
/// other mutex constants.
pub const MUTEX_INIT: StaticMutex = StaticMutex {
lock: mutex::NATIVE_MUTEX_INIT
lock: sys::MUTEX_INIT,
poison: UnsafeCell { value: poison::Flag { failed: false } },
};
impl StaticMutex {
/// Attempts to grab this lock, see `Mutex::try_lock`
pub fn try_lock<'a>(&'a self) -> Option<Guard<'a>> {
unsafe { self.lock.trylock().map(lift_guard) }
impl<T: Send> Mutex<T> {
/// Creates a new mutex in an unlocked state ready for use.
pub fn new(t: T) -> Mutex<T> {
Mutex {
inner: box MUTEX_INIT,
data: UnsafeCell::new(t),
}
}
/// Acquires a mutex, blocking the current task until it is able to do so.
///
/// This function will block the local task until it is available to acquire
/// the mutex. Upon returning, the task is the only task with the mutex
/// held. An RAII guard is returned to allow scoped unlock of the lock. When
/// the guard goes out of scope, the mutex will be unlocked.
///
/// # Panics
///
/// If another user of this mutex panicked while holding the mutex, then
/// this call will immediately panic once the mutex is acquired.
pub fn lock(&self) -> MutexGuard<T> {
unsafe {
let lock: &'static StaticMutex = &*(&*self.inner as *const _);
MutexGuard::new(self, lock.lock())
}
}
/// Attempts to acquire this lock.
///
/// If the lock could not be acquired at this time, then `None` is returned.
/// Otherwise, an RAII guard is returned. The lock will be unlocked when the
/// guard is dropped.
///
/// This function does not block.
///
/// # Panics
///
/// If another user of this mutex panicked while holding the mutex, then
/// this call will immediately panic if the mutex would otherwise be
/// acquired.
pub fn try_lock(&self) -> Option<MutexGuard<T>> {
unsafe {
let lock: &'static StaticMutex = &*(&*self.inner as *const _);
lock.try_lock().map(|guard| {
MutexGuard::new(self, guard)
})
}
}
}
#[unsafe_destructor]
impl<T: Send> Drop for Mutex<T> {
fn drop(&mut self) {
// This is actually safe b/c we know that there is no further usage of
// this mutex (it's up to the user to arrange for a mutex to get
// dropped, that's not our job)
unsafe { self.inner.lock.destroy() }
}
}
impl StaticMutex {
/// Acquires this lock, see `Mutex::lock`
pub fn lock<'a>(&'a self) -> Guard<'a> {
lift_guard(unsafe { self.lock.lock() })
pub fn lock(&'static self) -> StaticMutexGuard {
unsafe { self.lock.lock() }
StaticMutexGuard::new(self)
}
/// Attempts to grab this lock, see `Mutex::try_lock`
pub fn try_lock(&'static self) -> Option<StaticMutexGuard> {
if unsafe { self.lock.try_lock() } {
Some(StaticMutexGuard::new(self))
} else {
None
}
}
/// Deallocates resources associated with this static mutex.
@ -110,58 +213,73 @@ impl StaticMutex {
/// *all* platforms. It may be the case that some platforms do not leak
/// memory if this method is not called, but this is not guaranteed to be
/// true on all platforms.
pub unsafe fn destroy(&self) {
pub unsafe fn destroy(&'static self) {
self.lock.destroy()
}
}
impl Mutex {
/// Creates a new mutex in an unlocked state ready for use.
pub fn new() -> Mutex {
Mutex {
lock: box StaticMutex {
lock: unsafe { mutex::StaticNativeMutex::new() },
}
}
impl<'mutex, T> MutexGuard<'mutex, T> {
fn new(lock: &Mutex<T>, guard: StaticMutexGuard) -> MutexGuard<T> {
MutexGuard { __lock: lock, __guard: guard }
}
/// Attempts to acquire this lock.
///
/// If the lock could not be acquired at this time, then `None` is returned.
/// Otherwise, an RAII guard is returned. The lock will be unlocked when the
/// guard is dropped.
///
/// This function does not block.
pub fn try_lock<'a>(&'a self) -> Option<Guard<'a>> {
self.lock.try_lock()
}
/// Acquires a mutex, blocking the current task until it is able to do so.
///
/// This function will block the local task until it is available to acquire
/// the mutex. Upon returning, the task is the only task with the mutex
/// held. An RAII guard is returned to allow scoped unlock of the lock. When
/// the guard goes out of scope, the mutex will be unlocked.
pub fn lock<'a>(&'a self) -> Guard<'a> { self.lock.lock() }
}
impl Drop for Mutex {
impl<'mutex, T> AsMutexGuard for MutexGuard<'mutex, T> {
unsafe fn as_mutex_guard(&self) -> &StaticMutexGuard { &self.__guard }
}
impl<'mutex, T> Deref<T> for MutexGuard<'mutex, T> {
fn deref<'a>(&'a self) -> &'a T { unsafe { &*self.__lock.data.get() } }
}
impl<'mutex, T> DerefMut<T> for MutexGuard<'mutex, T> {
fn deref_mut<'a>(&'a mut self) -> &'a mut T {
unsafe { &mut *self.__lock.data.get() }
}
}
impl StaticMutexGuard {
fn new(lock: &'static StaticMutex) -> StaticMutexGuard {
unsafe {
let guard = StaticMutexGuard {
lock: &lock.lock,
marker: marker::NoSend,
poison: (*lock.poison.get()).borrow(),
};
guard.poison.check("mutex");
return guard;
}
}
}
pub fn guard_lock(guard: &StaticMutexGuard) -> &sys::Mutex { guard.lock }
pub fn guard_poison(guard: &StaticMutexGuard) -> &poison::Guard {
&guard.poison
}
impl AsMutexGuard for StaticMutexGuard {
unsafe fn as_mutex_guard(&self) -> &StaticMutexGuard { self }
}
#[unsafe_destructor]
impl Drop for StaticMutexGuard {
fn drop(&mut self) {
// This is actually safe b/c we know that there is no further usage of
// this mutex (it's up to the user to arrange for a mutex to get
// dropped, that's not our job)
unsafe { self.lock.destroy() }
unsafe {
self.poison.done();
self.lock.unlock();
}
}
}
#[cfg(test)]
mod test {
use prelude::*;
use super::{Mutex, StaticMutex, MUTEX_INIT};
use task;
use sync::{Arc, Mutex, StaticMutex, MUTEX_INIT, Condvar};
#[test]
fn smoke() {
let m = Mutex::new();
let m = Mutex::new(());
drop(m.lock());
drop(m.lock());
}
@ -211,8 +329,104 @@ mod test {
}
#[test]
fn trylock() {
let m = Mutex::new();
fn try_lock() {
let m = Mutex::new(());
assert!(m.try_lock().is_some());
}
#[test]
fn test_mutex_arc_condvar() {
let arc = Arc::new((Mutex::new(false), Condvar::new()));
let arc2 = arc.clone();
let (tx, rx) = channel();
spawn(proc() {
// wait until parent gets in
rx.recv();
let &(ref lock, ref cvar) = &*arc2;
let mut lock = lock.lock();
*lock = true;
cvar.notify_one();
});
let &(ref lock, ref cvar) = &*arc;
let lock = lock.lock();
tx.send(());
assert!(!*lock);
while !*lock {
cvar.wait(&lock);
}
}
#[test]
#[should_fail]
fn test_arc_condvar_poison() {
let arc = Arc::new((Mutex::new(1i), Condvar::new()));
let arc2 = arc.clone();
let (tx, rx) = channel();
spawn(proc() {
rx.recv();
let &(ref lock, ref cvar) = &*arc2;
let _g = lock.lock();
cvar.notify_one();
// Parent should fail when it wakes up.
panic!();
});
let &(ref lock, ref cvar) = &*arc;
let lock = lock.lock();
tx.send(());
while *lock == 1 {
cvar.wait(&lock);
}
}
#[test]
#[should_fail]
fn test_mutex_arc_poison() {
let arc = Arc::new(Mutex::new(1i));
let arc2 = arc.clone();
let _ = task::try(proc() {
let lock = arc2.lock();
assert_eq!(*lock, 2);
});
let lock = arc.lock();
assert_eq!(*lock, 1);
}
#[test]
fn test_mutex_arc_nested() {
// Tests nested mutexes and access
// to underlying data.
let arc = Arc::new(Mutex::new(1i));
let arc2 = Arc::new(Mutex::new(arc));
let (tx, rx) = channel();
spawn(proc() {
let lock = arc2.lock();
let lock2 = lock.deref().lock();
assert_eq!(*lock2, 1);
tx.send(());
});
rx.recv();
}
#[test]
fn test_mutex_arc_access_in_unwind() {
let arc = Arc::new(Mutex::new(1i));
let arc2 = arc.clone();
let _ = task::try::<()>(proc() {
struct Unwinder {
i: Arc<Mutex<int>>,
}
impl Drop for Unwinder {
fn drop(&mut self) {
*self.i.lock() += 1;
}
}
let _u = Unwinder { i: arc2 };
panic!();
});
let lock = arc.lock();
assert_eq!(*lock, 2);
}
}

View File

@ -13,12 +13,10 @@
//! This primitive is meant to be used to run one-time initialization. An
//! example use case would be for initializing an FFI library.
use core::prelude::*;
use core::int;
use core::atomic;
use super::mutex::{StaticMutex, MUTEX_INIT};
use int;
use mem::drop;
use sync::atomic;
use sync::{StaticMutex, MUTEX_INIT};
/// A synchronization primitive which can be used to run a one-time global
/// initialization. Useful for one-time initialization for FFI or related
@ -27,8 +25,8 @@ use super::mutex::{StaticMutex, MUTEX_INIT};
///
/// # Example
///
/// ```rust,ignore
/// use std::sync::one::{Once, ONCE_INIT};
/// ```rust
/// use std::sync::{Once, ONCE_INIT};
///
/// static START: Once = ONCE_INIT;
///
@ -59,7 +57,7 @@ impl Once {
///
/// When this function returns, it is guaranteed that some initialization
/// has run and completed (it may not be the closure specified).
pub fn doit(&self, f: ||) {
pub fn doit(&'static self, f: ||) {
// Optimize common path: load is much cheaper than fetch_add.
if self.cnt.load(atomic::SeqCst) < 0 {
return
@ -121,6 +119,7 @@ impl Once {
#[cfg(test)]
mod test {
use prelude::*;
use task;
use super::{ONCE_INIT, Once};

48
src/libstd/sync/poison.rs Normal file
View File

@ -0,0 +1,48 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use option::None;
use rustrt::task::Task;
use rustrt::local::Local;
pub struct Flag { pub failed: bool }
impl Flag {
pub fn borrow(&mut self) -> Guard {
Guard { flag: &mut self.failed, failing: failing() }
}
}
pub struct Guard<'a> {
flag: &'a mut bool,
failing: bool,
}
impl<'a> Guard<'a> {
pub fn check(&self, name: &str) {
if *self.flag {
panic!("poisoned {} - another task failed inside", name);
}
}
pub fn done(&mut self) {
if !self.failing && failing() {
*self.flag = true;
}
}
}
fn failing() -> bool {
if Local::exists(None::<Task>) {
Local::borrow(None::<Task>).unwinder.unwinding()
} else {
false
}
}

File diff suppressed because it is too large Load Diff

514
src/libstd/sync/rwlock.rs Normal file
View File

@ -0,0 +1,514 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::*;
use kinds::marker;
use cell::UnsafeCell;
use sys_common::rwlock as sys;
use sync::poison;
/// A reader-writer lock
///
/// This type of lock allows a number of readers or at most one writer at any
/// point in time. The write portion of this lock typically allows modification
/// of the underlying data (exclusive access) and the read portion of this lock
/// typically allows for read-only access (shared access).
///
/// The type parameter `T` represents the data that this lock protects. It is
/// required that `T` satisfies `Send` to be shared across tasks and `Sync` to
/// allow concurrent access through readers. The RAII guards returned from the
/// locking methods implement `Deref` (and `DerefMut` for the `write` methods)
/// to allow access to the contained of the lock.
///
/// RWLocks, like Mutexes, will become poisoned on panics. Note, however, that
/// an RWLock may only be poisoned if a panic occurs while it is locked
/// exclusively (write mode). If a panic occurs in any reader, then the lock
/// will not be poisoned.
///
/// # Example
///
/// ```
/// use std::sync::RWLock;
///
/// let lock = RWLock::new(5i);
///
/// // many reader locks can be held at once
/// {
/// let r1 = lock.read();
/// let r2 = lock.read();
/// assert_eq!(*r1, 5);
/// assert_eq!(*r2, 5);
/// } // read locks are dropped at this point
///
/// // only one write lock may be held, however
/// {
/// let mut w = lock.write();
/// *w += 1;
/// assert_eq!(*w, 6);
/// } // write lock is dropped here
/// ```
pub struct RWLock<T> {
inner: Box<StaticRWLock>,
data: UnsafeCell<T>,
}
/// Structure representing a staticaly allocated RWLock.
///
/// This structure is intended to be used inside of a `static` and will provide
/// automatic global access as well as lazy initialization. The internal
/// resources of this RWLock, however, must be manually deallocated.
///
/// # Example
///
/// ```
/// use std::sync::{StaticRWLock, RWLOCK_INIT};
///
/// static LOCK: StaticRWLock = RWLOCK_INIT;
///
/// {
/// let _g = LOCK.read();
/// // ... shared read access
/// }
/// {
/// let _g = LOCK.write();
/// // ... exclusive write access
/// }
/// unsafe { LOCK.destroy() } // free all resources
/// ```
pub struct StaticRWLock {
inner: sys::RWLock,
poison: UnsafeCell<poison::Flag>,
}
/// Constant initialization for a statically-initialized rwlock.
pub const RWLOCK_INIT: StaticRWLock = StaticRWLock {
inner: sys::RWLOCK_INIT,
poison: UnsafeCell { value: poison::Flag { failed: false } },
};
/// RAII structure used to release the shared read access of a lock when
/// dropped.
#[must_use]
pub struct RWLockReadGuard<'a, T: 'a> {
__lock: &'a RWLock<T>,
__guard: StaticRWLockReadGuard,
}
/// RAII structure used to release the exclusive write access of a lock when
/// dropped.
#[must_use]
pub struct RWLockWriteGuard<'a, T: 'a> {
__lock: &'a RWLock<T>,
__guard: StaticRWLockWriteGuard,
}
/// RAII structure used to release the shared read access of a lock when
/// dropped.
#[must_use]
pub struct StaticRWLockReadGuard {
lock: &'static sys::RWLock,
marker: marker::NoSend,
}
/// RAII structure used to release the exclusive write access of a lock when
/// dropped.
#[must_use]
pub struct StaticRWLockWriteGuard {
lock: &'static sys::RWLock,
marker: marker::NoSend,
poison: poison::Guard<'static>,
}
impl<T: Send + Sync> RWLock<T> {
/// Creates a new instance of an RWLock which is unlocked and read to go.
pub fn new(t: T) -> RWLock<T> {
RWLock { inner: box RWLOCK_INIT, data: UnsafeCell::new(t) }
}
/// Locks this rwlock with shared read access, blocking the current thread
/// until it can be acquired.
///
/// The calling thread will be blocked until there are no more writers which
/// hold the lock. There may be other readers currently inside the lock when
/// this method returns. This method does not provide any guarantees with
/// respect to the ordering of whether contentious readers or writers will
/// acquire the lock first.
///
/// Returns an RAII guard which will release this thread's shared access
/// once it is dropped.
///
/// # Panics
///
/// This function will panic if the RWLock is poisoned. An RWLock is
/// poisoned whenever a writer panics while holding an exclusive lock. The
/// panic will occur immediately after the lock has been acquired.
#[inline]
pub fn read(&self) -> RWLockReadGuard<T> {
unsafe {
let lock: &'static StaticRWLock = &*(&*self.inner as *const _);
RWLockReadGuard::new(self, lock.read())
}
}
/// Attempt to acquire this lock with shared read access.
///
/// This function will never block and will return immediately if `read`
/// would otherwise succeed. Returns `Some` of an RAII guard which will
/// release the shared access of this thread when dropped, or `None` if the
/// access could not be granted. This method does not provide any
/// guarantees with respect to the ordering of whether contentious readers
/// or writers will acquire the lock first.
///
/// # Panics
///
/// This function will panic if the RWLock is poisoned. An RWLock is
/// poisoned whenever a writer panics while holding an exclusive lock. A
/// panic will only occur if the lock is acquired.
#[inline]
pub fn try_read(&self) -> Option<RWLockReadGuard<T>> {
unsafe {
let lock: &'static StaticRWLock = &*(&*self.inner as *const _);
lock.try_read().map(|guard| {
RWLockReadGuard::new(self, guard)
})
}
}
/// Lock this rwlock with exclusive write access, blocking the current
/// thread until it can be acquired.
///
/// This function will not return while other writers or other readers
/// currently have access to the lock.
///
/// Returns an RAII guard which will drop the write access of this rwlock
/// when dropped.
///
/// # Panics
///
/// This function will panic if the RWLock is poisoned. An RWLock is
/// poisoned whenever a writer panics while holding an exclusive lock. The
/// panic will occur when the lock is acquired.
#[inline]
pub fn write(&self) -> RWLockWriteGuard<T> {
unsafe {
let lock: &'static StaticRWLock = &*(&*self.inner as *const _);
RWLockWriteGuard::new(self, lock.write())
}
}
/// Attempt to lock this rwlock with exclusive write access.
///
/// This function does not ever block, and it will return `None` if a call
/// to `write` would otherwise block. If successful, an RAII guard is
/// returned.
///
/// # Panics
///
/// This function will panic if the RWLock is poisoned. An RWLock is
/// poisoned whenever a writer panics while holding an exclusive lock. A
/// panic will only occur if the lock is acquired.
#[inline]
pub fn try_write(&self) -> Option<RWLockWriteGuard<T>> {
unsafe {
let lock: &'static StaticRWLock = &*(&*self.inner as *const _);
lock.try_write().map(|guard| {
RWLockWriteGuard::new(self, guard)
})
}
}
}
#[unsafe_destructor]
impl<T> Drop for RWLock<T> {
fn drop(&mut self) {
unsafe { self.inner.inner.destroy() }
}
}
impl StaticRWLock {
/// Locks this rwlock with shared read access, blocking the current thread
/// until it can be acquired.
///
/// See `RWLock::read`.
#[inline]
pub fn read(&'static self) -> StaticRWLockReadGuard {
unsafe { self.inner.read() }
StaticRWLockReadGuard::new(self)
}
/// Attempt to acquire this lock with shared read access.
///
/// See `RWLock::try_read`.
#[inline]
pub fn try_read(&'static self) -> Option<StaticRWLockReadGuard> {
if unsafe { self.inner.try_read() } {
Some(StaticRWLockReadGuard::new(self))
} else {
None
}
}
/// Lock this rwlock with exclusive write access, blocking the current
/// thread until it can be acquired.
///
/// See `RWLock::write`.
#[inline]
pub fn write(&'static self) -> StaticRWLockWriteGuard {
unsafe { self.inner.write() }
StaticRWLockWriteGuard::new(self)
}
/// Attempt to lock this rwlock with exclusive write access.
///
/// See `RWLock::try_write`.
#[inline]
pub fn try_write(&'static self) -> Option<StaticRWLockWriteGuard> {
if unsafe { self.inner.try_write() } {
Some(StaticRWLockWriteGuard::new(self))
} else {
None
}
}
/// Deallocate all resources associated with this static lock.
///
/// This method is unsafe to call as there is no guarantee that there are no
/// active users of the lock, and this also doesn't prevent any future users
/// of this lock. This method is required to be called to not leak memory on
/// all platforms.
pub unsafe fn destroy(&'static self) {
self.inner.destroy()
}
}
impl<'rwlock, T> RWLockReadGuard<'rwlock, T> {
fn new(lock: &RWLock<T>, guard: StaticRWLockReadGuard)
-> RWLockReadGuard<T> {
RWLockReadGuard { __lock: lock, __guard: guard }
}
}
impl<'rwlock, T> RWLockWriteGuard<'rwlock, T> {
fn new(lock: &RWLock<T>, guard: StaticRWLockWriteGuard)
-> RWLockWriteGuard<T> {
RWLockWriteGuard { __lock: lock, __guard: guard }
}
}
impl<'rwlock, T> Deref<T> for RWLockReadGuard<'rwlock, T> {
fn deref(&self) -> &T { unsafe { &*self.__lock.data.get() } }
}
impl<'rwlock, T> Deref<T> for RWLockWriteGuard<'rwlock, T> {
fn deref(&self) -> &T { unsafe { &*self.__lock.data.get() } }
}
impl<'rwlock, T> DerefMut<T> for RWLockWriteGuard<'rwlock, T> {
fn deref_mut(&mut self) -> &mut T { unsafe { &mut *self.__lock.data.get() } }
}
impl StaticRWLockReadGuard {
fn new(lock: &'static StaticRWLock) -> StaticRWLockReadGuard {
let guard = StaticRWLockReadGuard {
lock: &lock.inner,
marker: marker::NoSend,
};
unsafe { (*lock.poison.get()).borrow().check("rwlock"); }
return guard;
}
}
impl StaticRWLockWriteGuard {
fn new(lock: &'static StaticRWLock) -> StaticRWLockWriteGuard {
unsafe {
let guard = StaticRWLockWriteGuard {
lock: &lock.inner,
marker: marker::NoSend,
poison: (*lock.poison.get()).borrow(),
};
guard.poison.check("rwlock");
return guard;
}
}
}
#[unsafe_destructor]
impl Drop for StaticRWLockReadGuard {
fn drop(&mut self) {
unsafe { self.lock.read_unlock(); }
}
}
#[unsafe_destructor]
impl Drop for StaticRWLockWriteGuard {
fn drop(&mut self) {
self.poison.done();
unsafe { self.lock.write_unlock(); }
}
}
#[cfg(test)]
mod tests {
use prelude::*;
use rand::{mod, Rng};
use task;
use sync::{Arc, RWLock, StaticRWLock, RWLOCK_INIT};
#[test]
fn smoke() {
let l = RWLock::new(());
drop(l.read());
drop(l.write());
drop((l.read(), l.read()));
drop(l.write());
}
#[test]
fn static_smoke() {
static R: StaticRWLock = RWLOCK_INIT;
drop(R.read());
drop(R.write());
drop((R.read(), R.read()));
drop(R.write());
unsafe { R.destroy(); }
}
#[test]
fn frob() {
static R: StaticRWLock = RWLOCK_INIT;
static N: uint = 10;
static M: uint = 1000;
let (tx, rx) = channel::<()>();
for _ in range(0, N) {
let tx = tx.clone();
spawn(proc() {
let mut rng = rand::task_rng();
for _ in range(0, M) {
if rng.gen_weighted_bool(N) {
drop(R.write());
} else {
drop(R.read());
}
}
drop(tx);
});
}
drop(tx);
let _ = rx.recv_opt();
unsafe { R.destroy(); }
}
#[test]
#[should_fail]
fn test_rw_arc_poison_wr() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try(proc() {
let lock = arc2.write();
assert_eq!(*lock, 2);
});
let lock = arc.read();
assert_eq!(*lock, 1);
}
#[test]
#[should_fail]
fn test_rw_arc_poison_ww() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try(proc() {
let lock = arc2.write();
assert_eq!(*lock, 2);
});
let lock = arc.write();
assert_eq!(*lock, 1);
}
#[test]
fn test_rw_arc_no_poison_rr() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try(proc() {
let lock = arc2.read();
assert_eq!(*lock, 2);
});
let lock = arc.read();
assert_eq!(*lock, 1);
}
#[test]
fn test_rw_arc_no_poison_rw() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try(proc() {
let lock = arc2.read();
assert_eq!(*lock, 2);
});
let lock = arc.write();
assert_eq!(*lock, 1);
}
#[test]
fn test_rw_arc() {
let arc = Arc::new(RWLock::new(0i));
let arc2 = arc.clone();
let (tx, rx) = channel();
task::spawn(proc() {
let mut lock = arc2.write();
for _ in range(0u, 10) {
let tmp = *lock;
*lock = -1;
task::deschedule();
*lock = tmp + 1;
}
tx.send(());
});
// Readers try to catch the writer in the act
let mut children = Vec::new();
for _ in range(0u, 5) {
let arc3 = arc.clone();
children.push(task::try_future(proc() {
let lock = arc3.read();
assert!(*lock >= 0);
}));
}
// Wait for children to pass their asserts
for r in children.iter_mut() {
assert!(r.get_ref().is_ok());
}
// Wait for writer to finish
rx.recv();
let lock = arc.read();
assert_eq!(*lock, 10);
}
#[test]
fn test_rw_arc_access_in_unwind() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try::<()>(proc() {
struct Unwinder {
i: Arc<RWLock<int>>,
}
impl Drop for Unwinder {
fn drop(&mut self) {
let mut lock = self.i.write();
*lock += 1;
}
}
let _u = Unwinder { i: arc2 };
panic!();
});
let lock = arc.read();
assert_eq!(*lock, 2);
}
}

View File

@ -0,0 +1,195 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use ops::Drop;
use sync::{Mutex, Condvar};
/// A counting, blocking, semaphore.
///
/// Semaphores are a form of atomic counter where access is only granted if the
/// counter is a positive value. Each acquisition will block the calling thread
/// until the counter is positive, and each release will increment the counter
/// and unblock any threads if necessary.
///
/// # Example
///
/// ```
/// use std::sync::Semaphore;
///
/// // Create a semaphore that represents 5 resources
/// let sem = Semaphore::new(5);
///
/// // Acquire one of the resources
/// sem.acquire();
///
/// // Acquire one of the resources for a limited period of time
/// {
/// let _guard = sem.access();
/// // ...
/// } // resources is released here
///
/// // Release our initially acquired resource
/// sem.release();
/// ```
pub struct Semaphore {
lock: Mutex<int>,
cvar: Condvar,
}
/// An RAII guard which will release a resource acquired from a semaphore when
/// dropped.
pub struct SemaphoreGuard<'a> {
sem: &'a Semaphore,
}
impl Semaphore {
/// Creates a new semaphore with the initial count specified.
///
/// The count specified can be thought of as a number of resources, and a
/// call to `acquire` or `access` will block until at least one resource is
/// available. It is valid to initialize a semaphore with a negative count.
pub fn new(count: int) -> Semaphore {
Semaphore {
lock: Mutex::new(count),
cvar: Condvar::new(),
}
}
/// Acquires a resource of this semaphore, blocking the current thread until
/// it can do so.
///
/// This method will block until the internal count of the semaphore is at
/// least 1.
pub fn acquire(&self) {
let mut count = self.lock.lock();
while *count <= 0 {
self.cvar.wait(&count);
}
*count -= 1;
}
/// Release a resource from this semaphore.
///
/// This will increment the number of resources in this semaphore by 1 and
/// will notify any pending waiters in `acquire` or `access` if necessary.
pub fn release(&self) {
*self.lock.lock() += 1;
self.cvar.notify_one();
}
/// Acquires a resource of this semaphore, returning an RAII guard to
/// release the semaphore when dropped.
///
/// This function is semantically equivalent to an `acquire` followed by a
/// `release` when the guard returned is dropped.
pub fn access(&self) -> SemaphoreGuard {
self.acquire();
SemaphoreGuard { sem: self }
}
}
#[unsafe_destructor]
impl<'a> Drop for SemaphoreGuard<'a> {
fn drop(&mut self) {
self.sem.release();
}
}
#[cfg(test)]
mod tests {
use prelude::*;
use sync::Arc;
use super::Semaphore;
#[test]
fn test_sem_acquire_release() {
let s = Semaphore::new(1);
s.acquire();
s.release();
s.acquire();
}
#[test]
fn test_sem_basic() {
let s = Semaphore::new(1);
let _g = s.access();
}
#[test]
fn test_sem_as_mutex() {
let s = Arc::new(Semaphore::new(1));
let s2 = s.clone();
spawn(proc() {
let _g = s2.access();
});
let _g = s.access();
}
#[test]
fn test_sem_as_cvar() {
/* Child waits and parent signals */
let (tx, rx) = channel();
let s = Arc::new(Semaphore::new(0));
let s2 = s.clone();
spawn(proc() {
s2.acquire();
tx.send(());
});
s.release();
let _ = rx.recv();
/* Parent waits and child signals */
let (tx, rx) = channel();
let s = Arc::new(Semaphore::new(0));
let s2 = s.clone();
spawn(proc() {
s2.release();
let _ = rx.recv();
});
s.acquire();
tx.send(());
}
#[test]
fn test_sem_multi_resource() {
// Parent and child both get in the critical section at the same
// time, and shake hands.
let s = Arc::new(Semaphore::new(2));
let s2 = s.clone();
let (tx1, rx1) = channel();
let (tx2, rx2) = channel();
spawn(proc() {
let _g = s2.access();
let _ = rx2.recv();
tx1.send(());
});
let _g = s.access();
tx2.send(());
let _ = rx1.recv();
}
#[test]
fn test_sem_runtime_friendly_blocking() {
let s = Arc::new(Semaphore::new(1));
let s2 = s.clone();
let (tx, rx) = channel();
{
let _g = s.access();
spawn(proc() {
tx.send(());
drop(s2.access());
tx.send(());
});
rx.recv(); // wait for child to come alive
}
rx.recv(); // wait for child to be done
}
}

View File

@ -0,0 +1,67 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use time::Duration;
use sys_common::mutex::{mod, Mutex};
use sys::condvar as imp;
/// An OS-based condition variable.
///
/// This structure is the lowest layer possible on top of the OS-provided
/// condition variables. It is consequently entirely unsafe to use. It is
/// recommended to use the safer types at the top level of this crate instead of
/// this type.
pub struct Condvar(imp::Condvar);
/// Static initializer for condition variables.
pub const CONDVAR_INIT: Condvar = Condvar(imp::CONDVAR_INIT);
impl Condvar {
/// Creates a new condition variable for use.
///
/// Behavior is undefined if the condition variable is moved after it is
/// first used with any of the functions below.
#[inline]
pub unsafe fn new() -> Condvar { Condvar(imp::Condvar::new()) }
/// Signal one waiter on this condition variable to wake up.
#[inline]
pub unsafe fn notify_one(&self) { self.0.notify_one() }
/// Awaken all current waiters on this condition variable.
#[inline]
pub unsafe fn notify_all(&self) { self.0.notify_all() }
/// Wait for a signal on the specified mutex.
///
/// Behavior is undefined if the mutex is not locked by the current thread.
/// Behavior is also undefined if more than one mutex is used concurrently
/// on this condition variable.
#[inline]
pub unsafe fn wait(&self, mutex: &Mutex) { self.0.wait(mutex::raw(mutex)) }
/// Wait for a signal on the specified mutex with a timeout duration
/// specified by `dur` (a relative time into the future).
///
/// Behavior is undefined if the mutex is not locked by the current thread.
/// Behavior is also undefined if more than one mutex is used concurrently
/// on this condition variable.
#[inline]
pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
self.0.wait_timeout(mutex::raw(mutex), dur)
}
/// Deallocate all resources associated with this condition variable.
///
/// Behavior is undefined if there are current or will be future users of
/// this condition variable.
#[inline]
pub unsafe fn destroy(&self) { self.0.destroy() }
}

View File

@ -19,8 +19,11 @@ use num::Int;
use path::BytesContainer;
use collections;
pub mod net;
pub mod condvar;
pub mod helper_thread;
pub mod mutex;
pub mod net;
pub mod rwlock;
pub mod thread_local;
// common error constructors

View File

@ -0,0 +1,64 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
pub use sys::mutex::raw;
use sys::mutex as imp;
/// An OS-based mutual exclusion lock.
///
/// This is the thinnest cross-platform wrapper around OS mutexes. All usage of
/// this mutex is unsafe and it is recommended to instead use the safe wrapper
/// at the top level of the crate instead of this type.
pub struct Mutex(imp::Mutex);
/// Constant initializer for statically allocated mutexes.
pub const MUTEX_INIT: Mutex = Mutex(imp::MUTEX_INIT);
impl Mutex {
/// Creates a newly initialized mutex.
///
/// Behavior is undefined if the mutex is moved after the first method is
/// called on the mutex.
#[inline]
pub unsafe fn new() -> Mutex { Mutex(imp::Mutex::new()) }
/// Lock the mutex blocking the current thread until it is available.
///
/// Behavior is undefined if the mutex has been moved between this and any
/// previous function call.
#[inline]
pub unsafe fn lock(&self) { self.0.lock() }
/// Attempt to lock the mutex without blocking, returning whether it was
/// successfully acquired or not.
///
/// Behavior is undefined if the mutex has been moved between this and any
/// previous function call.
#[inline]
pub unsafe fn try_lock(&self) -> bool { self.0.try_lock() }
/// Unlock the mutex.
///
/// Behavior is undefined if the current thread does not actually hold the
/// mutex.
#[inline]
pub unsafe fn unlock(&self) { self.0.unlock() }
/// Deallocate all resources associated with this mutex.
///
/// Behavior is undefined if there are current or will be future users of
/// this mutex.
#[inline]
pub unsafe fn destroy(&self) { self.0.destroy() }
}
// not meant to be exported to the outside world, just the containing module
pub fn raw(mutex: &Mutex) -> &imp::Mutex { &mutex.0 }

View File

@ -0,0 +1,86 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use sys::rwlock as imp;
/// An OS-based reader-writer lock.
///
/// This structure is entirely unsafe and serves as the lowest layer of a
/// cross-platform binding of system rwlocks. It is recommended to use the
/// safer types at the top level of this crate instead of this type.
pub struct RWLock(imp::RWLock);
/// Constant initializer for static RWLocks.
pub const RWLOCK_INIT: RWLock = RWLock(imp::RWLOCK_INIT);
impl RWLock {
/// Creates a new instance of an RWLock.
///
/// Usage of an RWLock is undefined if it is moved after its first use (any
/// function calls below).
#[inline]
pub unsafe fn new() -> RWLock { RWLock(imp::RWLock::new()) }
/// Acquire shared access to the underlying lock, blocking the current
/// thread to do so.
///
/// Behavior is undefined if the rwlock has been moved between this and any
/// previous methodo call.
#[inline]
pub unsafe fn read(&self) { self.0.read() }
/// Attempt to acquire shared access to this lock, returning whether it
/// succeeded or not.
///
/// This function does not block the current thread.
///
/// Behavior is undefined if the rwlock has been moved between this and any
/// previous methodo call.
#[inline]
pub unsafe fn try_read(&self) -> bool { self.0.try_read() }
/// Acquire write access to the underlying lock, blocking the current thread
/// to do so.
///
/// Behavior is undefined if the rwlock has been moved between this and any
/// previous methodo call.
#[inline]
pub unsafe fn write(&self) { self.0.write() }
/// Attempt to acquire exclusive access to this lock, returning whether it
/// succeeded or not.
///
/// This function does not block the current thread.
///
/// Behavior is undefined if the rwlock has been moved between this and any
/// previous methodo call.
#[inline]
pub unsafe fn try_write(&self) -> bool { self.0.try_write() }
/// Unlock previously acquired shared access to this lock.
///
/// Behavior is undefined if the current thread does not have shared access.
#[inline]
pub unsafe fn read_unlock(&self) { self.0.read_unlock() }
/// Unlock previously acquired exclusive access to this lock.
///
/// Behavior is undefined if the current thread does not currently have
/// exclusive access.
#[inline]
pub unsafe fn write_unlock(&self) { self.0.write_unlock() }
/// Destroy OS-related resources with this RWLock.
///
/// Behavior is undefined if there are any currently active users of this
/// lock.
#[inline]
pub unsafe fn destroy(&self) { self.0.destroy() }
}

View File

@ -0,0 +1,83 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use cell::UnsafeCell;
use libc;
use sys::mutex::{mod, Mutex};
use sys::sync as ffi;
use time::Duration;
pub struct Condvar { inner: UnsafeCell<ffi::pthread_cond_t> }
pub const CONDVAR_INIT: Condvar = Condvar {
inner: UnsafeCell { value: ffi::PTHREAD_COND_INITIALIZER },
};
impl Condvar {
#[inline]
pub unsafe fn new() -> Condvar {
// Might be moved and address is changing it is better to avoid
// initialization of potentially opaque OS data before it landed
Condvar { inner: UnsafeCell::new(ffi::PTHREAD_COND_INITIALIZER) }
}
#[inline]
pub unsafe fn notify_one(&self) {
let r = ffi::pthread_cond_signal(self.inner.get());
debug_assert_eq!(r, 0);
}
#[inline]
pub unsafe fn notify_all(&self) {
let r = ffi::pthread_cond_broadcast(self.inner.get());
debug_assert_eq!(r, 0);
}
#[inline]
pub unsafe fn wait(&self, mutex: &Mutex) {
let r = ffi::pthread_cond_wait(self.inner.get(), mutex::raw(mutex));
debug_assert_eq!(r, 0);
}
pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
assert!(dur >= Duration::nanoseconds(0));
// First, figure out what time it currently is
let mut tv = libc::timeval { tv_sec: 0, tv_usec: 0 };
let r = ffi::gettimeofday(&mut tv, 0 as *mut _);
debug_assert_eq!(r, 0);
// Offset that time with the specified duration
let abs = Duration::seconds(tv.tv_sec as i64) +
Duration::microseconds(tv.tv_usec as i64) +
dur;
let ns = abs.num_nanoseconds().unwrap() as u64;
let timeout = libc::timespec {
tv_sec: (ns / 1000000000) as libc::time_t,
tv_nsec: (ns % 1000000000) as libc::c_long,
};
// And wait!
let r = ffi::pthread_cond_timedwait(self.inner.get(), mutex::raw(mutex),
&timeout);
if r != 0 {
debug_assert_eq!(r as int, libc::ETIMEDOUT as int);
false
} else {
true
}
}
#[inline]
pub unsafe fn destroy(&self) {
let r = ffi::pthread_cond_destroy(self.inner.get());
debug_assert_eq!(r, 0);
}
}

View File

@ -34,14 +34,18 @@ macro_rules! helper_init( (static $name:ident: Helper<$m:ty>) => (
pub mod c;
pub mod ext;
pub mod condvar;
pub mod fs;
pub mod helper_signal;
pub mod mutex;
pub mod os;
pub mod pipe;
pub mod process;
pub mod rwlock;
pub mod sync;
pub mod tcp;
pub mod timer;
pub mod thread_local;
pub mod timer;
pub mod tty;
pub mod udp;

View File

@ -0,0 +1,52 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use cell::UnsafeCell;
use sys::sync as ffi;
use sys_common::mutex;
pub struct Mutex { inner: UnsafeCell<ffi::pthread_mutex_t> }
#[inline]
pub unsafe fn raw(m: &Mutex) -> *mut ffi::pthread_mutex_t {
m.inner.get()
}
pub const MUTEX_INIT: Mutex = Mutex {
inner: UnsafeCell { value: ffi::PTHREAD_MUTEX_INITIALIZER },
};
impl Mutex {
#[inline]
pub unsafe fn new() -> Mutex {
// Might be moved and address is changing it is better to avoid
// initialization of potentially opaque OS data before it landed
MUTEX_INIT
}
#[inline]
pub unsafe fn lock(&self) {
let r = ffi::pthread_mutex_lock(self.inner.get());
debug_assert_eq!(r, 0);
}
#[inline]
pub unsafe fn unlock(&self) {
let r = ffi::pthread_mutex_unlock(self.inner.get());
debug_assert_eq!(r, 0);
}
#[inline]
pub unsafe fn try_lock(&self) -> bool {
ffi::pthread_mutex_trylock(self.inner.get()) == 0
}
#[inline]
pub unsafe fn destroy(&self) {
let r = ffi::pthread_mutex_destroy(self.inner.get());
debug_assert_eq!(r, 0);
}
}

View File

@ -0,0 +1,57 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use cell::UnsafeCell;
use sys::sync as ffi;
pub struct RWLock { inner: UnsafeCell<ffi::pthread_rwlock_t> }
pub const RWLOCK_INIT: RWLock = RWLock {
inner: UnsafeCell { value: ffi::PTHREAD_RWLOCK_INITIALIZER },
};
impl RWLock {
#[inline]
pub unsafe fn new() -> RWLock {
// Might be moved and address is changing it is better to avoid
// initialization of potentially opaque OS data before it landed
RWLOCK_INIT
}
#[inline]
pub unsafe fn read(&self) {
let r = ffi::pthread_rwlock_rdlock(self.inner.get());
debug_assert_eq!(r, 0);
}
#[inline]
pub unsafe fn try_read(&self) -> bool {
ffi::pthread_rwlock_tryrdlock(self.inner.get()) == 0
}
#[inline]
pub unsafe fn write(&self) {
let r = ffi::pthread_rwlock_wrlock(self.inner.get());
debug_assert_eq!(r, 0);
}
#[inline]
pub unsafe fn try_write(&self) -> bool {
ffi::pthread_rwlock_trywrlock(self.inner.get()) == 0
}
#[inline]
pub unsafe fn read_unlock(&self) {
let r = ffi::pthread_rwlock_unlock(self.inner.get());
debug_assert_eq!(r, 0);
}
#[inline]
pub unsafe fn write_unlock(&self) { self.read_unlock() }
#[inline]
pub unsafe fn destroy(&self) {
let r = ffi::pthread_rwlock_destroy(self.inner.get());
debug_assert_eq!(r, 0);
}
}

208
src/libstd/sys/unix/sync.rs Normal file
View File

@ -0,0 +1,208 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
#![allow(bad_style)]
use libc;
pub use self::os::{PTHREAD_MUTEX_INITIALIZER, pthread_mutex_t};
pub use self::os::{PTHREAD_COND_INITIALIZER, pthread_cond_t};
pub use self::os::{PTHREAD_RWLOCK_INITIALIZER, pthread_rwlock_t};
extern {
// mutexes
pub fn pthread_mutex_destroy(lock: *mut pthread_mutex_t) -> libc::c_int;
pub fn pthread_mutex_lock(lock: *mut pthread_mutex_t) -> libc::c_int;
pub fn pthread_mutex_trylock(lock: *mut pthread_mutex_t) -> libc::c_int;
pub fn pthread_mutex_unlock(lock: *mut pthread_mutex_t) -> libc::c_int;
// cvars
pub fn pthread_cond_wait(cond: *mut pthread_cond_t,
lock: *mut pthread_mutex_t) -> libc::c_int;
pub fn pthread_cond_timedwait(cond: *mut pthread_cond_t,
lock: *mut pthread_mutex_t,
abstime: *const libc::timespec) -> libc::c_int;
pub fn pthread_cond_signal(cond: *mut pthread_cond_t) -> libc::c_int;
pub fn pthread_cond_broadcast(cond: *mut pthread_cond_t) -> libc::c_int;
pub fn pthread_cond_destroy(cond: *mut pthread_cond_t) -> libc::c_int;
pub fn gettimeofday(tp: *mut libc::timeval,
tz: *mut libc::c_void) -> libc::c_int;
// rwlocks
pub fn pthread_rwlock_destroy(lock: *mut pthread_rwlock_t) -> libc::c_int;
pub fn pthread_rwlock_rdlock(lock: *mut pthread_rwlock_t) -> libc::c_int;
pub fn pthread_rwlock_tryrdlock(lock: *mut pthread_rwlock_t) -> libc::c_int;
pub fn pthread_rwlock_wrlock(lock: *mut pthread_rwlock_t) -> libc::c_int;
pub fn pthread_rwlock_trywrlock(lock: *mut pthread_rwlock_t) -> libc::c_int;
pub fn pthread_rwlock_unlock(lock: *mut pthread_rwlock_t) -> libc::c_int;
}
#[cfg(any(target_os = "freebsd", target_os = "dragonfly"))]
mod os {
use libc;
pub type pthread_mutex_t = *mut libc::c_void;
pub type pthread_cond_t = *mut libc::c_void;
pub type pthread_rwlock_t = *mut libc::c_void;
pub const PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = 0 as *mut _;
pub const PTHREAD_COND_INITIALIZER: pthread_cond_t = 0 as *mut _;
pub const PTHREAD_RWLOCK_INITIALIZER: pthread_rwlock_t = 0 as *mut _;
}
#[cfg(any(target_os = "macos", target_os = "ios"))]
mod os {
use libc;
#[cfg(target_arch = "x86_64")]
const __PTHREAD_MUTEX_SIZE__: uint = 56;
#[cfg(any(target_arch = "x86",
target_arch = "arm"))]
const __PTHREAD_MUTEX_SIZE__: uint = 40;
#[cfg(target_arch = "x86_64")]
const __PTHREAD_COND_SIZE__: uint = 40;
#[cfg(any(target_arch = "x86",
target_arch = "arm"))]
const __PTHREAD_COND_SIZE__: uint = 24;
#[cfg(target_arch = "x86_64")]
const __PTHREAD_RWLOCK_SIZE__: uint = 192;
#[cfg(any(target_arch = "x86",
target_arch = "arm"))]
const __PTHREAD_RWLOCK_SIZE__: uint = 124;
const _PTHREAD_MUTEX_SIG_INIT: libc::c_long = 0x32AAABA7;
const _PTHREAD_COND_SIG_INIT: libc::c_long = 0x3CB0B1BB;
const _PTHREAD_RWLOCK_SIG_INIT: libc::c_long = 0x2DA8B3B4;
#[repr(C)]
pub struct pthread_mutex_t {
__sig: libc::c_long,
__opaque: [u8, ..__PTHREAD_MUTEX_SIZE__],
}
#[repr(C)]
pub struct pthread_cond_t {
__sig: libc::c_long,
__opaque: [u8, ..__PTHREAD_COND_SIZE__],
}
#[repr(C)]
pub struct pthread_rwlock_t {
__sig: libc::c_long,
__opaque: [u8, ..__PTHREAD_RWLOCK_SIZE__],
}
pub const PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = pthread_mutex_t {
__sig: _PTHREAD_MUTEX_SIG_INIT,
__opaque: [0, ..__PTHREAD_MUTEX_SIZE__],
};
pub const PTHREAD_COND_INITIALIZER: pthread_cond_t = pthread_cond_t {
__sig: _PTHREAD_COND_SIG_INIT,
__opaque: [0, ..__PTHREAD_COND_SIZE__],
};
pub const PTHREAD_RWLOCK_INITIALIZER: pthread_rwlock_t = pthread_rwlock_t {
__sig: _PTHREAD_RWLOCK_SIG_INIT,
__opaque: [0, ..__PTHREAD_RWLOCK_SIZE__],
};
}
#[cfg(target_os = "linux")]
mod os {
use libc;
// minus 8 because we have an 'align' field
#[cfg(target_arch = "x86_64")]
const __SIZEOF_PTHREAD_MUTEX_T: uint = 40 - 8;
#[cfg(any(target_arch = "x86",
target_arch = "arm",
target_arch = "mips",
target_arch = "mipsel"))]
const __SIZEOF_PTHREAD_MUTEX_T: uint = 24 - 8;
#[cfg(any(target_arch = "x86_64",
target_arch = "x86",
target_arch = "arm",
target_arch = "mips",
target_arch = "mipsel"))]
const __SIZEOF_PTHREAD_COND_T: uint = 48 - 8;
#[cfg(target_arch = "x86_64")]
const __SIZEOF_PTHREAD_RWLOCK_T: uint = 56 - 8;
#[cfg(any(target_arch = "x86",
target_arch = "arm",
target_arch = "mips",
target_arch = "mipsel"))]
const __SIZEOF_PTHREAD_RWLOCK_T: uint = 32 - 8;
#[repr(C)]
pub struct pthread_mutex_t {
__align: libc::c_longlong,
size: [u8, ..__SIZEOF_PTHREAD_MUTEX_T],
}
#[repr(C)]
pub struct pthread_cond_t {
__align: libc::c_longlong,
size: [u8, ..__SIZEOF_PTHREAD_COND_T],
}
#[repr(C)]
pub struct pthread_rwlock_t {
__align: libc::c_longlong,
size: [u8, ..__SIZEOF_PTHREAD_RWLOCK_T],
}
pub const PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = pthread_mutex_t {
__align: 0,
size: [0, ..__SIZEOF_PTHREAD_MUTEX_T],
};
pub const PTHREAD_COND_INITIALIZER: pthread_cond_t = pthread_cond_t {
__align: 0,
size: [0, ..__SIZEOF_PTHREAD_COND_T],
};
pub const PTHREAD_RWLOCK_INITIALIZER: pthread_rwlock_t = pthread_rwlock_t {
__align: 0,
size: [0, ..__SIZEOF_PTHREAD_RWLOCK_T],
};
}
#[cfg(target_os = "android")]
mod os {
use libc;
#[repr(C)]
pub struct pthread_mutex_t { value: libc::c_int }
#[repr(C)]
pub struct pthread_cond_t { value: libc::c_int }
#[repr(C)]
pub struct pthread_rwlock_t {
lock: pthread_mutex_t,
cond: pthread_cond_t,
numLocks: libc::c_int,
writerThreadId: libc::c_int,
pendingReaders: libc::c_int,
pendingWriters: libc::c_int,
reserved: [*mut libc::c_void, ..4],
}
pub const PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = pthread_mutex_t {
value: 0,
};
pub const PTHREAD_COND_INITIALIZER: pthread_cond_t = pthread_cond_t {
value: 0,
};
pub const PTHREAD_RWLOCK_INITIALIZER: pthread_rwlock_t = pthread_rwlock_t {
lock: PTHREAD_MUTEX_INITIALIZER,
cond: PTHREAD_COND_INITIALIZER,
numLocks: 0,
writerThreadId: 0,
pendingReaders: 0,
pendingWriters: 0,
reserved: [0 as *mut _, ..4],
};
}

View File

@ -0,0 +1,63 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use cell::UnsafeCell;
use libc::{mod, DWORD};
use libc;
use os;
use sys::mutex::{mod, Mutex};
use sys::sync as ffi;
use time::Duration;
pub struct Condvar { inner: UnsafeCell<ffi::CONDITION_VARIABLE> }
pub const CONDVAR_INIT: Condvar = Condvar {
inner: UnsafeCell { value: ffi::CONDITION_VARIABLE_INIT }
};
impl Condvar {
#[inline]
pub unsafe fn new() -> Condvar { CONDVAR_INIT }
#[inline]
pub unsafe fn wait(&self, mutex: &Mutex) {
let r = ffi::SleepConditionVariableCS(self.inner.get(),
mutex::raw(mutex),
libc::INFINITE);
debug_assert!(r != 0);
}
pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
let r = ffi::SleepConditionVariableCS(self.inner.get(),
mutex::raw(mutex),
dur.num_milliseconds() as DWORD);
if r == 0 {
const ERROR_TIMEOUT: DWORD = 0x5B4;
debug_assert_eq!(os::errno() as uint, ERROR_TIMEOUT as uint);
false
} else {
true
}
}
#[inline]
pub unsafe fn notify_one(&self) {
ffi::WakeConditionVariable(self.inner.get())
}
#[inline]
pub unsafe fn notify_all(&self) {
ffi::WakeAllConditionVariable(self.inner.get())
}
pub unsafe fn destroy(&self) {
// ...
}
}

View File

@ -35,11 +35,15 @@ macro_rules! helper_init( (static $name:ident: Helper<$m:ty>) => (
pub mod c;
pub mod ext;
pub mod condvar;
pub mod fs;
pub mod helper_signal;
pub mod mutex;
pub mod os;
pub mod pipe;
pub mod process;
pub mod rwlock;
pub mod sync;
pub mod tcp;
pub mod thread_local;
pub mod timer;

View File

@ -0,0 +1,76 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use sync::atomic;
use alloc::{mod, heap};
use libc::DWORD;
use sys::sync as ffi;
const SPIN_COUNT: DWORD = 4000;
pub struct Mutex { inner: atomic::AtomicUint }
pub const MUTEX_INIT: Mutex = Mutex { inner: atomic::INIT_ATOMIC_UINT };
#[inline]
pub unsafe fn raw(m: &super::Mutex) -> ffi::LPCRITICAL_SECTION {
m.0.get()
}
impl Mutex {
#[inline]
pub unsafe fn new() -> Mutex {
Mutex { inner: atomic::AtomicUint::new(init_lock() as uint) }
}
#[inline]
pub unsafe fn lock(&self) {
ffi::EnterCriticalSection(self.get())
}
#[inline]
pub unsafe fn try_lock(&self) -> bool {
ffi::TryEnterCriticalSection(self.get()) != 0
}
#[inline]
pub unsafe fn unlock(&self) {
ffi::LeaveCriticalSection(self.get())
}
pub unsafe fn destroy(&self) {
let lock = self.inner.swap(0, atomic::SeqCst);
if lock != 0 { free_lock(lock as ffi::LPCRITICAL_SECTION) }
}
unsafe fn get(&self) -> ffi::LPCRITICAL_SECTION {
match self.inner.load(atomic::SeqCst) {
0 => {}
n => return n as ffi::LPCRITICAL_SECTION
}
let lock = init_lock();
match self.inner.compare_and_swap(0, lock as uint, atomic::SeqCst) {
0 => return lock as ffi::LPCRITICAL_SECTION,
_ => {}
}
free_lock(lock);
return self.inner.load(atomic::SeqCst) as ffi::LPCRITICAL_SECTION;
}
}
unsafe fn init_lock() -> ffi::LPCRITICAL_SECTION {
let block = heap::allocate(ffi::CRITICAL_SECTION_SIZE, 8)
as ffi::LPCRITICAL_SECTION;
if block.is_null() { alloc::oom() }
ffi::InitializeCriticalSectionAndSpinCount(block, SPIN_COUNT);
return block;
}
unsafe fn free_lock(h: ffi::LPCRITICAL_SECTION) {
ffi::DeleteCriticalSection(h);
heap::deallocate(h as *mut _, ffi::CRITICAL_SECTION_SIZE, 8);
}

View File

@ -0,0 +1,53 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use cell::UnsafeCell;
use sys::sync as ffi;
pub struct RWLock { inner: UnsafeCell<ffi::SRWLOCK> }
pub const RWLOCK_INIT: RWLock = RWLock {
inner: UnsafeCell { value: ffi::SRWLOCK_INIT }
};
impl RWLock {
#[inline]
pub unsafe fn new() -> RWLock { RWLOCK_INIT }
#[inline]
pub unsafe fn read(&self) {
ffi::AcquireSRWLockShared(self.inner.get())
}
#[inline]
pub unsafe fn try_read(&self) -> bool {
ffi::TryAcquireSRWLockShared(self.inner.get()) != 0
}
#[inline]
pub unsafe fn write(&self) {
ffi::AcquireSRWLockExclusive(self.inner.get())
}
#[inline]
pub unsafe fn try_write(&self) -> bool {
ffi::TryAcquireSRWLockExclusive(self.inner.get()) != 0
}
#[inline]
pub unsafe fn read_unlock(&self) {
ffi::ReleaseSRWLockShared(self.inner.get())
}
#[inline]
pub unsafe fn write_unlock(&self) {
ffi::ReleaseSRWLockExclusive(self.inner.get())
}
#[inline]
pub unsafe fn destroy(&self) {
// ...
}
}

View File

@ -0,0 +1,58 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use libc::{BOOL, DWORD, c_void, LPVOID};
use libc::types::os::arch::extra::BOOLEAN;
pub type LPCRITICAL_SECTION = *mut c_void;
pub type LPCONDITION_VARIABLE = *mut CONDITION_VARIABLE;
pub type LPSRWLOCK = *mut SRWLOCK;
#[cfg(target_arch = "x86")]
pub const CRITICAL_SECTION_SIZE: uint = 24;
#[cfg(target_arch = "x86_64")]
pub const CRITICAL_SECTION_SIZE: uint = 40;
#[repr(C)]
pub struct CONDITION_VARIABLE { pub ptr: LPVOID }
#[repr(C)]
pub struct SRWLOCK { pub ptr: LPVOID }
pub const CONDITION_VARIABLE_INIT: CONDITION_VARIABLE = CONDITION_VARIABLE {
ptr: 0 as *mut _,
};
pub const SRWLOCK_INIT: SRWLOCK = SRWLOCK { ptr: 0 as *mut _ };
extern "system" {
// critical sections
pub fn InitializeCriticalSectionAndSpinCount(
lpCriticalSection: LPCRITICAL_SECTION,
dwSpinCount: DWORD) -> BOOL;
pub fn DeleteCriticalSection(lpCriticalSection: LPCRITICAL_SECTION);
pub fn EnterCriticalSection(lpCriticalSection: LPCRITICAL_SECTION);
pub fn LeaveCriticalSection(lpCriticalSection: LPCRITICAL_SECTION);
pub fn TryEnterCriticalSection(lpCriticalSection: LPCRITICAL_SECTION) -> BOOL;
// condition variables
pub fn SleepConditionVariableCS(ConditionVariable: LPCONDITION_VARIABLE,
CriticalSection: LPCRITICAL_SECTION,
dwMilliseconds: DWORD) -> BOOL;
pub fn WakeConditionVariable(ConditionVariable: LPCONDITION_VARIABLE);
pub fn WakeAllConditionVariable(ConditionVariable: LPCONDITION_VARIABLE);
// slim rwlocks
pub fn AcquireSRWLockExclusive(SRWLock: LPSRWLOCK);
pub fn AcquireSRWLockShared(SRWLock: LPSRWLOCK);
pub fn ReleaseSRWLockExclusive(SRWLock: LPSRWLOCK);
pub fn ReleaseSRWLockShared(SRWLock: LPSRWLOCK);
pub fn TryAcquireSRWLockExclusive(SRWLock: LPSRWLOCK) -> BOOLEAN;
pub fn TryAcquireSRWLockShared(SRWLock: LPSRWLOCK) -> BOOLEAN;
}