Rollup merge of #47970 - vlovich:condvar_wait_until, r=dtolnay

Add Condvar APIs not susceptible to spurious wake

Provide wait_until and wait_timeout_until helper wrappers that aren't susceptible to spurious wake.
Additionally wait_timeout_until makes it possible to more easily write code that waits for a fixed amount of time in face of spurious wakes since otherwise each user would have to do math on adjusting the duration.

Implements #47960.
This commit is contained in:
kennytm 2018-02-25 15:54:39 +08:00 committed by GitHub
commit e253224541
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -14,7 +14,7 @@ use sync::{mutex, MutexGuard, PoisonError};
use sys_common::condvar as sys; use sys_common::condvar as sys;
use sys_common::mutex as sys_mutex; use sys_common::mutex as sys_mutex;
use sys_common::poison::{self, LockResult}; use sys_common::poison::{self, LockResult};
use time::Duration; use time::{Duration, Instant};
/// A type indicating whether a timed wait on a condition variable returned /// A type indicating whether a timed wait on a condition variable returned
/// due to a time out or not. /// due to a time out or not.
@ -221,6 +221,64 @@ impl Condvar {
} }
} }
/// Blocks the current thread until this condition variable receives a
/// notification and the required condition is met. Spurious wakeups are
/// ignored and this function will only return once the condition has been
/// met.
///
/// This function will atomically unlock the mutex specified (represented by
/// `guard`) and block the current thread. This means that any calls
/// to [`notify_one`] or [`notify_all`] which happen logically after the
/// mutex is unlocked are candidates to wake this thread up. When this
/// function call returns, the lock specified will have been re-acquired.
///
/// # Errors
///
/// This function will return an error if the mutex being waited on is
/// poisoned when this thread re-acquires the lock. For more information,
/// see information about [poisoning] on the [`Mutex`] type.
///
/// [`notify_one`]: #method.notify_one
/// [`notify_all`]: #method.notify_all
/// [poisoning]: ../sync/struct.Mutex.html#poisoning
/// [`Mutex`]: ../sync/struct.Mutex.html
///
/// # Examples
///
/// ```
/// #![feature(wait_until)]
///
/// use std::sync::{Arc, Mutex, Condvar};
/// use std::thread;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = pair.clone();
///
/// thread::spawn(move|| {
/// let &(ref lock, ref cvar) = &*pair2;
/// let mut started = lock.lock().unwrap();
/// *started = true;
/// // We notify the condvar that the value has changed.
/// cvar.notify_one();
/// });
///
/// // Wait for the thread to start up.
/// let &(ref lock, ref cvar) = &*pair;
/// // As long as the value inside the `Mutex` is false, we wait.
/// let _guard = cvar.wait_until(lock.lock().unwrap(), |started| { *started }).unwrap();
/// ```
#[unstable(feature = "wait_until", issue = "47960")]
pub fn wait_until<'a, T, F>(&self, mut guard: MutexGuard<'a, T>,
mut condition: F)
-> LockResult<MutexGuard<'a, T>>
where F: FnMut(&mut T) -> bool {
while !condition(&mut *guard) {
guard = self.wait(guard)?;
}
Ok(guard)
}
/// Waits on this condition variable for a notification, timing out after a /// Waits on this condition variable for a notification, timing out after a
/// specified duration. /// specified duration.
/// ///
@ -295,7 +353,15 @@ impl Condvar {
/// ///
/// Note that the best effort is made to ensure that the time waited is /// Note that the best effort is made to ensure that the time waited is
/// measured with a monotonic clock, and not affected by the changes made to /// measured with a monotonic clock, and not affected by the changes made to
/// the system time. /// the system time. This function is susceptible to spurious wakeups.
/// Condition variables normally have a boolean predicate associated with
/// them, and the predicate must always be checked each time this function
/// returns to protect against spurious wakeups. Additionally, it is
/// typically desirable for the time-out to not exceed some duration in
/// spite of spurious wakes, thus the sleep-duration is decremented by the
/// amount slept. Alternatively, use the `wait_timeout_until` method
/// to wait until a condition is met with a total time-out regardless
/// of spurious wakes.
/// ///
/// The returned [`WaitTimeoutResult`] value indicates if the timeout is /// The returned [`WaitTimeoutResult`] value indicates if the timeout is
/// known to have elapsed. /// known to have elapsed.
@ -304,6 +370,7 @@ impl Condvar {
/// returns, regardless of whether the timeout elapsed or not. /// returns, regardless of whether the timeout elapsed or not.
/// ///
/// [`wait`]: #method.wait /// [`wait`]: #method.wait
/// [`wait_timeout_until`]: #method.wait_timeout_until
/// [`WaitTimeoutResult`]: struct.WaitTimeoutResult.html /// [`WaitTimeoutResult`]: struct.WaitTimeoutResult.html
/// ///
/// # Examples /// # Examples
@ -355,6 +422,80 @@ impl Condvar {
} }
} }
/// Waits on this condition variable for a notification, timing out after a
/// specified duration. Spurious wakes will not cause this function to
/// return.
///
/// The semantics of this function are equivalent to [`wait_until`] except
/// that the thread will be blocked for roughly no longer than `dur`. This
/// method should not be used for precise timing due to anomalies such as
/// preemption or platform differences that may not cause the maximum
/// amount of time waited to be precisely `dur`.
///
/// Note that the best effort is made to ensure that the time waited is
/// measured with a monotonic clock, and not affected by the changes made to
/// the system time.
///
/// The returned [`WaitTimeoutResult`] value indicates if the timeout is
/// known to have elapsed without the condition being met.
///
/// Like [`wait_until`], the lock specified will be re-acquired when this
/// function returns, regardless of whether the timeout elapsed or not.
///
/// [`wait_until`]: #method.wait_until
/// [`wait_timeout`]: #method.wait_timeout
/// [`WaitTimeoutResult`]: struct.WaitTimeoutResult.html
///
/// # Examples
///
/// ```
/// #![feature(wait_timeout_until)]
///
/// use std::sync::{Arc, Mutex, Condvar};
/// use std::thread;
/// use std::time::Duration;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = pair.clone();
///
/// thread::spawn(move|| {
/// let &(ref lock, ref cvar) = &*pair2;
/// let mut started = lock.lock().unwrap();
/// *started = true;
/// // We notify the condvar that the value has changed.
/// cvar.notify_one();
/// });
///
/// // wait for the thread to start up
/// let &(ref lock, ref cvar) = &*pair;
/// let result = cvar.wait_timeout_until(
/// lock.lock().unwrap(),
/// Duration::from_millis(100),
/// |&mut started| started,
/// ).unwrap();
/// if result.1.timed_out() {
/// // timed-out without the condition ever evaluating to true.
/// }
/// // access the locked mutex via result.0
/// ```
#[unstable(feature = "wait_timeout_until", issue = "47960")]
pub fn wait_timeout_until<'a, T, F>(&self, mut guard: MutexGuard<'a, T>,
dur: Duration, mut condition: F)
-> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)>
where F: FnMut(&mut T) -> bool {
let start = Instant::now();
loop {
if condition(&mut *guard) {
return Ok((guard, WaitTimeoutResult(false)));
}
let timeout = match dur.checked_sub(start.elapsed()) {
Some(timeout) => timeout,
None => return Ok((guard, WaitTimeoutResult(true))),
};
guard = self.wait_timeout(guard, timeout)?.0;
}
}
/// Wakes up one blocked thread on this condvar. /// Wakes up one blocked thread on this condvar.
/// ///
/// If there is a blocked thread on this condition variable, then it will /// If there is a blocked thread on this condition variable, then it will
@ -480,6 +621,7 @@ impl Drop for Condvar {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
/// #![feature(wait_until)]
use sync::mpsc::channel; use sync::mpsc::channel;
use sync::{Condvar, Mutex, Arc}; use sync::{Condvar, Mutex, Arc};
use sync::atomic::{AtomicBool, Ordering}; use sync::atomic::{AtomicBool, Ordering};
@ -548,6 +690,29 @@ mod tests {
} }
} }
#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_until() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = pair.clone();
// Inside of our lock, spawn a new thread, and then wait for it to start.
thread::spawn(move|| {
let &(ref lock, ref cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
// We notify the condvar that the value has changed.
cvar.notify_one();
});
// Wait for the thread to start up.
let &(ref lock, ref cvar) = &*pair;
let guard = cvar.wait_until(lock.lock().unwrap(), |started| {
*started
});
assert!(*guard.unwrap());
}
#[test] #[test]
#[cfg_attr(target_os = "emscripten", ignore)] #[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_wait() { fn wait_timeout_wait() {
@ -567,6 +732,53 @@ mod tests {
} }
} }
#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_until_wait() {
let m = Arc::new(Mutex::new(()));
let c = Arc::new(Condvar::new());
let g = m.lock().unwrap();
let (_g, wait) = c.wait_timeout_until(g, Duration::from_millis(1), |_| { false }).unwrap();
// no spurious wakeups. ensure it timed-out
assert!(wait.timed_out());
}
#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_until_instant_satisfy() {
let m = Arc::new(Mutex::new(()));
let c = Arc::new(Condvar::new());
let g = m.lock().unwrap();
let (_g, wait) = c.wait_timeout_until(g, Duration::from_millis(0), |_| { true }).unwrap();
// ensure it didn't time-out even if we were not given any time.
assert!(!wait.timed_out());
}
#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_until_wake() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_copy = pair.clone();
let &(ref m, ref c) = &*pair;
let g = m.lock().unwrap();
let _t = thread::spawn(move || {
let &(ref lock, ref cvar) = &*pair_copy;
let mut started = lock.lock().unwrap();
thread::sleep(Duration::from_millis(1));
*started = true;
cvar.notify_one();
});
let (g2, wait) = c.wait_timeout_until(g, Duration::from_millis(u64::MAX), |&mut notified| {
notified
}).unwrap();
// ensure it didn't time-out even if we were not given any time.
assert!(!wait.timed_out());
assert!(*g2);
}
#[test] #[test]
#[cfg_attr(target_os = "emscripten", ignore)] #[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_wake() { fn wait_timeout_wake() {