From 7d296c4843785f64a4246213375b21c1ab1e7462 Mon Sep 17 00:00:00 2001 From: Vitali Lovich Date: Fri, 2 Feb 2018 11:17:48 -0800 Subject: [PATCH 1/9] Add Condvar APIs not susceptible to spurious wake Provide wait_until and wait_timeout_until helper wrappers that aren't susceptible to spurious wake. --- src/libstd/sync/condvar.rs | 207 ++++++++++++++++++++++++++++++++++++- 1 file changed, 205 insertions(+), 2 deletions(-) diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs index 56402175817..1e5beaaa342 100644 --- a/src/libstd/sync/condvar.rs +++ b/src/libstd/sync/condvar.rs @@ -14,7 +14,7 @@ use sync::{mutex, MutexGuard, PoisonError}; use sys_common::condvar as sys; use sys_common::mutex as sys_mutex; use sys_common::poison::{self, LockResult}; -use time::Duration; +use time::{Duration, Instant}; /// A type indicating whether a timed wait on a condition variable returned /// due to a time out or not. @@ -219,6 +219,61 @@ impl Condvar { } } + /// Blocks the current thread until this condition variable receives a + /// notification and the required condition is met. There are no spurious + /// wakeups when calling this. + /// + /// This function will atomically unlock the mutex specified (represented by + /// `guard`) and block the current thread. This means that any calls + /// to [`notify_one`] or [`notify_all`] which happen logically after the + /// mutex is unlocked are candidates to wake this thread up. When this + /// function call returns, the lock specified will have been re-acquired. + /// + /// # Errors + /// + /// This function will return an error if the mutex being waited on is + /// poisoned when this thread re-acquires the lock. For more information, + /// see information about [poisoning] on the [`Mutex`] type. + /// + /// [`notify_one`]: #method.notify_one + /// [`notify_all`]: #method.notify_all + /// [poisoning]: ../sync/struct.Mutex.html#poisoning + /// [`Mutex`]: ../sync/struct.Mutex.html + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex, Condvar}; + /// use std::thread; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = pair.clone(); + /// + /// thread::spawn(move|| { + /// let &(ref lock, ref cvar) = &*pair2; + /// let mut started = lock.lock().unwrap(); + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // Wait for the thread to start up. + /// let &(ref lock, ref cvar) = &*pair; + /// // As long as the value inside the `Mutex` is false, we wait. + /// cvar.wait_until(lock.lock().unwrap(), |ref started| { started }); + /// ``` + #[stable(feature = "wait_until", since = "1.24")] + pub fn wait_until<'a, T, F>(&self, mut guard: MutexGuard<'a, T>, + mut condition: F) + -> LockResult> + where F: FnMut(&T) -> bool { + while !condition(&*guard) { + guard = self.wait(guard)?; + } + Ok(guard) + } + + /// Waits on this condition variable for a notification, timing out after a /// specified duration. /// @@ -293,7 +348,15 @@ impl Condvar { /// /// Note that the best effort is made to ensure that the time waited is /// measured with a monotonic clock, and not affected by the changes made to - /// the system time. + /// the system time. This function is susceptible to spurious wakeups. + /// Condition variables normally have a boolean predicate associated with + /// them, and the predicate must always be checked each time this function + /// returns to protect against spurious wakeups. Additionally, it is + /// typically desirable for the time-out to not exceed some duration in + /// spite of spurious wakes, thus the sleep-duration is decremented by the + /// amount slept. Alternatively, use the `wait_timeout_until` method + /// to wait until a condition is met with a total time-out regardless + /// of spurious wakes. /// /// The returned [`WaitTimeoutResult`] value indicates if the timeout is /// known to have elapsed. @@ -302,6 +365,7 @@ impl Condvar { /// returns, regardless of whether the timeout elapsed or not. /// /// [`wait`]: #method.wait + /// [`wait_timeout_until`]: #method.wait_timeout_until /// [`WaitTimeoutResult`]: struct.WaitTimeoutResult.html /// /// # Examples @@ -353,6 +417,76 @@ impl Condvar { } } + /// Waits on this condition variable for a notification, timing out after a + /// specified duration. + /// + /// The semantics of this function are equivalent to [`wait_until`] except + /// that the thread will be blocked for roughly no longer than `dur`. This + /// method should not be used for precise timing due to anomalies such as + /// preemption or platform differences that may not cause the maximum + /// amount of time waited to be precisely `dur`. + /// + /// Note that the best effort is made to ensure that the time waited is + /// measured with a monotonic clock, and not affected by the changes made to + /// the system time. + /// + /// The returned [`WaitTimeoutResult`] value indicates if the timeout is + /// known to have elapsed without the condition being met. + /// + /// Like [`wait_until`], the lock specified will be re-acquired when this + /// function returns, regardless of whether the timeout elapsed or not. + /// + /// [`wait_until`]: #method.wait_until + /// [`wait_timeout`]: #method.wait_timeout + /// [`WaitTimeoutResult`]: struct.WaitTimeoutResult.html + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex, Condvar}; + /// use std::thread; + /// use std::time::Duration; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = pair.clone(); + /// + /// thread::spawn(move|| { + /// let &(ref lock, ref cvar) = &*pair2; + /// let mut started = lock.lock().unwrap(); + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // wait for the thread to start up + /// let &(ref lock, ref cvar) = &*pair; + /// let result = cvar.wait_timeout_until(lock, Duration::from_millis(100), |started| { + /// started + /// }).unwrap(); + /// if result.1.timed_out() { + /// // timed-out without the condition ever evaluating to true. + /// } + /// // access the locked mutex via result.0 + /// ``` + #[stable(feature = "wait_timeout_until", since = "1.24")] + pub fn wait_timeout_until<'a, T, F>(&self, mut guard: MutexGuard<'a, T>, + mut dur: Duration, mut condition: F) + -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> + where F: FnMut(&T) -> bool { + let timed_out = Duration::new(0, 0); + loop { + if !condition(&*guard) { + return Ok((guard, WaitTimeoutResult(false))); + } else if dur == timed_out { + return Ok((guard, WaitTimeoutResult(false))); + } + let wait_timer = Instant::now(); + let wait_result = self.wait_timeout(guard, dur)?; + dur = dur.checked_sub(wait_timer.elapsed()).unwrap_or(timed_out); + guard = wait_result.0; + } + } + /// Wakes up one blocked thread on this condvar. /// /// If there is a blocked thread on this condition variable, then it will @@ -546,6 +680,29 @@ mod tests { } } + #[test] + #[cfg_attr(target_os = "emscripten", ignore)] + fn wait_until() { + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let pair2 = pair.clone(); + + // Inside of our lock, spawn a new thread, and then wait for it to start. + thread::spawn(move|| { + let &(ref lock, ref cvar) = &*pair2; + let mut started = lock.lock().unwrap(); + *started = true; + // We notify the condvar that the value has changed. + cvar.notify_one(); + }); + + // Wait for the thread to start up. + let &(ref lock, ref cvar) = &*pair; + let guard = cvar.wait_until(lock.lock().unwrap(), |started| { + started + }); + assert!(*guard); + } + #[test] #[cfg_attr(target_os = "emscripten", ignore)] fn wait_timeout_wait() { @@ -565,6 +722,52 @@ mod tests { } } + #[test] + #[cfg_attr(target_os = "emscripten", ignore)] + fn wait_timeout_until_wait() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + let g = m.lock().unwrap(); + let (_g, wait) = c.wait_timeout_until(g, Duration::from_millis(1), || { false }).unwrap(); + // no spurious wakeups. ensure it timed-out + assert!(wait.timed_out()); + } + + #[test] + #[cfg_attr(target_os = "emscripten", ignore)] + fn wait_timeout_until_instant_satisfy() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + let g = m.lock().unwrap(); + let (_g, wait) = c.wait_timeout_until(g, Duration::from_millis(0), || { true }).unwrap(); + // ensure it didn't time-out even if we were not given any time. + assert!(!wait.timed_out()); + } + + #[test] + #[cfg_attr(target_os = "emscripten", ignore)] + fn wait_timeout_until_wake() { + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let pair_copy = pair.clone(); + + let g = m.lock().unwrap(); + let t = thread::spawn(move || { + let &(ref lock, ref cvar) = &*pair2; + let mut started = lock.lock().unwrap(); + thread::sleep(Duration::from_millis(1)); + started = true; + cvar.notify_one(); + }); + let (g2, wait) = c.wait_timeout_until(g, Duration::from_millis(u64::MAX), |¬ified| { + notified + }).unwrap(); + // ensure it didn't time-out even if we were not given any time. + assert!(!wait.timed_out()); + assert!(*g2); + } + #[test] #[cfg_attr(target_os = "emscripten", ignore)] fn wait_timeout_wake() { From 404e1a67007a254ab35e4fe8a8650e9335590a76 Mon Sep 17 00:00:00 2001 From: Vitali Lovich Date: Fri, 2 Feb 2018 11:52:16 -0800 Subject: [PATCH 2/9] Fix typo --- src/libstd/sync/condvar.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs index 1e5beaaa342..c58f8fd2990 100644 --- a/src/libstd/sync/condvar.rs +++ b/src/libstd/sync/condvar.rs @@ -478,7 +478,7 @@ impl Condvar { if !condition(&*guard) { return Ok((guard, WaitTimeoutResult(false))); } else if dur == timed_out { - return Ok((guard, WaitTimeoutResult(false))); + return Ok((guard, WaitTimeoutResult(true))); } let wait_timer = Instant::now(); let wait_result = self.wait_timeout(guard, dur)?; From e72bd6df5398dd7ee02c6057b861537c49649b4e Mon Sep 17 00:00:00 2001 From: Vitali Lovich Date: Fri, 2 Feb 2018 12:07:16 -0800 Subject: [PATCH 3/9] Review response Make condition closure accept mut T&. Clarify spurious wakeup documentation. Cleanup doc example code. --- src/libstd/sync/condvar.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs index c58f8fd2990..76b68fc4f4f 100644 --- a/src/libstd/sync/condvar.rs +++ b/src/libstd/sync/condvar.rs @@ -220,8 +220,9 @@ impl Condvar { } /// Blocks the current thread until this condition variable receives a - /// notification and the required condition is met. There are no spurious - /// wakeups when calling this. + /// notification and the required condition is met. Spurious wakeups are + /// ignored and this function will only return once the condition has been + /// met. /// /// This function will atomically unlock the mutex specified (represented by /// `guard`) and block the current thread. This means that any calls @@ -260,14 +261,14 @@ impl Condvar { /// // Wait for the thread to start up. /// let &(ref lock, ref cvar) = &*pair; /// // As long as the value inside the `Mutex` is false, we wait. - /// cvar.wait_until(lock.lock().unwrap(), |ref started| { started }); + /// cvar.wait_until(lock.lock().unwrap(), |started| { started }); /// ``` #[stable(feature = "wait_until", since = "1.24")] pub fn wait_until<'a, T, F>(&self, mut guard: MutexGuard<'a, T>, mut condition: F) -> LockResult> - where F: FnMut(&T) -> bool { - while !condition(&*guard) { + where F: FnMut(&mut T) -> bool { + while !condition(&mut *guard) { guard = self.wait(guard)?; } Ok(guard) @@ -418,7 +419,8 @@ impl Condvar { } /// Waits on this condition variable for a notification, timing out after a - /// specified duration. + /// specified duration. Spurious wakes will not cause this function to + /// return. /// /// The semantics of this function are equivalent to [`wait_until`] except /// that the thread will be blocked for roughly no longer than `dur`. This @@ -472,10 +474,10 @@ impl Condvar { pub fn wait_timeout_until<'a, T, F>(&self, mut guard: MutexGuard<'a, T>, mut dur: Duration, mut condition: F) -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> - where F: FnMut(&T) -> bool { + where F: FnMut(&mut T) -> bool { let timed_out = Duration::new(0, 0); loop { - if !condition(&*guard) { + if !condition(&mut *guard) { return Ok((guard, WaitTimeoutResult(false))); } else if dur == timed_out { return Ok((guard, WaitTimeoutResult(true))); From 95e4dc2ad143c91d0930ea28634e6f5c54ac0812 Mon Sep 17 00:00:00 2001 From: Vitali Lovich Date: Mon, 5 Feb 2018 15:11:00 -0800 Subject: [PATCH 4/9] Simplify wait_timeout_until & fix condition typo --- src/libstd/sync/condvar.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs index 76b68fc4f4f..e6a3388aa25 100644 --- a/src/libstd/sync/condvar.rs +++ b/src/libstd/sync/condvar.rs @@ -475,17 +475,16 @@ impl Condvar { mut dur: Duration, mut condition: F) -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> where F: FnMut(&mut T) -> bool { - let timed_out = Duration::new(0, 0); + let start = Instant::now(); loop { - if !condition(&mut *guard) { + if condition(&mut *guard) { return Ok((guard, WaitTimeoutResult(false))); - } else if dur == timed_out { - return Ok((guard, WaitTimeoutResult(true))); } - let wait_timer = Instant::now(); - let wait_result = self.wait_timeout(guard, dur)?; - dur = dur.checked_sub(wait_timer.elapsed()).unwrap_or(timed_out); - guard = wait_result.0; + let timeout = match dur.checked_sub(start.elapsed()) { + Some(timeout) => timeout, + None => return Ok((guard, WaitTimeoutResult(true))), + } + guard = self.wait_timeout(guard, dur)?.0; } } From 97df227d19791b0e9d199be28851fb924c3c1e21 Mon Sep 17 00:00:00 2001 From: Vitali Lovich Date: Mon, 12 Feb 2018 21:08:14 -0800 Subject: [PATCH 5/9] Fix wait_timeout value --- src/libstd/sync/condvar.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs index e6a3388aa25..65235aa62a1 100644 --- a/src/libstd/sync/condvar.rs +++ b/src/libstd/sync/condvar.rs @@ -484,7 +484,7 @@ impl Condvar { Some(timeout) => timeout, None => return Ok((guard, WaitTimeoutResult(true))), } - guard = self.wait_timeout(guard, dur)?.0; + guard = self.wait_timeout(guard, timeout)?.0; } } From 6fe2d1d765810c05ce2aa2184baa9f4aabf1a151 Mon Sep 17 00:00:00 2001 From: Vitali Lovich Date: Tue, 13 Feb 2018 11:32:04 -0800 Subject: [PATCH 6/9] Misc fixes Switch feature guards to unstable Add missing semicolon Remove mut that's no longer necessary --- src/libstd/sync/condvar.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs index 65235aa62a1..98fadbd3543 100644 --- a/src/libstd/sync/condvar.rs +++ b/src/libstd/sync/condvar.rs @@ -263,7 +263,7 @@ impl Condvar { /// // As long as the value inside the `Mutex` is false, we wait. /// cvar.wait_until(lock.lock().unwrap(), |started| { started }); /// ``` - #[stable(feature = "wait_until", since = "1.24")] + #[unstable(feature = "wait_until", issue = "47960")] pub fn wait_until<'a, T, F>(&self, mut guard: MutexGuard<'a, T>, mut condition: F) -> LockResult> @@ -470,9 +470,9 @@ impl Condvar { /// } /// // access the locked mutex via result.0 /// ``` - #[stable(feature = "wait_timeout_until", since = "1.24")] + #[unstable(feature = "wait_timeout_until", issue = "47960")] pub fn wait_timeout_until<'a, T, F>(&self, mut guard: MutexGuard<'a, T>, - mut dur: Duration, mut condition: F) + dur: Duration, mut condition: F) -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> where F: FnMut(&mut T) -> bool { let start = Instant::now(); @@ -483,7 +483,7 @@ impl Condvar { let timeout = match dur.checked_sub(start.elapsed()) { Some(timeout) => timeout, None => return Ok((guard, WaitTimeoutResult(true))), - } + }; guard = self.wait_timeout(guard, timeout)?.0; } } From b1f04a3a2e1a01ea1b77153dd8d22e7837542aa0 Mon Sep 17 00:00:00 2001 From: Vitali Lovich Date: Thu, 15 Feb 2018 09:28:25 -0800 Subject: [PATCH 7/9] Fix unit test compilation Also fix some code snippets in documentation. --- src/libstd/sync/condvar.rs | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs index 98fadbd3543..546e105deb7 100644 --- a/src/libstd/sync/condvar.rs +++ b/src/libstd/sync/condvar.rs @@ -244,6 +244,8 @@ impl Condvar { /// # Examples /// /// ``` + /// #![feature(wait_until)] + /// /// use std::sync::{Arc, Mutex, Condvar}; /// use std::thread; /// @@ -261,7 +263,7 @@ impl Condvar { /// // Wait for the thread to start up. /// let &(ref lock, ref cvar) = &*pair; /// // As long as the value inside the `Mutex` is false, we wait. - /// cvar.wait_until(lock.lock().unwrap(), |started| { started }); + /// let _guard = cvar.wait_until(lock.lock().unwrap(), |started| { *started }).unwrap(); /// ``` #[unstable(feature = "wait_until", issue = "47960")] pub fn wait_until<'a, T, F>(&self, mut guard: MutexGuard<'a, T>, @@ -445,6 +447,8 @@ impl Condvar { /// # Examples /// /// ``` + /// #![feature(wait_timeout_until)] + /// /// use std::sync::{Arc, Mutex, Condvar}; /// use std::thread; /// use std::time::Duration; @@ -462,8 +466,8 @@ impl Condvar { /// /// // wait for the thread to start up /// let &(ref lock, ref cvar) = &*pair; - /// let result = cvar.wait_timeout_until(lock, Duration::from_millis(100), |started| { - /// started + /// let result = cvar.wait_timeout_until(lock.lock().unwrap(), Duration::from_millis(100), |started| { + /// *started /// }).unwrap(); /// if result.1.timed_out() { /// // timed-out without the condition ever evaluating to true. @@ -613,6 +617,7 @@ impl Drop for Condvar { #[cfg(test)] mod tests { + /// #![feature(wait_until)] use sync::mpsc::channel; use sync::{Condvar, Mutex, Arc}; use sync::atomic::{AtomicBool, Ordering}; @@ -699,9 +704,9 @@ mod tests { // Wait for the thread to start up. let &(ref lock, ref cvar) = &*pair; let guard = cvar.wait_until(lock.lock().unwrap(), |started| { - started + *started }); - assert!(*guard); + assert!(*guard.unwrap()); } #[test] @@ -730,7 +735,7 @@ mod tests { let c = Arc::new(Condvar::new()); let g = m.lock().unwrap(); - let (_g, wait) = c.wait_timeout_until(g, Duration::from_millis(1), || { false }).unwrap(); + let (_g, wait) = c.wait_timeout_until(g, Duration::from_millis(1), |_| { false }).unwrap(); // no spurious wakeups. ensure it timed-out assert!(wait.timed_out()); } @@ -742,7 +747,7 @@ mod tests { let c = Arc::new(Condvar::new()); let g = m.lock().unwrap(); - let (_g, wait) = c.wait_timeout_until(g, Duration::from_millis(0), || { true }).unwrap(); + let (_g, wait) = c.wait_timeout_until(g, Duration::from_millis(0), |_| { true }).unwrap(); // ensure it didn't time-out even if we were not given any time. assert!(!wait.timed_out()); } @@ -753,15 +758,16 @@ mod tests { let pair = Arc::new((Mutex::new(false), Condvar::new())); let pair_copy = pair.clone(); + let &(ref m, ref c) = &*pair; let g = m.lock().unwrap(); - let t = thread::spawn(move || { - let &(ref lock, ref cvar) = &*pair2; + let _t = thread::spawn(move || { + let &(ref lock, ref cvar) = &*pair_copy; let mut started = lock.lock().unwrap(); thread::sleep(Duration::from_millis(1)); - started = true; + *started = true; cvar.notify_one(); }); - let (g2, wait) = c.wait_timeout_until(g, Duration::from_millis(u64::MAX), |¬ified| { + let (g2, wait) = c.wait_timeout_until(g, Duration::from_millis(u64::MAX), |&mut notified| { notified }).unwrap(); // ensure it didn't time-out even if we were not given any time. From d549db8031a07b79009f9efe8b3233cd8900c82b Mon Sep 17 00:00:00 2001 From: Vitali Lovich Date: Sat, 17 Feb 2018 09:17:58 -0800 Subject: [PATCH 8/9] Fix tidy violation --- src/libstd/sync/condvar.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs index 546e105deb7..7f9b5667628 100644 --- a/src/libstd/sync/condvar.rs +++ b/src/libstd/sync/condvar.rs @@ -466,9 +466,11 @@ impl Condvar { /// /// // wait for the thread to start up /// let &(ref lock, ref cvar) = &*pair; - /// let result = cvar.wait_timeout_until(lock.lock().unwrap(), Duration::from_millis(100), |started| { - /// *started - /// }).unwrap(); + /// let result = cvar.wait_timeout_until( + /// lock.lock().unwrap(), + /// Duration::from_millis(100), + /// |started| started, + /// ).unwrap(); /// if result.1.timed_out() { /// // timed-out without the condition ever evaluating to true. /// } From 14b403c91ab9a1e4b776e00adcd9d88153e3b736 Mon Sep 17 00:00:00 2001 From: Vitali Lovich Date: Tue, 20 Feb 2018 13:07:21 -0800 Subject: [PATCH 9/9] Fix doc compile error --- src/libstd/sync/condvar.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs index 7f9b5667628..3e40cbb37e3 100644 --- a/src/libstd/sync/condvar.rs +++ b/src/libstd/sync/condvar.rs @@ -469,7 +469,7 @@ impl Condvar { /// let result = cvar.wait_timeout_until( /// lock.lock().unwrap(), /// Duration::from_millis(100), - /// |started| started, + /// |&mut started| started, /// ).unwrap(); /// if result.1.timed_out() { /// // timed-out without the condition ever evaluating to true.