From 629f6e8d68be06bf07f803db64be6a917a66b2cf Mon Sep 17 00:00:00 2001 From: Ben Blum Date: Mon, 8 Jul 2013 13:48:57 -0400 Subject: [PATCH] Implement KillHandle::kill() and friends (unkillable, atomically). Close #6377. --- src/libstd/rt/kill.rs | 152 ++++++++++++++++++++++++++++++++++++++++- src/libstd/task/mod.rs | 112 ++++++++++++++++++++---------- 2 files changed, 227 insertions(+), 37 deletions(-) diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs index 2ae62fc91e8..929e69d6173 100644 --- a/src/libstd/rt/kill.rs +++ b/src/libstd/rt/kill.rs @@ -10,15 +10,38 @@ //! Task death: asynchronous killing, linked failure, exit code propagation. +use cast; use cell::Cell; use option::{Option, Some, None}; use prelude::*; +use rt::task::Task; +use unstable::atomics::{AtomicUint, SeqCst}; use unstable::sync::{UnsafeAtomicRcBox, LittleLock}; use util; +static KILLED_MSG: &'static str = "killed by linked failure"; + +// State values for the 'killed' and 'unkillable' atomic flags below. +static KILL_RUNNING: uint = 0; +static KILL_KILLED: uint = 1; +static KILL_UNKILLABLE: uint = 2; + // FIXME(#7544)(bblum): think about the cache efficiency of this struct KillHandleInner { - // ((more fields to be added in a future commit)) + // Is the task running, blocked, or killed? Possible values: + // * KILL_RUNNING - Not unkillable, no kill pending. + // * KILL_KILLED - Kill pending. + // * - A transmuted blocked ~Task pointer. + // This flag is refcounted because it may also be referenced by a blocking + // concurrency primitive, used to wake the task normally, whose reference + // may outlive the handle's if the task is killed. + killed: UnsafeAtomicRcBox, + // Has the task deferred kill signals? This flag guards the above one. + // Possible values: + // * KILL_RUNNING - Not unkillable, no kill pending. + // * KILL_KILLED - Kill pending. + // * KILL_UNKILLABLE - Kill signals deferred. + unkillable: AtomicUint, // Shared state between task and children for exit code propagation. These // are here so we can re-use the kill handle to implement watched children @@ -47,13 +70,18 @@ pub struct Death { // Action to be done with the exit code. If set, also makes the task wait // until all its watched children exit before collecting the status. on_exit: Option<~fn(bool)>, + // nesting level counter for task::unkillable calls (0 == killable). + unkillable: int, + // nesting level counter for task::atomically calls (0 == can yield). + wont_sleep: int, } impl KillHandle { pub fn new() -> KillHandle { KillHandle(UnsafeAtomicRcBox::new(KillHandleInner { // Linked failure fields - // ((none yet)) + killed: UnsafeAtomicRcBox::new(AtomicUint::new(KILL_RUNNING)), + unkillable: AtomicUint::new(KILL_RUNNING), // Exit code propagation fields any_child_failed: false, child_tombstones: None, @@ -61,6 +89,54 @@ impl KillHandle { })) } + // Will begin unwinding if a kill signal was received, unless already_failing. + // This can't be used recursively, because a task which sees a KILLED + // signal must fail immediately, which an already-unkillable task can't do. + #[inline] + pub fn inhibit_kill(&mut self, already_failing: bool) { + let inner = unsafe { &mut *self.get() }; + // Expect flag to contain RUNNING. If KILLED, it should stay KILLED. + // FIXME(#7544)(bblum): is it really necessary to prohibit double kill? + match inner.unkillable.compare_and_swap(KILL_RUNNING, KILL_UNKILLABLE, SeqCst) { + KILL_RUNNING => { }, // normal case + KILL_KILLED => if !already_failing { fail!(KILLED_MSG) }, + _ => rtabort!("inhibit_kill: task already unkillable"), + } + } + + // Will begin unwinding if a kill signal was received, unless already_failing. + #[inline] + pub fn allow_kill(&mut self, already_failing: bool) { + let inner = unsafe { &mut *self.get() }; + // Expect flag to contain UNKILLABLE. If KILLED, it should stay KILLED. + // FIXME(#7544)(bblum): is it really necessary to prohibit double kill? + match inner.unkillable.compare_and_swap(KILL_UNKILLABLE, KILL_RUNNING, SeqCst) { + KILL_UNKILLABLE => { }, // normal case + KILL_KILLED => if !already_failing { fail!(KILLED_MSG) }, + _ => rtabort!("allow_kill: task already killable"), + } + } + + // Send a kill signal to the handle's owning task. Returns the task itself + // if it was blocked and needs punted awake. To be called by other tasks. + pub fn kill(&mut self) -> Option<~Task> { + let inner = unsafe { &mut *self.get() }; + if inner.unkillable.swap(KILL_KILLED, SeqCst) == KILL_RUNNING { + // Got in. Allowed to try to punt the task awake. + let flag = unsafe { &mut *inner.killed.get() }; + match flag.swap(KILL_KILLED, SeqCst) { + // Task either not blocked or already taken care of. + KILL_RUNNING | KILL_KILLED => None, + // Got ownership of the blocked task. + task_ptr => Some(unsafe { cast::transmute(task_ptr) }), + } + } else { + // Otherwise it was either unkillable or already killed. Somebody + // else was here first who will deal with the kill signal. + None + } + } + pub fn notify_immediate_failure(&mut self) { // A benign data race may happen here if there are failing sibling // tasks that were also spawned-watched. The refcount's write barriers @@ -123,6 +199,7 @@ impl KillHandle { } // NB: Takes a pthread mutex -- 'blk' not allowed to reschedule. + #[inline] fn add_lazy_tombstone(parent: &mut KillHandle, blk: &fn(Option<~fn() -> bool>) -> ~fn() -> bool) { @@ -144,6 +221,8 @@ impl Death { kill_handle: Some(KillHandle::new()), watching_parent: None, on_exit: None, + unkillable: 0, + wont_sleep: 0, } } @@ -153,11 +232,22 @@ impl Death { kill_handle: Some(KillHandle::new()), watching_parent: self.kill_handle.clone(), on_exit: None, + unkillable: 0, + wont_sleep: 0, } } /// Collect failure exit codes from children and propagate them to a parent. pub fn collect_failure(&mut self, mut success: bool) { + // This may run after the task has already failed, so even though the + // task appears to need to be killed, the scheduler should not fail us + // when we block to unwrap. + // (XXX: Another less-elegant reason for doing this is so that the use + // of the LittleLock in reparent_children_to doesn't need to access the + // unkillable flag in the kill_handle, since we'll have removed it.) + rtassert!(self.unkillable == 0); + self.unkillable = 1; + // Step 1. Decide if we need to collect child failures synchronously. do self.on_exit.take_map |on_exit| { if success { @@ -191,6 +281,64 @@ impl Death { parent_handle.notify_immediate_failure(); } }; + + // Can't use allow_kill directly; that would require the kill handle. + rtassert!(self.unkillable == 1); + self.unkillable = 0; + } + + /// Enter a possibly-nested unkillable section of code. + /// All calls must be paired with a subsequent call to allow_kill. + #[inline] + pub fn inhibit_kill(&mut self, already_failing: bool) { + if self.unkillable == 0 { + rtassert!(self.kill_handle.is_some()); + self.kill_handle.get_mut_ref().inhibit_kill(already_failing); + } + self.unkillable += 1; + } + + /// Exit a possibly-nested unkillable section of code. + /// All calls must be paired with a preceding call to inhibit_kill. + #[inline] + pub fn allow_kill(&mut self, already_failing: bool) { + rtassert!(self.unkillable != 0); + self.unkillable -= 1; + if self.unkillable == 0 { + rtassert!(self.kill_handle.is_some()); + self.kill_handle.get_mut_ref().allow_kill(already_failing); + } + } + + /// Enter a possibly-nested "atomic" section of code. Just for assertions. + /// All calls must be paired with a subsequent call to allow_yield. + #[inline] + pub fn inhibit_yield(&mut self) { + self.wont_sleep += 1; + } + + /// Exit a possibly-nested "atomic" section of code. Just for assertions. + /// All calls must be paired with a preceding call to inhibit_yield. + #[inline] + pub fn allow_yield(&mut self) { + rtassert!(self.wont_sleep != 0); + self.wont_sleep -= 1; + } + + /// Ensure that the task is allowed to become descheduled. + #[inline] + pub fn assert_may_sleep(&self) { + if self.wont_sleep != 0 { + rtabort!("illegal atomic-sleep: can't deschedule inside atomically()"); + } + } +} + +impl Drop for Death { + fn drop(&self) { + // Mustn't be in an atomic or unkillable section at task death. + rtassert!(self.unkillable == 0); + rtassert!(self.wont_sleep == 0); } } diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index 2fec9858c88..f2c1d2ffd9d 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -42,7 +42,8 @@ use cmp::Eq; use comm::{stream, Chan, GenericChan, GenericPort, Port}; use result::Result; use result; -use rt::{context, OldTaskContext}; +use rt::{context, OldTaskContext, TaskContext}; +use rt::local::Local; use task::rt::{task_id, sched_id}; use unstable::finally::Finally; use util::replace; @@ -526,8 +527,6 @@ pub fn yield() { pub fn failing() -> bool { //! True if the running task has failed - use rt::{context, OldTaskContext}; - use rt::local::Local; use rt::task::Task; match context() { @@ -572,33 +571,59 @@ pub fn get_scheduler() -> Scheduler { * ~~~ */ pub unsafe fn unkillable(f: &fn() -> U) -> U { - if context() == OldTaskContext { - let t = rt::rust_get_task(); - do (|| { - rt::rust_task_inhibit_kill(t); - f() - }).finally { - rt::rust_task_allow_kill(t); + use rt::task::Task; + + match context() { + OldTaskContext => { + let t = rt::rust_get_task(); + do (|| { + rt::rust_task_inhibit_kill(t); + f() + }).finally { + rt::rust_task_allow_kill(t); + } } - } else { - // FIXME #6377 - f() + TaskContext => { + // The inhibits/allows might fail and need to borrow the task. + let t = Local::unsafe_borrow::(); + do (|| { + (*t).death.inhibit_kill((*t).unwinder.unwinding); + f() + }).finally { + (*t).death.allow_kill((*t).unwinder.unwinding); + } + } + // FIXME(#3095): This should be an rtabort as soon as the scheduler + // no longer uses a workqueue implemented with an Exclusive. + _ => f() } } /// The inverse of unkillable. Only ever to be used nested in unkillable(). pub unsafe fn rekillable(f: &fn() -> U) -> U { - if context() == OldTaskContext { - let t = rt::rust_get_task(); - do (|| { - rt::rust_task_allow_kill(t); - f() - }).finally { - rt::rust_task_inhibit_kill(t); + use rt::task::Task; + + match context() { + OldTaskContext => { + let t = rt::rust_get_task(); + do (|| { + rt::rust_task_allow_kill(t); + f() + }).finally { + rt::rust_task_inhibit_kill(t); + } } - } else { - // FIXME #6377 - f() + TaskContext => { + let t = Local::unsafe_borrow::(); + do (|| { + (*t).death.allow_kill((*t).unwinder.unwinding); + f() + }).finally { + (*t).death.inhibit_kill((*t).unwinder.unwinding); + } + } + // FIXME(#3095): As in unkillable(). + _ => f() } } @@ -607,19 +632,36 @@ pub unsafe fn rekillable(f: &fn() -> U) -> U { * For use with exclusive ARCs, which use pthread mutexes directly. */ pub unsafe fn atomically(f: &fn() -> U) -> U { - if context() == OldTaskContext { - let t = rt::rust_get_task(); - do (|| { - rt::rust_task_inhibit_kill(t); - rt::rust_task_inhibit_yield(t); - f() - }).finally { - rt::rust_task_allow_yield(t); - rt::rust_task_allow_kill(t); + use rt::task::Task; + + match context() { + OldTaskContext => { + let t = rt::rust_get_task(); + do (|| { + rt::rust_task_inhibit_kill(t); + rt::rust_task_inhibit_yield(t); + f() + }).finally { + rt::rust_task_allow_yield(t); + rt::rust_task_allow_kill(t); + } } - } else { - // FIXME #6377 - f() + TaskContext => { + let t = Local::unsafe_borrow::(); + do (|| { + // It's important to inhibit kill after inhibiting yield, because + // inhibit-kill might fail if we were already killed, and the + // inhibit-yield must happen to match the finally's allow-yield. + (*t).death.inhibit_yield(); + (*t).death.inhibit_kill((*t).unwinder.unwinding); + f() + }).finally { + (*t).death.allow_kill((*t).unwinder.unwinding); + (*t).death.allow_yield(); + } + } + // FIXME(#3095): As in unkillable(). + _ => f() } }