Implement KillHandle::kill() and friends (unkillable, atomically). Close #6377.
This commit is contained in:
parent
2a99320583
commit
629f6e8d68
@ -10,15 +10,38 @@
|
||||
|
||||
//! Task death: asynchronous killing, linked failure, exit code propagation.
|
||||
|
||||
use cast;
|
||||
use cell::Cell;
|
||||
use option::{Option, Some, None};
|
||||
use prelude::*;
|
||||
use rt::task::Task;
|
||||
use unstable::atomics::{AtomicUint, SeqCst};
|
||||
use unstable::sync::{UnsafeAtomicRcBox, LittleLock};
|
||||
use util;
|
||||
|
||||
static KILLED_MSG: &'static str = "killed by linked failure";
|
||||
|
||||
// State values for the 'killed' and 'unkillable' atomic flags below.
|
||||
static KILL_RUNNING: uint = 0;
|
||||
static KILL_KILLED: uint = 1;
|
||||
static KILL_UNKILLABLE: uint = 2;
|
||||
|
||||
// FIXME(#7544)(bblum): think about the cache efficiency of this
|
||||
struct KillHandleInner {
|
||||
// ((more fields to be added in a future commit))
|
||||
// Is the task running, blocked, or killed? Possible values:
|
||||
// * KILL_RUNNING - Not unkillable, no kill pending.
|
||||
// * KILL_KILLED - Kill pending.
|
||||
// * <ptr> - A transmuted blocked ~Task pointer.
|
||||
// This flag is refcounted because it may also be referenced by a blocking
|
||||
// concurrency primitive, used to wake the task normally, whose reference
|
||||
// may outlive the handle's if the task is killed.
|
||||
killed: UnsafeAtomicRcBox<AtomicUint>,
|
||||
// Has the task deferred kill signals? This flag guards the above one.
|
||||
// Possible values:
|
||||
// * KILL_RUNNING - Not unkillable, no kill pending.
|
||||
// * KILL_KILLED - Kill pending.
|
||||
// * KILL_UNKILLABLE - Kill signals deferred.
|
||||
unkillable: AtomicUint,
|
||||
|
||||
// Shared state between task and children for exit code propagation. These
|
||||
// are here so we can re-use the kill handle to implement watched children
|
||||
@ -47,13 +70,18 @@ pub struct Death {
|
||||
// Action to be done with the exit code. If set, also makes the task wait
|
||||
// until all its watched children exit before collecting the status.
|
||||
on_exit: Option<~fn(bool)>,
|
||||
// nesting level counter for task::unkillable calls (0 == killable).
|
||||
unkillable: int,
|
||||
// nesting level counter for task::atomically calls (0 == can yield).
|
||||
wont_sleep: int,
|
||||
}
|
||||
|
||||
impl KillHandle {
|
||||
pub fn new() -> KillHandle {
|
||||
KillHandle(UnsafeAtomicRcBox::new(KillHandleInner {
|
||||
// Linked failure fields
|
||||
// ((none yet))
|
||||
killed: UnsafeAtomicRcBox::new(AtomicUint::new(KILL_RUNNING)),
|
||||
unkillable: AtomicUint::new(KILL_RUNNING),
|
||||
// Exit code propagation fields
|
||||
any_child_failed: false,
|
||||
child_tombstones: None,
|
||||
@ -61,6 +89,54 @@ impl KillHandle {
|
||||
}))
|
||||
}
|
||||
|
||||
// Will begin unwinding if a kill signal was received, unless already_failing.
|
||||
// This can't be used recursively, because a task which sees a KILLED
|
||||
// signal must fail immediately, which an already-unkillable task can't do.
|
||||
#[inline]
|
||||
pub fn inhibit_kill(&mut self, already_failing: bool) {
|
||||
let inner = unsafe { &mut *self.get() };
|
||||
// Expect flag to contain RUNNING. If KILLED, it should stay KILLED.
|
||||
// FIXME(#7544)(bblum): is it really necessary to prohibit double kill?
|
||||
match inner.unkillable.compare_and_swap(KILL_RUNNING, KILL_UNKILLABLE, SeqCst) {
|
||||
KILL_RUNNING => { }, // normal case
|
||||
KILL_KILLED => if !already_failing { fail!(KILLED_MSG) },
|
||||
_ => rtabort!("inhibit_kill: task already unkillable"),
|
||||
}
|
||||
}
|
||||
|
||||
// Will begin unwinding if a kill signal was received, unless already_failing.
|
||||
#[inline]
|
||||
pub fn allow_kill(&mut self, already_failing: bool) {
|
||||
let inner = unsafe { &mut *self.get() };
|
||||
// Expect flag to contain UNKILLABLE. If KILLED, it should stay KILLED.
|
||||
// FIXME(#7544)(bblum): is it really necessary to prohibit double kill?
|
||||
match inner.unkillable.compare_and_swap(KILL_UNKILLABLE, KILL_RUNNING, SeqCst) {
|
||||
KILL_UNKILLABLE => { }, // normal case
|
||||
KILL_KILLED => if !already_failing { fail!(KILLED_MSG) },
|
||||
_ => rtabort!("allow_kill: task already killable"),
|
||||
}
|
||||
}
|
||||
|
||||
// Send a kill signal to the handle's owning task. Returns the task itself
|
||||
// if it was blocked and needs punted awake. To be called by other tasks.
|
||||
pub fn kill(&mut self) -> Option<~Task> {
|
||||
let inner = unsafe { &mut *self.get() };
|
||||
if inner.unkillable.swap(KILL_KILLED, SeqCst) == KILL_RUNNING {
|
||||
// Got in. Allowed to try to punt the task awake.
|
||||
let flag = unsafe { &mut *inner.killed.get() };
|
||||
match flag.swap(KILL_KILLED, SeqCst) {
|
||||
// Task either not blocked or already taken care of.
|
||||
KILL_RUNNING | KILL_KILLED => None,
|
||||
// Got ownership of the blocked task.
|
||||
task_ptr => Some(unsafe { cast::transmute(task_ptr) }),
|
||||
}
|
||||
} else {
|
||||
// Otherwise it was either unkillable or already killed. Somebody
|
||||
// else was here first who will deal with the kill signal.
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn notify_immediate_failure(&mut self) {
|
||||
// A benign data race may happen here if there are failing sibling
|
||||
// tasks that were also spawned-watched. The refcount's write barriers
|
||||
@ -123,6 +199,7 @@ impl KillHandle {
|
||||
}
|
||||
|
||||
// NB: Takes a pthread mutex -- 'blk' not allowed to reschedule.
|
||||
#[inline]
|
||||
fn add_lazy_tombstone(parent: &mut KillHandle,
|
||||
blk: &fn(Option<~fn() -> bool>) -> ~fn() -> bool) {
|
||||
|
||||
@ -144,6 +221,8 @@ impl Death {
|
||||
kill_handle: Some(KillHandle::new()),
|
||||
watching_parent: None,
|
||||
on_exit: None,
|
||||
unkillable: 0,
|
||||
wont_sleep: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@ -153,11 +232,22 @@ impl Death {
|
||||
kill_handle: Some(KillHandle::new()),
|
||||
watching_parent: self.kill_handle.clone(),
|
||||
on_exit: None,
|
||||
unkillable: 0,
|
||||
wont_sleep: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Collect failure exit codes from children and propagate them to a parent.
|
||||
pub fn collect_failure(&mut self, mut success: bool) {
|
||||
// This may run after the task has already failed, so even though the
|
||||
// task appears to need to be killed, the scheduler should not fail us
|
||||
// when we block to unwrap.
|
||||
// (XXX: Another less-elegant reason for doing this is so that the use
|
||||
// of the LittleLock in reparent_children_to doesn't need to access the
|
||||
// unkillable flag in the kill_handle, since we'll have removed it.)
|
||||
rtassert!(self.unkillable == 0);
|
||||
self.unkillable = 1;
|
||||
|
||||
// Step 1. Decide if we need to collect child failures synchronously.
|
||||
do self.on_exit.take_map |on_exit| {
|
||||
if success {
|
||||
@ -191,6 +281,64 @@ impl Death {
|
||||
parent_handle.notify_immediate_failure();
|
||||
}
|
||||
};
|
||||
|
||||
// Can't use allow_kill directly; that would require the kill handle.
|
||||
rtassert!(self.unkillable == 1);
|
||||
self.unkillable = 0;
|
||||
}
|
||||
|
||||
/// Enter a possibly-nested unkillable section of code.
|
||||
/// All calls must be paired with a subsequent call to allow_kill.
|
||||
#[inline]
|
||||
pub fn inhibit_kill(&mut self, already_failing: bool) {
|
||||
if self.unkillable == 0 {
|
||||
rtassert!(self.kill_handle.is_some());
|
||||
self.kill_handle.get_mut_ref().inhibit_kill(already_failing);
|
||||
}
|
||||
self.unkillable += 1;
|
||||
}
|
||||
|
||||
/// Exit a possibly-nested unkillable section of code.
|
||||
/// All calls must be paired with a preceding call to inhibit_kill.
|
||||
#[inline]
|
||||
pub fn allow_kill(&mut self, already_failing: bool) {
|
||||
rtassert!(self.unkillable != 0);
|
||||
self.unkillable -= 1;
|
||||
if self.unkillable == 0 {
|
||||
rtassert!(self.kill_handle.is_some());
|
||||
self.kill_handle.get_mut_ref().allow_kill(already_failing);
|
||||
}
|
||||
}
|
||||
|
||||
/// Enter a possibly-nested "atomic" section of code. Just for assertions.
|
||||
/// All calls must be paired with a subsequent call to allow_yield.
|
||||
#[inline]
|
||||
pub fn inhibit_yield(&mut self) {
|
||||
self.wont_sleep += 1;
|
||||
}
|
||||
|
||||
/// Exit a possibly-nested "atomic" section of code. Just for assertions.
|
||||
/// All calls must be paired with a preceding call to inhibit_yield.
|
||||
#[inline]
|
||||
pub fn allow_yield(&mut self) {
|
||||
rtassert!(self.wont_sleep != 0);
|
||||
self.wont_sleep -= 1;
|
||||
}
|
||||
|
||||
/// Ensure that the task is allowed to become descheduled.
|
||||
#[inline]
|
||||
pub fn assert_may_sleep(&self) {
|
||||
if self.wont_sleep != 0 {
|
||||
rtabort!("illegal atomic-sleep: can't deschedule inside atomically()");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Death {
|
||||
fn drop(&self) {
|
||||
// Mustn't be in an atomic or unkillable section at task death.
|
||||
rtassert!(self.unkillable == 0);
|
||||
rtassert!(self.wont_sleep == 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -42,7 +42,8 @@ use cmp::Eq;
|
||||
use comm::{stream, Chan, GenericChan, GenericPort, Port};
|
||||
use result::Result;
|
||||
use result;
|
||||
use rt::{context, OldTaskContext};
|
||||
use rt::{context, OldTaskContext, TaskContext};
|
||||
use rt::local::Local;
|
||||
use task::rt::{task_id, sched_id};
|
||||
use unstable::finally::Finally;
|
||||
use util::replace;
|
||||
@ -526,8 +527,6 @@ pub fn yield() {
|
||||
pub fn failing() -> bool {
|
||||
//! True if the running task has failed
|
||||
|
||||
use rt::{context, OldTaskContext};
|
||||
use rt::local::Local;
|
||||
use rt::task::Task;
|
||||
|
||||
match context() {
|
||||
@ -572,33 +571,59 @@ pub fn get_scheduler() -> Scheduler {
|
||||
* ~~~
|
||||
*/
|
||||
pub unsafe fn unkillable<U>(f: &fn() -> U) -> U {
|
||||
if context() == OldTaskContext {
|
||||
let t = rt::rust_get_task();
|
||||
do (|| {
|
||||
rt::rust_task_inhibit_kill(t);
|
||||
f()
|
||||
}).finally {
|
||||
rt::rust_task_allow_kill(t);
|
||||
use rt::task::Task;
|
||||
|
||||
match context() {
|
||||
OldTaskContext => {
|
||||
let t = rt::rust_get_task();
|
||||
do (|| {
|
||||
rt::rust_task_inhibit_kill(t);
|
||||
f()
|
||||
}).finally {
|
||||
rt::rust_task_allow_kill(t);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// FIXME #6377
|
||||
f()
|
||||
TaskContext => {
|
||||
// The inhibits/allows might fail and need to borrow the task.
|
||||
let t = Local::unsafe_borrow::<Task>();
|
||||
do (|| {
|
||||
(*t).death.inhibit_kill((*t).unwinder.unwinding);
|
||||
f()
|
||||
}).finally {
|
||||
(*t).death.allow_kill((*t).unwinder.unwinding);
|
||||
}
|
||||
}
|
||||
// FIXME(#3095): This should be an rtabort as soon as the scheduler
|
||||
// no longer uses a workqueue implemented with an Exclusive.
|
||||
_ => f()
|
||||
}
|
||||
}
|
||||
|
||||
/// The inverse of unkillable. Only ever to be used nested in unkillable().
|
||||
pub unsafe fn rekillable<U>(f: &fn() -> U) -> U {
|
||||
if context() == OldTaskContext {
|
||||
let t = rt::rust_get_task();
|
||||
do (|| {
|
||||
rt::rust_task_allow_kill(t);
|
||||
f()
|
||||
}).finally {
|
||||
rt::rust_task_inhibit_kill(t);
|
||||
use rt::task::Task;
|
||||
|
||||
match context() {
|
||||
OldTaskContext => {
|
||||
let t = rt::rust_get_task();
|
||||
do (|| {
|
||||
rt::rust_task_allow_kill(t);
|
||||
f()
|
||||
}).finally {
|
||||
rt::rust_task_inhibit_kill(t);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// FIXME #6377
|
||||
f()
|
||||
TaskContext => {
|
||||
let t = Local::unsafe_borrow::<Task>();
|
||||
do (|| {
|
||||
(*t).death.allow_kill((*t).unwinder.unwinding);
|
||||
f()
|
||||
}).finally {
|
||||
(*t).death.inhibit_kill((*t).unwinder.unwinding);
|
||||
}
|
||||
}
|
||||
// FIXME(#3095): As in unkillable().
|
||||
_ => f()
|
||||
}
|
||||
}
|
||||
|
||||
@ -607,19 +632,36 @@ pub unsafe fn rekillable<U>(f: &fn() -> U) -> U {
|
||||
* For use with exclusive ARCs, which use pthread mutexes directly.
|
||||
*/
|
||||
pub unsafe fn atomically<U>(f: &fn() -> U) -> U {
|
||||
if context() == OldTaskContext {
|
||||
let t = rt::rust_get_task();
|
||||
do (|| {
|
||||
rt::rust_task_inhibit_kill(t);
|
||||
rt::rust_task_inhibit_yield(t);
|
||||
f()
|
||||
}).finally {
|
||||
rt::rust_task_allow_yield(t);
|
||||
rt::rust_task_allow_kill(t);
|
||||
use rt::task::Task;
|
||||
|
||||
match context() {
|
||||
OldTaskContext => {
|
||||
let t = rt::rust_get_task();
|
||||
do (|| {
|
||||
rt::rust_task_inhibit_kill(t);
|
||||
rt::rust_task_inhibit_yield(t);
|
||||
f()
|
||||
}).finally {
|
||||
rt::rust_task_allow_yield(t);
|
||||
rt::rust_task_allow_kill(t);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// FIXME #6377
|
||||
f()
|
||||
TaskContext => {
|
||||
let t = Local::unsafe_borrow::<Task>();
|
||||
do (|| {
|
||||
// It's important to inhibit kill after inhibiting yield, because
|
||||
// inhibit-kill might fail if we were already killed, and the
|
||||
// inhibit-yield must happen to match the finally's allow-yield.
|
||||
(*t).death.inhibit_yield();
|
||||
(*t).death.inhibit_kill((*t).unwinder.unwinding);
|
||||
f()
|
||||
}).finally {
|
||||
(*t).death.allow_kill((*t).unwinder.unwinding);
|
||||
(*t).death.allow_yield();
|
||||
}
|
||||
}
|
||||
// FIXME(#3095): As in unkillable().
|
||||
_ => f()
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user