From 08f6380a9f0b866796080094f44fe25ea5636547 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Tue, 13 Jan 2015 21:24:26 -0800 Subject: [PATCH] Rewrite Condvar::wait_timeout and make it public **The implementation is a direct adaptation of libcxx's condition_variable implementation.** pthread_cond_timedwait uses the non-monotonic system clock. It's possible to change the clock to a monotonic via pthread_cond_attr, but this is incompatible with static initialization. To deal with this, we calculate the timeout using the system clock, and maintain a separate record of the start and end times with a monotonic clock to be used for calculation of the return value. --- src/libstd/sync/condvar.rs | 123 +++++++++++++++++++++++++++++--- src/libstd/sync/poison.rs | 17 ++++- src/libstd/sys/unix/condvar.rs | 56 +++++++++------ src/libstd/sys/unix/mod.rs | 1 + src/libstd/sys/unix/time.rs | 124 +++++++++++++++++++++++++++++++++ src/libstd/sys/windows/mod.rs | 1 + src/libstd/sys/windows/time.rs | 50 +++++++++++++ src/libstd/time/mod.rs | 68 +----------------- 8 files changed, 343 insertions(+), 97 deletions(-) create mode 100644 src/libstd/sys/unix/time.rs create mode 100644 src/libstd/sys/windows/time.rs diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs index bcd5f56a353..b8b186f31e0 100644 --- a/src/libstd/sync/condvar.rs +++ b/src/libstd/sync/condvar.rs @@ -12,6 +12,7 @@ use prelude::v1::*; use sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; use sync::poison::{self, LockResult}; +use sys::time::SteadyTime; use sys_common::condvar as sys; use sys_common::mutex as sys_mutex; use time::Duration; @@ -153,13 +154,8 @@ impl Condvar { /// /// Like `wait`, the lock specified will be re-acquired when this function /// returns, regardless of whether the timeout elapsed or not. - // Note that this method is *not* public, and this is quite intentional - // because we're not quite sure about the semantics of relative vs absolute - // durations or how the timing guarantees play into what the system APIs - // provide. There are also additional concerns about the unix-specific - // implementation which may need to be addressed. - #[allow(dead_code)] - fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration) + #[unstable] + pub fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration) -> LockResult<(MutexGuard<'a, T>, bool)> { unsafe { let me: &'static Condvar = &*(self as *const _); @@ -167,6 +163,25 @@ impl Condvar { } } + /// Wait on this condition variable for a notification, timing out after a + /// specified duration. + /// + /// The semantics of this function are equivalent to `wait_timeout` except + /// that the implementation will repeatedly wait while the duration has not + /// passed and the provided function returns `false`. + #[unstable] + pub fn wait_timeout_with<'a, T, F>(&self, + guard: MutexGuard<'a, T>, + dur: Duration, + f: F) + -> LockResult<(MutexGuard<'a, T>, bool)> + where F: FnMut(LockResult<&mut T>) -> bool { + unsafe { + let me: &'static Condvar = &*(self as *const _); + me.inner.wait_timeout_with(guard, dur, f) + } + } + /// Wake up one blocked thread on this condvar. /// /// If there is a blocked thread on this condition variable, then it will @@ -220,9 +235,9 @@ impl StaticCondvar { /// specified duration. /// /// See `Condvar::wait_timeout`. - #[allow(dead_code)] // may want to stabilize this later, see wait_timeout above - fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration) - -> LockResult<(MutexGuard<'a, T>, bool)> { + #[unstable = "may be merged with Condvar in the future"] + pub fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration) + -> LockResult<(MutexGuard<'a, T>, bool)> { let (poisoned, success) = unsafe { let lock = mutex::guard_lock(&guard); self.verify(lock); @@ -236,6 +251,50 @@ impl StaticCondvar { } } + /// Wait on this condition variable for a notification, timing out after a + /// specified duration. + /// + /// The implementation will repeatedly wait while the duration has not + /// passed and the function returns `false`. + /// + /// See `Condvar::wait_timeout_with`. + #[unstable = "may be merged with Condvar in the future"] + pub fn wait_timeout_with<'a, T, F>(&'static self, + guard: MutexGuard<'a, T>, + dur: Duration, + mut f: F) + -> LockResult<(MutexGuard<'a, T>, bool)> + where F: FnMut(LockResult<&mut T>) -> bool { + // This could be made more efficient by pushing the implementation into sys::condvar + let start = SteadyTime::now(); + let mut guard_result: LockResult> = Ok(guard); + while !f(guard_result + .as_mut() + .map(|g| &mut **g) + .map_err(|e| poison::new_poison_error(&mut **e.get_mut()))) { + let now = SteadyTime::now(); + let consumed = &now - &start; + let guard = guard_result.unwrap_or_else(|e| e.into_inner()); + let (new_guard_result, no_timeout) = match self.wait_timeout(guard, dur - consumed) { + Ok((new_guard, no_timeout)) => (Ok(new_guard), no_timeout), + Err(err) => { + let (new_guard, no_timeout) = err.into_inner(); + (Err(poison::new_poison_error(new_guard)), no_timeout) + } + }; + guard_result = new_guard_result; + if !no_timeout { + let result = f(guard_result + .as_mut() + .map(|g| &mut **g) + .map_err(|e| poison::new_poison_error(&mut **e.get_mut()))); + return poison::map_result(guard_result, |g| (g, result)); + } + } + + poison::map_result(guard_result, |g| (g, true)) + } + /// Wake up one blocked thread on this condvar. /// /// See `Condvar::notify_one`. @@ -285,6 +344,7 @@ mod tests { use super::{StaticCondvar, CONDVAR_INIT}; use sync::mpsc::channel; use sync::{StaticMutex, MUTEX_INIT, Condvar, Mutex, Arc}; + use sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; use thread::Thread; use time::Duration; @@ -372,6 +432,49 @@ mod tests { unsafe { C.destroy(); M.destroy(); } } + #[test] + fn wait_timeout_with() { + static C: StaticCondvar = CONDVAR_INIT; + static M: StaticMutex = MUTEX_INIT; + static S: AtomicUsize = ATOMIC_USIZE_INIT; + + let g = M.lock().unwrap(); + let (g, success) = C.wait_timeout_with(g, Duration::nanoseconds(1000), |_| false).unwrap(); + assert!(!success); + + let (tx, rx) = channel(); + let _t = Thread::scoped(move || { + rx.recv().unwrap(); + let g = M.lock().unwrap(); + S.store(1, Ordering::SeqCst); + C.notify_one(); + drop(g); + + rx.recv().unwrap(); + let g = M.lock().unwrap(); + S.store(2, Ordering::SeqCst); + C.notify_one(); + drop(g); + + rx.recv().unwrap(); + let _g = M.lock().unwrap(); + S.store(3, Ordering::SeqCst); + C.notify_one(); + }); + + let mut state = 0; + let (_g, success) = C.wait_timeout_with(g, Duration::days(1), |_| { + assert_eq!(state, S.load(Ordering::SeqCst)); + tx.send(()).unwrap(); + state += 1; + match state { + 1|2 => false, + _ => true, + } + }).unwrap(); + assert!(success); + } + #[test] #[should_fail] fn two_mutexes() { diff --git a/src/libstd/sync/poison.rs b/src/libstd/sync/poison.rs index 385df45b400..cc8c331ef39 100644 --- a/src/libstd/sync/poison.rs +++ b/src/libstd/sync/poison.rs @@ -99,8 +99,23 @@ impl fmt::Show for PoisonError { impl PoisonError { /// Consumes this error indicating that a lock is poisoned, returning the /// underlying guard to allow access regardless. - #[stable] + #[deprecated="renamed to into_inner"] pub fn into_guard(self) -> T { self.guard } + + /// Consumes this error indicating that a lock is poisoned, returning the + /// underlying guard to allow access regardless. + #[unstable] + pub fn into_inner(self) -> T { self.guard } + + /// Reaches into this error indicating that a lock is poisoned, returning a + /// reference to the underlying guard to allow access regardless. + #[unstable] + pub fn get_ref(&self) -> &T { &self.guard } + + /// Reaches into this error indicating that a lock is poisoned, returning a + /// mutable reference to the underlying guard to allow access regardless. + #[unstable] + pub fn get_mut(&mut self) -> &mut T { &mut self.guard } } impl FromError> for TryLockError { diff --git a/src/libstd/sys/unix/condvar.rs b/src/libstd/sys/unix/condvar.rs index 52dd261824f..85a65bbef50 100644 --- a/src/libstd/sys/unix/condvar.rs +++ b/src/libstd/sys/unix/condvar.rs @@ -10,9 +10,12 @@ use cell::UnsafeCell; use libc; +use std::option::Option::{Some, None}; use sys::mutex::{self, Mutex}; +use sys::time; use sys::sync as ffi; use time::Duration; +use num::{Int, NumCast}; pub struct Condvar { inner: UnsafeCell } @@ -46,33 +49,46 @@ impl Condvar { debug_assert_eq!(r, 0); } + // This implementation is modeled after libcxx's condition_variable + // https://github.com/llvm-mirror/libcxx/blob/release_35/src/condition_variable.cpp#L46 + // https://github.com/llvm-mirror/libcxx/blob/release_35/include/__mutex_base#L367 pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool { - assert!(dur >= Duration::nanoseconds(0)); + if dur <= Duration::zero() { + return false; + } - // First, figure out what time it currently is - let mut tv = libc::timeval { tv_sec: 0, tv_usec: 0 }; - let r = ffi::gettimeofday(&mut tv, 0 as *mut _); + // First, figure out what time it currently is, in both system and stable time. + // pthread_cond_timedwait uses system time, but we want to report timeout based on stable + // time. + let mut sys_now = libc::timeval { tv_sec: 0, tv_usec: 0 }; + let stable_now = time::SteadyTime::now(); + let r = ffi::gettimeofday(&mut sys_now, 0 as *mut _); debug_assert_eq!(r, 0); - // Offset that time with the specified duration - let abs = Duration::seconds(tv.tv_sec as i64) + - Duration::microseconds(tv.tv_usec as i64) + - dur; - let ns = abs.num_nanoseconds().unwrap() as u64; - let timeout = libc::timespec { - tv_sec: (ns / 1000000000) as libc::time_t, - tv_nsec: (ns % 1000000000) as libc::c_long, + let seconds = NumCast::from(dur.num_seconds()); + let timeout = match seconds.and_then(|s| sys_now.tv_sec.checked_add(s)) { + Some(sec) => { + libc::timespec { + tv_sec: sec, + tv_nsec: (dur - Duration::seconds(dur.num_seconds())) + .num_nanoseconds().unwrap() as libc::c_long, + } + } + None => { + libc::timespec { + tv_sec: Int::max_value(), + tv_nsec: 1_000_000_000 - 1, + } + } }; // And wait! - let r = ffi::pthread_cond_timedwait(self.inner.get(), mutex::raw(mutex), - &timeout); - if r != 0 { - debug_assert_eq!(r as int, libc::ETIMEDOUT as int); - false - } else { - true - } + let r = ffi::pthread_cond_timedwait(self.inner.get(), mutex::raw(mutex), &timeout); + debug_assert!(r == libc::ETIMEDOUT || r == 0); + + // ETIMEDOUT is not a totally reliable method of determining timeout due to clock shifts, + // so do the check ourselves + &time::SteadyTime::now() - &stable_now < dur } #[inline] diff --git a/src/libstd/sys/unix/mod.rs b/src/libstd/sys/unix/mod.rs index 6a408aa60f0..bb98d1e052a 100644 --- a/src/libstd/sys/unix/mod.rs +++ b/src/libstd/sys/unix/mod.rs @@ -52,6 +52,7 @@ pub mod sync; pub mod tcp; pub mod thread; pub mod thread_local; +pub mod time; pub mod timer; pub mod tty; pub mod udp; diff --git a/src/libstd/sys/unix/time.rs b/src/libstd/sys/unix/time.rs new file mode 100644 index 00000000000..cc1e23fbca9 --- /dev/null +++ b/src/libstd/sys/unix/time.rs @@ -0,0 +1,124 @@ +// Copyright 2015 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +pub use self::inner::SteadyTime; + +#[cfg(any(target_os = "macos", target_os = "ios"))] +mod inner { + use libc; + use time::Duration; + use ops::Sub; + use sync::{Once, ONCE_INIT}; + + pub struct SteadyTime { + t: u64 + } + + extern { + pub fn mach_absolute_time() -> u64; + pub fn mach_timebase_info(info: *mut libc::mach_timebase_info) -> libc::c_int; + } + + impl SteadyTime { + pub fn now() -> SteadyTime { + SteadyTime { + t: unsafe { mach_absolute_time() }, + } + } + + pub fn ns(&self) -> u64 { + let info = info(); + self.t * info.numer as u64 / info.denom as u64 + } + } + + fn info() -> &'static libc::mach_timebase_info { + static mut INFO: libc::mach_timebase_info = libc::mach_timebase_info { + numer: 0, + denom: 0, + }; + static ONCE: Once = ONCE_INIT; + + unsafe { + ONCE.call_once(|| { + mach_timebase_info(&mut INFO); + }); + &INFO + } + } + + impl<'a> Sub for &'a SteadyTime { + type Output = Duration; + + fn sub(self, other: &SteadyTime) -> Duration { + unsafe { + let info = info(); + let diff = self.t as i64 - other.t as i64; + Duration::nanoseconds(diff * info.numer as i64 / info.denom as i64) + } + } + } +} + +#[cfg(not(any(target_os = "macos", target_os = "ios")))] +mod inner { + use libc; + use time::Duration; + use ops::Sub; + + const NSEC_PER_SEC: i64 = 1_000_000_000; + + pub struct SteadyTime { + t: libc::timespec, + } + + // Apparently android provides this in some other library? + #[cfg(not(target_os = "android"))] + #[link(name = "rt")] + extern {} + + extern { + fn clock_gettime(clk_id: libc::c_int, tp: *mut libc::timespec) -> libc::c_int; + } + + impl SteadyTime { + pub fn now() -> SteadyTime { + let mut t = SteadyTime { + t: libc::timespec { + tv_sec: 0, + tv_nsec: 0, + } + }; + unsafe { + assert_eq!(0, clock_gettime(libc::CLOCK_MONOTONIC, &mut t.t)); + } + t + } + + pub fn ns(&self) -> u64 { + self.t.tv_sec as u64 * NSEC_PER_SEC as u64 + self.t.tv_nsec as u64 + } + } + + impl<'a> Sub for &'a SteadyTime { + type Output = Duration; + + fn sub(self, other: &SteadyTime) -> Duration { + if self.t.tv_nsec >= other.t.tv_nsec { + Duration::seconds(self.t.tv_sec as i64 - other.t.tv_sec as i64) + + Duration::nanoseconds(self.t.tv_nsec as i64 - other.t.tv_nsec as i64) + } else { + Duration::seconds(self.t.tv_sec as i64 - 1 - other.t.tv_sec as i64) + + Duration::nanoseconds(self.t.tv_nsec as i64 + NSEC_PER_SEC - + other.t.tv_nsec as i64) + } + } + } +} diff --git a/src/libstd/sys/windows/mod.rs b/src/libstd/sys/windows/mod.rs index 0e706c3cc6a..72fc2f8700d 100644 --- a/src/libstd/sys/windows/mod.rs +++ b/src/libstd/sys/windows/mod.rs @@ -50,6 +50,7 @@ pub mod rwlock; pub mod sync; pub mod stack_overflow; pub mod tcp; +pub mod time; pub mod thread; pub mod thread_local; pub mod timer; diff --git a/src/libstd/sys/windows/time.rs b/src/libstd/sys/windows/time.rs new file mode 100644 index 00000000000..20ceff0aa69 --- /dev/null +++ b/src/libstd/sys/windows/time.rs @@ -0,0 +1,50 @@ +// Copyright 2015 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. +use libc; +use ops::Sub; +use time::Duration; +use sync::{Once, ONCE_INIT}; + +pub struct SteadyTime { + t: libc::LARGE_INTEGER, +} + +impl SteadyTime { + pub fn now() -> SteadyTime { + let mut t = SteadyTime { t: 0 }; + unsafe { libc::QueryPerformanceCounter(&mut t.t); } + t + } + + pub fn ns(&self) -> u64 { + self.t as u64 * 1_000_000_000 / frequency() as u64 + } +} + +fn frequency() -> libc::LARGE_INTEGER { + static mut FREQUENCY: libc::LARGE_INTEGER = 0; + static ONCE: Once = ONCE_INIT; + + unsafe { + ONCE.call_once(|| { + libc::QueryPerformanceFrequency(&mut FREQUENCY); + }); + FREQUENCY + } +} + +impl<'a> Sub for &'a SteadyTime { + type Output = Duration; + + fn sub(self, other: &SteadyTime) -> Duration { + let diff = self.t as i64 - other.t as i64; + Duration::microseconds(diff * 1_000_000 / frequency() as i64) + } +} diff --git a/src/libstd/time/mod.rs b/src/libstd/time/mod.rs index d6c94f27a8b..f62571942a7 100644 --- a/src/libstd/time/mod.rs +++ b/src/libstd/time/mod.rs @@ -10,7 +10,7 @@ //! Temporal quantification. -use libc; +use sys::time::SteadyTime; pub use self::duration::Duration; @@ -20,69 +20,5 @@ pub mod duration; /// in nanoseconds since an unspecified epoch. // NB: this is intentionally not public, this is not ready to stabilize its api. fn precise_time_ns() -> u64 { - return os_precise_time_ns(); - - #[cfg(windows)] - fn os_precise_time_ns() -> u64 { - let mut ticks_per_s = 0; - assert_eq!(unsafe { - libc::QueryPerformanceFrequency(&mut ticks_per_s) - }, 1); - let ticks_per_s = if ticks_per_s == 0 {1} else {ticks_per_s}; - let mut ticks = 0; - assert_eq!(unsafe { - libc::QueryPerformanceCounter(&mut ticks) - }, 1); - - return (ticks as u64 * 1000000000) / (ticks_per_s as u64); - } - - #[cfg(any(target_os = "macos", target_os = "ios"))] - fn os_precise_time_ns() -> u64 { - use sync; - - static mut TIMEBASE: libc::mach_timebase_info = libc::mach_timebase_info { numer: 0, - denom: 0 }; - static ONCE: sync::Once = sync::ONCE_INIT; - unsafe { - ONCE.call_once(|| { - imp::mach_timebase_info(&mut TIMEBASE); - }); - let time = imp::mach_absolute_time(); - time * TIMEBASE.numer as u64 / TIMEBASE.denom as u64 - } - } - - #[cfg(not(any(windows, target_os = "macos", target_os = "ios")))] - fn os_precise_time_ns() -> u64 { - let mut ts = libc::timespec { tv_sec: 0, tv_nsec: 0 }; - unsafe { - imp::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts); - } - return (ts.tv_sec as u64) * 1000000000 + (ts.tv_nsec as u64) - } -} - -#[cfg(all(unix, not(target_os = "macos"), not(target_os = "ios")))] -mod imp { - use libc::{c_int, timespec}; - - // Apparently android provides this in some other library? - #[cfg(not(target_os = "android"))] - #[link(name = "rt")] - extern {} - - extern { - pub fn clock_gettime(clk_id: c_int, tp: *mut timespec) -> c_int; - } - -} -#[cfg(any(target_os = "macos", target_os = "ios"))] -mod imp { - use libc::{c_int, mach_timebase_info}; - - extern { - pub fn mach_absolute_time() -> u64; - pub fn mach_timebase_info(info: *mut mach_timebase_info) -> c_int; - } + SteadyTime::now().ns() }