From 1464fc3a0cb2b8a92c54357fcff7c632b334cb29 Mon Sep 17 00:00:00 2001 From: Mara Bos Date: Sat, 19 Sep 2020 12:42:14 +0200 Subject: [PATCH 1/8] Move thread parker to a separate module. --- library/std/src/thread/mod.rs | 129 ++++----------------------- library/std/src/thread/parker/mod.rs | 125 ++++++++++++++++++++++++++ 2 files changed, 142 insertions(+), 112 deletions(-) create mode 100644 library/std/src/thread/parker/mod.rs diff --git a/library/std/src/thread/mod.rs b/library/std/src/thread/mod.rs index 8c353e2484e..a5a8d5c9fbb 100644 --- a/library/std/src/thread/mod.rs +++ b/library/std/src/thread/mod.rs @@ -149,6 +149,8 @@ #[cfg(all(test, not(target_os = "emscripten")))] mod tests; +mod parker; + use crate::any::Any; use crate::cell::UnsafeCell; use crate::ffi::{CStr, CString}; @@ -159,15 +161,14 @@ use crate::num::NonZeroU64; use crate::panic; use crate::panicking; use crate::str; -use crate::sync::atomic::AtomicUsize; -use crate::sync::atomic::Ordering::SeqCst; -use crate::sync::{Arc, Condvar, Mutex}; +use crate::sync::Arc; use crate::sys::thread as imp; use crate::sys_common::mutex; use crate::sys_common::thread; use crate::sys_common::thread_info; use crate::sys_common::{AsInner, IntoInner}; use crate::time::Duration; +use parker::Parker; //////////////////////////////////////////////////////////////////////////////// // Thread-local storage @@ -667,6 +668,8 @@ pub fn current() -> Thread { /// /// [`channel`]: crate::sync::mpsc /// [`join`]: JoinHandle::join +/// [`Condvar`]: crate::sync::Condvar +/// [`Mutex`]: crate::sync::Mutex #[stable(feature = "rust1", since = "1.0.0")] pub fn yield_now() { imp::Thread::yield_now() @@ -712,6 +715,8 @@ pub fn yield_now() { /// panic!() /// } /// ``` +/// +/// [Mutex]: crate::sync::Mutex #[inline] #[stable(feature = "rust1", since = "1.0.0")] pub fn panicking() -> bool { @@ -779,11 +784,6 @@ pub fn sleep(dur: Duration) { imp::Thread::sleep(dur) } -// constants for park/unpark -const EMPTY: usize = 0; -const PARKED: usize = 1; -const NOTIFIED: usize = 2; - /// Blocks unless or until the current thread's token is made available. /// /// A call to `park` does not guarantee that the thread will remain parked @@ -870,45 +870,11 @@ const NOTIFIED: usize = 2; /// /// [`unpark`]: Thread::unpark /// [`thread::park_timeout`]: park_timeout -// -// The implementation currently uses the trivial strategy of a Mutex+Condvar -// with wakeup flag, which does not actually allow spurious wakeups. In the -// future, this will be implemented in a more efficient way, perhaps along the lines of -// http://cr.openjdk.java.net/~stefank/6989984.1/raw_files/new/src/os/linux/vm/os_linux.cpp -// or futuxes, and in either case may allow spurious wakeups. #[stable(feature = "rust1", since = "1.0.0")] pub fn park() { - let thread = current(); - - // If we were previously notified then we consume this notification and - // return quickly. - if thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { - return; - } - - // Otherwise we need to coordinate going to sleep - let mut m = thread.inner.lock.lock().unwrap(); - match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { - Ok(_) => {} - Err(NOTIFIED) => { - // We must read here, even though we know it will be `NOTIFIED`. - // This is because `unpark` may have been called again since we read - // `NOTIFIED` in the `compare_exchange` above. We must perform an - // acquire operation that synchronizes with that `unpark` to observe - // any writes it made before the call to unpark. To do that we must - // read from the write it made to `state`. - let old = thread.inner.state.swap(EMPTY, SeqCst); - assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); - return; - } // should consume this notification, so prohibit spurious wakeups in next park. - Err(_) => panic!("inconsistent park state"), - } - loop { - m = thread.inner.cvar.wait(m).unwrap(); - match thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) { - Ok(_) => return, // got a notification - Err(_) => {} // spurious wakeup, go back to sleep - } + // SAFETY: park_timeout is called on the parker owned by this thread. + unsafe { + current().inner.parker.park(); } } @@ -970,35 +936,9 @@ pub fn park_timeout_ms(ms: u32) { /// ``` #[stable(feature = "park_timeout", since = "1.4.0")] pub fn park_timeout(dur: Duration) { - let thread = current(); - - // Like `park` above we have a fast path for an already-notified thread, and - // afterwards we start coordinating for a sleep. - // return quickly. - if thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { - return; - } - let m = thread.inner.lock.lock().unwrap(); - match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { - Ok(_) => {} - Err(NOTIFIED) => { - // We must read again here, see `park`. - let old = thread.inner.state.swap(EMPTY, SeqCst); - assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); - return; - } // should consume this notification, so prohibit spurious wakeups in next park. - Err(_) => panic!("inconsistent park_timeout state"), - } - - // Wait with a timeout, and if we spuriously wake up or otherwise wake up - // from a notification we just want to unconditionally set the state back to - // empty, either consuming a notification or un-flagging ourselves as - // parked. - let (_m, _result) = thread.inner.cvar.wait_timeout(m, dur).unwrap(); - match thread.inner.state.swap(EMPTY, SeqCst) { - NOTIFIED => {} // got a notification, hurray! - PARKED => {} // no notification, alas - n => panic!("inconsistent park_timeout state: {}", n), + // SAFETY: park_timeout is called on the parker owned by this thread. + unsafe { + current().inner.parker.park_timeout(dur); } } @@ -1077,11 +1017,7 @@ impl ThreadId { struct Inner { name: Option, // Guaranteed to be UTF-8 id: ThreadId, - - // state for thread park/unpark - state: AtomicUsize, - lock: Mutex<()>, - cvar: Condvar, + parker: Parker, } #[derive(Clone)] @@ -1115,13 +1051,7 @@ impl Thread { let cname = name.map(|n| CString::new(n).expect("thread name may not contain interior null bytes")); Thread { - inner: Arc::new(Inner { - name: cname, - id: ThreadId::new(), - state: AtomicUsize::new(EMPTY), - lock: Mutex::new(()), - cvar: Condvar::new(), - }), + inner: Arc::new(Inner { name: cname, id: ThreadId::new(), parker: Parker::new() }), } } @@ -1157,32 +1087,7 @@ impl Thread { /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub fn unpark(&self) { - // To ensure the unparked thread will observe any writes we made - // before this call, we must perform a release operation that `park` - // can synchronize with. To do that we must write `NOTIFIED` even if - // `state` is already `NOTIFIED`. That is why this must be a swap - // rather than a compare-and-swap that returns if it reads `NOTIFIED` - // on failure. - match self.inner.state.swap(NOTIFIED, SeqCst) { - EMPTY => return, // no one was waiting - NOTIFIED => return, // already unparked - PARKED => {} // gotta go wake someone up - _ => panic!("inconsistent state in unpark"), - } - - // There is a period between when the parked thread sets `state` to - // `PARKED` (or last checked `state` in the case of a spurious wake - // up) and when it actually waits on `cvar`. If we were to notify - // during this period it would be ignored and then when the parked - // thread went to sleep it would never wake up. Fortunately, it has - // `lock` locked at this stage so we can acquire `lock` to wait until - // it is ready to receive the notification. - // - // Releasing `lock` before the call to `notify_one` means that when the - // parked thread wakes it doesn't get woken only to have to wait for us - // to release `lock`. - drop(self.inner.lock.lock().unwrap()); - self.inner.cvar.notify_one() + self.inner.parker.unpark(); } /// Gets the thread's unique identifier. diff --git a/library/std/src/thread/parker/mod.rs b/library/std/src/thread/parker/mod.rs new file mode 100644 index 00000000000..c8b9b7b1c79 --- /dev/null +++ b/library/std/src/thread/parker/mod.rs @@ -0,0 +1,125 @@ +//! Parker implementaiton based on a Mutex and Condvar. +//! +//! The implementation currently uses the trivial strategy of a Mutex+Condvar +//! with wakeup flag, which does not actually allow spurious wakeups. In the +//! future, this will be implemented in a more efficient way, perhaps along the lines of +//! http://cr.openjdk.java.net/~stefank/6989984.1/raw_files/new/src/os/linux/vm/os_linux.cpp +//! or futuxes, and in either case may allow spurious wakeups. + +use crate::sync::atomic::AtomicUsize; +use crate::sync::atomic::Ordering::SeqCst; +use crate::sync::{Condvar, Mutex}; +use crate::time::Duration; + +const EMPTY: usize = 0; +const PARKED: usize = 1; +const NOTIFIED: usize = 2; + +pub struct Parker { + state: AtomicUsize, + lock: Mutex<()>, + cvar: Condvar, +} + +impl Parker { + pub fn new() -> Self { + Parker { state: AtomicUsize::new(EMPTY), lock: Mutex::new(()), cvar: Condvar::new() } + } + + // This implementaiton doesn't require `unsafe`, but other implementations + // may assume this is only called by the thread that owns the Parker. + pub unsafe fn park(&self) { + // If we were previously notified then we consume this notification and + // return quickly. + if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { + return; + } + + // Otherwise we need to coordinate going to sleep + let mut m = self.lock.lock().unwrap(); + match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + Err(NOTIFIED) => { + // We must read here, even though we know it will be `NOTIFIED`. + // This is because `unpark` may have been called again since we read + // `NOTIFIED` in the `compare_exchange` above. We must perform an + // acquire operation that synchronizes with that `unpark` to observe + // any writes it made before the call to unpark. To do that we must + // read from the write it made to `state`. + let old = self.state.swap(EMPTY, SeqCst); + assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + return; + } // should consume this notification, so prohibit spurious wakeups in next park. + Err(_) => panic!("inconsistent park state"), + } + loop { + m = self.cvar.wait(m).unwrap(); + match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) { + Ok(_) => return, // got a notification + Err(_) => {} // spurious wakeup, go back to sleep + } + } + } + + // This implementaiton doesn't require `unsafe`, but other implementations + // may assume this is only called by the thread that owns the Parker. + pub unsafe fn park_timeout(&self, dur: Duration) { + // Like `park` above we have a fast path for an already-notified thread, and + // afterwards we start coordinating for a sleep. + // return quickly. + if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { + return; + } + let m = self.lock.lock().unwrap(); + match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + Err(NOTIFIED) => { + // We must read again here, see `park`. + let old = self.state.swap(EMPTY, SeqCst); + assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + return; + } // should consume this notification, so prohibit spurious wakeups in next park. + Err(_) => panic!("inconsistent park_timeout state"), + } + + // Wait with a timeout, and if we spuriously wake up or otherwise wake up + // from a notification we just want to unconditionally set the state back to + // empty, either consuming a notification or un-flagging ourselves as + // parked. + let (_m, _result) = self.cvar.wait_timeout(m, dur).unwrap(); + match self.state.swap(EMPTY, SeqCst) { + NOTIFIED => {} // got a notification, hurray! + PARKED => {} // no notification, alas + n => panic!("inconsistent park_timeout state: {}", n), + } + } + + pub fn unpark(&self) { + // To ensure the unparked thread will observe any writes we made + // before this call, we must perform a release operation that `park` + // can synchronize with. To do that we must write `NOTIFIED` even if + // `state` is already `NOTIFIED`. That is why this must be a swap + // rather than a compare-and-swap that returns if it reads `NOTIFIED` + // on failure. + match self.state.swap(NOTIFIED, SeqCst) { + EMPTY => return, // no one was waiting + NOTIFIED => return, // already unparked + PARKED => {} // gotta go wake someone up + _ => panic!("inconsistent state in unpark"), + } + + // There is a period between when the parked thread sets `state` to + // `PARKED` (or last checked `state` in the case of a spurious wake + // up) and when it actually waits on `cvar`. If we were to notify + // during this period it would be ignored and then when the parked + // thread went to sleep it would never wake up. Fortunately, it has + // `lock` locked at this stage so we can acquire `lock` to wait until + // it is ready to receive the notification. + // + // Releasing `lock` before the call to `notify_one` means that when the + // parked thread wakes it doesn't get woken only to have to wait for us + // to release `lock`. + drop(self.lock.lock().unwrap()); + self.cvar.notify_one() + } +} From ec13df4ec4780b679dde227db2185a82cf31c93b Mon Sep 17 00:00:00 2001 From: Mara Bos Date: Sat, 19 Sep 2020 14:54:21 +0200 Subject: [PATCH 2/8] Add fast futex-based thread parker for Linux. --- library/std/src/thread/parker/generic.rs | 119 +++++++++++++++++++++ library/std/src/thread/parker/linux.rs | 105 ++++++++++++++++++ library/std/src/thread/parker/mod.rs | 130 ++--------------------- 3 files changed, 231 insertions(+), 123 deletions(-) create mode 100644 library/std/src/thread/parker/generic.rs create mode 100644 library/std/src/thread/parker/linux.rs diff --git a/library/std/src/thread/parker/generic.rs b/library/std/src/thread/parker/generic.rs new file mode 100644 index 00000000000..14cfa958e5e --- /dev/null +++ b/library/std/src/thread/parker/generic.rs @@ -0,0 +1,119 @@ +//! Parker implementaiton based on a Mutex and Condvar. + +use crate::sync::atomic::AtomicUsize; +use crate::sync::atomic::Ordering::SeqCst; +use crate::sync::{Condvar, Mutex}; +use crate::time::Duration; + +const EMPTY: usize = 0; +const PARKED: usize = 1; +const NOTIFIED: usize = 2; + +pub struct Parker { + state: AtomicUsize, + lock: Mutex<()>, + cvar: Condvar, +} + +impl Parker { + pub fn new() -> Self { + Parker { state: AtomicUsize::new(EMPTY), lock: Mutex::new(()), cvar: Condvar::new() } + } + + // This implementaiton doesn't require `unsafe`, but other implementations + // may assume this is only called by the thread that owns the Parker. + pub unsafe fn park(&self) { + // If we were previously notified then we consume this notification and + // return quickly. + if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { + return; + } + + // Otherwise we need to coordinate going to sleep + let mut m = self.lock.lock().unwrap(); + match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + Err(NOTIFIED) => { + // We must read here, even though we know it will be `NOTIFIED`. + // This is because `unpark` may have been called again since we read + // `NOTIFIED` in the `compare_exchange` above. We must perform an + // acquire operation that synchronizes with that `unpark` to observe + // any writes it made before the call to unpark. To do that we must + // read from the write it made to `state`. + let old = self.state.swap(EMPTY, SeqCst); + assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + return; + } // should consume this notification, so prohibit spurious wakeups in next park. + Err(_) => panic!("inconsistent park state"), + } + loop { + m = self.cvar.wait(m).unwrap(); + match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) { + Ok(_) => return, // got a notification + Err(_) => {} // spurious wakeup, go back to sleep + } + } + } + + // This implementaiton doesn't require `unsafe`, but other implementations + // may assume this is only called by the thread that owns the Parker. + pub unsafe fn park_timeout(&self, dur: Duration) { + // Like `park` above we have a fast path for an already-notified thread, and + // afterwards we start coordinating for a sleep. + // return quickly. + if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { + return; + } + let m = self.lock.lock().unwrap(); + match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + Err(NOTIFIED) => { + // We must read again here, see `park`. + let old = self.state.swap(EMPTY, SeqCst); + assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + return; + } // should consume this notification, so prohibit spurious wakeups in next park. + Err(_) => panic!("inconsistent park_timeout state"), + } + + // Wait with a timeout, and if we spuriously wake up or otherwise wake up + // from a notification we just want to unconditionally set the state back to + // empty, either consuming a notification or un-flagging ourselves as + // parked. + let (_m, _result) = self.cvar.wait_timeout(m, dur).unwrap(); + match self.state.swap(EMPTY, SeqCst) { + NOTIFIED => {} // got a notification, hurray! + PARKED => {} // no notification, alas + n => panic!("inconsistent park_timeout state: {}", n), + } + } + + pub fn unpark(&self) { + // To ensure the unparked thread will observe any writes we made + // before this call, we must perform a release operation that `park` + // can synchronize with. To do that we must write `NOTIFIED` even if + // `state` is already `NOTIFIED`. That is why this must be a swap + // rather than a compare-and-swap that returns if it reads `NOTIFIED` + // on failure. + match self.state.swap(NOTIFIED, SeqCst) { + EMPTY => return, // no one was waiting + NOTIFIED => return, // already unparked + PARKED => {} // gotta go wake someone up + _ => panic!("inconsistent state in unpark"), + } + + // There is a period between when the parked thread sets `state` to + // `PARKED` (or last checked `state` in the case of a spurious wake + // up) and when it actually waits on `cvar`. If we were to notify + // during this period it would be ignored and then when the parked + // thread went to sleep it would never wake up. Fortunately, it has + // `lock` locked at this stage so we can acquire `lock` to wait until + // it is ready to receive the notification. + // + // Releasing `lock` before the call to `notify_one` means that when the + // parked thread wakes it doesn't get woken only to have to wait for us + // to release `lock`. + drop(self.lock.lock().unwrap()); + self.cvar.notify_one() + } +} diff --git a/library/std/src/thread/parker/linux.rs b/library/std/src/thread/parker/linux.rs new file mode 100644 index 00000000000..05142b88c73 --- /dev/null +++ b/library/std/src/thread/parker/linux.rs @@ -0,0 +1,105 @@ +use crate::sync::atomic::AtomicI32; +use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release}; +use crate::time::Duration; + +const PARKED: i32 = -1; +const EMPTY: i32 = 0; +const NOTIFIED: i32 = 1; + +pub struct Parker { + state: AtomicI32, +} + +impl Parker { + #[inline] + pub const fn new() -> Self { + Parker { state: AtomicI32::new(EMPTY) } + } + + // Assumes this is only called by the thread that owns the Parker, + // which means that `self.state != PARKED`. + pub unsafe fn park(&self) { + // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the + // first case. + if self.state.fetch_sub(1, Acquire) == NOTIFIED { + return; + } + loop { + // Wait for something to happen, assuming it's still set to PARKED. + futex_wait(&self.state, PARKED, None); + // Change NOTIFIED=>EMPTY and return in that case. + if self.state.compare_and_swap(NOTIFIED, EMPTY, Acquire) == NOTIFIED { + return; + } else { + // Spurious wake up. We loop to try again. + } + } + } + + // Assumes this is only called by the thread that owns the Parker, + // which means that `self.state != PARKED`. + pub unsafe fn park_timeout(&self, timeout: Duration) { + // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the + // first case. + if self.state.fetch_sub(1, Acquire) == NOTIFIED { + return; + } + // Wait for something to happen, assuming it's still set to PARKED. + futex_wait(&self.state, PARKED, Some(timeout)); + // This is not just a store, because we need to establish a + // release-acquire ordering with unpark(). + if self.state.swap(EMPTY, Acquire) == NOTIFIED { + // Woke up because of unpark(). + } else { + // Timeout or spurious wake up. + // We return either way, because we can't easily tell if it was the + // timeout or not. + } + } + + pub fn unpark(&self) { + // Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and + // wake the thread in the first case. + // + // Note that even NOTIFIED=>NOTIFIED results in a write. This is on + // purpose, to make sure every unpark() has a release-acquire ordering + // with park(). + if self.state.swap(NOTIFIED, Release) == PARKED { + futex_wake(&self.state); + } + } +} + +fn futex_wait(futex: &AtomicI32, expected: i32, timeout: Option) { + let timespec; + let timespec_ptr = match timeout { + Some(timeout) => { + timespec = libc::timespec { + tv_sec: timeout.as_secs() as _, + tv_nsec: timeout.subsec_nanos() as _, + }; + ×pec as *const libc::timespec + } + None => crate::ptr::null(), + }; + unsafe { + libc::syscall( + libc::SYS_futex, + futex as *const AtomicI32, + libc::FUTEX_WAIT | libc::FUTEX_PRIVATE_FLAG, + expected, + timespec_ptr, + ); + } +} + +fn futex_wake(futex: &AtomicI32) { + unsafe { + libc::syscall( + libc::SYS_futex, + futex as *const AtomicI32, + libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG, + 1, + ); + } +} diff --git a/library/std/src/thread/parker/mod.rs b/library/std/src/thread/parker/mod.rs index c8b9b7b1c79..4dc5e1aa271 100644 --- a/library/std/src/thread/parker/mod.rs +++ b/library/std/src/thread/parker/mod.rs @@ -1,125 +1,9 @@ -//! Parker implementaiton based on a Mutex and Condvar. -//! -//! The implementation currently uses the trivial strategy of a Mutex+Condvar -//! with wakeup flag, which does not actually allow spurious wakeups. In the -//! future, this will be implemented in a more efficient way, perhaps along the lines of -//! http://cr.openjdk.java.net/~stefank/6989984.1/raw_files/new/src/os/linux/vm/os_linux.cpp -//! or futuxes, and in either case may allow spurious wakeups. - -use crate::sync::atomic::AtomicUsize; -use crate::sync::atomic::Ordering::SeqCst; -use crate::sync::{Condvar, Mutex}; -use crate::time::Duration; - -const EMPTY: usize = 0; -const PARKED: usize = 1; -const NOTIFIED: usize = 2; - -pub struct Parker { - state: AtomicUsize, - lock: Mutex<()>, - cvar: Condvar, -} - -impl Parker { - pub fn new() -> Self { - Parker { state: AtomicUsize::new(EMPTY), lock: Mutex::new(()), cvar: Condvar::new() } - } - - // This implementaiton doesn't require `unsafe`, but other implementations - // may assume this is only called by the thread that owns the Parker. - pub unsafe fn park(&self) { - // If we were previously notified then we consume this notification and - // return quickly. - if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { - return; - } - - // Otherwise we need to coordinate going to sleep - let mut m = self.lock.lock().unwrap(); - match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { - Ok(_) => {} - Err(NOTIFIED) => { - // We must read here, even though we know it will be `NOTIFIED`. - // This is because `unpark` may have been called again since we read - // `NOTIFIED` in the `compare_exchange` above. We must perform an - // acquire operation that synchronizes with that `unpark` to observe - // any writes it made before the call to unpark. To do that we must - // read from the write it made to `state`. - let old = self.state.swap(EMPTY, SeqCst); - assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); - return; - } // should consume this notification, so prohibit spurious wakeups in next park. - Err(_) => panic!("inconsistent park state"), - } - loop { - m = self.cvar.wait(m).unwrap(); - match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) { - Ok(_) => return, // got a notification - Err(_) => {} // spurious wakeup, go back to sleep - } - } - } - - // This implementaiton doesn't require `unsafe`, but other implementations - // may assume this is only called by the thread that owns the Parker. - pub unsafe fn park_timeout(&self, dur: Duration) { - // Like `park` above we have a fast path for an already-notified thread, and - // afterwards we start coordinating for a sleep. - // return quickly. - if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { - return; - } - let m = self.lock.lock().unwrap(); - match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { - Ok(_) => {} - Err(NOTIFIED) => { - // We must read again here, see `park`. - let old = self.state.swap(EMPTY, SeqCst); - assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); - return; - } // should consume this notification, so prohibit spurious wakeups in next park. - Err(_) => panic!("inconsistent park_timeout state"), - } - - // Wait with a timeout, and if we spuriously wake up or otherwise wake up - // from a notification we just want to unconditionally set the state back to - // empty, either consuming a notification or un-flagging ourselves as - // parked. - let (_m, _result) = self.cvar.wait_timeout(m, dur).unwrap(); - match self.state.swap(EMPTY, SeqCst) { - NOTIFIED => {} // got a notification, hurray! - PARKED => {} // no notification, alas - n => panic!("inconsistent park_timeout state: {}", n), - } - } - - pub fn unpark(&self) { - // To ensure the unparked thread will observe any writes we made - // before this call, we must perform a release operation that `park` - // can synchronize with. To do that we must write `NOTIFIED` even if - // `state` is already `NOTIFIED`. That is why this must be a swap - // rather than a compare-and-swap that returns if it reads `NOTIFIED` - // on failure. - match self.state.swap(NOTIFIED, SeqCst) { - EMPTY => return, // no one was waiting - NOTIFIED => return, // already unparked - PARKED => {} // gotta go wake someone up - _ => panic!("inconsistent state in unpark"), - } - - // There is a period between when the parked thread sets `state` to - // `PARKED` (or last checked `state` in the case of a spurious wake - // up) and when it actually waits on `cvar`. If we were to notify - // during this period it would be ignored and then when the parked - // thread went to sleep it would never wake up. Fortunately, it has - // `lock` locked at this stage so we can acquire `lock` to wait until - // it is ready to receive the notification. - // - // Releasing `lock` before the call to `notify_one` means that when the - // parked thread wakes it doesn't get woken only to have to wait for us - // to release `lock`. - drop(self.lock.lock().unwrap()); - self.cvar.notify_one() +cfg_if::cfg_if! { + if #[cfg(any(target_os = "linux", target_os = "android"))] { + mod linux; + pub use linux::Parker; + } else { + mod generic; + pub use generic::Parker; } } From f18f93d44cb2d95388865f98bb5938e572f0fc94 Mon Sep 17 00:00:00 2001 From: Mara Bos Date: Sat, 19 Sep 2020 14:54:52 +0200 Subject: [PATCH 3/8] Mark unpark() as #[inline]. --- library/std/src/thread/mod.rs | 1 + library/std/src/thread/parker/linux.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/library/std/src/thread/mod.rs b/library/std/src/thread/mod.rs index a5a8d5c9fbb..45430e58cbb 100644 --- a/library/std/src/thread/mod.rs +++ b/library/std/src/thread/mod.rs @@ -1086,6 +1086,7 @@ impl Thread { /// parked_thread.join().unwrap(); /// ``` #[stable(feature = "rust1", since = "1.0.0")] + #[inline] pub fn unpark(&self) { self.inner.parker.unpark(); } diff --git a/library/std/src/thread/parker/linux.rs b/library/std/src/thread/parker/linux.rs index 05142b88c73..2e238dcdc0b 100644 --- a/library/std/src/thread/parker/linux.rs +++ b/library/std/src/thread/parker/linux.rs @@ -57,6 +57,7 @@ impl Parker { } } + #[inline] pub fn unpark(&self) { // Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and // wake the thread in the first case. From 568d9696e90d6bd8125d59cebf13b3af624abd31 Mon Sep 17 00:00:00 2001 From: Mara Bos Date: Sat, 19 Sep 2020 15:17:51 +0200 Subject: [PATCH 4/8] Fix warning. --- library/std/src/thread/parker/linux.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/library/std/src/thread/parker/linux.rs b/library/std/src/thread/parker/linux.rs index 2e238dcdc0b..090c83fbb40 100644 --- a/library/std/src/thread/parker/linux.rs +++ b/library/std/src/thread/parker/linux.rs @@ -1,5 +1,5 @@ use crate::sync::atomic::AtomicI32; -use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release}; +use crate::sync::atomic::Ordering::{Acquire, Release}; use crate::time::Duration; const PARKED: i32 = -1; From 2cf0f64722a92dffa12d43a4c0383a9d76becbcc Mon Sep 17 00:00:00 2001 From: Mara Bos Date: Sat, 19 Sep 2020 18:03:10 +0200 Subject: [PATCH 5/8] Move linux-specific futex code into `sys` module. --- library/std/src/sys/unix/futex.rs | 38 +++++++++++++++++++ library/std/src/sys/unix/mod.rs | 1 + .../src/thread/parker/{linux.rs => futex.rs} | 35 +---------------- library/std/src/thread/parker/mod.rs | 4 +- 4 files changed, 42 insertions(+), 36 deletions(-) create mode 100644 library/std/src/sys/unix/futex.rs rename library/std/src/thread/parker/{linux.rs => futex.rs} (74%) diff --git a/library/std/src/sys/unix/futex.rs b/library/std/src/sys/unix/futex.rs new file mode 100644 index 00000000000..6af06aa5f7e --- /dev/null +++ b/library/std/src/sys/unix/futex.rs @@ -0,0 +1,38 @@ +#![cfg(any(target_os = "linux", target_os = "android"))] + +use crate::sync::atomic::AtomicI32; +use crate::time::Duration; + +pub fn futex_wait(futex: &AtomicI32, expected: i32, timeout: Option) { + let timespec; + let timespec_ptr = match timeout { + Some(timeout) => { + timespec = libc::timespec { + tv_sec: timeout.as_secs() as _, + tv_nsec: timeout.subsec_nanos() as _, + }; + ×pec as *const libc::timespec + } + None => crate::ptr::null(), + }; + unsafe { + libc::syscall( + libc::SYS_futex, + futex as *const AtomicI32, + libc::FUTEX_WAIT | libc::FUTEX_PRIVATE_FLAG, + expected, + timespec_ptr, + ); + } +} + +pub fn futex_wake(futex: &AtomicI32) { + unsafe { + libc::syscall( + libc::SYS_futex, + futex as *const AtomicI32, + libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG, + 1, + ); + } +} diff --git a/library/std/src/sys/unix/mod.rs b/library/std/src/sys/unix/mod.rs index eddf00d3979..6c623e21099 100644 --- a/library/std/src/sys/unix/mod.rs +++ b/library/std/src/sys/unix/mod.rs @@ -49,6 +49,7 @@ pub mod env; pub mod ext; pub mod fd; pub mod fs; +pub mod futex; pub mod io; #[cfg(target_os = "l4re")] mod l4re; diff --git a/library/std/src/thread/parker/linux.rs b/library/std/src/thread/parker/futex.rs similarity index 74% rename from library/std/src/thread/parker/linux.rs rename to library/std/src/thread/parker/futex.rs index 090c83fbb40..2d7a7770508 100644 --- a/library/std/src/thread/parker/linux.rs +++ b/library/std/src/thread/parker/futex.rs @@ -1,5 +1,6 @@ use crate::sync::atomic::AtomicI32; use crate::sync::atomic::Ordering::{Acquire, Release}; +use crate::sys::futex::{futex_wait, futex_wake}; use crate::time::Duration; const PARKED: i32 = -1; @@ -70,37 +71,3 @@ impl Parker { } } } - -fn futex_wait(futex: &AtomicI32, expected: i32, timeout: Option) { - let timespec; - let timespec_ptr = match timeout { - Some(timeout) => { - timespec = libc::timespec { - tv_sec: timeout.as_secs() as _, - tv_nsec: timeout.subsec_nanos() as _, - }; - ×pec as *const libc::timespec - } - None => crate::ptr::null(), - }; - unsafe { - libc::syscall( - libc::SYS_futex, - futex as *const AtomicI32, - libc::FUTEX_WAIT | libc::FUTEX_PRIVATE_FLAG, - expected, - timespec_ptr, - ); - } -} - -fn futex_wake(futex: &AtomicI32) { - unsafe { - libc::syscall( - libc::SYS_futex, - futex as *const AtomicI32, - libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG, - 1, - ); - } -} diff --git a/library/std/src/thread/parker/mod.rs b/library/std/src/thread/parker/mod.rs index 4dc5e1aa271..23c17c8e2cf 100644 --- a/library/std/src/thread/parker/mod.rs +++ b/library/std/src/thread/parker/mod.rs @@ -1,7 +1,7 @@ cfg_if::cfg_if! { if #[cfg(any(target_os = "linux", target_os = "android"))] { - mod linux; - pub use linux::Parker; + mod futex; + pub use futex::Parker; } else { mod generic; pub use generic::Parker; From 485f882d7713b0e0b864fc8b21368910e5b8b0a7 Mon Sep 17 00:00:00 2001 From: Mara Bos Date: Sun, 20 Sep 2020 00:17:05 +0200 Subject: [PATCH 6/8] Check conversion from Duration to timespec in futex_wait. --- library/std/src/sys/unix/futex.rs | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/library/std/src/sys/unix/futex.rs b/library/std/src/sys/unix/futex.rs index 6af06aa5f7e..e6f0c48c59b 100644 --- a/library/std/src/sys/unix/futex.rs +++ b/library/std/src/sys/unix/futex.rs @@ -1,27 +1,26 @@ #![cfg(any(target_os = "linux", target_os = "android"))] +use crate::convert::TryInto; +use crate::ptr::null; use crate::sync::atomic::AtomicI32; use crate::time::Duration; pub fn futex_wait(futex: &AtomicI32, expected: i32, timeout: Option) { - let timespec; - let timespec_ptr = match timeout { - Some(timeout) => { - timespec = libc::timespec { - tv_sec: timeout.as_secs() as _, - tv_nsec: timeout.subsec_nanos() as _, - }; - ×pec as *const libc::timespec - } - None => crate::ptr::null(), - }; + let timespec = timeout.and_then(|d| { + Some(libc::timespec { + // Sleep forever if the timeout is longer than fits in a timespec. + tv_sec: d.as_secs().try_into().ok()?, + // This conversion never truncates, as subsec_nanos is always <1e9. + tv_nsec: d.subsec_nanos() as _, + }) + }); unsafe { libc::syscall( libc::SYS_futex, futex as *const AtomicI32, libc::FUTEX_WAIT | libc::FUTEX_PRIVATE_FLAG, expected, - timespec_ptr, + timespec.as_ref().map_or(null(), |d| d as *const libc::timespec), ); } } From 4301b5c1cc9652247b1c465d304ff9c94df294ef Mon Sep 17 00:00:00 2001 From: Mara Bos Date: Sun, 20 Sep 2020 00:35:39 +0200 Subject: [PATCH 7/8] Add notes about memory ordering to futex parker implementation. --- library/std/src/thread/parker/futex.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/library/std/src/thread/parker/futex.rs b/library/std/src/thread/parker/futex.rs index 2d7a7770508..a5d4927dcc5 100644 --- a/library/std/src/thread/parker/futex.rs +++ b/library/std/src/thread/parker/futex.rs @@ -11,6 +11,26 @@ pub struct Parker { state: AtomicI32, } +// Notes about memory ordering: +// +// Memory ordering is only relevant for the relative ordering of operations +// between different variables. Even Ordering::Relaxed guarantees a +// monotonic/consistent order when looking at just a single atomic variable. +// +// So, since this parker is just a single atomic variable, we only need to look +// at the ordering guarantees we need to provide to the 'outside world'. +// +// The only memory ordering guarantee that parking and unparking provide, is +// that things which happened before unpark() are visible on the thread +// returning from park() afterwards. Otherwise, it was effectively unparked +// before unpark() was called while still consuming the 'token'. +// +// In other words, unpark() needs to synchronize with the part of park() that +// consumes the token and returns. +// +// This is done with a release-acquire synchronization, by using +// Ordering::Release when writing NOTIFIED (the 'token') in unpark(), and using +// Ordering::Acquire when checking for this state in park(). impl Parker { #[inline] pub const fn new() -> Self { From 0b73fd7105db81a994a81b775a43bbdb1be3c76a Mon Sep 17 00:00:00 2001 From: Mara Bos Date: Sun, 27 Sep 2020 12:27:27 +0200 Subject: [PATCH 8/8] Move thread parker to sys_common. --- library/std/src/sys_common/mod.rs | 1 + .../src/{thread/parker => sys_common/thread_parker}/futex.rs | 0 .../{thread/parker => sys_common/thread_parker}/generic.rs | 0 .../src/{thread/parker => sys_common/thread_parker}/mod.rs | 0 library/std/src/thread/mod.rs | 4 +--- 5 files changed, 2 insertions(+), 3 deletions(-) rename library/std/src/{thread/parker => sys_common/thread_parker}/futex.rs (100%) rename library/std/src/{thread/parker => sys_common/thread_parker}/generic.rs (100%) rename library/std/src/{thread/parker => sys_common/thread_parker}/mod.rs (100%) diff --git a/library/std/src/sys_common/mod.rs b/library/std/src/sys_common/mod.rs index 28cdfefb12a..234b257aa92 100644 --- a/library/std/src/sys_common/mod.rs +++ b/library/std/src/sys_common/mod.rs @@ -66,6 +66,7 @@ pub mod thread; pub mod thread_info; pub mod thread_local_dtor; pub mod thread_local_key; +pub mod thread_parker; pub mod util; pub mod wtf8; diff --git a/library/std/src/thread/parker/futex.rs b/library/std/src/sys_common/thread_parker/futex.rs similarity index 100% rename from library/std/src/thread/parker/futex.rs rename to library/std/src/sys_common/thread_parker/futex.rs diff --git a/library/std/src/thread/parker/generic.rs b/library/std/src/sys_common/thread_parker/generic.rs similarity index 100% rename from library/std/src/thread/parker/generic.rs rename to library/std/src/sys_common/thread_parker/generic.rs diff --git a/library/std/src/thread/parker/mod.rs b/library/std/src/sys_common/thread_parker/mod.rs similarity index 100% rename from library/std/src/thread/parker/mod.rs rename to library/std/src/sys_common/thread_parker/mod.rs diff --git a/library/std/src/thread/mod.rs b/library/std/src/thread/mod.rs index 45430e58cbb..fb2fbb5bf2d 100644 --- a/library/std/src/thread/mod.rs +++ b/library/std/src/thread/mod.rs @@ -149,8 +149,6 @@ #[cfg(all(test, not(target_os = "emscripten")))] mod tests; -mod parker; - use crate::any::Any; use crate::cell::UnsafeCell; use crate::ffi::{CStr, CString}; @@ -166,9 +164,9 @@ use crate::sys::thread as imp; use crate::sys_common::mutex; use crate::sys_common::thread; use crate::sys_common::thread_info; +use crate::sys_common::thread_parker::Parker; use crate::sys_common::{AsInner, IntoInner}; use crate::time::Duration; -use parker::Parker; //////////////////////////////////////////////////////////////////////////////// // Thread-local storage