Add blocking support module for channels

This commit is contained in:
Aaron Turon 2014-12-01 08:14:35 -08:00
parent 84cb6cd938
commit 7fd7ce682d
7 changed files with 359 additions and 310 deletions

View File

@ -0,0 +1,81 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Generic support for building blocking abstractions.
use thread::Thread;
use sync::atomic::{AtomicBool, INIT_ATOMIC_BOOL, Ordering};
use sync::Arc;
use kinds::marker::NoSend;
use mem;
use clone::Clone;
struct Inner {
thread: Thread,
woken: AtomicBool,
}
#[deriving(Clone)]
pub struct SignalToken {
inner: Arc<Inner>,
}
pub struct WaitToken {
inner: Arc<Inner>,
no_send: NoSend,
}
fn token() -> (WaitToken, SignalToken) {
let inner = Arc::new(Inner {
thread: Thread::current(),
woken: INIT_ATOMIC_BOOL,
});
let wait_token = WaitToken {
inner: inner.clone(),
no_send: NoSend,
};
let signal_token = SignalToken {
inner: inner
};
(wait_token, signal_token)
}
impl SignalToken {
fn signal(&self) -> bool {
let wake = !self.inner.woken.compare_and_swap(false, true, Ordering::SeqCst);
if wake {
self.inner.thread.unpark();
}
wake
}
/// Convert to an unsafe uint value. Useful for storing in a pipe's state
/// flag.
#[inline]
pub unsafe fn cast_to_uint(self) -> uint {
mem::transmute(self.inner)
}
/// Convert from an unsafe uint value. Useful for retrieving a pipe's state
/// flag.
#[inline]
pub unsafe fn cast_from_uint(signal_ptr: uint) -> SignalToken {
SignalToken { inner: mem::transmute(signal_ptr) }
}
}
impl WaitToken {
fn wait(self) {
while !self.inner.woken.load(Ordering::SeqCst) {
Thread::park()
}
}
}

View File

@ -54,17 +54,6 @@
//! There are methods on both of senders and receivers to perform their
//! respective operations without panicking, however.
//!
//! ## Runtime Requirements
//!
//! The channel types defined in this module generally have very few runtime
//! requirements in order to operate. The major requirement they have is for a
//! local rust `Task` to be available if any *blocking* operation is performed.
//!
//! If a local `Task` is not available (for example an FFI callback), then the
//! `send` operation is safe on a `Sender` (as well as a `send_opt`) as well as
//! the `try_send` method on a `SyncSender`, but no other operations are
//! guaranteed to be safe.
//!
//! # Example
//!
//! Simple usage:
@ -327,9 +316,9 @@ use alloc::arc::Arc;
use core::kinds::marker;
use core::mem;
use core::cell::UnsafeCell;
use rt::task::BlockedTask;
pub use comm::select::{Select, Handle};
use comm::select::StartResult::*;
macro_rules! test {
{ fn $name:ident() $b:block $(#[$a:meta])*} => (
@ -348,6 +337,7 @@ macro_rules! test {
)
}
mod blocking;
mod oneshot;
mod select;
mod shared;
@ -947,34 +937,33 @@ impl<T: Send> select::Packet for Receiver<T> {
}
}
fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{
fn start_selection(&self, mut token: SignalToken) -> bool {
loop {
let (t, new_port) = match *unsafe { self.inner() } {
Oneshot(ref p) => {
match unsafe { (*p.get()).start_selection(task) } {
oneshot::SelSuccess => return Ok(()),
oneshot::SelCanceled(task) => return Err(task),
match unsafe { (*p.get()).start_selection(token) } {
oneshot::SelSuccess => return Installed,
oneshot::SelCanceled => return Abort,
oneshot::SelUpgraded(t, rx) => (t, rx),
}
}
Stream(ref p) => {
match unsafe { (*p.get()).start_selection(task) } {
stream::SelSuccess => return Ok(()),
stream::SelCanceled(task) => return Err(task),
match unsafe { (*p.get()).start_selection(token) } {
stream::SelSuccess => return Installed,
stream::SelCanceled => return Abort,
stream::SelUpgraded(t, rx) => (t, rx),
}
}
Shared(ref p) => {
return unsafe { (*p.get()).start_selection(task) };
return unsafe { (*p.get()).start_selection(token) };
}
Sync(ref p) => {
return unsafe { (*p.get()).start_selection(task) };
return unsafe { (*p.get()).start_selection(token) };
}
};
task = t;
token = t;
unsafe {
mem::swap(self.inner_mut(),
new_port.inner_mut());
mem::swap(self.inner_mut(), new_port.inner_mut());
}
}
}

View File

@ -40,17 +40,20 @@ use self::MyUpgrade::*;
use core::prelude::*;
use alloc::boxed::Box;
use core::mem;
use rt::local::Local;
use rt::task::{Task, BlockedTask};
use sync::atomic;
use comm::Receiver;
use comm::blocking::{mod, WaitToken, SignalToken};
use core::mem;
use sync::atomic;
// Various states you can find a port in.
const EMPTY: uint = 0;
const DATA: uint = 1;
const DISCONNECTED: uint = 2;
const EMPTY: uint = 0; // initial state: no data, no blocked reciever
const DATA: uint = 1; // data ready for receiver to take
const DISCONNECTED: uint = 2; // channel is disconnected OR upgraded
// Any other value represents a pointer to a SignalToken value. The
// protocol ensures that when the state moves *to* a pointer,
// ownership of the token is given to the packet, and when the state
// moves *from* a pointer, ownership of the token is transferred to
// whoever changed the state.
pub struct Packet<T> {
// Internal state of the chan/port pair (stores the blocked task as well)
@ -71,12 +74,12 @@ pub enum Failure<T> {
pub enum UpgradeResult {
UpSuccess,
UpDisconnected,
UpWoke(BlockedTask),
UpWoke(SignalToken),
}
pub enum SelectionResult<T> {
SelCanceled(BlockedTask),
SelUpgraded(BlockedTask, Receiver<T>),
SelCanceled,
SelUpgraded(SignalToken, Receiver<T>),
SelSuccess,
}
@ -118,12 +121,10 @@ impl<T: Send> Packet<T> {
// Not possible, these are one-use channels
DATA => unreachable!(),
// Anything else means that there was a task waiting on the other
// end. We leave the 'DATA' state inside so it'll pick it up on the
// other end.
n => unsafe {
let t = BlockedTask::cast_from_uint(n);
t.wake().map(|t| t.reawaken());
// There is a thread waiting on the other end. We leave the 'DATA'
// state inside so it'll pick it up on the other end.
ptr => unsafe {
SignalToken::cast_from_uint(ptr).signal();
Ok(())
}
}
@ -142,23 +143,17 @@ impl<T: Send> Packet<T> {
// Attempt to not block the task (it's a little expensive). If it looks
// like we're not empty, then immediately go through to `try_recv`.
if self.state.load(atomic::SeqCst) == EMPTY {
let t: Box<Task> = Local::take();
t.deschedule(1, |task| {
let n = unsafe { task.cast_to_uint() };
match self.state.compare_and_swap(EMPTY, n, atomic::SeqCst) {
// Nothing on the channel, we legitimately block
EMPTY => Ok(()),
let (wait_token, signal_token) = blocking::token();
let ptr = unsafe { signal_token.cast_to_uint() };
// If there's data or it's a disconnected channel, then we
// failed the cmpxchg, so we just wake ourselves back up
DATA | DISCONNECTED => {
unsafe { Err(BlockedTask::cast_from_uint(n)) }
}
// Only one thread is allowed to sleep on this port
_ => unreachable!()
}
});
// race with senders to enter the blocking state
if self.state.compare_and_swap(EMPTY, ptr, atomic::SeqCst) == EMPTY {
wait_token.wait();
debug_assert!(self.state.load(atomic::SeqCst) != EMPTY);
} else {
// drop the signal token, since we never blocked
drop(unsafe { SignalToken::cast_from_uint(ptr) });
}
}
self.try_recv()
@ -197,6 +192,9 @@ impl<T: Send> Packet<T> {
}
}
}
// We are the sole receiver; there cannot be a blocking
// receiver already.
_ => unreachable!()
}
}
@ -223,7 +221,7 @@ impl<T: Send> Packet<T> {
DISCONNECTED => { self.upgrade = prev; UpDisconnected }
// If someone's waiting, we gotta wake them up
n => UpWoke(unsafe { BlockedTask::cast_from_uint(n) })
ptr => UpWoke(unsafe { SignalToken::cast_from_uint(ptr) })
}
}
@ -232,9 +230,8 @@ impl<T: Send> Packet<T> {
DATA | DISCONNECTED | EMPTY => {}
// If someone's waiting, we gotta wake them up
n => unsafe {
let t = BlockedTask::cast_from_uint(n);
t.wake().map(|t| t.reawaken());
ptr => unsafe {
SignalToken::cast_from_uint(ptr).signal();
}
}
}
@ -286,13 +283,17 @@ impl<T: Send> Packet<T> {
// Attempts to start selection on this port. This can either succeed, fail
// because there is data, or fail because there is an upgrade pending.
pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
let n = unsafe { task.cast_to_uint() };
match self.state.compare_and_swap(EMPTY, n, atomic::SeqCst) {
pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> {
let ptr = unsafe { token.cast_to_uint() };
match self.state.compare_and_swap(EMPTY, ptr, atomic::SeqCst) {
EMPTY => SelSuccess,
DATA => SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }),
DATA => {
drop(unsafe { SignalToken::cast_from_uint(ptr) });
SelCanceled
}
DISCONNECTED if self.data.is_some() => {
SelCanceled(unsafe { BlockedTask::cast_from_uint(n) })
drop(unsafe { SignalToken::cast_from_uint(ptr) });
SelCanceled
}
DISCONNECTED => {
match mem::replace(&mut self.upgrade, SendUsed) {
@ -300,8 +301,7 @@ impl<T: Send> Packet<T> {
// propagate upwards whether the upgrade can receive
// data
GoUp(upgrade) => {
SelUpgraded(unsafe { BlockedTask::cast_from_uint(n) },
upgrade)
SelUpgraded(unsafe { SignalToken::cast_from_uint(ptr) }, upgrade)
}
// If the other end disconnected without sending an
@ -309,7 +309,8 @@ impl<T: Send> Packet<T> {
// disconnected).
up => {
self.upgrade = up;
SelCanceled(unsafe { BlockedTask::cast_from_uint(n) })
drop(unsafe { SignalToken::cast_from_uint(ptr) });
SelCanceled
}
}
}
@ -331,7 +332,7 @@ impl<T: Send> Packet<T> {
// If we've got a blocked task, then use an atomic to gain ownership
// of it (may fail)
n => self.state.compare_and_swap(n, EMPTY, atomic::SeqCst)
BLOCKED => self.state.compare_and_swap(BLOCKED, EMPTY, atomic::SeqCst)
};
// Now that we've got ownership of our state, figure out what to do
@ -358,11 +359,9 @@ impl<T: Send> Packet<T> {
}
}
// We woke ourselves up from select. Assert that the task should be
// trashed and returned that we don't have any data.
n => {
let t = unsafe { BlockedTask::cast_from_uint(n) };
t.trash();
// We woke ourselves up from select.
ptr => unsafe {
drop(SignalToken::cast_from_uint(ptr));
Ok(false)
}
}

View File

@ -59,10 +59,11 @@ use core::cell::Cell;
use core::kinds::marker;
use core::mem;
use core::uint;
use rt::local::Local;
use rt::task::{Task, BlockedTask};
use comm::Receiver;
use comm::blocking::{mod, SignalToken};
use self::StartResult::*;
/// The "receiver set" of the select interface. This structure is used to manage
/// a set of receivers which are being selected over.
@ -93,10 +94,17 @@ pub struct Handle<'rx, T:'rx> {
struct Packets { cur: *mut Handle<'static, ()> }
#[doc(hidden)]
#[deriving(PartialEq)]
pub enum StartResult {
Installed,
Abort,
}
#[doc(hidden)]
pub trait Packet {
fn can_recv(&self) -> bool;
fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>;
fn start_selection(&self, token: SignalToken) -> StartResult;
fn abort_selection(&self) -> bool;
}
@ -165,36 +173,39 @@ impl Select {
// Most notably, the iterations over all of the receivers shouldn't be
// necessary.
unsafe {
let mut amt = 0;
for p in self.iter() {
amt += 1;
if do_preflight_checks && (*p).packet.can_recv() {
return (*p).id;
}
}
assert!(amt > 0);
let mut ready_index = amt;
let mut ready_id = uint::MAX;
let mut iter = self.iter().enumerate();
// Acquire a number of blocking contexts, and block on each one
// sequentially until one fails. If one fails, then abort
// immediately so we can go unblock on all the other receivers.
let task: Box<Task> = Local::take();
task.deschedule(amt, |task| {
// Prepare for the block
let (i, handle) = iter.next().unwrap();
match (*handle).packet.start_selection(task) {
Ok(()) => Ok(()),
Err(task) => {
ready_index = i;
ready_id = (*handle).id;
Err(task)
// Stage 1: preflight checks. Look for any packets ready to receive
if do_preflight_checks {
for handle in self.iter() {
if (*handle).packet.can_recv() {
return (*handle).id();
}
}
});
}
// Stage 2: begin the blocking process
//
// Create a number of signal tokens, and install each one
// sequentially until one fails. If one fails, then abort the
// selection on the already-installed tokens.
let (wait_token, signal_token) = blocking::tokens();
for (i, handle) in self.iter().enumerate() {
match (*handle).packet.start_selection(signal_token.clone()) {
Installed => {}
Abort => {
// Go back and abort the already-begun selections
for handle in self.iter().take(i) {
(*handle).packet.abort_selection();
}
return (*handle).id;
}
}
}
// Stage 3: no messages available, actually block
wait_token.wait();
// Stage 4: there *must* be message available; find it.
//
// Abort the selection process on each receiver. If the abort
// process returns `true`, then that means that the receiver is
// ready to receive some data. Note that this also means that the
@ -216,12 +227,14 @@ impl Select {
// A rewrite should focus on avoiding a yield loop, and for now this
// implementation is tying us over to a more efficient "don't
// iterate over everything every time" implementation.
for handle in self.iter().take(ready_index) {
let mut ready_id = uint::MAX;
for handle in self.iter() {
if (*handle).packet.abort_selection() {
ready_id = (*handle).id;
}
}
// We must have found a ready receiver
assert!(ready_id != uint::MAX);
return ready_id;
}

View File

@ -25,12 +25,12 @@ use core::prelude::*;
use alloc::boxed::Box;
use core::cmp;
use core::int;
use rt::local::Local;
use rt::task::{Task, BlockedTask};
use rt::thread::Thread;
use sync::{atomic, Mutex, MutexGuard};
use comm::mpsc_queue as mpsc;
use comm::blocking::{mod, SignalToken};
use comm::select::StartResult;
use comm::select::StartResult::*;
const DISCONNECTED: int = int::MIN;
const FUDGE: int = 1024;
@ -43,7 +43,7 @@ pub struct Packet<T> {
queue: mpsc::Queue<T>,
cnt: atomic::AtomicInt, // How many items are on this channel
steals: int, // How many times has a port received without blocking?
to_wake: atomic::AtomicUint, // Task to wake up
to_wake: atomic::AtomicUint, // SignalToken for wake up
// The number of channels which are currently using this packet.
channels: atomic::AtomicInt,
@ -95,41 +95,34 @@ impl<T: Send> Packet<T> {
//
// This can only be called at channel-creation time
pub fn inherit_blocker(&mut self,
task: Option<BlockedTask>,
token: Option<SignalToken>,
guard: MutexGuard<()>) {
match task {
Some(task) => {
assert_eq!(self.cnt.load(atomic::SeqCst), 0);
assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
self.to_wake.store(unsafe { task.cast_to_uint() },
atomic::SeqCst);
self.cnt.store(-1, atomic::SeqCst);
token.map(|token| {
assert_eq!(self.cnt.load(atomic::SeqCst), 0);
assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
self.to_wake.store(unsafe { token.cast_to_uint() }, atomic::SeqCst);
self.cnt.store(-1, atomic::SeqCst);
// This store is a little sketchy. What's happening here is
// that we're transferring a blocker from a oneshot or stream
// channel to this shared channel. In doing so, we never
// spuriously wake them up and rather only wake them up at the
// appropriate time. This implementation of shared channels
// assumes that any blocking recv() will undo the increment of
// steals performed in try_recv() once the recv is complete.
// This thread that we're inheriting, however, is not in the
// middle of recv. Hence, the first time we wake them up,
// they're going to wake up from their old port, move on to the
// upgraded port, and then call the block recv() function.
//
// When calling this function, they'll find there's data
// immediately available, counting it as a steal. This in fact
// wasn't a steal because we appropriately blocked them waiting
// for data.
//
// To offset this bad increment, we initially set the steal
// count to -1. You'll find some special code in
// abort_selection() as well to ensure that this -1 steal count
// doesn't escape too far.
self.steals = -1;
}
None => {}
}
// This store is a little sketchy. What's happening here is that
// we're transferring a blocker from a oneshot or stream channel to
// this shared channel. In doing so, we never spuriously wake them
// up and rather only wake them up at the appropriate time. This
// implementation of shared channels assumes that any blocking
// recv() will undo the increment of steals performed in try_recv()
// once the recv is complete. This thread that we're inheriting,
// however, is not in the middle of recv. Hence, the first time we
// wake them up, they're going to wake up from their old port, move
// on to the upgraded port, and then call the block recv() function.
//
// When calling this function, they'll find there's data immediately
// available, counting it as a steal. This in fact wasn't a steal
// because we appropriately blocked them waiting for data.
//
// To offset this bad increment, we initially set the steal count to
// -1. You'll find some special code in abort_selection() as well to
// ensure that this -1 steal count doesn't escape too far.
self.steals = -1;
});
// When the shared packet is constructed, we grabbed this lock. The
// purpose of this lock is to ensure that abort_selection() doesn't
@ -175,7 +168,7 @@ impl<T: Send> Packet<T> {
self.queue.push(t);
match self.cnt.fetch_add(1, atomic::SeqCst) {
-1 => {
self.take_to_wake().wake().map(|t| t.reawaken());
self.take_to_wake().signal();
}
// In this case, we have possibly failed to send our data, and
@ -232,10 +225,10 @@ impl<T: Send> Packet<T> {
data => return data,
}
let task: Box<Task> = Local::take();
task.deschedule(1, |task| {
self.decrement(task)
});
let (wait_token, signal_token) = blocking::tokens();
if self.decrement(signal_token) == Installed {
wait_token.wait()
}
match self.try_recv() {
data @ Ok(..) => { self.steals -= 1; data }
@ -244,10 +237,11 @@ impl<T: Send> Packet<T> {
}
// Essentially the exact same thing as the stream decrement function.
fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
// Returns true if blocking should proceed.
fn decrement(&mut self, token: SignalToken) -> StartResult {
assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
let n = unsafe { task.cast_to_uint() };
self.to_wake.store(n, atomic::SeqCst);
let ptr = unsafe { token.cast_to_uint() };
self.to_wake.store(ptr, atomic::SeqCst);
let steals = self.steals;
self.steals = 0;
@ -258,12 +252,13 @@ impl<T: Send> Packet<T> {
// data, we successfully sleep
n => {
assert!(n >= 0);
if n - steals <= 0 { return Ok(()) }
if n - steals <= 0 { return Installed }
}
}
self.to_wake.store(0, atomic::SeqCst);
Err(unsafe { BlockedTask::cast_from_uint(n) })
drop(unsafe { SignalToken::cast_from_uint(ptr) });
Abort
}
pub fn try_recv(&mut self) -> Result<T, Failure> {
@ -271,20 +266,19 @@ impl<T: Send> Packet<T> {
mpsc::Data(t) => Some(t),
mpsc::Empty => None,
// This is a bit of an interesting case. The channel is
// reported as having data available, but our pop() has
// failed due to the queue being in an inconsistent state.
// This means that there is some pusher somewhere which has
// yet to complete, but we are guaranteed that a pop will
// eventually succeed. In this case, we spin in a yield loop
// because the remote sender should finish their enqueue
// This is a bit of an interesting case. The channel is reported as
// having data available, but our pop() has failed due to the queue
// being in an inconsistent state. This means that there is some
// pusher somewhere which has yet to complete, but we are guaranteed
// that a pop will eventually succeed. In this case, we spin in a
// yield loop because the remote sender should finish their enqueue
// operation "very quickly".
//
// Avoiding this yield loop would require a different queue
// abstraction which provides the guarantee that after M
// pushes have succeeded, at least M pops will succeed. The
// current queues guarantee that if there are N active
// pushes, you can pop N times once all N have finished.
// abstraction which provides the guarantee that after M pushes have
// succeeded, at least M pops will succeed. The current queues
// guarantee that if there are N active pushes, you can pop N times
// once all N have finished.
mpsc::Inconsistent => {
let data;
loop {
@ -354,7 +348,7 @@ impl<T: Send> Packet<T> {
}
match self.cnt.swap(DISCONNECTED, atomic::SeqCst) {
-1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
-1 => { self.take_to_wake().signal(); }
DISCONNECTED => {}
n => { assert!(n >= 0); }
}
@ -366,8 +360,7 @@ impl<T: Send> Packet<T> {
self.port_dropped.store(true, atomic::SeqCst);
let mut steals = self.steals;
while {
let cnt = self.cnt.compare_and_swap(
steals, DISCONNECTED, atomic::SeqCst);
let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, atomic::SeqCst);
cnt != DISCONNECTED && cnt != steals
} {
// See the discussion in 'try_recv' for why we yield
@ -382,11 +375,11 @@ impl<T: Send> Packet<T> {
}
// Consumes ownership of the 'to_wake' field.
fn take_to_wake(&mut self) -> BlockedTask {
let task = self.to_wake.load(atomic::SeqCst);
fn take_to_wake(&mut self) -> SignalToken {
let ptr = self.to_wake.load(atomic::SeqCst);
self.to_wake.store(0, atomic::SeqCst);
assert!(task != 0);
unsafe { BlockedTask::cast_from_uint(task) }
assert!(ptr != 0);
unsafe { SignalToken::cast_from_uint(ptr) }
}
////////////////////////////////////////////////////////////////////////////
@ -414,19 +407,18 @@ impl<T: Send> Packet<T> {
}
}
// Inserts the blocked task for selection on this port, returning it back if
// the port already has data on it.
// Inserts the signal token for selection on this port, returning true if
// blocking should proceed.
//
// The code here is the same as in stream.rs, except that it doesn't need to
// peek at the channel to see if an upgrade is pending.
pub fn start_selection(&mut self,
task: BlockedTask) -> Result<(), BlockedTask> {
match self.decrement(task) {
Ok(()) => Ok(()),
Err(task) => {
pub fn start_selection(&mut self, token: SignalToken) -> StartResult {
match self.decrement(token) {
Installed => Installed,
Abort => {
let prev = self.bump(1);
assert!(prev == DISCONNECTED || prev >= 0);
return Err(task);
Abort
}
}
}
@ -464,7 +456,7 @@ impl<T: Send> Packet<T> {
let cur = prev + steals + 1;
assert!(cur >= 0);
if prev < 0 {
self.take_to_wake().trash();
drop(self.take_to_wake());
} else {
while self.to_wake.load(atomic::SeqCst) != 0 {
Thread::yield_now();

View File

@ -27,13 +27,12 @@ use core::prelude::*;
use alloc::boxed::Box;
use core::cmp;
use core::int;
use rt::local::Local;
use rt::task::{Task, BlockedTask};
use rt::thread::Thread;
use thread::Thread;
use sync::atomic;
use comm::spsc_queue as spsc;
use comm::Receiver;
use comm::blocking::{mod, WaitToken, SignalToken};
const DISCONNECTED: int = int::MIN;
#[cfg(test)]
@ -46,7 +45,7 @@ pub struct Packet<T> {
cnt: atomic::AtomicInt, // How many items are on this channel
steals: int, // How many times has a port received without blocking?
to_wake: atomic::AtomicUint, // Task to wake up
to_wake: atomic::AtomicUint, // SignalToken for the blocked thread to wake up
port_dropped: atomic::AtomicBool, // flag if the channel has been destroyed.
}
@ -60,13 +59,13 @@ pub enum Failure<T> {
pub enum UpgradeResult {
UpSuccess,
UpDisconnected,
UpWoke(BlockedTask),
UpWoke(SignalToken),
}
pub enum SelectionResult<T> {
SelSuccess,
SelCanceled(BlockedTask),
SelUpgraded(BlockedTask, Receiver<T>),
SelCanceled,
SelUpgraded(SignalToken, Receiver<T>),
}
// Any message could contain an "upgrade request" to a new shared port, so the
@ -89,7 +88,6 @@ impl<T: Send> Packet<T> {
}
}
pub fn send(&mut self, t: T) -> Result<(), T> {
// If the other port has deterministically gone away, then definitely
// must return the data back up the stack. Otherwise, the data is
@ -98,10 +96,11 @@ impl<T: Send> Packet<T> {
match self.do_send(Data(t)) {
UpSuccess | UpDisconnected => {},
UpWoke(task) => { task.wake().map(|t| t.reawaken()); }
UpWoke(token) => { token.signal(); }
}
Ok(())
}
pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
// If the port has gone away, then there's no need to proceed any
// further.
@ -144,20 +143,20 @@ impl<T: Send> Packet<T> {
}
// Consumes ownership of the 'to_wake' field.
fn take_to_wake(&mut self) -> BlockedTask {
let task = self.to_wake.load(atomic::SeqCst);
fn take_to_wake(&mut self) -> SignalToken {
let ptr = self.to_wake.load(atomic::SeqCst);
self.to_wake.store(0, atomic::SeqCst);
assert!(task != 0);
unsafe { BlockedTask::cast_from_uint(task) }
assert!(ptr != 0);
unsafe { SignaToken::cast_from_uint(ptr) }
}
// Decrements the count on the channel for a sleeper, returning the sleeper
// back if it shouldn't sleep. Note that this is the location where we take
// steals into account.
fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
fn decrement(&mut self, token: SignalToken) -> Result<(), SignalToken> {
assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
let n = unsafe { task.cast_to_uint() };
self.to_wake.store(n, atomic::SeqCst);
let ptr = unsafe { token.cast_to_uint() };
self.to_wake.store(ptr, atomic::SeqCst);
let steals = self.steals;
self.steals = 0;
@ -173,7 +172,7 @@ impl<T: Send> Packet<T> {
}
self.to_wake.store(0, atomic::SeqCst);
Err(unsafe { BlockedTask::cast_from_uint(n) })
Err(unsafe { SignalToken::cast_from_uint(ptr) })
}
pub fn recv(&mut self) -> Result<T, Failure<T>> {
@ -185,10 +184,10 @@ impl<T: Send> Packet<T> {
// Welp, our channel has no data. Deschedule the current task and
// initiate the blocking protocol.
let task: Box<Task> = Local::take();
task.deschedule(1, |task| {
self.decrement(task)
});
let (wait_token, signal_token) = blocking::tokens();
if self.decrement(signal_token).is_ok() {
wait_token.wait()
}
match self.try_recv() {
// Messages which actually popped from the queue shouldn't count as
@ -269,7 +268,7 @@ impl<T: Send> Packet<T> {
// Dropping a channel is pretty simple, we just flag it as disconnected
// and then wakeup a blocker if there is one.
match self.cnt.swap(DISCONNECTED, atomic::SeqCst) {
-1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
-1 => { self.take_to_wake().signal(); }
DISCONNECTED => {}
n => { assert!(n >= 0); }
}
@ -364,19 +363,19 @@ impl<T: Send> Packet<T> {
// Attempts to start selecting on this port. Like a oneshot, this can fail
// immediately because of an upgrade.
pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
match self.decrement(task) {
pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> {
match self.decrement(token) {
Ok(()) => SelSuccess,
Err(task) => {
Err(token) => {
let ret = match self.queue.peek() {
Some(&GoUp(..)) => {
match self.queue.pop() {
Some(GoUp(port)) => SelUpgraded(task, port),
Some(GoUp(port)) => SelUpgraded(token, port),
_ => unreachable!(),
}
}
Some(..) => SelCanceled(task),
None => SelCanceled(task),
Some(..) => SelCanceled,
None => SelCanceled,
};
// Undo our decrement above, and we should be guaranteed that the
// previous value is positive because we're not going to sleep
@ -439,7 +438,7 @@ impl<T: Send> Packet<T> {
// final solution but rather out of necessity for now to get
// something working.
if prev < 0 {
self.take_to_wake().trash();
drop(self.take_to_wake());
} else {
while self.to_wake.load(atomic::SeqCst) != 0 {
Thread::yield_now();

View File

@ -42,11 +42,10 @@ use alloc::boxed::Box;
use vec::Vec;
use core::mem;
use core::cell::UnsafeCell;
use rt::local::Local;
use rt::mutex::{NativeMutex, LockGuard};
use rt::task::{Task, BlockedTask};
use sync::atomic;
use sync::{atomic, Mutex, MutexGuard};
use comm::blocking::{mod, WaitToken, SignalToken};
use comm::select::StartResult::{mod, Installed, Abort};
pub struct Packet<T> {
/// Only field outside of the mutex. Just done for kicks, but mainly because
@ -74,10 +73,10 @@ struct State<T> {
canceled: Option<&'static mut bool>,
}
/// Possible flavors of tasks who can be blocked on this channel.
/// Possible flavors of threads who can be blocked on this channel.
enum Blocker {
BlockedSender(BlockedTask),
BlockedReceiver(BlockedTask),
BlockedSender(SignalToken),
BlockedReceiver(SignalToken),
NoneBlocked
}
@ -89,7 +88,7 @@ struct Queue {
}
struct Node {
task: Option<BlockedTask>,
token: Option<SignalToken>,
next: *mut Node,
}
@ -106,28 +105,15 @@ pub enum Failure {
Disconnected,
}
/// Atomically blocks the current task, placing it into `slot`, unlocking `lock`
/// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
/// in the meantime. This re-locks the mutex upon returning.
fn wait(slot: &mut Blocker, f: fn(BlockedTask) -> Blocker,
lock: &NativeMutex) {
let me: Box<Task> = Local::take();
me.deschedule(1, |task| {
match mem::replace(slot, f(task)) {
NoneBlocked => {}
_ => unreachable!(),
}
unsafe { lock.unlock_noguard(); }
Ok(())
});
unsafe { lock.lock_noguard(); }
}
/// Wakes up a task, dropping the lock at the correct time
fn wakeup(task: BlockedTask, guard: LockGuard) {
/// Wakes up a thread, dropping the lock at the correct time
fn wakeup<T>(token: SignalToken, guard: MutexGuard<State<T>>) {
// We need to be careful to wake up the waiting task *outside* of the mutex
// in case it incurs a context switch.
mem::drop(guard);
task.wake().map(|t| t.reawaken());
drop(guard);
token.signal();
}
impl<T: Send> Packet<T> {
@ -153,29 +139,27 @@ impl<T: Send> Packet<T> {
}
}
// Locks this channel, returning a guard for the state and the mutable state
// itself. Care should be taken to ensure that the state does not escape the
// guard!
//
// Note that we're ok promoting an & reference to an &mut reference because
// the lock ensures that we're the only ones in the world with a pointer to
// the state.
fn lock<'a>(&'a self) -> (LockGuard<'a>, &'a mut State<T>) {
unsafe {
let guard = self.lock.lock();
(guard, &mut *self.state.get())
// wait until a send slot is available, returning locked access to
// the channel state.
fn acquire_send_slot(&self) -> MutexGuard<State<T>> {
let mut node = Node { token: None, next: 0 as *mut Node };
loop {
let mut guard = self.lock.lock();
// are we ready to go?
if guard.disconnected || guard.buf.size() < guard.buf.cap() {
return guard;
}
// no room; actually block
let wait_token = guard.queue.enqueue(&mut node);
drop(guard);
wait_token.wait();
}
}
pub fn send(&self, t: T) -> Result<(), T> {
let (guard, state) = self.lock();
// wait for a slot to become available, and enqueue the data
while !state.disconnected && state.buf.size() == state.buf.cap() {
state.queue.enqueue(&self.lock);
}
if state.disconnected { return Err(t) }
state.buf.enqueue(t);
let guard = self.acquire_send_slot();
if guard.disconnected { return Err(t) }
guard.buf.enqueue(t);
match mem::replace(&mut state.blocker, NoneBlocked) {
// if our capacity is 0, then we need to wait for a receiver to be
@ -194,7 +178,7 @@ impl<T: Send> Packet<T> {
NoneBlocked => Ok(()),
// success, someone's about to receive our buffered data.
BlockedReceiver(task) => { wakeup(task, guard); Ok(()) }
BlockedReceiver(token) => { wakeup(token, guard); Ok(()) }
BlockedSender(..) => panic!("lolwut"),
}
@ -212,9 +196,9 @@ impl<T: Send> Packet<T> {
match mem::replace(&mut state.blocker, NoneBlocked) {
NoneBlocked => Err(super::Full(t)),
BlockedSender(..) => unreachable!(),
BlockedReceiver(task) => {
state.buf.enqueue(t);
wakeup(task, guard);
BlockedReceiver(token) => {
guard.buf.enqueue(t);
wakeup(token, guard);
Ok(())
}
}
@ -222,10 +206,10 @@ impl<T: Send> Packet<T> {
// If the buffer has some space and the capacity isn't 0, then we
// just enqueue the data for later retrieval, ensuring to wake up
// any blocked receiver if there is one.
assert!(state.buf.size() < state.buf.cap());
state.buf.enqueue(t);
match mem::replace(&mut state.blocker, NoneBlocked) {
BlockedReceiver(task) => wakeup(task, guard),
assert!(guard.buf.size() < guard.buf.cap());
guard.buf.enqueue(t);
match mem::replace(&mut guard.blocker, NoneBlocked) {
BlockedReceiver(token) => wakeup(token, guard),
NoneBlocked => {}
BlockedSender(..) => unreachable!(),
}
@ -238,7 +222,7 @@ impl<T: Send> Packet<T> {
// When reading this, remember that there can only ever be one receiver at
// time.
pub fn recv(&self) -> Result<T, ()> {
let (guard, state) = self.lock();
let guard = self.lock.lock();
// Wait for the buffer to have something in it. No need for a while loop
// because we're the only receiver.
@ -275,10 +259,8 @@ impl<T: Send> Packet<T> {
// * `waited` - flag if the receiver blocked to receive some data, or if it
// just picked up some data on the way out
// * `guard` - the lock guard that is held over this channel's lock
fn wakeup_senders(&self, waited: bool,
guard: LockGuard,
state: &mut State<T>) {
let pending_sender1: Option<BlockedTask> = state.queue.dequeue();
fn wakeup_senders(&self, waited: bool, guard: MutexGuard<State<T>>) {
let pending_sender1: Option<SignalToken> = guard.queue.dequeue();
// If this is a no-buffer channel (cap == 0), then if we didn't wait we
// need to ACK the sender. If we waited, then the sender waking us up
@ -287,9 +269,9 @@ impl<T: Send> Packet<T> {
match mem::replace(&mut state.blocker, NoneBlocked) {
NoneBlocked => None,
BlockedReceiver(..) => unreachable!(),
BlockedSender(task) => {
state.canceled.take();
Some(task)
BlockedSender(token) => {
guard.canceled.take();
Some(token)
}
}
} else {
@ -298,8 +280,8 @@ impl<T: Send> Packet<T> {
mem::drop((state, guard));
// only outside of the lock do we wake up the pending tasks
pending_sender1.map(|t| t.wake().map(|t| t.reawaken()));
pending_sender2.map(|t| t.wake().map(|t| t.reawaken()));
pending_sender1.map(|t| t.signal());
pending_sender2.map(|t| t.signal());
}
// Prepares this shared packet for a channel clone, essentially just bumping
@ -322,7 +304,7 @@ impl<T: Send> Packet<T> {
match mem::replace(&mut state.blocker, NoneBlocked) {
NoneBlocked => {}
BlockedSender(..) => unreachable!(),
BlockedReceiver(task) => wakeup(task, guard),
BlockedReceiver(token) => wakeup(token, guard),
}
}
@ -349,9 +331,9 @@ impl<T: Send> Packet<T> {
let waiter = match mem::replace(&mut state.blocker, NoneBlocked) {
NoneBlocked => None,
BlockedSender(task) => {
*state.canceled.take().unwrap() = true;
Some(task)
BlockedSender(token) => {
*guard.canceled.take().unwrap() = true;
Some(token)
}
BlockedReceiver(..) => unreachable!(),
};
@ -359,11 +341,11 @@ impl<T: Send> Packet<T> {
loop {
match queue.dequeue() {
Some(task) => { task.wake().map(|t| t.reawaken()); }
Some(token) => { token.signal(); }
None => break,
}
}
waiter.map(|t| t.wake().map(|t| t.reawaken()));
waiter.map(|t| t.signal());
}
////////////////////////////////////////////////////////////////////////////
@ -379,17 +361,17 @@ impl<T: Send> Packet<T> {
// Attempts to start selection on this port. This can either succeed or fail
// because there is data waiting.
pub fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>{
let (_g, state) = self.lock();
if state.disconnected || state.buf.size() > 0 {
Err(task)
pub fn start_selection(&self, token: SignalToken) -> StartResult {
let guard = self.lock();
if guard.disconnected || guard.buf.size() > 0 {
Abort
} else {
match mem::replace(&mut state.blocker, BlockedReceiver(task)) {
match mem::replace(&mut guard.blocker, BlockedReceiver(token)) {
NoneBlocked => {}
BlockedSender(..) => unreachable!(),
BlockedReceiver(..) => unreachable!(),
}
Ok(())
Installed
}
}
@ -401,11 +383,11 @@ impl<T: Send> Packet<T> {
let (_g, state) = self.lock();
match mem::replace(&mut state.blocker, NoneBlocked) {
NoneBlocked => true,
BlockedSender(task) => {
state.blocker = BlockedSender(task);
BlockedSender(token) => {
guard.blocker = BlockedSender(token);
true
}
BlockedReceiver(task) => { task.trash(); false }
BlockedReceiver(token) => { drop(token); false }
}
}
}
@ -449,31 +431,25 @@ impl<T> Buffer<T> {
////////////////////////////////////////////////////////////////////////////////
impl Queue {
fn enqueue(&mut self, lock: &NativeMutex) {
let task: Box<Task> = Local::take();
let mut node = Node {
task: None,
next: 0 as *mut Node,
};
task.deschedule(1, |task| {
node.task = Some(task);
if self.tail.is_null() {
self.head = &mut node as *mut Node;
self.tail = &mut node as *mut Node;
} else {
unsafe {
(*self.tail).next = &mut node as *mut Node;
self.tail = &mut node as *mut Node;
}
fn enqueue(&mut self, node: &mut Node) -> WaitToken {
let (wait_token, signal_token) = blocking::tokens();
node.token = Some(signal_token);
node.next = 0 as *mut Node;
if self.tail.is_null() {
self.head = node as *mut Node;
self.tail = node as *mut Node;
} else {
unsafe {
(*self.tail).next = node as *mut Node;
self.tail = node as *mut Node;
}
unsafe { lock.unlock_noguard(); }
Ok(())
});
unsafe { lock.lock_noguard(); }
assert!(node.next.is_null());
}
wait_token
}
fn dequeue(&mut self) -> Option<BlockedTask> {
fn dequeue(&mut self) -> Option<SignalToken> {
if self.head.is_null() {
return None
}
@ -484,7 +460,7 @@ impl Queue {
}
unsafe {
(*node).next = 0 as *mut Node;
Some((*node).task.take().unwrap())
Some((*node).token.take().unwrap())
}
}
}