Rollup merge of #57992 - Matthias247:waker4, r=cramertj
Update the future/task API This change updates the future and task API as discussed in the stabilization RFC at https://github.com/rust-lang/rfcs/pull/2592. Changes: - Replacing UnsafeWake with RawWaker and RawWakerVtable - Removal of LocalWaker - Removal of Arc-based Wake trait
This commit is contained in:
commit
919cf42feb
|
@ -71,7 +71,7 @@ use core::ops::{
|
|||
CoerceUnsized, DispatchFromDyn, Deref, DerefMut, Receiver, Generator, GeneratorState
|
||||
};
|
||||
use core::ptr::{self, NonNull, Unique};
|
||||
use core::task::{LocalWaker, Poll};
|
||||
use core::task::{Waker, Poll};
|
||||
|
||||
use crate::vec::Vec;
|
||||
use crate::raw_vec::RawVec;
|
||||
|
@ -896,7 +896,7 @@ impl<G: ?Sized + Generator> Generator for Pin<Box<G>> {
|
|||
impl<F: ?Sized + Future + Unpin> Future for Box<F> {
|
||||
type Output = F::Output;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
F::poll(Pin::new(&mut *self), lw)
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
|
||||
F::poll(Pin::new(&mut *self), waker)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -132,10 +132,6 @@ mod macros;
|
|||
|
||||
pub mod alloc;
|
||||
|
||||
#[unstable(feature = "futures_api",
|
||||
reason = "futures in libcore are unstable",
|
||||
issue = "50547")]
|
||||
pub mod task;
|
||||
// Primitive types using the heaps above
|
||||
|
||||
// Need to conditionally define the mod from `boxed.rs` to avoid
|
||||
|
|
|
@ -1,130 +0,0 @@
|
|||
//! Types and Traits for working with asynchronous tasks.
|
||||
|
||||
pub use core::task::*;
|
||||
|
||||
#[cfg(all(target_has_atomic = "ptr", target_has_atomic = "cas"))]
|
||||
pub use if_arc::*;
|
||||
|
||||
#[cfg(all(target_has_atomic = "ptr", target_has_atomic = "cas"))]
|
||||
mod if_arc {
|
||||
use super::*;
|
||||
use core::marker::PhantomData;
|
||||
use core::mem;
|
||||
use core::ptr::{self, NonNull};
|
||||
use crate::sync::Arc;
|
||||
|
||||
/// A way of waking up a specific task.
|
||||
///
|
||||
/// Any task executor must provide a way of signaling that a task it owns
|
||||
/// is ready to be `poll`ed again. Executors do so by implementing this trait.
|
||||
pub trait Wake: Send + Sync {
|
||||
/// Indicates that the associated task is ready to make progress and should
|
||||
/// be `poll`ed.
|
||||
///
|
||||
/// Executors generally maintain a queue of "ready" tasks; `wake` should place
|
||||
/// the associated task onto this queue.
|
||||
fn wake(arc_self: &Arc<Self>);
|
||||
|
||||
/// Indicates that the associated task is ready to make progress and should
|
||||
/// be `poll`ed. This function is like `wake`, but can only be called from the
|
||||
/// thread on which this `Wake` was created.
|
||||
///
|
||||
/// Executors generally maintain a queue of "ready" tasks; `wake_local` should place
|
||||
/// the associated task onto this queue.
|
||||
#[inline]
|
||||
unsafe fn wake_local(arc_self: &Arc<Self>) {
|
||||
Self::wake(arc_self);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(target_has_atomic = "ptr", target_has_atomic = "cas"))]
|
||||
struct ArcWrapped<T>(PhantomData<T>);
|
||||
|
||||
unsafe impl<T: Wake + 'static> UnsafeWake for ArcWrapped<T> {
|
||||
#[inline]
|
||||
unsafe fn clone_raw(&self) -> Waker {
|
||||
let me: *const ArcWrapped<T> = self;
|
||||
let arc = (*(&me as *const *const ArcWrapped<T> as *const Arc<T>)).clone();
|
||||
Waker::from(arc)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
unsafe fn drop_raw(&self) {
|
||||
let mut me: *const ArcWrapped<T> = self;
|
||||
let me = &mut me as *mut *const ArcWrapped<T> as *mut Arc<T>;
|
||||
ptr::drop_in_place(me);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
unsafe fn wake(&self) {
|
||||
let me: *const ArcWrapped<T> = self;
|
||||
T::wake(&*(&me as *const *const ArcWrapped<T> as *const Arc<T>))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
unsafe fn wake_local(&self) {
|
||||
let me: *const ArcWrapped<T> = self;
|
||||
T::wake_local(&*(&me as *const *const ArcWrapped<T> as *const Arc<T>))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<Arc<T>> for Waker
|
||||
where T: Wake + 'static,
|
||||
{
|
||||
fn from(rc: Arc<T>) -> Self {
|
||||
unsafe {
|
||||
let ptr = mem::transmute::<Arc<T>, NonNull<ArcWrapped<T>>>(rc);
|
||||
Waker::new(ptr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a `LocalWaker` from a local `wake`.
|
||||
///
|
||||
/// This function requires that `wake` is "local" (created on the current thread).
|
||||
/// The resulting `LocalWaker` will call `wake.wake_local()` when awoken, and
|
||||
/// will call `wake.wake()` if awoken after being converted to a `Waker`.
|
||||
#[inline]
|
||||
pub unsafe fn local_waker<W: Wake + 'static>(wake: Arc<W>) -> LocalWaker {
|
||||
let ptr = mem::transmute::<Arc<W>, NonNull<ArcWrapped<W>>>(wake);
|
||||
LocalWaker::new(ptr)
|
||||
}
|
||||
|
||||
struct NonLocalAsLocal<T>(ArcWrapped<T>);
|
||||
|
||||
unsafe impl<T: Wake + 'static> UnsafeWake for NonLocalAsLocal<T> {
|
||||
#[inline]
|
||||
unsafe fn clone_raw(&self) -> Waker {
|
||||
self.0.clone_raw()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
unsafe fn drop_raw(&self) {
|
||||
self.0.drop_raw()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
unsafe fn wake(&self) {
|
||||
self.0.wake()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
unsafe fn wake_local(&self) {
|
||||
// Since we're nonlocal, we can't call wake_local
|
||||
self.0.wake()
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a `LocalWaker` from a non-local `wake`.
|
||||
///
|
||||
/// This function is similar to `local_waker`, but does not require that `wake`
|
||||
/// is local to the current thread. The resulting `LocalWaker` will call
|
||||
/// `wake.wake()` when awoken.
|
||||
#[inline]
|
||||
pub fn local_waker_from_nonlocal<W: Wake + 'static>(wake: Arc<W>) -> LocalWaker {
|
||||
unsafe {
|
||||
let ptr = mem::transmute::<Arc<W>, NonNull<NonLocalAsLocal<W>>>(wake);
|
||||
LocalWaker::new(ptr)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -5,7 +5,7 @@
|
|||
use marker::Unpin;
|
||||
use ops;
|
||||
use pin::Pin;
|
||||
use task::{Poll, LocalWaker};
|
||||
use task::{Poll, Waker};
|
||||
|
||||
/// A future represents an asynchronous computation.
|
||||
///
|
||||
|
@ -19,13 +19,14 @@ use task::{Poll, LocalWaker};
|
|||
/// final value. This method does not block if the value is not ready. Instead,
|
||||
/// the current task is scheduled to be woken up when it's possible to make
|
||||
/// further progress by `poll`ing again. The wake up is performed using
|
||||
/// `cx.waker()`, a handle for waking up the current task.
|
||||
/// the `waker` argument of the `poll()` method, which is a handle for waking
|
||||
/// up the current task.
|
||||
///
|
||||
/// When using a future, you generally won't call `poll` directly, but instead
|
||||
/// `await!` the value.
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub trait Future {
|
||||
/// The result of the `Future`.
|
||||
/// The type of value produced on completion.
|
||||
type Output;
|
||||
|
||||
/// Attempt to resolve the future to a final value, registering
|
||||
|
@ -42,16 +43,16 @@ pub trait Future {
|
|||
/// Once a future has finished, clients should not `poll` it again.
|
||||
///
|
||||
/// When a future is not ready yet, `poll` returns `Poll::Pending` and
|
||||
/// stores a clone of the [`LocalWaker`] to be woken once the future can
|
||||
/// stores a clone of the [`Waker`] to be woken once the future can
|
||||
/// make progress. For example, a future waiting for a socket to become
|
||||
/// readable would call `.clone()` on the [`LocalWaker`] and store it.
|
||||
/// readable would call `.clone()` on the [`Waker`] and store it.
|
||||
/// When a signal arrives elsewhere indicating that the socket is readable,
|
||||
/// `[LocalWaker::wake]` is called and the socket future's task is awoken.
|
||||
/// `[Waker::wake]` is called and the socket future's task is awoken.
|
||||
/// Once a task has been woken up, it should attempt to `poll` the future
|
||||
/// again, which may or may not produce a final value.
|
||||
///
|
||||
/// Note that on multiple calls to `poll`, only the most recent
|
||||
/// [`LocalWaker`] passed to `poll` should be scheduled to receive a
|
||||
/// [`Waker`] passed to `poll` should be scheduled to receive a
|
||||
/// wakeup.
|
||||
///
|
||||
/// # Runtime characteristics
|
||||
|
@ -67,44 +68,35 @@ pub trait Future {
|
|||
/// typically do *not* suffer the same problems of "all wakeups must poll
|
||||
/// all events"; they are more like `epoll(4)`.
|
||||
///
|
||||
/// An implementation of `poll` should strive to return quickly, and must
|
||||
/// *never* block. Returning quickly prevents unnecessarily clogging up
|
||||
/// An implementation of `poll` should strive to return quickly, and should
|
||||
/// not block. Returning quickly prevents unnecessarily clogging up
|
||||
/// threads or event loops. If it is known ahead of time that a call to
|
||||
/// `poll` may end up taking awhile, the work should be offloaded to a
|
||||
/// thread pool (or something similar) to ensure that `poll` can return
|
||||
/// quickly.
|
||||
///
|
||||
/// # [`LocalWaker`], [`Waker`] and thread-safety
|
||||
///
|
||||
/// The `poll` function takes a [`LocalWaker`], an object which knows how to
|
||||
/// awaken the current task. [`LocalWaker`] is not `Send` nor `Sync`, so in
|
||||
/// order to make thread-safe futures the [`LocalWaker::into_waker`] method
|
||||
/// should be used to convert the [`LocalWaker`] into a thread-safe version.
|
||||
/// [`LocalWaker::wake`] implementations have the ability to be more
|
||||
/// efficient, however, so when thread safety is not necessary,
|
||||
/// [`LocalWaker`] should be preferred.
|
||||
/// An implementation of `poll` may also never cause memory unsafety.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Once a future has completed (returned `Ready` from `poll`),
|
||||
/// then any future calls to `poll` may panic, block forever, or otherwise
|
||||
/// cause bad behavior. The `Future` trait itself provides no guarantees
|
||||
/// about the behavior of `poll` after a future has completed.
|
||||
/// cause any kind of bad behavior expect causing memory unsafety.
|
||||
/// The `Future` trait itself provides no guarantees about the behavior
|
||||
/// of `poll` after a future has completed.
|
||||
///
|
||||
/// [`Poll::Pending`]: ../task/enum.Poll.html#variant.Pending
|
||||
/// [`Poll::Ready(val)`]: ../task/enum.Poll.html#variant.Ready
|
||||
/// [`LocalWaker`]: ../task/struct.LocalWaker.html
|
||||
/// [`LocalWaker::into_waker`]: ../task/struct.LocalWaker.html#method.into_waker
|
||||
/// [`LocalWaker::wake`]: ../task/struct.LocalWaker.html#method.wake
|
||||
/// [`Waker`]: ../task/struct.Waker.html
|
||||
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output>;
|
||||
/// [`Waker::wake`]: ../task/struct.Waker.html#method.wake
|
||||
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output>;
|
||||
}
|
||||
|
||||
impl<'a, F: ?Sized + Future + Unpin> Future for &'a mut F {
|
||||
type Output = F::Output;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
F::poll(Pin::new(&mut **self), lw)
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
|
||||
F::poll(Pin::new(&mut **self), waker)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -115,7 +107,7 @@ where
|
|||
{
|
||||
type Output = <<P as ops::Deref>::Target as Future>::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
Pin::get_mut(self).as_mut().poll(lw)
|
||||
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
|
||||
Pin::get_mut(self).as_mut().poll(waker)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,4 +8,4 @@ mod poll;
|
|||
pub use self::poll::Poll;
|
||||
|
||||
mod wake;
|
||||
pub use self::wake::{Waker, LocalWaker, UnsafeWake};
|
||||
pub use self::wake::{Waker, RawWaker, RawWakerVTable};
|
||||
|
|
|
@ -4,16 +4,92 @@
|
|||
|
||||
use fmt;
|
||||
use marker::Unpin;
|
||||
use ptr::NonNull;
|
||||
|
||||
/// A `RawWaker` allows the implementor of a task executor to create a [`Waker`]
|
||||
/// which provides customized wakeup behavior.
|
||||
///
|
||||
/// [vtable]: https://en.wikipedia.org/wiki/Virtual_method_table
|
||||
///
|
||||
/// It consists of a data pointer and a [virtual function pointer table (vtable)][vtable] that
|
||||
/// customizes the behavior of the `RawWaker`.
|
||||
#[derive(PartialEq, Debug)]
|
||||
pub struct RawWaker {
|
||||
/// A data pointer, which can be used to store arbitrary data as required
|
||||
/// by the executor. This could be e.g. a type-erased pointer to an `Arc`
|
||||
/// that is associated with the task.
|
||||
/// The value of this field gets passed to all functions that are part of
|
||||
/// the vtable as the first parameter.
|
||||
data: *const (),
|
||||
/// Virtual function pointer table that customizes the behavior of this waker.
|
||||
vtable: &'static RawWakerVTable,
|
||||
}
|
||||
|
||||
impl RawWaker {
|
||||
/// Creates a new `RawWaker` from the provided `data` pointer and `vtable`.
|
||||
///
|
||||
/// The `data` pointer can be used to store arbitrary data as required
|
||||
/// by the executor. This could be e.g. a type-erased pointer to an `Arc`
|
||||
/// that is associated with the task.
|
||||
/// The value of this poiner will get passed to all functions that are part
|
||||
/// of the `vtable` as the first parameter.
|
||||
///
|
||||
/// The `vtable` customizes the behavior of a `Waker` which gets created
|
||||
/// from a `RawWaker`. For each operation on the `Waker`, the associated
|
||||
/// function in the `vtable` of the underlying `RawWaker` will be called.
|
||||
pub const fn new(data: *const (), vtable: &'static RawWakerVTable) -> RawWaker {
|
||||
RawWaker {
|
||||
data,
|
||||
vtable,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A virtual function pointer table (vtable) that specifies the behavior
|
||||
/// of a [`RawWaker`].
|
||||
///
|
||||
/// The pointer passed to all functions inside the vtable is the `data` pointer
|
||||
/// from the enclosing [`RawWaker`] object.
|
||||
///
|
||||
/// The functions inside this struct are only intended be called on the `data`
|
||||
/// pointer of a properly constructed [`RawWaker`] object from inside the
|
||||
/// [`RawWaker`] implementation. Calling one of the contained functions using
|
||||
/// any other `data` pointer will cause undefined behavior.
|
||||
#[derive(PartialEq, Copy, Clone, Debug)]
|
||||
pub struct RawWakerVTable {
|
||||
/// This function will be called when the [`RawWaker`] gets cloned, e.g. when
|
||||
/// the [`Waker`] in which the [`RawWaker`] is stored gets cloned.
|
||||
///
|
||||
/// The implementation of this function must retain all resources that are
|
||||
/// required for this additional instance of a [`RawWaker`] and associated
|
||||
/// task. Calling `wake` on the resulting [`RawWaker`] should result in a wakeup
|
||||
/// of the same task that would have been awoken by the original [`RawWaker`].
|
||||
pub clone: unsafe fn(*const ()) -> RawWaker,
|
||||
|
||||
/// This function will be called when `wake` is called on the [`Waker`].
|
||||
/// It must wake up the task associated with this [`RawWaker`].
|
||||
///
|
||||
/// The implemention of this function must not consume the provided data
|
||||
/// pointer.
|
||||
pub wake: unsafe fn(*const ()),
|
||||
|
||||
/// This function gets called when a [`RawWaker`] gets dropped.
|
||||
///
|
||||
/// The implementation of this function must make sure to release any
|
||||
/// resources that are associated with this instance of a [`RawWaker`] and
|
||||
/// associated task.
|
||||
pub drop: unsafe fn(*const ()),
|
||||
}
|
||||
|
||||
/// A `Waker` is a handle for waking up a task by notifying its executor that it
|
||||
/// is ready to be run.
|
||||
///
|
||||
/// This handle contains a trait object pointing to an instance of the `UnsafeWake`
|
||||
/// trait, allowing notifications to get routed through it.
|
||||
/// This handle encapsulates a [`RawWaker`] instance, which defines the
|
||||
/// executor-specific wakeup behavior.
|
||||
///
|
||||
/// Implements [`Clone`], [`Send`], and [`Sync`].
|
||||
#[repr(transparent)]
|
||||
pub struct Waker {
|
||||
inner: NonNull<dyn UnsafeWake>,
|
||||
waker: RawWaker,
|
||||
}
|
||||
|
||||
impl Unpin for Waker {}
|
||||
|
@ -21,264 +97,66 @@ unsafe impl Send for Waker {}
|
|||
unsafe impl Sync for Waker {}
|
||||
|
||||
impl Waker {
|
||||
/// Constructs a new `Waker` directly.
|
||||
///
|
||||
/// Note that most code will not need to call this. Implementers of the
|
||||
/// `UnsafeWake` trait will typically provide a wrapper that calls this
|
||||
/// but you otherwise shouldn't call it directly.
|
||||
///
|
||||
/// If you're working with the standard library then it's recommended to
|
||||
/// use the `Waker::from` function instead which works with the safe
|
||||
/// `Arc` type and the safe `Wake` trait.
|
||||
#[inline]
|
||||
pub unsafe fn new(inner: NonNull<dyn UnsafeWake>) -> Self {
|
||||
Waker { inner }
|
||||
}
|
||||
|
||||
/// Wake up the task associated with this `Waker`.
|
||||
#[inline]
|
||||
pub fn wake(&self) {
|
||||
unsafe { self.inner.as_ref().wake() }
|
||||
// The actual wakeup call is delegated through a virtual function call
|
||||
// to the implementation which is defined by the executor.
|
||||
|
||||
// SAFETY: This is safe because `Waker::new_unchecked` is the only way
|
||||
// to initialize `wake` and `data` requiring the user to acknowledge
|
||||
// that the contract of `RawWaker` is upheld.
|
||||
unsafe { (self.waker.vtable.wake)(self.waker.data) }
|
||||
}
|
||||
|
||||
/// Returns `true` if or not this `Waker` and `other` awaken the same task.
|
||||
/// Returns whether or not this `Waker` and other `Waker` have awaken the same task.
|
||||
///
|
||||
/// This function works on a best-effort basis, and may return false even
|
||||
/// when the `Waker`s would awaken the same task. However, if this function
|
||||
/// returns `true`, it is guaranteed that the `Waker`s will awaken the same
|
||||
/// task.
|
||||
/// returns `true`, it is guaranteed that the `Waker`s will awaken the same task.
|
||||
///
|
||||
/// This function is primarily used for optimization purposes.
|
||||
#[inline]
|
||||
pub fn will_wake(&self, other: &Waker) -> bool {
|
||||
self.inner == other.inner
|
||||
self.waker == other.waker
|
||||
}
|
||||
|
||||
/// Returns `true` if or not this `Waker` and `other` `LocalWaker` awaken
|
||||
/// the same task.
|
||||
/// Creates a new `Waker` from [`RawWaker`].
|
||||
///
|
||||
/// This function works on a best-effort basis, and may return false even
|
||||
/// when the `Waker`s would awaken the same task. However, if this function
|
||||
/// returns true, it is guaranteed that the `Waker`s will awaken the same
|
||||
/// task.
|
||||
///
|
||||
/// This function is primarily used for optimization purposes.
|
||||
#[inline]
|
||||
pub fn will_wake_local(&self, other: &LocalWaker) -> bool {
|
||||
self.will_wake(&other.0)
|
||||
/// The behavior of the returned `Waker` is undefined if the contract defined
|
||||
/// in [`RawWaker`]'s and [`RawWakerVTable`]'s documentation is not upheld.
|
||||
/// Therefore this method is unsafe.
|
||||
pub unsafe fn new_unchecked(waker: RawWaker) -> Waker {
|
||||
Waker {
|
||||
waker,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for Waker {
|
||||
#[inline]
|
||||
fn clone(&self) -> Self {
|
||||
unsafe {
|
||||
self.inner.as_ref().clone_raw()
|
||||
Waker {
|
||||
// SAFETY: This is safe because `Waker::new_unchecked` is the only way
|
||||
// to initialize `clone` and `data` requiring the user to acknowledge
|
||||
// that the contract of [`RawWaker`] is upheld.
|
||||
waker: unsafe { (self.waker.vtable.clone)(self.waker.data) },
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Waker {
|
||||
fn drop(&mut self) {
|
||||
// SAFETY: This is safe because `Waker::new_unchecked` is the only way
|
||||
// to initialize `drop` and `data` requiring the user to acknowledge
|
||||
// that the contract of `RawWaker` is upheld.
|
||||
unsafe { (self.waker.vtable.drop)(self.waker.data) }
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Waker {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let vtable_ptr = self.waker.vtable as *const RawWakerVTable;
|
||||
f.debug_struct("Waker")
|
||||
.field("data", &self.waker.data)
|
||||
.field("vtable", &vtable_ptr)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Waker {
|
||||
#[inline]
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
self.inner.as_ref().drop_raw()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A `LocalWaker` is a handle for waking up a task by notifying its executor that it
|
||||
/// is ready to be run.
|
||||
///
|
||||
/// This is similar to the `Waker` type, but cannot be sent across threads.
|
||||
/// Task executors can use this type to implement more optimized single-threaded wakeup
|
||||
/// behavior.
|
||||
#[repr(transparent)]
|
||||
#[derive(Clone)]
|
||||
pub struct LocalWaker(Waker);
|
||||
|
||||
impl Unpin for LocalWaker {}
|
||||
impl !Send for LocalWaker {}
|
||||
impl !Sync for LocalWaker {}
|
||||
|
||||
impl LocalWaker {
|
||||
/// Constructs a new `LocalWaker` directly.
|
||||
///
|
||||
/// Note that most code will not need to call this. Implementers of the
|
||||
/// `UnsafeWake` trait will typically provide a wrapper that calls this
|
||||
/// but you otherwise shouldn't call it directly.
|
||||
///
|
||||
/// If you're working with the standard library then it's recommended to
|
||||
/// use the `local_waker_from_nonlocal` or `local_waker` to convert a `Waker`
|
||||
/// into a `LocalWaker`.
|
||||
///
|
||||
/// For this function to be used safely, it must be sound to call `inner.wake_local()`
|
||||
/// on the current thread.
|
||||
#[inline]
|
||||
pub unsafe fn new(inner: NonNull<dyn UnsafeWake>) -> Self {
|
||||
LocalWaker(Waker::new(inner))
|
||||
}
|
||||
|
||||
/// Borrows this `LocalWaker` as a `Waker`.
|
||||
///
|
||||
/// `Waker` is nearly identical to `LocalWaker`, but is threadsafe
|
||||
/// (implements `Send` and `Sync`).
|
||||
#[inline]
|
||||
pub fn as_waker(&self) -> &Waker {
|
||||
&self.0
|
||||
}
|
||||
|
||||
/// Converts this `LocalWaker` into a `Waker`.
|
||||
///
|
||||
/// `Waker` is nearly identical to `LocalWaker`, but is threadsafe
|
||||
/// (implements `Send` and `Sync`).
|
||||
#[inline]
|
||||
pub fn into_waker(self) -> Waker {
|
||||
self.0
|
||||
}
|
||||
|
||||
/// Wake up the task associated with this `LocalWaker`.
|
||||
#[inline]
|
||||
pub fn wake(&self) {
|
||||
unsafe { self.0.inner.as_ref().wake_local() }
|
||||
}
|
||||
|
||||
/// Returns `true` if or not this `LocalWaker` and `other` `LocalWaker` awaken the same task.
|
||||
///
|
||||
/// This function works on a best-effort basis, and may return false even
|
||||
/// when the `LocalWaker`s would awaken the same task. However, if this function
|
||||
/// returns true, it is guaranteed that the `LocalWaker`s will awaken the same
|
||||
/// task.
|
||||
///
|
||||
/// This function is primarily used for optimization purposes.
|
||||
#[inline]
|
||||
pub fn will_wake(&self, other: &LocalWaker) -> bool {
|
||||
self.0.will_wake(&other.0)
|
||||
}
|
||||
|
||||
/// Returns `true` if or not this `LocalWaker` and `other` `Waker` awaken the same task.
|
||||
///
|
||||
/// This function works on a best-effort basis, and may return false even
|
||||
/// when the `Waker`s would awaken the same task. However, if this function
|
||||
/// returns true, it is guaranteed that the `LocalWaker`s will awaken the same
|
||||
/// task.
|
||||
///
|
||||
/// This function is primarily used for optimization purposes.
|
||||
#[inline]
|
||||
pub fn will_wake_nonlocal(&self, other: &Waker) -> bool {
|
||||
self.0.will_wake(other)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LocalWaker> for Waker {
|
||||
/// Converts a `LocalWaker` into a `Waker`.
|
||||
///
|
||||
/// This conversion turns a `!Sync` `LocalWaker` into a `Sync` `Waker`, allowing a wakeup
|
||||
/// object to be sent to another thread, but giving up its ability to do specialized
|
||||
/// thread-local wakeup behavior.
|
||||
#[inline]
|
||||
fn from(local_waker: LocalWaker) -> Self {
|
||||
local_waker.0
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for LocalWaker {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("LocalWaker")
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// An unsafe trait for implementing custom memory management for a `Waker` or `LocalWaker`.
|
||||
///
|
||||
/// A `Waker` conceptually is a cloneable trait object for `Wake`, and is
|
||||
/// most often essentially just `Arc<dyn Wake>`. However, in some contexts
|
||||
/// (particularly `no_std`), it's desirable to avoid `Arc` in favor of some
|
||||
/// custom memory management strategy. This trait is designed to allow for such
|
||||
/// customization.
|
||||
///
|
||||
/// When using `std`, a default implementation of the `UnsafeWake` trait is provided for
|
||||
/// `Arc<T>` where `T: Wake`.
|
||||
pub unsafe trait UnsafeWake: Send + Sync {
|
||||
/// Creates a clone of this `UnsafeWake` and stores it behind a `Waker`.
|
||||
///
|
||||
/// This function will create a new uniquely owned handle that under the
|
||||
/// hood references the same notification instance. In other words calls
|
||||
/// to `wake` on the returned handle should be equivalent to calls to
|
||||
/// `wake` on this handle.
|
||||
///
|
||||
/// # Unsafety
|
||||
///
|
||||
/// This function is unsafe to call because it's asserting the `UnsafeWake`
|
||||
/// value is in a consistent state, i.e., hasn't been dropped.
|
||||
unsafe fn clone_raw(&self) -> Waker;
|
||||
|
||||
/// Drops this instance of `UnsafeWake`, deallocating resources
|
||||
/// associated with it.
|
||||
///
|
||||
// FIXME(cramertj):
|
||||
/// This method is intended to have a signature such as:
|
||||
///
|
||||
/// ```ignore (not-a-doctest)
|
||||
/// fn drop_raw(self: *mut Self);
|
||||
/// ```
|
||||
///
|
||||
/// Unfortunately, in Rust today that signature is not object safe.
|
||||
/// Nevertheless it's recommended to implement this function *as if* that
|
||||
/// were its signature. As such it is not safe to call on an invalid
|
||||
/// pointer, nor is the validity of the pointer guaranteed after this
|
||||
/// function returns.
|
||||
///
|
||||
/// # Unsafety
|
||||
///
|
||||
/// This function is unsafe to call because it's asserting the `UnsafeWake`
|
||||
/// value is in a consistent state, i.e., hasn't been dropped.
|
||||
unsafe fn drop_raw(&self);
|
||||
|
||||
/// Indicates that the associated task is ready to make progress and should
|
||||
/// be `poll`ed.
|
||||
///
|
||||
/// Executors generally maintain a queue of "ready" tasks; `wake` should place
|
||||
/// the associated task onto this queue.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Implementations should avoid panicking, but clients should also be prepared
|
||||
/// for panics.
|
||||
///
|
||||
/// # Unsafety
|
||||
///
|
||||
/// This function is unsafe to call because it's asserting the `UnsafeWake`
|
||||
/// value is in a consistent state, i.e., hasn't been dropped.
|
||||
unsafe fn wake(&self);
|
||||
|
||||
/// Indicates that the associated task is ready to make progress and should
|
||||
/// be `poll`ed. This function is the same as `wake`, but can only be called
|
||||
/// from the thread that this `UnsafeWake` is "local" to. This allows for
|
||||
/// implementors to provide specialized wakeup behavior specific to the current
|
||||
/// thread. This function is called by `LocalWaker::wake`.
|
||||
///
|
||||
/// Executors generally maintain a queue of "ready" tasks; `wake_local` should place
|
||||
/// the associated task onto this queue.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Implementations should avoid panicking, but clients should also be prepared
|
||||
/// for panics.
|
||||
///
|
||||
/// # Unsafety
|
||||
///
|
||||
/// This function is unsafe to call because it's asserting the `UnsafeWake`
|
||||
/// value is in a consistent state, i.e., hasn't been dropped, and that the
|
||||
/// `UnsafeWake` hasn't moved from the thread on which it was created.
|
||||
unsafe fn wake_local(&self) {
|
||||
self.wake()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ use core::marker::Unpin;
|
|||
use core::pin::Pin;
|
||||
use core::option::Option;
|
||||
use core::ptr::NonNull;
|
||||
use core::task::{LocalWaker, Poll};
|
||||
use core::task::{Waker, Poll};
|
||||
use core::ops::{Drop, Generator, GeneratorState};
|
||||
|
||||
#[doc(inline)]
|
||||
|
@ -32,10 +32,10 @@ impl<T: Generator<Yield = ()>> !Unpin for GenFuture<T> {}
|
|||
#[unstable(feature = "gen_future", issue = "50547")]
|
||||
impl<T: Generator<Yield = ()>> Future for GenFuture<T> {
|
||||
type Output = T::Return;
|
||||
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
|
||||
// Safe because we're !Unpin + !Drop mapping to a ?Unpin value
|
||||
let gen = unsafe { Pin::map_unchecked_mut(self, |s| &mut s.0) };
|
||||
set_task_waker(lw, || match gen.resume() {
|
||||
set_task_waker(waker, || match gen.resume() {
|
||||
GeneratorState::Yielded(()) => Poll::Pending,
|
||||
GeneratorState::Complete(x) => Poll::Ready(x),
|
||||
})
|
||||
|
@ -43,10 +43,10 @@ impl<T: Generator<Yield = ()>> Future for GenFuture<T> {
|
|||
}
|
||||
|
||||
thread_local! {
|
||||
static TLS_WAKER: Cell<Option<NonNull<LocalWaker>>> = Cell::new(None);
|
||||
static TLS_WAKER: Cell<Option<NonNull<Waker>>> = Cell::new(None);
|
||||
}
|
||||
|
||||
struct SetOnDrop(Option<NonNull<LocalWaker>>);
|
||||
struct SetOnDrop(Option<NonNull<Waker>>);
|
||||
|
||||
impl Drop for SetOnDrop {
|
||||
fn drop(&mut self) {
|
||||
|
@ -58,12 +58,12 @@ impl Drop for SetOnDrop {
|
|||
|
||||
#[unstable(feature = "gen_future", issue = "50547")]
|
||||
/// Sets the thread-local task context used by async/await futures.
|
||||
pub fn set_task_waker<F, R>(lw: &LocalWaker, f: F) -> R
|
||||
pub fn set_task_waker<F, R>(waker: &Waker, f: F) -> R
|
||||
where
|
||||
F: FnOnce() -> R
|
||||
{
|
||||
let old_waker = TLS_WAKER.with(|tls_waker| {
|
||||
tls_waker.replace(Some(NonNull::from(lw)))
|
||||
tls_waker.replace(Some(NonNull::from(waker)))
|
||||
});
|
||||
let _reset_waker = SetOnDrop(old_waker);
|
||||
f()
|
||||
|
@ -78,7 +78,7 @@ where
|
|||
/// retrieved by a surrounding call to get_task_waker.
|
||||
pub fn get_task_waker<F, R>(f: F) -> R
|
||||
where
|
||||
F: FnOnce(&LocalWaker) -> R
|
||||
F: FnOnce(&Waker) -> R
|
||||
{
|
||||
let waker_ptr = TLS_WAKER.with(|tls_waker| {
|
||||
// Clear the entry so that nested `get_task_waker` calls
|
||||
|
@ -88,7 +88,7 @@ where
|
|||
let _reset_waker = SetOnDrop(waker_ptr);
|
||||
|
||||
let waker_ptr = waker_ptr.expect(
|
||||
"TLS LocalWaker not set. This is a rustc bug. \
|
||||
"TLS Waker not set. This is a rustc bug. \
|
||||
Please file an issue on https://github.com/rust-lang/rust.");
|
||||
unsafe { f(waker_ptr.as_ref()) }
|
||||
}
|
||||
|
@ -99,5 +99,5 @@ pub fn poll_with_tls_waker<F>(f: Pin<&mut F>) -> Poll<F::Output>
|
|||
where
|
||||
F: Future
|
||||
{
|
||||
get_task_waker(|lw| F::poll(f, lw))
|
||||
get_task_waker(|waker| F::poll(f, waker))
|
||||
}
|
||||
|
|
|
@ -463,8 +463,6 @@ pub mod task {
|
|||
//! Types and Traits for working with asynchronous tasks.
|
||||
#[doc(inline)]
|
||||
pub use core::task::*;
|
||||
#[doc(inline)]
|
||||
pub use alloc_crate::task::*;
|
||||
}
|
||||
|
||||
#[unstable(feature = "futures_api",
|
||||
|
|
|
@ -12,7 +12,7 @@ use panicking;
|
|||
use ptr::{Unique, NonNull};
|
||||
use rc::Rc;
|
||||
use sync::{Arc, Mutex, RwLock, atomic};
|
||||
use task::{LocalWaker, Poll};
|
||||
use task::{Waker, Poll};
|
||||
use thread::Result;
|
||||
|
||||
#[stable(feature = "panic_hooks", since = "1.10.0")]
|
||||
|
@ -323,9 +323,9 @@ impl<T: fmt::Debug> fmt::Debug for AssertUnwindSafe<T> {
|
|||
impl<'a, F: Future> Future for AssertUnwindSafe<F> {
|
||||
type Output = F::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
|
||||
let pinned_field = unsafe { Pin::map_unchecked_mut(self, |x| &mut x.0) };
|
||||
F::poll(pinned_field, lw)
|
||||
F::poll(pinned_field, waker)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
use std::iter::Iterator;
|
||||
use std::future::Future;
|
||||
|
||||
use std::task::{Poll, LocalWaker};
|
||||
use std::task::{Poll, Waker};
|
||||
use std::pin::Pin;
|
||||
use std::unimplemented;
|
||||
|
||||
|
@ -13,7 +13,7 @@ struct MyFuture;
|
|||
impl Future for MyFuture {
|
||||
type Output = u32;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<u32> {
|
||||
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<u32> {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
// edition:2018
|
||||
// aux-build:arc_wake.rs
|
||||
|
||||
#![feature(arbitrary_self_types, async_await, await_macro, futures_api)]
|
||||
|
||||
extern crate arc_wake;
|
||||
|
||||
use std::pin::Pin;
|
||||
use std::future::Future;
|
||||
use std::sync::{
|
||||
|
@ -9,17 +12,17 @@ use std::sync::{
|
|||
atomic::{self, AtomicUsize},
|
||||
};
|
||||
use std::task::{
|
||||
LocalWaker, Poll, Wake,
|
||||
local_waker_from_nonlocal,
|
||||
Poll, Waker,
|
||||
};
|
||||
use arc_wake::ArcWake;
|
||||
|
||||
struct Counter {
|
||||
wakes: AtomicUsize,
|
||||
}
|
||||
|
||||
impl Wake for Counter {
|
||||
fn wake(this: &Arc<Self>) {
|
||||
this.wakes.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
impl ArcWake for Counter {
|
||||
fn wake(arc_self: &Arc<Self>) {
|
||||
arc_self.wakes.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -29,11 +32,11 @@ fn wake_and_yield_once() -> WakeOnceThenComplete { WakeOnceThenComplete(false) }
|
|||
|
||||
impl Future for WakeOnceThenComplete {
|
||||
type Output = ();
|
||||
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<()> {
|
||||
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<()> {
|
||||
if self.0 {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
lw.wake();
|
||||
waker.wake();
|
||||
self.0 = true;
|
||||
Poll::Pending
|
||||
}
|
||||
|
@ -130,7 +133,7 @@ where
|
|||
{
|
||||
let mut fut = Box::pin(f(9));
|
||||
let counter = Arc::new(Counter { wakes: AtomicUsize::new(0) });
|
||||
let waker = local_waker_from_nonlocal(counter.clone());
|
||||
let waker = ArcWake::into_waker(counter.clone());
|
||||
assert_eq!(0, counter.wakes.load(atomic::Ordering::SeqCst));
|
||||
assert_eq!(Poll::Pending, fut.as_mut().poll(&waker));
|
||||
assert_eq!(1, counter.wakes.load(atomic::Ordering::SeqCst));
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
// edition:2018
|
||||
|
||||
#![feature(arbitrary_self_types, futures_api)]
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::task::{
|
||||
Poll, Waker, RawWaker, RawWakerVTable,
|
||||
};
|
||||
|
||||
macro_rules! waker_vtable {
|
||||
($ty:ident) => {
|
||||
&RawWakerVTable {
|
||||
clone: clone_arc_raw::<$ty>,
|
||||
drop: drop_arc_raw::<$ty>,
|
||||
wake: wake_arc_raw::<$ty>,
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub trait ArcWake {
|
||||
fn wake(arc_self: &Arc<Self>);
|
||||
|
||||
fn into_waker(wake: Arc<Self>) -> Waker where Self: Sized
|
||||
{
|
||||
let ptr = Arc::into_raw(wake) as *const();
|
||||
|
||||
unsafe {
|
||||
Waker::new_unchecked(RawWaker::new(ptr, waker_vtable!(Self)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unsafe fn increase_refcount<T: ArcWake>(data: *const()) {
|
||||
// Retain Arc by creating a copy
|
||||
let arc: Arc<T> = Arc::from_raw(data as *const T);
|
||||
let arc_clone = arc.clone();
|
||||
// Forget the Arcs again, so that the refcount isn't decrased
|
||||
let _ = Arc::into_raw(arc);
|
||||
let _ = Arc::into_raw(arc_clone);
|
||||
}
|
||||
|
||||
unsafe fn clone_arc_raw<T: ArcWake>(data: *const()) -> RawWaker {
|
||||
increase_refcount::<T>(data);
|
||||
RawWaker::new(data, waker_vtable!(T))
|
||||
}
|
||||
|
||||
unsafe fn drop_arc_raw<T: ArcWake>(data: *const()) {
|
||||
// Drop Arc
|
||||
let _: Arc<T> = Arc::from_raw(data as *const T);
|
||||
}
|
||||
|
||||
unsafe fn wake_arc_raw<T: ArcWake>(data: *const()) {
|
||||
let arc: Arc<T> = Arc::from_raw(data as *const T);
|
||||
ArcWake::wake(&arc);
|
||||
let _ = Arc::into_raw(arc);
|
||||
}
|
|
@ -1,30 +1,28 @@
|
|||
// aux-build:arc_wake.rs
|
||||
|
||||
#![feature(arbitrary_self_types, futures_api)]
|
||||
#![allow(unused)]
|
||||
|
||||
extern crate arc_wake;
|
||||
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::rc::Rc;
|
||||
use std::sync::{
|
||||
Arc,
|
||||
atomic::{self, AtomicUsize},
|
||||
};
|
||||
use std::task::{
|
||||
Poll, Wake, Waker, LocalWaker,
|
||||
local_waker, local_waker_from_nonlocal,
|
||||
Poll, Waker,
|
||||
};
|
||||
use arc_wake::ArcWake;
|
||||
|
||||
struct Counter {
|
||||
local_wakes: AtomicUsize,
|
||||
nonlocal_wakes: AtomicUsize,
|
||||
wakes: AtomicUsize,
|
||||
}
|
||||
|
||||
impl Wake for Counter {
|
||||
fn wake(this: &Arc<Self>) {
|
||||
this.nonlocal_wakes.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
}
|
||||
|
||||
unsafe fn wake_local(this: &Arc<Self>) {
|
||||
this.local_wakes.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
impl ArcWake for Counter {
|
||||
fn wake(arc_self: &Arc<Self>) {
|
||||
arc_self.wakes.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -32,40 +30,28 @@ struct MyFuture;
|
|||
|
||||
impl Future for MyFuture {
|
||||
type Output = ();
|
||||
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
|
||||
// Wake once locally
|
||||
lw.wake();
|
||||
// Wake twice non-locally
|
||||
let waker = lw.clone().into_waker();
|
||||
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
|
||||
// Wake twice
|
||||
waker.wake();
|
||||
waker.wake();
|
||||
Poll::Ready(())
|
||||
}
|
||||
}
|
||||
|
||||
fn test_local_waker() {
|
||||
fn test_waker() {
|
||||
let counter = Arc::new(Counter {
|
||||
local_wakes: AtomicUsize::new(0),
|
||||
nonlocal_wakes: AtomicUsize::new(0),
|
||||
wakes: AtomicUsize::new(0),
|
||||
});
|
||||
let waker = unsafe { local_waker(counter.clone()) };
|
||||
assert_eq!(Poll::Ready(()), Pin::new(&mut MyFuture).poll(&waker));
|
||||
assert_eq!(1, counter.local_wakes.load(atomic::Ordering::SeqCst));
|
||||
assert_eq!(2, counter.nonlocal_wakes.load(atomic::Ordering::SeqCst));
|
||||
}
|
||||
let waker = ArcWake::into_waker(counter.clone());
|
||||
assert_eq!(2, Arc::strong_count(&counter));
|
||||
|
||||
fn test_local_as_nonlocal_waker() {
|
||||
let counter = Arc::new(Counter {
|
||||
local_wakes: AtomicUsize::new(0),
|
||||
nonlocal_wakes: AtomicUsize::new(0),
|
||||
});
|
||||
let waker: LocalWaker = local_waker_from_nonlocal(counter.clone());
|
||||
assert_eq!(Poll::Ready(()), Pin::new(&mut MyFuture).poll(&waker));
|
||||
assert_eq!(0, counter.local_wakes.load(atomic::Ordering::SeqCst));
|
||||
assert_eq!(3, counter.nonlocal_wakes.load(atomic::Ordering::SeqCst));
|
||||
assert_eq!(2, counter.wakes.load(atomic::Ordering::SeqCst));
|
||||
|
||||
drop(waker);
|
||||
assert_eq!(1, Arc::strong_count(&counter));
|
||||
}
|
||||
|
||||
fn main() {
|
||||
test_local_waker();
|
||||
test_local_as_nonlocal_waker();
|
||||
test_waker();
|
||||
}
|
||||
|
|
|
@ -1,10 +0,0 @@
|
|||
// exact-check
|
||||
|
||||
const QUERY = 'waker_from';
|
||||
|
||||
const EXPECTED = {
|
||||
'others': [
|
||||
{ 'path': 'std::task', 'name': 'local_waker_from_nonlocal' },
|
||||
{ 'path': 'alloc::task', 'name': 'local_waker_from_nonlocal' },
|
||||
],
|
||||
};
|
Loading…
Reference in New Issue