Implement a lock-free work-stealing deque

This adds an implementation of the Chase-Lev work-stealing deque to libstd
under std::rt::deque. I've been unable to break the implementation of the deque
itself, and it's not super highly optimized just yet (everything uses a SeqCst
memory ordering).

The major snag in implementing the chase-lev deque is that the buffers used to
store data internally cannot get deallocated back to the OS. In the meantime, a
shared buffer pool (synchronized by a normal mutex) is used to
deallocate/allocate buffers from. This is done in hope of not overcommitting too
much memory. It is in theory possible to eventually free the buffers, but one
must be very careful in doing so.

I was unable to get some good numbers from src/test/bench tests (I don't think
many of them are slamming the work queue that much), but I was able to get some
good numbers from one of my own tests. In a recent rewrite of select::select(),
I found that my implementation was incredibly slow due to contention on the
shared work queue. Upon switching to the parallel deque, I saw the contention
drop to 0 and the runtime go from 1.6s to 0.9s with the most amount of time
spent in libuv awakening the schedulers (plus allocations).

Closes #4877
This commit is contained in:
Alex Crichton 2013-11-26 09:40:24 -08:00
parent 08f4d1ff9f
commit a70f9d7324
7 changed files with 723 additions and 141 deletions

View File

@ -1076,21 +1076,22 @@ mod test {
use std::rt::task::Task;
use std::rt::task::UnwindResult;
use std::rt::thread::Thread;
use std::rt::work_queue::WorkQueue;
use std::rt::deque::BufferPool;
use std::unstable::run_in_bare_thread;
use uvio::UvEventLoop;
do run_in_bare_thread {
let sleepers = SleeperList::new();
let work_queue1 = WorkQueue::new();
let work_queue2 = WorkQueue::new();
let queues = ~[work_queue1.clone(), work_queue2.clone()];
let mut pool = BufferPool::init();
let (worker1, stealer1) = pool.deque();
let (worker2, stealer2) = pool.deque();
let queues = ~[stealer1, stealer2];
let loop1 = ~UvEventLoop::new() as ~EventLoop;
let mut sched1 = ~Scheduler::new(loop1, work_queue1, queues.clone(),
let mut sched1 = ~Scheduler::new(loop1, worker1, queues.clone(),
sleepers.clone());
let loop2 = ~UvEventLoop::new() as ~EventLoop;
let mut sched2 = ~Scheduler::new(loop2, work_queue2, queues.clone(),
let mut sched2 = ~Scheduler::new(loop2, worker2, queues.clone(),
sleepers.clone());
let handle1 = Cell::new(sched1.make_handle());

658
src/libstd/rt/deque.rs Normal file
View File

@ -0,0 +1,658 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! A (mostly) lock-free concurrent work-stealing deque
//!
//! This module contains an implementation of the Chase-Lev work stealing deque
//! described in "Dynamic Circular Work-Stealing Deque". The implementation is
//! heavily based on the pseudocode found in the paper.
//!
//! This implementation does not want to have the restriction of a garbage
//! collector for reclamation of buffers, and instead it uses a shared pool of
//! buffers. This shared pool is required for correctness in this
//! implementation.
//!
//! The only lock-synchronized portions of this deque are the buffer allocation
//! and deallocation portions. Otherwise all operations are lock-free.
//!
//! # Example
//!
//! use std::rt::deque::BufferPool;
//!
//! let mut pool = BufferPool::init();
//! let (mut worker, mut stealer) = pool.deque();
//!
//! // Only the worker may push/pop
//! worker.push(1);
//! worker.pop();
//!
//! // Stealers take data from the other end of the deque
//! worker.push(1);
//! stealer.steal();
//!
//! // Stealers can be cloned to have many stealers stealing in parallel
//! worker.push(1);
//! let mut stealer2 = stealer.clone();
//! stealer2.steal();
// NB: the "buffer pool" strategy is not done for speed, but rather for
// correctness. For more info, see the comment on `swap_buffer`
// XXX: all atomic operations in this module use a SeqCst ordering. That is
// probably overkill
use cast;
use clone::Clone;
use iter::range;
use kinds::Send;
use libc;
use mem;
use ops::Drop;
use option::{Option, Some, None};
use ptr;
use unstable::atomics::{AtomicInt, AtomicPtr, SeqCst};
use unstable::sync::{UnsafeArc, Exclusive};
// Once the queue is less than 1/K full, then it will be downsized. Note that
// the deque requires that this number be less than 2.
static K: int = 4;
// Minimum number of bits that a buffer size should be. No buffer will resize to
// under this value, and all deques will initially contain a buffer of this
// size.
//
// The size in question is 1 << MIN_BITS
static MIN_BITS: int = 7;
struct Deque<T> {
bottom: AtomicInt,
top: AtomicInt,
array: AtomicPtr<Buffer<T>>,
pool: BufferPool<T>,
}
/// Worker half of the work-stealing deque. This worker has exclusive access to
/// one side of the deque, and uses `push` and `pop` method to manipulate it.
///
/// There may only be one worker per deque.
pub struct Worker<T> {
priv deque: UnsafeArc<Deque<T>>,
}
/// The stealing half of the work-stealing deque. Stealers have access to the
/// opposite end of the deque from the worker, and they only have access to the
/// `steal` method.
pub struct Stealer<T> {
priv deque: UnsafeArc<Deque<T>>,
}
/// When stealing some data, this is an enumeration of the possible outcomes.
#[deriving(Eq)]
pub enum Stolen<T> {
/// The deque was empty at the time of stealing
Empty,
/// The stealer lost the race for stealing data, and a retry may return more
/// data.
Abort,
/// The stealer has successfully stolen some data.
Data(T),
}
/// The allocation pool for buffers used by work-stealing deques. Right now this
/// structure is used for reclamation of memory after it is no longer in use by
/// deques.
///
/// This data structure is protected by a mutex, but it is rarely used. Deques
/// will only use this structure when allocating a new buffer or deallocating a
/// previous one.
pub struct BufferPool<T> {
priv pool: Exclusive<~[~Buffer<T>]>,
}
/// An internal buffer used by the chase-lev deque. This structure is actually
/// implemented as a circular buffer, and is used as the intermediate storage of
/// the data in the deque.
///
/// This type is implemented with *T instead of ~[T] for two reasons:
///
/// 1. There is nothing safe about using this buffer. This easily allows the
/// same value to be read twice in to rust, and there is nothing to
/// prevent this. The usage by the deque must ensure that one of the
/// values is forgotten. Furthermore, we only ever want to manually run
/// destructors for values in this buffer (on drop) because the bounds
/// are defined by the deque it's owned by.
///
/// 2. We can certainly avoid bounds checks using *T instead of ~[T], although
/// LLVM is probably pretty good at doing this already.
struct Buffer<T> {
storage: *T,
log_size: int,
}
impl<T: Send> BufferPool<T> {
/// Allocates a new buffer pool which in turn can be used to allocate new
/// deques.
pub fn init() -> BufferPool<T> {
BufferPool { pool: Exclusive::new(~[]) }
}
/// Allocates a new work-stealing deque which will send/receiving memory to
/// and from this buffer pool.
pub fn deque(&mut self) -> (Worker<T>, Stealer<T>) {
let (a, b) = UnsafeArc::new2(Deque::init(self.clone()));
(Worker { deque: a }, Stealer { deque: b })
}
fn alloc(&mut self, bits: int) -> ~Buffer<T> {
unsafe {
self.pool.with(|pool| {
match pool.iter().position(|x| x.size() >= (1 << bits)) {
Some(i) => pool.remove(i),
None => ~Buffer::init(bits)
}
})
}
}
fn free(&mut self, buf: ~Buffer<T>) {
unsafe {
use cell::Cell;
let buf = Cell::new(buf);
self.pool.with(|pool| {
let buf = buf.take();
match pool.iter().position(|v| v.size() > buf.size()) {
Some(i) => pool.insert(i, buf),
None => pool.push(buf),
}
})
}
}
}
impl<T: Send> Clone for BufferPool<T> {
fn clone(&self) -> BufferPool<T> { BufferPool { pool: self.pool.clone() } }
}
impl<T: Send> Worker<T> {
/// Pushes data onto the front of this work queue.
pub fn push(&mut self, t: T) {
unsafe { (*self.deque.get()).push(t) }
}
/// Pops data off the front of the work queue, returning `None` on an empty
/// queue.
pub fn pop(&mut self) -> Option<T> {
unsafe { (*self.deque.get()).pop() }
}
/// Gets access to the buffer pool that this worker is attached to. This can
/// be used to create more deques which share the same buffer pool as this
/// deque.
pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> {
unsafe { &mut (*self.deque.get()).pool }
}
}
impl<T: Send> Stealer<T> {
/// Steals work off the end of the queue (opposite of the worker's end)
pub fn steal(&mut self) -> Stolen<T> {
unsafe { (*self.deque.get()).steal() }
}
/// Gets access to the buffer pool that this stealer is attached to. This
/// can be used to create more deques which share the same buffer pool as
/// this deque.
pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> {
unsafe { &mut (*self.deque.get()).pool }
}
}
impl<T: Send> Clone for Stealer<T> {
fn clone(&self) -> Stealer<T> { Stealer { deque: self.deque.clone() } }
}
// Almost all of this code can be found directly in the paper so I'm not
// personally going to heavily comment what's going on here.
impl<T: Send> Deque<T> {
fn init(mut pool: BufferPool<T>) -> Deque<T> {
let buf = pool.alloc(MIN_BITS);
Deque {
bottom: AtomicInt::new(0),
top: AtomicInt::new(0),
array: AtomicPtr::new(unsafe { cast::transmute(buf) }),
pool: pool,
}
}
unsafe fn push(&mut self, data: T) {
let mut b = self.bottom.load(SeqCst);
let t = self.top.load(SeqCst);
let mut a = self.array.load(SeqCst);
let size = b - t;
if size >= (*a).size() - 1 {
// You won't find this code in the chase-lev deque paper. This is
// alluded to in a small footnote, however. We always free a buffer
// when growing in order to prevent leaks.
a = self.swap_buffer(b, a, (*a).resize(b, t, 1));
b = self.bottom.load(SeqCst);
}
(*a).put(b, data);
self.bottom.store(b + 1, SeqCst);
}
unsafe fn pop(&mut self) -> Option<T> {
let b = self.bottom.load(SeqCst);
let a = self.array.load(SeqCst);
let b = b - 1;
self.bottom.store(b, SeqCst);
let t = self.top.load(SeqCst);
let size = b - t;
if size < 0 {
self.bottom.store(t, SeqCst);
return None;
}
let data = (*a).get(b);
if size > 0 {
self.maybe_shrink(b, t);
return Some(data);
}
if self.top.compare_and_swap(t, t + 1, SeqCst) == t {
self.bottom.store(t + 1, SeqCst);
return Some(data);
} else {
self.bottom.store(t + 1, SeqCst);
cast::forget(data); // someone else stole this value
return None;
}
}
unsafe fn steal(&mut self) -> Stolen<T> {
let t = self.top.load(SeqCst);
let old = self.array.load(SeqCst);
let b = self.bottom.load(SeqCst);
let a = self.array.load(SeqCst);
let size = b - t;
if size <= 0 { return Empty }
if size % (*a).size() == 0 {
if a == old && t == self.top.load(SeqCst) {
return Empty
}
return Abort
}
let data = (*a).get(t);
if self.top.compare_and_swap(t, t + 1, SeqCst) == t {
Data(data)
} else {
cast::forget(data); // someone else stole this value
Abort
}
}
unsafe fn maybe_shrink(&mut self, b: int, t: int) {
let a = self.array.load(SeqCst);
if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) {
self.swap_buffer(b, a, (*a).resize(b, t, -1));
}
}
// Helper routine not mentioned in the paper which is used in growing and
// shrinking buffers to swap in a new buffer into place. As a bit of a
// recap, the whole point that we need a buffer pool rather than just
// calling malloc/free directly is that stealers can continue using buffers
// after this method has called 'free' on it. The continued usage is simply
// a read followed by a forget, but we must make sure that the memory can
// continue to be read after we flag this buffer for reclamation.
unsafe fn swap_buffer(&mut self, b: int, old: *mut Buffer<T>,
buf: Buffer<T>) -> *mut Buffer<T> {
let newbuf: *mut Buffer<T> = cast::transmute(~buf);
self.array.store(newbuf, SeqCst);
let ss = (*newbuf).size();
self.bottom.store(b + ss, SeqCst);
let t = self.top.load(SeqCst);
if self.top.compare_and_swap(t, t + ss, SeqCst) != t {
self.bottom.store(b, SeqCst);
}
self.pool.free(cast::transmute(old));
return newbuf;
}
}
#[unsafe_destructor]
impl<T: Send> Drop for Deque<T> {
fn drop(&mut self) {
let t = self.top.load(SeqCst);
let b = self.bottom.load(SeqCst);
let a = self.array.load(SeqCst);
// Free whatever is leftover in the dequeue, and then move the buffer
// back into the pool.
for i in range(t, b) {
let _: T = unsafe { (*a).get(i) };
}
self.pool.free(unsafe { cast::transmute(a) });
}
}
impl<T: Send> Buffer<T> {
unsafe fn init(log_size: int) -> Buffer<T> {
let size = (1 << log_size) * mem::size_of::<T>();
let buffer = libc::malloc(size as libc::size_t);
assert!(!buffer.is_null());
Buffer {
storage: buffer as *T,
log_size: log_size,
}
}
fn size(&self) -> int { 1 << self.log_size }
// Apparently LLVM cannot optimize (foo % (1 << bar)) into this implicitly
fn mask(&self) -> int { (1 << self.log_size) - 1 }
// This does not protect against loading duplicate values of the same cell,
// nor does this clear out the contents contained within. Hence, this is a
// very unsafe method which the caller needs to treat specially in case a
// race is lost.
unsafe fn get(&self, i: int) -> T {
ptr::read_ptr(self.storage.offset(i & self.mask()))
}
// Unsafe because this unsafely overwrites possibly uninitialized or
// initialized data.
unsafe fn put(&mut self, i: int, t: T) {
let ptr = self.storage.offset(i & self.mask());
ptr::copy_nonoverlapping_memory(ptr as *mut T, &t as *T, 1);
cast::forget(t);
}
// Again, unsafe because this has incredibly dubious ownership violations.
// It is assumed that this buffer is immediately dropped.
unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer<T> {
let mut buf = Buffer::init(self.log_size + delta);
for i in range(t, b) {
buf.put(i, self.get(i));
}
return buf;
}
}
#[unsafe_destructor]
impl<T: Send> Drop for Buffer<T> {
fn drop(&mut self) {
// It is assumed that all buffers are empty on drop.
unsafe { libc::free(self.storage as *libc::c_void) }
}
}
#[cfg(test)]
mod tests {
use prelude::*;
use super::{Data, BufferPool, Abort, Empty, Worker, Stealer};
use cast;
use rt::thread::Thread;
use rand;
use rand::Rng;
use unstable::atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst,
AtomicUint, INIT_ATOMIC_UINT};
use vec;
#[test]
fn smoke() {
let mut pool = BufferPool::init();
let (mut w, mut s) = pool.deque();
assert_eq!(w.pop(), None);
assert_eq!(s.steal(), Empty);
w.push(1);
assert_eq!(w.pop(), Some(1));
w.push(1);
assert_eq!(s.steal(), Data(1));
w.push(1);
assert_eq!(s.clone().steal(), Data(1));
}
#[test]
fn stealpush() {
static AMT: int = 100000;
let mut pool = BufferPool::<int>::init();
let (mut w, s) = pool.deque();
let t = do Thread::start {
let mut s = s;
let mut left = AMT;
while left > 0 {
match s.steal() {
Data(i) => {
assert_eq!(i, 1);
left -= 1;
}
Abort | Empty => {}
}
}
};
for _ in range(0, AMT) {
w.push(1);
}
t.join();
}
#[test]
fn stealpush_large() {
static AMT: int = 100000;
let mut pool = BufferPool::<(int, int)>::init();
let (mut w, s) = pool.deque();
let t = do Thread::start {
let mut s = s;
let mut left = AMT;
while left > 0 {
match s.steal() {
Data((1, 10)) => { left -= 1; }
Data(..) => fail!(),
Abort | Empty => {}
}
}
};
for _ in range(0, AMT) {
w.push((1, 10));
}
t.join();
}
fn stampede(mut w: Worker<~int>, s: Stealer<~int>,
nthreads: int, amt: uint) {
for _ in range(0, amt) {
w.push(~20);
}
let mut remaining = AtomicUint::new(amt);
let unsafe_remaining: *mut AtomicUint = &mut remaining;
let threads = range(0, nthreads).map(|_| {
let s = s.clone();
do Thread::start {
unsafe {
let mut s = s;
while (*unsafe_remaining).load(SeqCst) > 0 {
match s.steal() {
Data(~20) => {
(*unsafe_remaining).fetch_sub(1, SeqCst);
}
Data(..) => fail!(),
Abort | Empty => {}
}
}
}
}
}).to_owned_vec();
while remaining.load(SeqCst) > 0 {
match w.pop() {
Some(~20) => { remaining.fetch_sub(1, SeqCst); }
Some(..) => fail!(),
None => {}
}
}
for thread in threads.move_iter() {
thread.join();
}
}
#[test]
fn run_stampede() {
let mut pool = BufferPool::<~int>::init();
let (w, s) = pool.deque();
stampede(w, s, 8, 10000);
}
#[test]
fn many_stampede() {
static AMT: uint = 4;
let mut pool = BufferPool::<~int>::init();
let threads = range(0, AMT).map(|_| {
let (w, s) = pool.deque();
do Thread::start {
stampede(w, s, 4, 10000);
}
}).to_owned_vec();
for thread in threads.move_iter() {
thread.join();
}
}
#[test]
fn stress() {
static AMT: int = 100000;
static NTHREADS: int = 8;
static mut DONE: AtomicBool = INIT_ATOMIC_BOOL;
static mut HITS: AtomicUint = INIT_ATOMIC_UINT;
let mut pool = BufferPool::<int>::init();
let (mut w, s) = pool.deque();
let threads = range(0, NTHREADS).map(|_| {
let s = s.clone();
do Thread::start {
unsafe {
let mut s = s;
loop {
match s.steal() {
Data(2) => { HITS.fetch_add(1, SeqCst); }
Data(..) => fail!(),
_ if DONE.load(SeqCst) => break,
_ => {}
}
}
}
}
}).to_owned_vec();
let mut rng = rand::task_rng();
let mut expected = 0;
while expected < AMT {
if rng.gen_range(0, 3) == 2 {
match w.pop() {
None => {}
Some(2) => unsafe { HITS.fetch_add(1, SeqCst); },
Some(_) => fail!(),
}
} else {
expected += 1;
w.push(2);
}
}
unsafe {
while HITS.load(SeqCst) < AMT as uint {
match w.pop() {
None => {}
Some(2) => { HITS.fetch_add(1, SeqCst); },
Some(_) => fail!(),
}
}
DONE.store(true, SeqCst);
}
for thread in threads.move_iter() {
thread.join();
}
assert_eq!(unsafe { HITS.load(SeqCst) }, expected as uint);
}
#[test]
fn no_starvation() {
static AMT: int = 10000;
static NTHREADS: int = 4;
static mut DONE: AtomicBool = INIT_ATOMIC_BOOL;
let mut pool = BufferPool::<(int, uint)>::init();
let (mut w, s) = pool.deque();
let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| {
let s = s.clone();
let box = ~AtomicUint::new(0);
let thread_box = unsafe {
*cast::transmute::<&~AtomicUint, **mut AtomicUint>(&box)
};
(do Thread::start {
unsafe {
let mut s = s;
loop {
match s.steal() {
Data((1, 2)) => {
(*thread_box).fetch_add(1, SeqCst);
}
Data(..) => fail!(),
_ if DONE.load(SeqCst) => break,
_ => {}
}
}
}
}, box)
}));
let mut rng = rand::task_rng();
let mut myhit = false;
let mut iter = 0;
'outer: loop {
for _ in range(0, rng.gen_range(0, AMT)) {
if !myhit && rng.gen_range(0, 3) == 2 {
match w.pop() {
None => {}
Some((1, 2)) => myhit = true,
Some(_) => fail!(),
}
} else {
w.push((1, 2));
}
}
iter += 1;
debug!("loop iteration {}", iter);
for (i, slot) in hits.iter().enumerate() {
let amt = slot.load(SeqCst);
debug!("thread {}: {}", i, amt);
if amt == 0 { continue 'outer; }
}
if myhit {
break
}
}
unsafe { DONE.store(true, SeqCst); }
for thread in threads.move_iter() {
thread.join();
}
}
}

View File

@ -75,7 +75,6 @@ use vec::{OwnedVector, MutableVector, ImmutableVector};
use vec;
use self::thread::Thread;
use self::work_queue::WorkQueue;
// the os module needs to reach into this helper, so allow general access
// through this reexport.
@ -130,9 +129,6 @@ pub mod rtio;
/// or task-local storage.
pub mod local;
/// A parallel work-stealing deque.
pub mod work_queue;
/// A parallel queue.
pub mod message_queue;
@ -142,6 +138,9 @@ mod mpsc_queue;
/// A lock-free multi-producer, multi-consumer bounded queue.
mod mpmc_bounded_queue;
/// A parallel work-stealing deque
pub mod deque;
/// A parallel data structure for tracking sleeping schedulers.
pub mod sleeper_list;
@ -287,7 +286,9 @@ fn run_(main: proc(), use_main_sched: bool) -> int {
// Create a work queue for each scheduler, ntimes. Create an extra
// for the main thread if that flag is set. We won't steal from it.
let work_queues: ~[WorkQueue<~Task>] = vec::from_fn(nscheds, |_| WorkQueue::new());
let mut pool = deque::BufferPool::init();
let arr = vec::from_fn(nscheds, |_| pool.deque());
let (workers, stealers) = vec::unzip(arr.move_iter());
// The schedulers.
let mut scheds = ~[];
@ -295,14 +296,14 @@ fn run_(main: proc(), use_main_sched: bool) -> int {
// sent the Shutdown message to terminate the schedulers.
let mut handles = ~[];
for work_queue in work_queues.iter() {
for worker in workers.move_iter() {
rtdebug!("inserting a regular scheduler");
// Every scheduler is driven by an I/O event loop.
let loop_ = new_event_loop();
let mut sched = ~Scheduler::new(loop_,
work_queue.clone(),
work_queues.clone(),
worker,
stealers.clone(),
sleepers.clone());
let handle = sched.make_handle();
@ -321,12 +322,12 @@ fn run_(main: proc(), use_main_sched: bool) -> int {
// This scheduler needs a queue that isn't part of the stealee
// set.
let work_queue = WorkQueue::new();
let (worker, _) = pool.deque();
let main_loop = new_event_loop();
let mut main_sched = ~Scheduler::new_special(main_loop,
work_queue,
work_queues.clone(),
worker,
stealers.clone(),
sleepers.clone(),
false,
Some(friend_handle));

View File

@ -13,13 +13,13 @@ use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
use clone::Clone;
use unstable::raw;
use super::sleeper_list::SleeperList;
use super::work_queue::WorkQueue;
use super::stack::{StackPool};
use super::rtio::EventLoop;
use super::context::Context;
use super::task::{Task, AnySched, Sched};
use super::message_queue::MessageQueue;
use rt::kill::BlockedTask;
use rt::deque;
use rt::local_ptr;
use rt::local::Local;
use rt::rtio::{RemoteCallback, PausibleIdleCallback, Callback};
@ -39,14 +39,14 @@ use vec::{OwnedVector};
/// in too much allocation and too many events.
pub struct Scheduler {
/// There are N work queues, one per scheduler.
priv work_queue: WorkQueue<~Task>,
work_queue: deque::Worker<~Task>,
/// Work queues for the other schedulers. These are created by
/// cloning the core work queues.
work_queues: ~[WorkQueue<~Task>],
work_queues: ~[deque::Stealer<~Task>],
/// The queue of incoming messages from other schedulers.
/// These are enqueued by SchedHandles after which a remote callback
/// is triggered to handle the message.
priv message_queue: MessageQueue<SchedMessage>,
message_queue: MessageQueue<SchedMessage>,
/// A shared list of sleeping schedulers. We'll use this to wake
/// up schedulers when pushing work onto the work queue.
sleeper_list: SleeperList,
@ -56,33 +56,33 @@ pub struct Scheduler {
/// not active since there are multiple event sources that may
/// wake the scheduler. It just prevents the scheduler from pushing
/// multiple handles onto the sleeper list.
priv sleepy: bool,
sleepy: bool,
/// A flag to indicate we've received the shutdown message and should
/// no longer try to go to sleep, but exit instead.
no_sleep: bool,
stack_pool: StackPool,
/// The scheduler runs on a special task. When it is not running
/// it is stored here instead of the work queue.
priv sched_task: Option<~Task>,
sched_task: Option<~Task>,
/// An action performed after a context switch on behalf of the
/// code running before the context switch
priv cleanup_job: Option<CleanupJob>,
cleanup_job: Option<CleanupJob>,
/// Should this scheduler run any task, or only pinned tasks?
run_anything: bool,
/// If the scheduler shouldn't run some tasks, a friend to send
/// them to.
priv friend_handle: Option<SchedHandle>,
friend_handle: Option<SchedHandle>,
/// A fast XorShift rng for scheduler use
rng: XorShiftRng,
/// A toggleable idle callback
priv idle_callback: Option<~PausibleIdleCallback>,
idle_callback: Option<~PausibleIdleCallback>,
/// A countdown that starts at a random value and is decremented
/// every time a yield check is performed. When it hits 0 a task
/// will yield.
priv yield_check_count: uint,
yield_check_count: uint,
/// A flag to tell the scheduler loop it needs to do some stealing
/// in order to introduce randomness as part of a yield
priv steal_for_yield: bool,
steal_for_yield: bool,
// n.b. currently destructors of an object are run in top-to-bottom in order
// of field declaration. Due to its nature, the pausible idle callback
@ -115,8 +115,8 @@ impl Scheduler {
// * Initialization Functions
pub fn new(event_loop: ~EventLoop,
work_queue: WorkQueue<~Task>,
work_queues: ~[WorkQueue<~Task>],
work_queue: deque::Worker<~Task>,
work_queues: ~[deque::Stealer<~Task>],
sleeper_list: SleeperList)
-> Scheduler {
@ -127,8 +127,8 @@ impl Scheduler {
}
pub fn new_special(event_loop: ~EventLoop,
work_queue: WorkQueue<~Task>,
work_queues: ~[WorkQueue<~Task>],
work_queue: deque::Worker<~Task>,
work_queues: ~[deque::Stealer<~Task>],
sleeper_list: SleeperList,
run_anything: bool,
friend: Option<SchedHandle>)
@ -440,11 +440,11 @@ impl Scheduler {
let start_index = self.rng.gen_range(0, len);
for index in range(0, len).map(|i| (i + start_index) % len) {
match work_queues[index].steal() {
Some(task) => {
deque::Data(task) => {
rtdebug!("found task by stealing");
return Some(task)
}
None => ()
_ => ()
}
};
rtdebug!("giving up on stealing");
@ -889,6 +889,7 @@ mod test {
use borrow::to_uint;
use rt::sched::{Scheduler};
use cell::Cell;
use rt::deque::BufferPool;
use rt::thread::Thread;
use rt::task::{Task, Sched};
use rt::basic;
@ -994,7 +995,6 @@ mod test {
#[test]
fn test_schedule_home_states() {
use rt::sleeper_list::SleeperList;
use rt::work_queue::WorkQueue;
use rt::sched::Shutdown;
use borrow;
use rt::comm::*;
@ -1002,14 +1002,15 @@ mod test {
do run_in_bare_thread {
let sleepers = SleeperList::new();
let normal_queue = WorkQueue::new();
let special_queue = WorkQueue::new();
let queues = ~[normal_queue.clone(), special_queue.clone()];
let mut pool = BufferPool::init();
let (normal_worker, normal_stealer) = pool.deque();
let (special_worker, special_stealer) = pool.deque();
let queues = ~[normal_stealer, special_stealer];
// Our normal scheduler
let mut normal_sched = ~Scheduler::new(
basic::event_loop(),
normal_queue,
normal_worker,
queues.clone(),
sleepers.clone());
@ -1020,7 +1021,7 @@ mod test {
// Our special scheduler
let mut special_sched = ~Scheduler::new_special(
basic::event_loop(),
special_queue.clone(),
special_worker,
queues.clone(),
sleepers.clone(),
false,
@ -1169,7 +1170,6 @@ mod test {
// Used to deadlock because Shutdown was never recvd.
#[test]
fn no_missed_messages() {
use rt::work_queue::WorkQueue;
use rt::sleeper_list::SleeperList;
use rt::stack::StackPool;
use rt::sched::{Shutdown, TaskFromFriend};
@ -1178,13 +1178,13 @@ mod test {
do run_in_bare_thread {
stress_factor().times(|| {
let sleepers = SleeperList::new();
let queue = WorkQueue::new();
let queues = ~[queue.clone()];
let mut pool = BufferPool::init();
let (worker, stealer) = pool.deque();
let mut sched = ~Scheduler::new(
basic::event_loop(),
queue,
queues.clone(),
worker,
~[stealer],
sleepers.clone());
let mut handle = sched.make_handle();

View File

@ -23,24 +23,25 @@ use rand;
use result::{Result, Ok, Err};
use rt::basic;
use rt::comm::oneshot;
use rt::deque::BufferPool;
use rt::new_event_loop;
use rt::sched::Scheduler;
use rt::sleeper_list::SleeperList;
use rt::task::Task;
use rt::task::UnwindResult;
use rt::thread::Thread;
use rt::work_queue::WorkQueue;
use unstable::{run_in_bare_thread};
use vec;
use vec::{OwnedVector, MutableVector, ImmutableVector};
pub fn new_test_uv_sched() -> Scheduler {
let queue = WorkQueue::new();
let queues = ~[queue.clone()];
let mut pool = BufferPool::init();
let (worker, stealer) = pool.deque();
let mut sched = Scheduler::new(new_event_loop(),
queue,
queues,
worker,
~[stealer],
SleeperList::new());
// Don't wait for the Shutdown message
@ -50,13 +51,12 @@ pub fn new_test_uv_sched() -> Scheduler {
}
pub fn new_test_sched() -> Scheduler {
let queue = WorkQueue::new();
let queues = ~[queue.clone()];
let mut pool = BufferPool::init();
let (worker, stealer) = pool.deque();
let mut sched = Scheduler::new(basic::event_loop(),
queue,
queues,
worker,
~[stealer],
SleeperList::new());
// Don't wait for the Shutdown message
@ -227,18 +227,16 @@ pub fn run_in_mt_newsched_task(f: proc()) {
let mut handles = ~[];
let mut scheds = ~[];
let mut work_queues = ~[];
for _ in range(0u, nthreads) {
let work_queue = WorkQueue::new();
work_queues.push(work_queue);
}
let mut pool = BufferPool::<~Task>::init();
let workers = range(0, nthreads).map(|_| pool.deque());
let (workers, stealers) = vec::unzip(workers);
for i in range(0u, nthreads) {
for worker in workers.move_iter() {
let loop_ = new_event_loop();
let mut sched = ~Scheduler::new(loop_,
work_queues[i].clone(),
work_queues.clone(),
worker,
stealers.clone(),
sleepers.clone());
let handle = sched.make_handle();

View File

@ -1,75 +0,0 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use container::Container;
use option::*;
use vec::OwnedVector;
use unstable::sync::Exclusive;
use cell::Cell;
use kinds::Send;
use clone::Clone;
pub struct WorkQueue<T> {
// XXX: Another mystery bug fixed by boxing this lock
priv queue: ~Exclusive<~[T]>
}
impl<T: Send> WorkQueue<T> {
pub fn new() -> WorkQueue<T> {
WorkQueue {
queue: ~Exclusive::new(~[])
}
}
pub fn push(&mut self, value: T) {
unsafe {
let value = Cell::new(value);
self.queue.with(|q| q.unshift(value.take()) );
}
}
pub fn pop(&mut self) -> Option<T> {
unsafe {
self.queue.with(|q| {
if !q.is_empty() {
Some(q.shift())
} else {
None
}
})
}
}
pub fn steal(&mut self) -> Option<T> {
unsafe {
self.queue.with(|q| {
if !q.is_empty() {
Some(q.pop())
} else {
None
}
})
}
}
pub fn is_empty(&self) -> bool {
unsafe {
self.queue.with_imm(|q| q.is_empty() )
}
}
}
impl<T> Clone for WorkQueue<T> {
fn clone(&self) -> WorkQueue<T> {
WorkQueue {
queue: self.queue.clone()
}
}
}

View File

@ -84,7 +84,6 @@ use rt::sched::{Scheduler, Shutdown, TaskFromFriend};
use rt::task::{Task, Sched};
use rt::task::UnwindResult;
use rt::thread::Thread;
use rt::work_queue::WorkQueue;
use rt::{in_green_task_context, new_event_loop};
use task::SingleThreaded;
use task::TaskOpts;
@ -111,11 +110,11 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) {
// Since this is a 1:1 scheduler we create a queue not in
// the stealee set. The run_anything flag is set false
// which will disable stealing.
let work_queue = WorkQueue::new();
let (worker, _stealer) = (*sched).work_queue.pool().deque();
// Create a new scheduler to hold the new task
let mut new_sched = ~Scheduler::new_special(new_event_loop(),
work_queue,
worker,
(*sched).work_queues.clone(),
(*sched).sleeper_list.clone(),
false,