Auto merge of #60921 - cuviper:remove-mpsc_select, r=SimonSapin
Remove the unstable and deprecated mpsc_select This removes macro `select!` and `std::sync::mpsc::{Handle, Select}`, which were all unstable and have been deprecated since 1.32. Closes #27800 r? @SimonSapin
This commit is contained in:
commit
589beb979c
@ -357,61 +357,6 @@ macro_rules! dbg {
|
||||
};
|
||||
}
|
||||
|
||||
/// Selects the first successful receive event from a number of receivers.
|
||||
///
|
||||
/// This macro is used to wait for the first event to occur on a number of
|
||||
/// receivers. It places no restrictions on the types of receivers given to
|
||||
/// this macro, this can be viewed as a heterogeneous select.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// #![feature(mpsc_select)]
|
||||
///
|
||||
/// use std::thread;
|
||||
/// use std::sync::mpsc;
|
||||
///
|
||||
/// // two placeholder functions for now
|
||||
/// fn long_running_thread() {}
|
||||
/// fn calculate_the_answer() -> u32 { 42 }
|
||||
///
|
||||
/// let (tx1, rx1) = mpsc::channel();
|
||||
/// let (tx2, rx2) = mpsc::channel();
|
||||
///
|
||||
/// thread::spawn(move|| { long_running_thread(); tx1.send(()).unwrap(); });
|
||||
/// thread::spawn(move|| { tx2.send(calculate_the_answer()).unwrap(); });
|
||||
///
|
||||
/// select! {
|
||||
/// _ = rx1.recv() => println!("the long running thread finished first"),
|
||||
/// answer = rx2.recv() => {
|
||||
/// println!("the answer was: {}", answer.unwrap());
|
||||
/// }
|
||||
/// }
|
||||
/// # drop(rx1.recv());
|
||||
/// # drop(rx2.recv());
|
||||
/// ```
|
||||
///
|
||||
/// For more information about select, see the `std::sync::mpsc::Select` structure.
|
||||
#[macro_export]
|
||||
#[unstable(feature = "mpsc_select", issue = "27800")]
|
||||
#[rustc_deprecated(since = "1.32.0",
|
||||
reason = "channel selection will be removed in a future release")]
|
||||
macro_rules! select {
|
||||
(
|
||||
$($name:pat = $rx:ident.$meth:ident() => $code:expr),+
|
||||
) => ({
|
||||
use $crate::sync::mpsc::Select;
|
||||
let sel = Select::new();
|
||||
$( let mut $rx = sel.handle(&$rx); )+
|
||||
unsafe {
|
||||
$( $rx.add(); )+
|
||||
}
|
||||
let ret = sel.wait();
|
||||
$( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
|
||||
{ unreachable!() }
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
macro_rules! assert_approx_eq {
|
||||
($a:expr, $b:expr) => ({
|
||||
|
@ -116,7 +116,6 @@
|
||||
//! ```
|
||||
|
||||
#![stable(feature = "rust1", since = "1.0.0")]
|
||||
#![allow(deprecated)] // for mpsc_select
|
||||
|
||||
// A description of how Rust's channel implementation works
|
||||
//
|
||||
@ -263,6 +262,8 @@
|
||||
// believe that there is anything fundamental that needs to change about these
|
||||
// channels, however, in order to support a more efficient select().
|
||||
//
|
||||
// FIXME: Select is now removed, so these factors are ready to be cleaned up!
|
||||
//
|
||||
// # Conclusion
|
||||
//
|
||||
// And now that you've seen all the races that I found and attempted to fix,
|
||||
@ -275,18 +276,8 @@ use crate::mem;
|
||||
use crate::cell::UnsafeCell;
|
||||
use crate::time::{Duration, Instant};
|
||||
|
||||
#[unstable(feature = "mpsc_select", issue = "27800")]
|
||||
pub use self::select::{Select, Handle};
|
||||
use self::select::StartResult;
|
||||
use self::select::StartResult::*;
|
||||
use self::blocking::SignalToken;
|
||||
|
||||
#[cfg(all(test, not(target_os = "emscripten")))]
|
||||
mod select_tests;
|
||||
|
||||
mod blocking;
|
||||
mod oneshot;
|
||||
mod select;
|
||||
mod shared;
|
||||
mod stream;
|
||||
mod sync;
|
||||
@ -1514,78 +1505,6 @@ impl<T> Receiver<T> {
|
||||
|
||||
}
|
||||
|
||||
impl<T> select::Packet for Receiver<T> {
|
||||
fn can_recv(&self) -> bool {
|
||||
loop {
|
||||
let new_port = match *unsafe { self.inner() } {
|
||||
Flavor::Oneshot(ref p) => {
|
||||
match p.can_recv() {
|
||||
Ok(ret) => return ret,
|
||||
Err(upgrade) => upgrade,
|
||||
}
|
||||
}
|
||||
Flavor::Stream(ref p) => {
|
||||
match p.can_recv() {
|
||||
Ok(ret) => return ret,
|
||||
Err(upgrade) => upgrade,
|
||||
}
|
||||
}
|
||||
Flavor::Shared(ref p) => return p.can_recv(),
|
||||
Flavor::Sync(ref p) => return p.can_recv(),
|
||||
};
|
||||
unsafe {
|
||||
mem::swap(self.inner_mut(),
|
||||
new_port.inner_mut());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn start_selection(&self, mut token: SignalToken) -> StartResult {
|
||||
loop {
|
||||
let (t, new_port) = match *unsafe { self.inner() } {
|
||||
Flavor::Oneshot(ref p) => {
|
||||
match p.start_selection(token) {
|
||||
oneshot::SelSuccess => return Installed,
|
||||
oneshot::SelCanceled => return Abort,
|
||||
oneshot::SelUpgraded(t, rx) => (t, rx),
|
||||
}
|
||||
}
|
||||
Flavor::Stream(ref p) => {
|
||||
match p.start_selection(token) {
|
||||
stream::SelSuccess => return Installed,
|
||||
stream::SelCanceled => return Abort,
|
||||
stream::SelUpgraded(t, rx) => (t, rx),
|
||||
}
|
||||
}
|
||||
Flavor::Shared(ref p) => return p.start_selection(token),
|
||||
Flavor::Sync(ref p) => return p.start_selection(token),
|
||||
};
|
||||
token = t;
|
||||
unsafe {
|
||||
mem::swap(self.inner_mut(), new_port.inner_mut());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn abort_selection(&self) -> bool {
|
||||
let mut was_upgrade = false;
|
||||
loop {
|
||||
let result = match *unsafe { self.inner() } {
|
||||
Flavor::Oneshot(ref p) => p.abort_selection(),
|
||||
Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
|
||||
Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
|
||||
Flavor::Sync(ref p) => return p.abort_selection(),
|
||||
};
|
||||
let new_port = match result { Ok(b) => return b, Err(p) => p };
|
||||
was_upgrade = true;
|
||||
unsafe {
|
||||
mem::swap(self.inner_mut(),
|
||||
new_port.inner_mut());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[stable(feature = "rust1", since = "1.0.0")]
|
||||
impl<'a, T> Iterator for Iter<'a, T> {
|
||||
type Item = T;
|
||||
|
@ -24,7 +24,6 @@
|
||||
|
||||
pub use self::Failure::*;
|
||||
pub use self::UpgradeResult::*;
|
||||
pub use self::SelectionResult::*;
|
||||
use self::MyUpgrade::*;
|
||||
|
||||
use crate::sync::mpsc::Receiver;
|
||||
@ -66,12 +65,6 @@ pub enum UpgradeResult {
|
||||
UpWoke(SignalToken),
|
||||
}
|
||||
|
||||
pub enum SelectionResult<T> {
|
||||
SelCanceled,
|
||||
SelUpgraded(SignalToken, Receiver<T>),
|
||||
SelSuccess,
|
||||
}
|
||||
|
||||
enum MyUpgrade<T> {
|
||||
NothingSent,
|
||||
SendUsed,
|
||||
@ -264,71 +257,6 @@ impl<T> Packet<T> {
|
||||
// select implementation
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// If Ok, the value is whether this port has data, if Err, then the upgraded
|
||||
// port needs to be checked instead of this one.
|
||||
pub fn can_recv(&self) -> Result<bool, Receiver<T>> {
|
||||
unsafe {
|
||||
match self.state.load(Ordering::SeqCst) {
|
||||
EMPTY => Ok(false), // Welp, we tried
|
||||
DATA => Ok(true), // we have some un-acquired data
|
||||
DISCONNECTED if (*self.data.get()).is_some() => Ok(true), // we have data
|
||||
DISCONNECTED => {
|
||||
match ptr::replace(self.upgrade.get(), SendUsed) {
|
||||
// The other end sent us an upgrade, so we need to
|
||||
// propagate upwards whether the upgrade can receive
|
||||
// data
|
||||
GoUp(upgrade) => Err(upgrade),
|
||||
|
||||
// If the other end disconnected without sending an
|
||||
// upgrade, then we have data to receive (the channel is
|
||||
// disconnected).
|
||||
up => { ptr::write(self.upgrade.get(), up); Ok(true) }
|
||||
}
|
||||
}
|
||||
_ => unreachable!(), // we're the "one blocker"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Attempts to start selection on this port. This can either succeed, fail
|
||||
// because there is data, or fail because there is an upgrade pending.
|
||||
pub fn start_selection(&self, token: SignalToken) -> SelectionResult<T> {
|
||||
unsafe {
|
||||
let ptr = token.cast_to_usize();
|
||||
match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) {
|
||||
EMPTY => SelSuccess,
|
||||
DATA => {
|
||||
drop(SignalToken::cast_from_usize(ptr));
|
||||
SelCanceled
|
||||
}
|
||||
DISCONNECTED if (*self.data.get()).is_some() => {
|
||||
drop(SignalToken::cast_from_usize(ptr));
|
||||
SelCanceled
|
||||
}
|
||||
DISCONNECTED => {
|
||||
match ptr::replace(self.upgrade.get(), SendUsed) {
|
||||
// The other end sent us an upgrade, so we need to
|
||||
// propagate upwards whether the upgrade can receive
|
||||
// data
|
||||
GoUp(upgrade) => {
|
||||
SelUpgraded(SignalToken::cast_from_usize(ptr), upgrade)
|
||||
}
|
||||
|
||||
// If the other end disconnected without sending an
|
||||
// upgrade, then we have data to receive (the channel is
|
||||
// disconnected).
|
||||
up => {
|
||||
ptr::write(self.upgrade.get(), up);
|
||||
drop(SignalToken::cast_from_usize(ptr));
|
||||
SelCanceled
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => unreachable!(), // we're the "one blocker"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove a previous selecting thread from this port. This ensures that the
|
||||
// blocked thread will no longer be visible to any other threads.
|
||||
//
|
||||
|
@ -1,352 +0,0 @@
|
||||
//! Selection over an array of receivers
|
||||
//!
|
||||
//! This module contains the implementation machinery necessary for selecting
|
||||
//! over a number of receivers. One large goal of this module is to provide an
|
||||
//! efficient interface to selecting over any receiver of any type.
|
||||
//!
|
||||
//! This is achieved through an architecture of a "receiver set" in which
|
||||
//! receivers are added to a set and then the entire set is waited on at once.
|
||||
//! The set can be waited on multiple times to prevent re-adding each receiver
|
||||
//! to the set.
|
||||
//!
|
||||
//! Usage of this module is currently encouraged to go through the use of the
|
||||
//! `select!` macro. This macro allows naturally binding of variables to the
|
||||
//! received values of receivers in a much more natural syntax then usage of the
|
||||
//! `Select` structure directly.
|
||||
//!
|
||||
//! # Examples
|
||||
//!
|
||||
//! ```rust
|
||||
//! #![feature(mpsc_select)]
|
||||
//!
|
||||
//! use std::sync::mpsc::channel;
|
||||
//!
|
||||
//! let (tx1, rx1) = channel();
|
||||
//! let (tx2, rx2) = channel();
|
||||
//!
|
||||
//! tx1.send(1).unwrap();
|
||||
//! tx2.send(2).unwrap();
|
||||
//!
|
||||
//! select! {
|
||||
//! val = rx1.recv() => {
|
||||
//! assert_eq!(val.unwrap(), 1);
|
||||
//! },
|
||||
//! val = rx2.recv() => {
|
||||
//! assert_eq!(val.unwrap(), 2);
|
||||
//! }
|
||||
//! }
|
||||
//! ```
|
||||
|
||||
#![allow(dead_code)]
|
||||
#![unstable(feature = "mpsc_select",
|
||||
reason = "This implementation, while likely sufficient, is unsafe and \
|
||||
likely to be error prone. At some point in the future this \
|
||||
module will be removed.",
|
||||
issue = "27800")]
|
||||
#![rustc_deprecated(since = "1.32.0",
|
||||
reason = "channel selection will be removed in a future release")]
|
||||
|
||||
use core::cell::{Cell, UnsafeCell};
|
||||
use core::marker;
|
||||
use core::ptr;
|
||||
use core::usize;
|
||||
|
||||
use crate::fmt;
|
||||
use crate::sync::mpsc::{Receiver, RecvError};
|
||||
use crate::sync::mpsc::blocking::{self, SignalToken};
|
||||
|
||||
/// The "receiver set" of the select interface. This structure is used to manage
|
||||
/// a set of receivers which are being selected over.
|
||||
pub struct Select {
|
||||
inner: UnsafeCell<SelectInner>,
|
||||
next_id: Cell<usize>,
|
||||
}
|
||||
|
||||
struct SelectInner {
|
||||
head: *mut Handle<'static, ()>,
|
||||
tail: *mut Handle<'static, ()>,
|
||||
}
|
||||
|
||||
impl !marker::Send for Select {}
|
||||
|
||||
/// A handle to a receiver which is currently a member of a `Select` set of
|
||||
/// receivers. This handle is used to keep the receiver in the set as well as
|
||||
/// interact with the underlying receiver.
|
||||
pub struct Handle<'rx, T:Send+'rx> {
|
||||
/// The ID of this handle, used to compare against the return value of
|
||||
/// `Select::wait()`.
|
||||
id: usize,
|
||||
selector: *mut SelectInner,
|
||||
next: *mut Handle<'static, ()>,
|
||||
prev: *mut Handle<'static, ()>,
|
||||
added: bool,
|
||||
packet: &'rx (dyn Packet+'rx),
|
||||
|
||||
// due to our fun transmutes, we be sure to place this at the end. (nothing
|
||||
// previous relies on T)
|
||||
rx: &'rx Receiver<T>,
|
||||
}
|
||||
|
||||
struct Packets { cur: *mut Handle<'static, ()> }
|
||||
|
||||
#[doc(hidden)]
|
||||
#[derive(PartialEq, Eq)]
|
||||
pub enum StartResult {
|
||||
Installed,
|
||||
Abort,
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub trait Packet {
|
||||
fn can_recv(&self) -> bool;
|
||||
fn start_selection(&self, token: SignalToken) -> StartResult;
|
||||
fn abort_selection(&self) -> bool;
|
||||
}
|
||||
|
||||
impl Select {
|
||||
/// Creates a new selection structure. This set is initially empty.
|
||||
///
|
||||
/// Usage of this struct directly can sometimes be burdensome, and usage is much easier through
|
||||
/// the `select!` macro.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// #![feature(mpsc_select)]
|
||||
///
|
||||
/// use std::sync::mpsc::Select;
|
||||
///
|
||||
/// let select = Select::new();
|
||||
/// ```
|
||||
pub fn new() -> Select {
|
||||
Select {
|
||||
inner: UnsafeCell::new(SelectInner {
|
||||
head: ptr::null_mut(),
|
||||
tail: ptr::null_mut(),
|
||||
}),
|
||||
next_id: Cell::new(1),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new handle into this receiver set for a new receiver. Note
|
||||
/// that this does *not* add the receiver to the receiver set, for that you
|
||||
/// must call the `add` method on the handle itself.
|
||||
pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
|
||||
let id = self.next_id.get();
|
||||
self.next_id.set(id + 1);
|
||||
Handle {
|
||||
id,
|
||||
selector: self.inner.get(),
|
||||
next: ptr::null_mut(),
|
||||
prev: ptr::null_mut(),
|
||||
added: false,
|
||||
rx,
|
||||
packet: rx,
|
||||
}
|
||||
}
|
||||
|
||||
/// Waits for an event on this receiver set. The returned value is *not* an
|
||||
/// index, but rather an ID. This ID can be queried against any active
|
||||
/// `Handle` structures (each one has an `id` method). The handle with
|
||||
/// the matching `id` will have some sort of event available on it. The
|
||||
/// event could either be that data is available or the corresponding
|
||||
/// channel has been closed.
|
||||
pub fn wait(&self) -> usize {
|
||||
self.wait2(true)
|
||||
}
|
||||
|
||||
/// Helper method for skipping the preflight checks during testing
|
||||
pub(super) fn wait2(&self, do_preflight_checks: bool) -> usize {
|
||||
// Note that this is currently an inefficient implementation. We in
|
||||
// theory have knowledge about all receivers in the set ahead of time,
|
||||
// so this method shouldn't really have to iterate over all of them yet
|
||||
// again. The idea with this "receiver set" interface is to get the
|
||||
// interface right this time around, and later this implementation can
|
||||
// be optimized.
|
||||
//
|
||||
// This implementation can be summarized by:
|
||||
//
|
||||
// fn select(receivers) {
|
||||
// if any receiver ready { return ready index }
|
||||
// deschedule {
|
||||
// block on all receivers
|
||||
// }
|
||||
// unblock on all receivers
|
||||
// return ready index
|
||||
// }
|
||||
//
|
||||
// Most notably, the iterations over all of the receivers shouldn't be
|
||||
// necessary.
|
||||
unsafe {
|
||||
// Stage 1: preflight checks. Look for any packets ready to receive
|
||||
if do_preflight_checks {
|
||||
for handle in self.iter() {
|
||||
if (*handle).packet.can_recv() {
|
||||
return (*handle).id();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stage 2: begin the blocking process
|
||||
//
|
||||
// Create a number of signal tokens, and install each one
|
||||
// sequentially until one fails. If one fails, then abort the
|
||||
// selection on the already-installed tokens.
|
||||
let (wait_token, signal_token) = blocking::tokens();
|
||||
for (i, handle) in self.iter().enumerate() {
|
||||
match (*handle).packet.start_selection(signal_token.clone()) {
|
||||
StartResult::Installed => {}
|
||||
StartResult::Abort => {
|
||||
// Go back and abort the already-begun selections
|
||||
for handle in self.iter().take(i) {
|
||||
(*handle).packet.abort_selection();
|
||||
}
|
||||
return (*handle).id;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stage 3: no messages available, actually block
|
||||
wait_token.wait();
|
||||
|
||||
// Stage 4: there *must* be message available; find it.
|
||||
//
|
||||
// Abort the selection process on each receiver. If the abort
|
||||
// process returns `true`, then that means that the receiver is
|
||||
// ready to receive some data. Note that this also means that the
|
||||
// receiver may have yet to have fully read the `to_wake` field and
|
||||
// woken us up (although the wakeup is guaranteed to fail).
|
||||
//
|
||||
// This situation happens in the window of where a sender invokes
|
||||
// increment(), sees -1, and then decides to wake up the thread. After
|
||||
// all this is done, the sending thread will set `selecting` to
|
||||
// `false`. Until this is done, we cannot return. If we were to
|
||||
// return, then a sender could wake up a receiver which has gone
|
||||
// back to sleep after this call to `select`.
|
||||
//
|
||||
// Note that it is a "fairly small window" in which an increment()
|
||||
// views that it should wake a thread up until the `selecting` bit
|
||||
// is set to false. For now, the implementation currently just spins
|
||||
// in a yield loop. This is very distasteful, but this
|
||||
// implementation is already nowhere near what it should ideally be.
|
||||
// A rewrite should focus on avoiding a yield loop, and for now this
|
||||
// implementation is tying us over to a more efficient "don't
|
||||
// iterate over everything every time" implementation.
|
||||
let mut ready_id = usize::MAX;
|
||||
for handle in self.iter() {
|
||||
if (*handle).packet.abort_selection() {
|
||||
ready_id = (*handle).id;
|
||||
}
|
||||
}
|
||||
|
||||
// We must have found a ready receiver
|
||||
assert!(ready_id != usize::MAX);
|
||||
return ready_id;
|
||||
}
|
||||
}
|
||||
|
||||
fn iter(&self) -> Packets { Packets { cur: unsafe { &*self.inner.get() }.head } }
|
||||
}
|
||||
|
||||
impl<'rx, T: Send> Handle<'rx, T> {
|
||||
/// Retrieves the ID of this handle.
|
||||
#[inline]
|
||||
pub fn id(&self) -> usize { self.id }
|
||||
|
||||
/// Blocks to receive a value on the underlying receiver, returning `Some` on
|
||||
/// success or `None` if the channel disconnects. This function has the same
|
||||
/// semantics as `Receiver.recv`
|
||||
pub fn recv(&mut self) -> Result<T, RecvError> { self.rx.recv() }
|
||||
|
||||
/// Adds this handle to the receiver set that the handle was created from. This
|
||||
/// method can be called multiple times, but it has no effect if `add` was
|
||||
/// called previously.
|
||||
///
|
||||
/// This method is unsafe because it requires that the `Handle` is not moved
|
||||
/// while it is added to the `Select` set.
|
||||
pub unsafe fn add(&mut self) {
|
||||
if self.added { return }
|
||||
let selector = &mut *self.selector;
|
||||
let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>;
|
||||
|
||||
if selector.head.is_null() {
|
||||
selector.head = me;
|
||||
selector.tail = me;
|
||||
} else {
|
||||
(*me).prev = selector.tail;
|
||||
assert!((*me).next.is_null());
|
||||
(*selector.tail).next = me;
|
||||
selector.tail = me;
|
||||
}
|
||||
self.added = true;
|
||||
}
|
||||
|
||||
/// Removes this handle from the `Select` set. This method is unsafe because
|
||||
/// it has no guarantee that the `Handle` was not moved since `add` was
|
||||
/// called.
|
||||
pub unsafe fn remove(&mut self) {
|
||||
if !self.added { return }
|
||||
|
||||
let selector = &mut *self.selector;
|
||||
let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>;
|
||||
|
||||
if self.prev.is_null() {
|
||||
assert_eq!(selector.head, me);
|
||||
selector.head = self.next;
|
||||
} else {
|
||||
(*self.prev).next = self.next;
|
||||
}
|
||||
if self.next.is_null() {
|
||||
assert_eq!(selector.tail, me);
|
||||
selector.tail = self.prev;
|
||||
} else {
|
||||
(*self.next).prev = self.prev;
|
||||
}
|
||||
|
||||
self.next = ptr::null_mut();
|
||||
self.prev = ptr::null_mut();
|
||||
|
||||
self.added = false;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Select {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
assert!((&*self.inner.get()).head.is_null());
|
||||
assert!((&*self.inner.get()).tail.is_null());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> Drop for Handle<'_, T> {
|
||||
fn drop(&mut self) {
|
||||
unsafe { self.remove() }
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for Packets {
|
||||
type Item = *mut Handle<'static, ()>;
|
||||
|
||||
fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
|
||||
if self.cur.is_null() {
|
||||
None
|
||||
} else {
|
||||
let ret = Some(self.cur);
|
||||
unsafe { self.cur = (*self.cur).next; }
|
||||
ret
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Select {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Select").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> fmt::Debug for Handle<'_, T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Handle").finish()
|
||||
}
|
||||
}
|
@ -1,413 +0,0 @@
|
||||
#![allow(unused_imports)]
|
||||
|
||||
/// This file exists to hack around https://github.com/rust-lang/rust/issues/47238
|
||||
|
||||
use crate::thread;
|
||||
use crate::sync::mpsc::*;
|
||||
|
||||
// Don't use the libstd version so we can pull in the right Select structure
|
||||
// (std::comm points at the wrong one)
|
||||
macro_rules! select {
|
||||
(
|
||||
$($name:pat = $rx:ident.$meth:ident() => $code:expr),+
|
||||
) => ({
|
||||
let sel = Select::new();
|
||||
$( let mut $rx = sel.handle(&$rx); )+
|
||||
unsafe {
|
||||
$( $rx.add(); )+
|
||||
}
|
||||
let ret = sel.wait();
|
||||
$( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
|
||||
{ unreachable!() }
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke() {
|
||||
let (tx1, rx1) = channel::<i32>();
|
||||
let (tx2, rx2) = channel::<i32>();
|
||||
tx1.send(1).unwrap();
|
||||
select! {
|
||||
foo = rx1.recv() => { assert_eq!(foo.unwrap(), 1); },
|
||||
_bar = rx2.recv() => { panic!() }
|
||||
}
|
||||
tx2.send(2).unwrap();
|
||||
select! {
|
||||
_foo = rx1.recv() => { panic!() },
|
||||
bar = rx2.recv() => { assert_eq!(bar.unwrap(), 2) }
|
||||
}
|
||||
drop(tx1);
|
||||
select! {
|
||||
foo = rx1.recv() => { assert!(foo.is_err()); },
|
||||
_bar = rx2.recv() => { panic!() }
|
||||
}
|
||||
drop(tx2);
|
||||
select! {
|
||||
bar = rx2.recv() => { assert!(bar.is_err()); }
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke2() {
|
||||
let (_tx1, rx1) = channel::<i32>();
|
||||
let (_tx2, rx2) = channel::<i32>();
|
||||
let (_tx3, rx3) = channel::<i32>();
|
||||
let (_tx4, rx4) = channel::<i32>();
|
||||
let (tx5, rx5) = channel::<i32>();
|
||||
tx5.send(4).unwrap();
|
||||
select! {
|
||||
_foo = rx1.recv() => { panic!("1") },
|
||||
_foo = rx2.recv() => { panic!("2") },
|
||||
_foo = rx3.recv() => { panic!("3") },
|
||||
_foo = rx4.recv() => { panic!("4") },
|
||||
foo = rx5.recv() => { assert_eq!(foo.unwrap(), 4); }
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn closed() {
|
||||
let (_tx1, rx1) = channel::<i32>();
|
||||
let (tx2, rx2) = channel::<i32>();
|
||||
drop(tx2);
|
||||
|
||||
select! {
|
||||
_a1 = rx1.recv() => { panic!() },
|
||||
a2 = rx2.recv() => { assert!(a2.is_err()); }
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unblocks() {
|
||||
let (tx1, rx1) = channel::<i32>();
|
||||
let (_tx2, rx2) = channel::<i32>();
|
||||
let (tx3, rx3) = channel::<i32>();
|
||||
|
||||
let _t = thread::spawn(move|| {
|
||||
for _ in 0..20 { thread::yield_now(); }
|
||||
tx1.send(1).unwrap();
|
||||
rx3.recv().unwrap();
|
||||
for _ in 0..20 { thread::yield_now(); }
|
||||
});
|
||||
|
||||
select! {
|
||||
a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
|
||||
_b = rx2.recv() => { panic!() }
|
||||
}
|
||||
tx3.send(1).unwrap();
|
||||
select! {
|
||||
a = rx1.recv() => { assert!(a.is_err()) },
|
||||
_b = rx2.recv() => { panic!() }
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn both_ready() {
|
||||
let (tx1, rx1) = channel::<i32>();
|
||||
let (tx2, rx2) = channel::<i32>();
|
||||
let (tx3, rx3) = channel::<()>();
|
||||
|
||||
let _t = thread::spawn(move|| {
|
||||
for _ in 0..20 { thread::yield_now(); }
|
||||
tx1.send(1).unwrap();
|
||||
tx2.send(2).unwrap();
|
||||
rx3.recv().unwrap();
|
||||
});
|
||||
|
||||
select! {
|
||||
a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
|
||||
a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
|
||||
}
|
||||
select! {
|
||||
a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
|
||||
a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
|
||||
}
|
||||
assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
|
||||
assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
|
||||
tx3.send(()).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stress() {
|
||||
const AMT: i32 = 10000;
|
||||
let (tx1, rx1) = channel::<i32>();
|
||||
let (tx2, rx2) = channel::<i32>();
|
||||
let (tx3, rx3) = channel::<()>();
|
||||
|
||||
let _t = thread::spawn(move|| {
|
||||
for i in 0..AMT {
|
||||
if i % 2 == 0 {
|
||||
tx1.send(i).unwrap();
|
||||
} else {
|
||||
tx2.send(i).unwrap();
|
||||
}
|
||||
rx3.recv().unwrap();
|
||||
}
|
||||
});
|
||||
|
||||
for i in 0..AMT {
|
||||
select! {
|
||||
i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
|
||||
i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
|
||||
}
|
||||
tx3.send(()).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused_must_use)]
|
||||
#[test]
|
||||
fn cloning() {
|
||||
let (tx1, rx1) = channel::<i32>();
|
||||
let (_tx2, rx2) = channel::<i32>();
|
||||
let (tx3, rx3) = channel::<()>();
|
||||
|
||||
let _t = thread::spawn(move|| {
|
||||
rx3.recv().unwrap();
|
||||
tx1.clone();
|
||||
assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
|
||||
tx1.send(2).unwrap();
|
||||
rx3.recv().unwrap();
|
||||
});
|
||||
|
||||
tx3.send(()).unwrap();
|
||||
select! {
|
||||
_i1 = rx1.recv() => {},
|
||||
_i2 = rx2.recv() => panic!()
|
||||
}
|
||||
tx3.send(()).unwrap();
|
||||
}
|
||||
|
||||
#[allow(unused_must_use)]
|
||||
#[test]
|
||||
fn cloning2() {
|
||||
let (tx1, rx1) = channel::<i32>();
|
||||
let (_tx2, rx2) = channel::<i32>();
|
||||
let (tx3, rx3) = channel::<()>();
|
||||
|
||||
let _t = thread::spawn(move|| {
|
||||
rx3.recv().unwrap();
|
||||
tx1.clone();
|
||||
assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
|
||||
tx1.send(2).unwrap();
|
||||
rx3.recv().unwrap();
|
||||
});
|
||||
|
||||
tx3.send(()).unwrap();
|
||||
select! {
|
||||
_i1 = rx1.recv() => {},
|
||||
_i2 = rx2.recv() => panic!()
|
||||
}
|
||||
tx3.send(()).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cloning3() {
|
||||
let (tx1, rx1) = channel::<()>();
|
||||
let (tx2, rx2) = channel::<()>();
|
||||
let (tx3, rx3) = channel::<()>();
|
||||
let _t = thread::spawn(move|| {
|
||||
let s = Select::new();
|
||||
let mut h1 = s.handle(&rx1);
|
||||
let mut h2 = s.handle(&rx2);
|
||||
unsafe { h2.add(); }
|
||||
unsafe { h1.add(); }
|
||||
assert_eq!(s.wait(), h2.id());
|
||||
tx3.send(()).unwrap();
|
||||
});
|
||||
|
||||
for _ in 0..1000 { thread::yield_now(); }
|
||||
drop(tx1.clone());
|
||||
tx2.send(()).unwrap();
|
||||
rx3.recv().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn preflight1() {
|
||||
let (tx, rx) = channel();
|
||||
tx.send(()).unwrap();
|
||||
select! {
|
||||
_n = rx.recv() => {}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn preflight2() {
|
||||
let (tx, rx) = channel();
|
||||
tx.send(()).unwrap();
|
||||
tx.send(()).unwrap();
|
||||
select! {
|
||||
_n = rx.recv() => {}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn preflight3() {
|
||||
let (tx, rx) = channel();
|
||||
drop(tx.clone());
|
||||
tx.send(()).unwrap();
|
||||
select! {
|
||||
_n = rx.recv() => {}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn preflight4() {
|
||||
let (tx, rx) = channel();
|
||||
tx.send(()).unwrap();
|
||||
let s = Select::new();
|
||||
let mut h = s.handle(&rx);
|
||||
unsafe { h.add(); }
|
||||
assert_eq!(s.wait2(false), h.id());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn preflight5() {
|
||||
let (tx, rx) = channel();
|
||||
tx.send(()).unwrap();
|
||||
tx.send(()).unwrap();
|
||||
let s = Select::new();
|
||||
let mut h = s.handle(&rx);
|
||||
unsafe { h.add(); }
|
||||
assert_eq!(s.wait2(false), h.id());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn preflight6() {
|
||||
let (tx, rx) = channel();
|
||||
drop(tx.clone());
|
||||
tx.send(()).unwrap();
|
||||
let s = Select::new();
|
||||
let mut h = s.handle(&rx);
|
||||
unsafe { h.add(); }
|
||||
assert_eq!(s.wait2(false), h.id());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn preflight7() {
|
||||
let (tx, rx) = channel::<()>();
|
||||
drop(tx);
|
||||
let s = Select::new();
|
||||
let mut h = s.handle(&rx);
|
||||
unsafe { h.add(); }
|
||||
assert_eq!(s.wait2(false), h.id());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn preflight8() {
|
||||
let (tx, rx) = channel();
|
||||
tx.send(()).unwrap();
|
||||
drop(tx);
|
||||
rx.recv().unwrap();
|
||||
let s = Select::new();
|
||||
let mut h = s.handle(&rx);
|
||||
unsafe { h.add(); }
|
||||
assert_eq!(s.wait2(false), h.id());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn preflight9() {
|
||||
let (tx, rx) = channel();
|
||||
drop(tx.clone());
|
||||
tx.send(()).unwrap();
|
||||
drop(tx);
|
||||
rx.recv().unwrap();
|
||||
let s = Select::new();
|
||||
let mut h = s.handle(&rx);
|
||||
unsafe { h.add(); }
|
||||
assert_eq!(s.wait2(false), h.id());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_data_waiting() {
|
||||
let (tx1, rx1) = channel();
|
||||
let (tx2, rx2) = channel();
|
||||
let _t = thread::spawn(move|| {
|
||||
select! {
|
||||
_n = rx1.recv() => {}
|
||||
}
|
||||
tx2.send(()).unwrap();
|
||||
});
|
||||
|
||||
for _ in 0..100 { thread::yield_now() }
|
||||
tx1.send(()).unwrap();
|
||||
rx2.recv().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stream_data_waiting() {
|
||||
let (tx1, rx1) = channel();
|
||||
let (tx2, rx2) = channel();
|
||||
tx1.send(()).unwrap();
|
||||
tx1.send(()).unwrap();
|
||||
rx1.recv().unwrap();
|
||||
rx1.recv().unwrap();
|
||||
let _t = thread::spawn(move|| {
|
||||
select! {
|
||||
_n = rx1.recv() => {}
|
||||
}
|
||||
tx2.send(()).unwrap();
|
||||
});
|
||||
|
||||
for _ in 0..100 { thread::yield_now() }
|
||||
tx1.send(()).unwrap();
|
||||
rx2.recv().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shared_data_waiting() {
|
||||
let (tx1, rx1) = channel();
|
||||
let (tx2, rx2) = channel();
|
||||
drop(tx1.clone());
|
||||
tx1.send(()).unwrap();
|
||||
rx1.recv().unwrap();
|
||||
let _t = thread::spawn(move|| {
|
||||
select! {
|
||||
_n = rx1.recv() => {}
|
||||
}
|
||||
tx2.send(()).unwrap();
|
||||
});
|
||||
|
||||
for _ in 0..100 { thread::yield_now() }
|
||||
tx1.send(()).unwrap();
|
||||
rx2.recv().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sync1() {
|
||||
let (tx, rx) = sync_channel::<i32>(1);
|
||||
tx.send(1).unwrap();
|
||||
select! {
|
||||
n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sync2() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
let _t = thread::spawn(move|| {
|
||||
for _ in 0..100 { thread::yield_now() }
|
||||
tx.send(1).unwrap();
|
||||
});
|
||||
select! {
|
||||
n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sync3() {
|
||||
let (tx1, rx1) = sync_channel::<i32>(0);
|
||||
let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
|
||||
let _t = thread::spawn(move|| { tx1.send(1).unwrap(); });
|
||||
let _t = thread::spawn(move|| { tx2.send(2).unwrap(); });
|
||||
select! {
|
||||
n = rx1.recv() => {
|
||||
let n = n.unwrap();
|
||||
assert_eq!(n, 1);
|
||||
assert_eq!(rx2.recv().unwrap(), 2);
|
||||
},
|
||||
n = rx2.recv() => {
|
||||
let n = n.unwrap();
|
||||
assert_eq!(n, 2);
|
||||
assert_eq!(rx1.recv().unwrap(), 1);
|
||||
}
|
||||
}
|
||||
}
|
@ -9,6 +9,7 @@
|
||||
/// channels are quite similar, and this is no coincidence!
|
||||
|
||||
pub use self::Failure::*;
|
||||
use self::StartResult::*;
|
||||
|
||||
use core::cmp;
|
||||
use core::intrinsics::abort;
|
||||
@ -19,8 +20,6 @@ use crate::ptr;
|
||||
use crate::sync::atomic::{AtomicUsize, AtomicIsize, AtomicBool, Ordering};
|
||||
use crate::sync::mpsc::blocking::{self, SignalToken};
|
||||
use crate::sync::mpsc::mpsc_queue as mpsc;
|
||||
use crate::sync::mpsc::select::StartResult::*;
|
||||
use crate::sync::mpsc::select::StartResult;
|
||||
use crate::sync::{Mutex, MutexGuard};
|
||||
use crate::thread;
|
||||
use crate::time::Instant;
|
||||
@ -57,6 +56,12 @@ pub enum Failure {
|
||||
Disconnected,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq)]
|
||||
enum StartResult {
|
||||
Installed,
|
||||
Abort,
|
||||
}
|
||||
|
||||
impl<T> Packet<T> {
|
||||
// Creation of a packet *must* be followed by a call to postinit_lock
|
||||
// and later by inherit_blocker
|
||||
@ -394,16 +399,6 @@ impl<T> Packet<T> {
|
||||
// select implementation
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// Helper function for select, tests whether this port can receive without
|
||||
// blocking (obviously not an atomic decision).
|
||||
//
|
||||
// This is different than the stream version because there's no need to peek
|
||||
// at the queue, we can just look at the local count.
|
||||
pub fn can_recv(&self) -> bool {
|
||||
let cnt = self.cnt.load(Ordering::SeqCst);
|
||||
cnt == DISCONNECTED || cnt - unsafe { *self.steals.get() } > 0
|
||||
}
|
||||
|
||||
// increment the count on the channel (used for selection)
|
||||
fn bump(&self, amt: isize) -> isize {
|
||||
match self.cnt.fetch_add(amt, Ordering::SeqCst) {
|
||||
@ -415,22 +410,6 @@ impl<T> Packet<T> {
|
||||
}
|
||||
}
|
||||
|
||||
// Inserts the signal token for selection on this port, returning true if
|
||||
// blocking should proceed.
|
||||
//
|
||||
// The code here is the same as in stream.rs, except that it doesn't need to
|
||||
// peek at the channel to see if an upgrade is pending.
|
||||
pub fn start_selection(&self, token: SignalToken) -> StartResult {
|
||||
match self.decrement(token) {
|
||||
Installed => Installed,
|
||||
Abort => {
|
||||
let prev = self.bump(1);
|
||||
assert!(prev == DISCONNECTED || prev >= 0);
|
||||
Abort
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Cancels a previous thread waiting on this port, returning whether there's
|
||||
// data on the port.
|
||||
//
|
||||
|
@ -9,7 +9,6 @@
|
||||
|
||||
pub use self::Failure::*;
|
||||
pub use self::UpgradeResult::*;
|
||||
pub use self::SelectionResult::*;
|
||||
use self::Message::*;
|
||||
|
||||
use core::cmp;
|
||||
@ -60,12 +59,6 @@ pub enum UpgradeResult {
|
||||
UpWoke(SignalToken),
|
||||
}
|
||||
|
||||
pub enum SelectionResult<T> {
|
||||
SelSuccess,
|
||||
SelCanceled,
|
||||
SelUpgraded(SignalToken, Receiver<T>),
|
||||
}
|
||||
|
||||
// Any message could contain an "upgrade request" to a new shared port, so the
|
||||
// internal queue it's a queue of T, but rather Message<T>
|
||||
enum Message<T> {
|
||||
@ -338,27 +331,6 @@ impl<T> Packet<T> {
|
||||
// select implementation
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// Tests to see whether this port can receive without blocking. If Ok is
|
||||
// returned, then that's the answer. If Err is returned, then the returned
|
||||
// port needs to be queried instead (an upgrade happened)
|
||||
pub fn can_recv(&self) -> Result<bool, Receiver<T>> {
|
||||
// We peek at the queue to see if there's anything on it, and we use
|
||||
// this return value to determine if we should pop from the queue and
|
||||
// upgrade this channel immediately. If it looks like we've got an
|
||||
// upgrade pending, then go through the whole recv rigamarole to update
|
||||
// the internal state.
|
||||
match self.queue.peek() {
|
||||
Some(&mut GoUp(..)) => {
|
||||
match self.recv(None) {
|
||||
Err(Upgraded(port)) => Err(port),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
Some(..) => Ok(true),
|
||||
None => Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
// increment the count on the channel (used for selection)
|
||||
fn bump(&self, amt: isize) -> isize {
|
||||
match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) {
|
||||
@ -370,31 +342,6 @@ impl<T> Packet<T> {
|
||||
}
|
||||
}
|
||||
|
||||
// Attempts to start selecting on this port. Like a oneshot, this can fail
|
||||
// immediately because of an upgrade.
|
||||
pub fn start_selection(&self, token: SignalToken) -> SelectionResult<T> {
|
||||
match self.decrement(token) {
|
||||
Ok(()) => SelSuccess,
|
||||
Err(token) => {
|
||||
let ret = match self.queue.peek() {
|
||||
Some(&mut GoUp(..)) => {
|
||||
match self.queue.pop() {
|
||||
Some(GoUp(port)) => SelUpgraded(token, port),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
Some(..) => SelCanceled,
|
||||
None => SelCanceled,
|
||||
};
|
||||
// Undo our decrement above, and we should be guaranteed that the
|
||||
// previous value is positive because we're not going to sleep
|
||||
let prev = self.bump(1);
|
||||
assert!(prev == DISCONNECTED || prev >= 0);
|
||||
ret
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Removes a previous thread from being blocked in this port
|
||||
pub fn abort_selection(&self,
|
||||
was_upgrade: bool) -> Result<bool, Receiver<T>> {
|
||||
|
@ -33,7 +33,6 @@ use core::ptr;
|
||||
|
||||
use crate::sync::atomic::{Ordering, AtomicUsize};
|
||||
use crate::sync::mpsc::blocking::{self, WaitToken, SignalToken};
|
||||
use crate::sync::mpsc::select::StartResult::{self, Installed, Abort};
|
||||
use crate::sync::{Mutex, MutexGuard};
|
||||
use crate::time::Instant;
|
||||
|
||||
@ -406,42 +405,6 @@ impl<T> Packet<T> {
|
||||
while let Some(token) = queue.dequeue() { token.signal(); }
|
||||
waiter.map(|t| t.signal());
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// select implementation
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// If Ok, the value is whether this port has data, if Err, then the upgraded
|
||||
// port needs to be checked instead of this one.
|
||||
pub fn can_recv(&self) -> bool {
|
||||
let guard = self.lock.lock().unwrap();
|
||||
guard.disconnected || guard.buf.size() > 0
|
||||
}
|
||||
|
||||
// Attempts to start selection on this port. This can either succeed or fail
|
||||
// because there is data waiting.
|
||||
pub fn start_selection(&self, token: SignalToken) -> StartResult {
|
||||
let mut guard = self.lock.lock().unwrap();
|
||||
if guard.disconnected || guard.buf.size() > 0 {
|
||||
Abort
|
||||
} else {
|
||||
match mem::replace(&mut guard.blocker, BlockedReceiver(token)) {
|
||||
NoneBlocked => {}
|
||||
BlockedSender(..) => unreachable!(),
|
||||
BlockedReceiver(..) => unreachable!(),
|
||||
}
|
||||
Installed
|
||||
}
|
||||
}
|
||||
|
||||
// Remove a previous selecting thread from this port. This ensures that the
|
||||
// blocked thread will no longer be visible to any other threads.
|
||||
//
|
||||
// The return value indicates whether there's data on this port.
|
||||
pub fn abort_selection(&self) -> bool {
|
||||
let mut guard = self.lock.lock().unwrap();
|
||||
abort_selection(&mut guard)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for Packet<T> {
|
||||
|
@ -1,35 +0,0 @@
|
||||
// run-pass
|
||||
#![allow(unused_must_use)]
|
||||
// ignore-emscripten no threads support
|
||||
|
||||
// This test may not always fail, but it can be flaky if the race it used to
|
||||
// expose is still present.
|
||||
|
||||
#![feature(mpsc_select)]
|
||||
#![allow(deprecated)]
|
||||
|
||||
use std::sync::mpsc::{channel, Sender, Receiver};
|
||||
use std::thread;
|
||||
|
||||
fn helper(rx: Receiver<Sender<()>>) {
|
||||
for tx in rx.iter() {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let (tx, rx) = channel();
|
||||
let t = thread::spawn(move|| { helper(rx) });
|
||||
let (snd, rcv) = channel::<isize>();
|
||||
for _ in 1..100000 {
|
||||
snd.send(1).unwrap();
|
||||
let (tx2, rx2) = channel();
|
||||
tx.send(tx2).unwrap();
|
||||
select! {
|
||||
_ = rx2.recv() => (),
|
||||
_ = rcv.recv() => ()
|
||||
}
|
||||
}
|
||||
drop(tx);
|
||||
t.join();
|
||||
}
|
@ -233,8 +233,6 @@ fn println() {
|
||||
println!("hello {}", "world",);
|
||||
}
|
||||
|
||||
// select! is too troublesome and unlikely to be stabilized
|
||||
|
||||
// stringify! is N/A
|
||||
|
||||
#[cfg(std)]
|
||||
|
Loading…
Reference in New Issue
Block a user