auto merge of #8195 : bblum/rust/task-cleanup, r=brson

In the first commit it is obvious why some of the barriers can be changed to ```Relaxed```, but it is not as obvious for the once I changed in ```kill.rs```. The rationale for those is documented as part of the documenting commit.

Also the last commit is a temporary hack to prevent kill signals from being received in taskgroup cleanup code, which could be fixed in a more principled way once the old runtime is gone.
This commit is contained in:
bors 2013-08-02 07:31:52 -07:00
commit 986df44753
4 changed files with 94 additions and 29 deletions

View File

@ -18,7 +18,7 @@ use kinds::Send;
use rt::sched::Scheduler;
use rt::local::Local;
use rt::select::{Select, SelectPort};
use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Release, SeqCst};
use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst};
use unstable::sync::UnsafeAtomicRcBox;
use util::Void;
use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
@ -217,15 +217,15 @@ impl<T> Select for PortOne<T> {
}
STATE_ONE => {
// Re-record that we are the only owner of the packet.
// Release barrier needed in case the task gets reawoken
// on a different core (this is analogous to writing a
// payload; a barrier in enqueueing the task protects it).
// No barrier needed, even if the task gets reawoken
// on a different core -- this is analogous to writing a
// payload; a barrier in enqueueing the task protects it.
// NB(#8132). This *must* occur before the enqueue below.
// FIXME(#6842, #8130) This is usually only needed for the
// assertion in recv_ready, except in the case of select().
// This won't actually ever have cacheline contention, but
// maybe should be optimized out with a cfg(test) anyway?
(*self.packet()).state.store(STATE_ONE, Release);
(*self.packet()).state.store(STATE_ONE, Relaxed);
rtdebug!("rendezvous recv");
sched.metrics.rendezvous_recvs += 1;
@ -300,7 +300,7 @@ impl<T> SelectPort<T> for PortOne<T> {
unsafe {
// See corresponding store() above in block_on for rationale.
// FIXME(#8130) This can happen only in test builds.
assert!((*packet).state.load(Acquire) == STATE_ONE);
assert!((*packet).state.load(Relaxed) == STATE_ONE);
let payload = (*packet).payload.take();
@ -375,9 +375,7 @@ impl<T> Drop for PortOne<T> {
// receiver was killed awake. The task can't still be
// blocked (we are it), but we need to free the handle.
let recvr = BlockedTask::cast_from_uint(task_as_state);
// FIXME(#7554)(bblum): Make this cfg(test) dependent.
// in a later commit.
assert!(recvr.wake().is_none());
recvr.assert_already_awake();
}
}
}

View File

@ -8,7 +8,63 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Task death: asynchronous killing, linked failure, exit code propagation.
/*!
Task death: asynchronous killing, linked failure, exit code propagation.
This file implements two orthogonal building-blocks for communicating failure
between tasks. One is 'linked failure' or 'task killing', that is, a failing
task causing other tasks to fail promptly (even those that are blocked on
pipes or I/O). The other is 'exit code propagation', which affects the result
observed by the parent of a task::try task that itself spawns child tasks
(such as any #[test] function). In both cases the data structures live in
KillHandle.
I. Task killing.
The model for killing involves two atomic flags, the "kill flag" and the
"unkillable flag". Operations on the kill flag include:
- In the taskgroup code (task/spawn.rs), tasks store a clone of their
KillHandle in their shared taskgroup. Another task in the group that fails
will use that handle to call kill().
- When a task blocks, it turns its ~Task into a BlockedTask by storing a
the transmuted ~Task pointer inside the KillHandle's kill flag. A task
trying to block and a task trying to kill it can simultaneously access the
kill flag, after which the task will get scheduled and fail (no matter who
wins the race). Likewise, a task trying to wake a blocked task normally and
a task trying to kill it can simultaneously access the flag; only one will
get the task to reschedule it.
Operations on the unkillable flag include:
- When a task becomes unkillable, it swaps on the flag to forbid any killer
from waking it up while it's blocked inside the unkillable section. If a
kill was already pending, the task fails instead of becoming unkillable.
- When a task is done being unkillable, it restores the flag to the normal
running state. If a kill was received-but-blocked during the unkillable
section, the task fails at this later point.
- When a task tries to kill another task, before swapping on the kill flag, it
first swaps on the unkillable flag, to see if it's "allowed" to wake up the
task. If it isn't, the killed task will receive the signal when it becomes
killable again. (Of course, a task trying to wake the task normally (e.g.
sending on a channel) does not access the unkillable flag at all.)
Why do we not need acquire/release barriers on any of the kill flag swaps?
This is because barriers establish orderings between accesses on different
memory locations, but each kill-related operation is only a swap on a single
location, so atomicity is all that matters. The exception is kill(), which
does a swap on both flags in sequence. kill() needs no barriers because it
does not matter if its two accesses are seen reordered on another CPU: if a
killer does perform both writes, it means it saw a KILL_RUNNING in the
unkillable flag, which means an unkillable task will see KILL_KILLED and fail
immediately (rendering the subsequent write to the kill flag unnecessary).
II. Exit code propagation.
FIXME(#7544): Decide on the ultimate model for this and document it.
*/
use cast;
use cell::Cell;
@ -16,8 +72,9 @@ use either::{Either, Left, Right};
use option::{Option, Some, None};
use prelude::*;
use rt::task::Task;
use task::spawn::Taskgroup;
use to_bytes::IterBytes;
use unstable::atomics::{AtomicUint, Acquire, SeqCst};
use unstable::atomics::{AtomicUint, Relaxed};
use unstable::sync::{UnsafeAtomicRcBox, LittleLock};
use util;
@ -95,7 +152,7 @@ impl Drop for KillFlag {
// Letting a KillFlag with a task inside get dropped would leak the task.
// We could free it here, but the task should get awoken by hand somehow.
fn drop(&self) {
match self.load(Acquire) {
match self.load(Relaxed) {
KILL_RUNNING | KILL_KILLED => { },
_ => rtabort!("can't drop kill flag with a blocked task inside!"),
}
@ -124,7 +181,7 @@ impl BlockedTask {
Unkillable(task) => Some(task),
Killable(flag_arc) => {
let flag = unsafe { &mut **flag_arc.get() };
match flag.swap(KILL_RUNNING, SeqCst) {
match flag.swap(KILL_RUNNING, Relaxed) {
KILL_RUNNING => None, // woken from select(), perhaps
KILL_KILLED => None, // a killer stole it already
task_ptr =>
@ -159,7 +216,7 @@ impl BlockedTask {
let flag = &mut **flag_arc.get();
let task_ptr = cast::transmute(task);
// Expect flag to contain RUNNING. If KILLED, it should stay KILLED.
match flag.compare_and_swap(KILL_RUNNING, task_ptr, SeqCst) {
match flag.compare_and_swap(KILL_RUNNING, task_ptr, Relaxed) {
KILL_RUNNING => Right(Killable(flag_arc)),
KILL_KILLED => Left(revive_task_ptr(task_ptr, Some(flag_arc))),
x => rtabort!("can't block task! kill flag = %?", x),
@ -257,7 +314,7 @@ impl KillHandle {
let inner = unsafe { &mut *self.get() };
// Expect flag to contain RUNNING. If KILLED, it should stay KILLED.
// FIXME(#7544)(bblum): is it really necessary to prohibit double kill?
match inner.unkillable.compare_and_swap(KILL_RUNNING, KILL_UNKILLABLE, SeqCst) {
match inner.unkillable.compare_and_swap(KILL_RUNNING, KILL_UNKILLABLE, Relaxed) {
KILL_RUNNING => { }, // normal case
KILL_KILLED => if !already_failing { fail!(KILLED_MSG) },
_ => rtabort!("inhibit_kill: task already unkillable"),
@ -270,7 +327,7 @@ impl KillHandle {
let inner = unsafe { &mut *self.get() };
// Expect flag to contain UNKILLABLE. If KILLED, it should stay KILLED.
// FIXME(#7544)(bblum): is it really necessary to prohibit double kill?
match inner.unkillable.compare_and_swap(KILL_UNKILLABLE, KILL_RUNNING, SeqCst) {
match inner.unkillable.compare_and_swap(KILL_UNKILLABLE, KILL_RUNNING, Relaxed) {
KILL_UNKILLABLE => { }, // normal case
KILL_KILLED => if !already_failing { fail!(KILLED_MSG) },
_ => rtabort!("allow_kill: task already killable"),
@ -281,10 +338,10 @@ impl KillHandle {
// if it was blocked and needs punted awake. To be called by other tasks.
pub fn kill(&mut self) -> Option<~Task> {
let inner = unsafe { &mut *self.get() };
if inner.unkillable.swap(KILL_KILLED, SeqCst) == KILL_RUNNING {
if inner.unkillable.swap(KILL_KILLED, Relaxed) == KILL_RUNNING {
// Got in. Allowed to try to punt the task awake.
let flag = unsafe { &mut *inner.killed.get() };
match flag.swap(KILL_KILLED, SeqCst) {
match flag.swap(KILL_KILLED, Relaxed) {
// Task either not blocked or already taken care of.
KILL_RUNNING | KILL_KILLED => None,
// Got ownership of the blocked task.
@ -306,8 +363,11 @@ impl KillHandle {
// is unkillable with a kill signal pending.
let inner = unsafe { &*self.get() };
let flag = unsafe { &*inner.killed.get() };
// FIXME(#6598): can use relaxed ordering (i think)
flag.load(Acquire) == KILL_KILLED
// A barrier-related concern here is that a task that gets killed
// awake needs to see the killer's write of KILLED to this flag. This
// is analogous to receiving a pipe payload; the appropriate barrier
// should happen when enqueueing the task.
flag.load(Relaxed) == KILL_KILLED
}
pub fn notify_immediate_failure(&mut self) {
@ -415,7 +475,7 @@ impl Death {
}
/// Collect failure exit codes from children and propagate them to a parent.
pub fn collect_failure(&mut self, mut success: bool) {
pub fn collect_failure(&mut self, mut success: bool, group: Option<Taskgroup>) {
// This may run after the task has already failed, so even though the
// task appears to need to be killed, the scheduler should not fail us
// when we block to unwrap.
@ -425,6 +485,10 @@ impl Death {
rtassert!(self.unkillable == 0);
self.unkillable = 1;
// FIXME(#7544): See corresponding fixme at the callsite in task.rs.
// NB(#8192): Doesn't work with "let _ = ..."
{ use util; util::ignore(group); }
// Step 1. Decide if we need to collect child failures synchronously.
do self.on_exit.take_map |on_exit| {
if success {

View File

@ -212,8 +212,13 @@ impl Task {
pub fn run(&mut self, f: &fn()) {
rtdebug!("run called on task: %u", borrow::to_uint(self));
self.unwinder.try(f);
{ let _ = self.taskgroup.take(); }
self.death.collect_failure(!self.unwinder.unwinding);
// FIXME(#7544): We pass the taskgroup into death so that it can be
// dropped while the unkillable counter is set. This should not be
// necessary except for an extraneous clone() in task/spawn.rs that
// causes a killhandle to get dropped, which mustn't receive a kill
// signal since we're outside of the unwinder's try() scope.
// { let _ = self.taskgroup.take(); }
self.death.collect_failure(!self.unwinder.unwinding, self.taskgroup.take());
self.destroy();
}

View File

@ -16,7 +16,7 @@ use ptr;
use option::*;
use either::{Either, Left, Right};
use task;
use unstable::atomics::{AtomicOption,AtomicUint,Acquire,Release,SeqCst};
use unstable::atomics::{AtomicOption,AtomicUint,Acquire,Release,Relaxed,SeqCst};
use unstable::finally::Finally;
use ops::Drop;
use clone::Clone;
@ -95,8 +95,7 @@ impl<T: Send> UnsafeAtomicRcBox<T> {
pub fn get(&self) -> *mut T {
unsafe {
let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
// FIXME(#6598) Change Acquire to Relaxed.
assert!(data.count.load(Acquire) > 0);
assert!(data.count.load(Relaxed) > 0);
let r: *mut T = data.data.get_mut_ref();
cast::forget(data);
return r;
@ -107,7 +106,7 @@ impl<T: Send> UnsafeAtomicRcBox<T> {
pub fn get_immut(&self) -> *T {
unsafe {
let data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
assert!(data.count.load(Acquire) > 0); // no barrier is really needed
assert!(data.count.load(Relaxed) > 0);
let r: *T = data.data.get_ref();
cast::forget(data);
return r;
@ -130,8 +129,7 @@ impl<T: Send> UnsafeAtomicRcBox<T> {
// Try to put our server end in the unwrapper slot.
// This needs no barrier -- it's protected by the release barrier on
// the xadd, and the acquire+release barrier in the destructor's xadd.
// FIXME(#6598) Change Acquire to Relaxed.
if data.unwrapper.fill(~(c1,p2), Acquire).is_none() {
if data.unwrapper.fill(~(c1,p2), Relaxed).is_none() {
// Got in. Tell this handle's destructor not to run (we are now it).
this.data = ptr::mut_null();
// Drop our own reference.