Reimplement ARC::unwrap() and friends.

This commit is contained in:
Ben Blum 2013-07-02 13:37:19 -04:00
parent 55adc4467b
commit 10a400ffaa
2 changed files with 283 additions and 27 deletions

View File

@ -50,9 +50,9 @@ use std::borrow;
/// As sync::condvar, a mechanism for unlock-and-descheduling and signaling.
pub struct Condvar<'self> {
is_mutex: bool,
failed: &'self mut bool,
cond: &'self sync::Condvar<'self>
priv is_mutex: bool,
priv failed: &'self mut bool,
priv cond: &'self sync::Condvar<'self>
}
impl<'self> Condvar<'self> {
@ -108,7 +108,7 @@ impl<'self> Condvar<'self> {
****************************************************************************/
/// An atomically reference counted wrapper for shared immutable state.
pub struct ARC<T> { x: UnsafeAtomicRcBox<T> }
pub struct ARC<T> { priv x: UnsafeAtomicRcBox<T> }
/// Create an atomically reference counted wrapper.
pub fn ARC<T:Freeze + Send>(data: T) -> ARC<T> {
@ -123,6 +123,20 @@ impl<T:Freeze+Send> ARC<T> {
pub fn get<'a>(&'a self) -> &'a T {
unsafe { &*self.x.get_immut() }
}
/**
* Retrieve the data back out of the ARC. This function blocks until the
* reference given to it is the last existing one, and then unwrap the data
* instead of destroying it.
*
* If multiple tasks call unwrap, all but the first will fail. Do not call
* unwrap from a task that holds another reference to the same ARC; it is
* guaranteed to deadlock.
*/
pub fn unwrap(self) -> T {
let ARC { x: x } = self;
unsafe { x.unwrap() }
}
}
/**
@ -143,9 +157,9 @@ impl<T:Freeze + Send> Clone for ARC<T> {
****************************************************************************/
#[doc(hidden)]
struct MutexARCInner<T> { lock: Mutex, failed: bool, data: T }
struct MutexARCInner<T> { priv lock: Mutex, priv failed: bool, priv data: T }
/// An ARC with mutable data protected by a blocking mutex.
struct MutexARC<T> { x: UnsafeAtomicRcBox<MutexARCInner<T>> }
struct MutexARC<T> { priv x: UnsafeAtomicRcBox<MutexARCInner<T>> }
/// Create a mutex-protected ARC with the supplied data.
pub fn MutexARC<T:Send>(user_data: T) -> MutexARC<T> {
@ -225,6 +239,22 @@ impl<T:Send> MutexARC<T> {
cond: cond })
}
}
/**
* Retrieves the data, blocking until all other references are dropped,
* exactly as arc::unwrap.
*
* Will additionally fail if another task has failed while accessing the arc.
*/
pub fn unwrap(self) -> T {
let MutexARC { x: x } = self;
let inner = unsafe { x.unwrap() };
let MutexARCInner { failed: failed, data: data, _ } = inner;
if failed {
fail!(~"Can't unwrap poisoned MutexARC - another task failed inside!");
}
data
}
}
// Common code for {mutex.access,rwlock.write}{,_cond}.
@ -268,7 +298,7 @@ fn PoisonOnFail<'r>(failed: &'r mut bool) -> PoisonOnFail {
****************************************************************************/
#[doc(hidden)]
struct RWARCInner<T> { lock: RWlock, failed: bool, data: T }
struct RWARCInner<T> { priv lock: RWlock, priv failed: bool, priv data: T }
/**
* A dual-mode ARC protected by a reader-writer lock. The data can be accessed
* mutably or immutably, and immutably-accessing tasks may run concurrently.
@ -278,7 +308,7 @@ struct RWARCInner<T> { lock: RWlock, failed: bool, data: T }
#[mutable] // XXX remove after snap
#[no_freeze]
struct RWARC<T> {
x: UnsafeAtomicRcBox<RWARCInner<T>>,
priv x: UnsafeAtomicRcBox<RWARCInner<T>>,
}
/// Create a reader/writer ARC with the supplied data.
@ -429,6 +459,23 @@ impl<T:Freeze + Send> RWARC<T> {
}
}
}
/**
* Retrieves the data, blocking until all other references are dropped,
* exactly as arc::unwrap.
*
* Will additionally fail if another task has failed while accessing the arc
* in write mode.
*/
pub fn unwrap(self) -> T {
let RWARC { x: x, _ } = self;
let inner = unsafe { x.unwrap() };
let RWARCInner { failed: failed, data: data, _ } = inner;
if failed {
fail!(~"Can't unwrap poisoned RWARC - another task failed inside!")
}
data
}
}
// Borrowck rightly complains about immutably aliasing the rwlock in order to
@ -611,6 +658,23 @@ mod tests {
}
}
#[test] #[should_fail] #[ignore(cfg(windows))]
pub fn test_mutex_arc_unwrap_poison() {
let arc = MutexARC(1);
let arc2 = ~(&arc).clone();
let (p, c) = comm::stream();
do task::spawn {
unsafe {
do arc2.access |one| {
c.send(());
assert!(*one == 2);
}
}
}
let _ = p.recv();
let one = arc.unwrap();
assert!(one == 1);
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_rw_arc_poison_wr() {
let arc = ~RWARC(1);
let arc2 = (*arc).clone();

View File

@ -9,12 +9,15 @@
// except according to those terms.
use cast;
use cell::Cell;
use comm;
use libc;
use ptr;
use option::*;
use task;
use task::atomically;
use unstable::atomics::{AtomicOption,AtomicUint,Acquire,Release,SeqCst};
use unstable::finally::Finally;
use unstable::intrinsics;
use ops::Drop;
use clone::Clone;
use kinds::Send;
@ -27,14 +30,22 @@ pub struct UnsafeAtomicRcBox<T> {
}
struct AtomicRcBoxData<T> {
count: int,
count: AtomicUint,
// An unwrapper uses this protocol to communicate with the "other" task that
// drops the last refcount on an arc. Unfortunately this can't be a proper
// pipe protocol because the unwrapper has to access both stages at once.
// FIXME(#7544): Maybe use AtomicPtr instead (to avoid xchg in take() later)?
unwrapper: AtomicOption<(comm::ChanOne<()>, comm::PortOne<bool>)>,
// FIXME(#3224) should be able to make this non-option to save memory
data: Option<T>,
}
impl<T: Send> UnsafeAtomicRcBox<T> {
pub fn new(data: T) -> UnsafeAtomicRcBox<T> {
unsafe {
let data = ~AtomicRcBoxData { count: 1, data: Some(data) };
let data = ~AtomicRcBoxData { count: AtomicUint::new(1),
unwrapper: AtomicOption::empty(),
data: Some(data) };
let ptr = cast::transmute(data);
return UnsafeAtomicRcBox { data: ptr };
}
@ -44,7 +55,7 @@ impl<T: Send> UnsafeAtomicRcBox<T> {
pub unsafe fn get(&self) -> *mut T
{
let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
assert!(data.count > 0);
assert!(data.count.load(Acquire) > 0); // no barrier is really needed
let r: *mut T = data.data.get_mut_ref();
cast::forget(data);
return r;
@ -53,20 +64,88 @@ impl<T: Send> UnsafeAtomicRcBox<T> {
#[inline]
pub unsafe fn get_immut(&self) -> *T
{
let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
assert!(data.count > 0);
let r: *T = cast::transmute_immut(data.data.get_mut_ref());
let data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
assert!(data.count.load(Acquire) > 0); // no barrier is really needed
let r: *T = data.data.get_ref();
cast::forget(data);
return r;
}
/// Wait until all other handles are dropped, then retrieve the enclosed
/// data. See extra::arc::ARC for specific semantics documentation.
/// If called when the task is already unkillable, unwrap will unkillably
/// block; otherwise, an unwrapping task can be killed by linked failure.
pub unsafe fn unwrap(self) -> T {
let this = Cell::new(self); // argh
do task::unkillable {
let mut this = this.take();
let mut data: ~AtomicRcBoxData<T> = cast::transmute(this.data);
// Set up the unwrap protocol.
let (p1,c1) = comm::oneshot(); // ()
let (p2,c2) = comm::oneshot(); // bool
// Try to put our server end in the unwrapper slot.
// This needs no barrier -- it's protected by the release barrier on
// the xadd, and the acquire+release barrier in the destructor's xadd.
// FIXME(#6598) Change Acquire to Relaxed.
if data.unwrapper.fill(~(c1,p2), Acquire).is_none() {
// Got in. Tell this handle's destructor not to run (we are now it).
this.data = ptr::mut_null();
// Drop our own reference.
let old_count = data.count.fetch_sub(1, Release);
assert!(old_count >= 1);
if old_count == 1 {
// We were the last owner. Can unwrap immediately.
// AtomicOption's destructor will free the server endpoint.
// FIXME(#3224): it should be like this
// let ~AtomicRcBoxData { data: user_data, _ } = data;
// user_data
data.data.take_unwrap()
} else {
// The *next* person who sees the refcount hit 0 will wake us.
let p1 = Cell::new(p1); // argh
// Unlike the above one, this cell is necessary. It will get
// taken either in the do block or in the finally block.
let c2_and_data = Cell::new((c2,data));
do (|| {
do task::rekillable { p1.take().recv(); }
// Got here. Back in the 'unkillable' without getting killed.
let (c2, data) = c2_and_data.take();
c2.send(true);
// FIXME(#3224): it should be like this
// let ~AtomicRcBoxData { data: user_data, _ } = data;
// user_data
let mut data = data;
data.data.take_unwrap()
}).finally {
if task::failing() {
// Killed during wait. Because this might happen while
// someone else still holds a reference, we can't free
// the data now; the "other" last refcount will free it.
let (c2, data) = c2_and_data.take();
c2.send(false);
cast::forget(data);
} else {
assert!(c2_and_data.is_empty());
}
}
}
} else {
// If 'put' returns the server end back to us, we were rejected;
// someone else was trying to unwrap. Avoid guaranteed deadlock.
cast::forget(data);
fail!("Another task is already unwrapping this ARC!");
}
}
}
}
impl<T: Send> Clone for UnsafeAtomicRcBox<T> {
fn clone(&self) -> UnsafeAtomicRcBox<T> {
unsafe {
let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
let new_count = intrinsics::atomic_xadd(&mut data.count, 1) + 1;
assert!(new_count >= 2);
// This barrier might be unnecessary, but I'm not sure...
let old_count = data.count.fetch_add(1, Acquire);
assert!(old_count >= 1);
cast::forget(data);
return UnsafeAtomicRcBox { data: self.data };
}
@ -77,12 +156,37 @@ impl<T: Send> Clone for UnsafeAtomicRcBox<T> {
impl<T> Drop for UnsafeAtomicRcBox<T>{
fn drop(&self) {
unsafe {
if self.data.is_null() {
return; // Happens when destructing an unwrapper's handle.
}
do task::unkillable {
let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
let new_count = intrinsics::atomic_xsub(&mut data.count, 1) - 1;
assert!(new_count >= 0);
if new_count == 0 {
// drop glue takes over.
// Must be acquire+release, not just release, to make sure this
// doesn't get reordered to after the unwrapper pointer load.
let old_count = data.count.fetch_sub(1, SeqCst);
assert!(old_count >= 1);
if old_count == 1 {
// Were we really last, or should we hand off to an
// unwrapper? It's safe to not xchg because the unwrapper
// will set the unwrap lock *before* dropping his/her
// reference. In effect, being here means we're the only
// *awake* task with the data.
match data.unwrapper.take(Acquire) {
Some(~(message,response)) => {
// Send 'ready' and wait for a response.
message.send(());
// Unkillable wait. Message guaranteed to come.
if response.recv() {
// Other task got the data.
cast::forget(data);
} else {
// Other task was killed. drop glue takes over.
}
}
None => {
// drop glue takes over.
}
}
} else {
cast::forget(data);
}
@ -139,6 +243,13 @@ struct ExData<T> {
/**
* An arc over mutable data that is protected by a lock. For library use only.
*
* # Safety note
*
* This uses a pthread mutex, not one that's aware of the userspace scheduler.
* The user of an exclusive must be careful not to invoke any functions that may
* reschedule the task while holding the lock, or deadlock may result. If you
* need to block or yield while accessing shared state, use extra::sync::RWARC.
*/
pub struct Exclusive<T> {
x: UnsafeAtomicRcBox<ExData<T>>
@ -189,12 +300,13 @@ impl<T:Send> Exclusive<T> {
f(cast::transmute_immut(x))
}
}
}
fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool {
unsafe {
let old = intrinsics::atomic_cxchg(address, oldval, newval);
old == oldval
pub fn unwrap(self) -> T {
let Exclusive { x: x } = self;
// Someday we might need to unkillably unwrap an exclusive, but not today.
let inner = unsafe { x.unwrap() };
let ExData { data: user_data, _ } = inner; // will destroy the LittleLock
user_data
}
}
@ -208,10 +320,13 @@ extern {
#[cfg(test)]
mod tests {
use super::*;
use cell::Cell;
use comm;
use super::exclusive;
use option::*;
use super::{exclusive, UnsafeAtomicRcBox};
use task;
use uint;
use util;
#[test]
fn exclusive_arc() {
@ -263,4 +378,81 @@ mod tests {
}
}
}
#[test]
fn unsafe_unwrap_basic() {
unsafe {
let x = UnsafeAtomicRcBox::new(~~"hello");
assert!(x.unwrap() == ~~"hello");
}
}
#[test]
fn exclusive_unwrap_basic() {
// Unlike the above, also tests no double-freeing of the LittleLock.
let x = exclusive(~~"hello");
assert!(x.unwrap() == ~~"hello");
}
#[test]
fn exclusive_unwrap_contended() {
let x = exclusive(~~"hello");
let x2 = Cell::new(x.clone());
do task::spawn {
let x2 = x2.take();
unsafe { do x2.with |_hello| { } }
task::yield();
}
assert!(x.unwrap() == ~~"hello");
// Now try the same thing, but with the child task blocking.
let x = exclusive(~~"hello");
let x2 = Cell::new(x.clone());
let mut res = None;
let mut builder = task::task();
builder.future_result(|r| res = Some(r));
do builder.spawn {
let x2 = x2.take();
assert!(x2.unwrap() == ~~"hello");
}
// Have to get rid of our reference before blocking.
util::ignore(x);
res.unwrap().recv();
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn exclusive_unwrap_conflict() {
let x = exclusive(~~"hello");
let x2 = Cell::new(x.clone());
let mut res = None;
let mut builder = task::task();
builder.future_result(|r| res = Some(r));
do builder.spawn {
let x2 = x2.take();
assert!(x2.unwrap() == ~~"hello");
}
assert!(x.unwrap() == ~~"hello");
// See #4689 for why this can't be just "res.recv()".
assert!(res.unwrap().recv() == task::Success);
}
#[test] #[ignore(cfg(windows))]
fn exclusive_unwrap_deadlock() {
// This is not guaranteed to get to the deadlock before being killed,
// but it will show up sometimes, and if the deadlock were not there,
// the test would nondeterministically fail.
let result = do task::try {
// a task that has two references to the same exclusive will
// deadlock when it unwraps. nothing to be done about that.
let x = exclusive(~~"hello");
let x2 = x.clone();
do task::spawn {
for 10.times { task::yield(); } // try to let the unwrapper go
fail!(); // punt it awake from its deadlock
}
let _z = x.unwrap();
unsafe { do x2.with |_hello| { } }
};
assert!(result.is_err());
}
}