diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 3a485bf2ea9..a7feb6db923 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -1076,21 +1076,22 @@ mod test { use std::rt::task::Task; use std::rt::task::UnwindResult; use std::rt::thread::Thread; - use std::rt::work_queue::WorkQueue; + use std::rt::deque::BufferPool; use std::unstable::run_in_bare_thread; use uvio::UvEventLoop; do run_in_bare_thread { let sleepers = SleeperList::new(); - let work_queue1 = WorkQueue::new(); - let work_queue2 = WorkQueue::new(); - let queues = ~[work_queue1.clone(), work_queue2.clone()]; + let mut pool = BufferPool::init(); + let (worker1, stealer1) = pool.deque(); + let (worker2, stealer2) = pool.deque(); + let queues = ~[stealer1, stealer2]; let loop1 = ~UvEventLoop::new() as ~EventLoop; - let mut sched1 = ~Scheduler::new(loop1, work_queue1, queues.clone(), + let mut sched1 = ~Scheduler::new(loop1, worker1, queues.clone(), sleepers.clone()); let loop2 = ~UvEventLoop::new() as ~EventLoop; - let mut sched2 = ~Scheduler::new(loop2, work_queue2, queues.clone(), + let mut sched2 = ~Scheduler::new(loop2, worker2, queues.clone(), sleepers.clone()); let handle1 = Cell::new(sched1.make_handle()); diff --git a/src/libstd/rt/deque.rs b/src/libstd/rt/deque.rs new file mode 100644 index 00000000000..94d4523b2e2 --- /dev/null +++ b/src/libstd/rt/deque.rs @@ -0,0 +1,658 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A (mostly) lock-free concurrent work-stealing deque +//! +//! This module contains an implementation of the Chase-Lev work stealing deque +//! described in "Dynamic Circular Work-Stealing Deque". The implementation is +//! heavily based on the pseudocode found in the paper. +//! +//! This implementation does not want to have the restriction of a garbage +//! collector for reclamation of buffers, and instead it uses a shared pool of +//! buffers. This shared pool is required for correctness in this +//! implementation. +//! +//! The only lock-synchronized portions of this deque are the buffer allocation +//! and deallocation portions. Otherwise all operations are lock-free. +//! +//! # Example +//! +//! use std::rt::deque::BufferPool; +//! +//! let mut pool = BufferPool::init(); +//! let (mut worker, mut stealer) = pool.deque(); +//! +//! // Only the worker may push/pop +//! worker.push(1); +//! worker.pop(); +//! +//! // Stealers take data from the other end of the deque +//! worker.push(1); +//! stealer.steal(); +//! +//! // Stealers can be cloned to have many stealers stealing in parallel +//! worker.push(1); +//! let mut stealer2 = stealer.clone(); +//! stealer2.steal(); + +// NB: the "buffer pool" strategy is not done for speed, but rather for +// correctness. For more info, see the comment on `swap_buffer` + +// XXX: all atomic operations in this module use a SeqCst ordering. That is +// probably overkill + +use cast; +use clone::Clone; +use iter::range; +use kinds::Send; +use libc; +use mem; +use ops::Drop; +use option::{Option, Some, None}; +use ptr; +use unstable::atomics::{AtomicInt, AtomicPtr, SeqCst}; +use unstable::sync::{UnsafeArc, Exclusive}; + +// Once the queue is less than 1/K full, then it will be downsized. Note that +// the deque requires that this number be less than 2. +static K: int = 4; + +// Minimum number of bits that a buffer size should be. No buffer will resize to +// under this value, and all deques will initially contain a buffer of this +// size. +// +// The size in question is 1 << MIN_BITS +static MIN_BITS: int = 7; + +struct Deque { + bottom: AtomicInt, + top: AtomicInt, + array: AtomicPtr>, + pool: BufferPool, +} + +/// Worker half of the work-stealing deque. This worker has exclusive access to +/// one side of the deque, and uses `push` and `pop` method to manipulate it. +/// +/// There may only be one worker per deque. +pub struct Worker { + priv deque: UnsafeArc>, +} + +/// The stealing half of the work-stealing deque. Stealers have access to the +/// opposite end of the deque from the worker, and they only have access to the +/// `steal` method. +pub struct Stealer { + priv deque: UnsafeArc>, +} + +/// When stealing some data, this is an enumeration of the possible outcomes. +#[deriving(Eq)] +pub enum Stolen { + /// The deque was empty at the time of stealing + Empty, + /// The stealer lost the race for stealing data, and a retry may return more + /// data. + Abort, + /// The stealer has successfully stolen some data. + Data(T), +} + +/// The allocation pool for buffers used by work-stealing deques. Right now this +/// structure is used for reclamation of memory after it is no longer in use by +/// deques. +/// +/// This data structure is protected by a mutex, but it is rarely used. Deques +/// will only use this structure when allocating a new buffer or deallocating a +/// previous one. +pub struct BufferPool { + priv pool: Exclusive<~[~Buffer]>, +} + +/// An internal buffer used by the chase-lev deque. This structure is actually +/// implemented as a circular buffer, and is used as the intermediate storage of +/// the data in the deque. +/// +/// This type is implemented with *T instead of ~[T] for two reasons: +/// +/// 1. There is nothing safe about using this buffer. This easily allows the +/// same value to be read twice in to rust, and there is nothing to +/// prevent this. The usage by the deque must ensure that one of the +/// values is forgotten. Furthermore, we only ever want to manually run +/// destructors for values in this buffer (on drop) because the bounds +/// are defined by the deque it's owned by. +/// +/// 2. We can certainly avoid bounds checks using *T instead of ~[T], although +/// LLVM is probably pretty good at doing this already. +struct Buffer { + storage: *T, + log_size: int, +} + +impl BufferPool { + /// Allocates a new buffer pool which in turn can be used to allocate new + /// deques. + pub fn init() -> BufferPool { + BufferPool { pool: Exclusive::new(~[]) } + } + + /// Allocates a new work-stealing deque which will send/receiving memory to + /// and from this buffer pool. + pub fn deque(&mut self) -> (Worker, Stealer) { + let (a, b) = UnsafeArc::new2(Deque::init(self.clone())); + (Worker { deque: a }, Stealer { deque: b }) + } + + fn alloc(&mut self, bits: int) -> ~Buffer { + unsafe { + self.pool.with(|pool| { + match pool.iter().position(|x| x.size() >= (1 << bits)) { + Some(i) => pool.remove(i), + None => ~Buffer::init(bits) + } + }) + } + } + + fn free(&mut self, buf: ~Buffer) { + unsafe { + use cell::Cell; + let buf = Cell::new(buf); + self.pool.with(|pool| { + let buf = buf.take(); + match pool.iter().position(|v| v.size() > buf.size()) { + Some(i) => pool.insert(i, buf), + None => pool.push(buf), + } + }) + } + } +} + +impl Clone for BufferPool { + fn clone(&self) -> BufferPool { BufferPool { pool: self.pool.clone() } } +} + +impl Worker { + /// Pushes data onto the front of this work queue. + pub fn push(&mut self, t: T) { + unsafe { (*self.deque.get()).push(t) } + } + /// Pops data off the front of the work queue, returning `None` on an empty + /// queue. + pub fn pop(&mut self) -> Option { + unsafe { (*self.deque.get()).pop() } + } + + /// Gets access to the buffer pool that this worker is attached to. This can + /// be used to create more deques which share the same buffer pool as this + /// deque. + pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool { + unsafe { &mut (*self.deque.get()).pool } + } +} + +impl Stealer { + /// Steals work off the end of the queue (opposite of the worker's end) + pub fn steal(&mut self) -> Stolen { + unsafe { (*self.deque.get()).steal() } + } + + /// Gets access to the buffer pool that this stealer is attached to. This + /// can be used to create more deques which share the same buffer pool as + /// this deque. + pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool { + unsafe { &mut (*self.deque.get()).pool } + } +} + +impl Clone for Stealer { + fn clone(&self) -> Stealer { Stealer { deque: self.deque.clone() } } +} + +// Almost all of this code can be found directly in the paper so I'm not +// personally going to heavily comment what's going on here. + +impl Deque { + fn init(mut pool: BufferPool) -> Deque { + let buf = pool.alloc(MIN_BITS); + Deque { + bottom: AtomicInt::new(0), + top: AtomicInt::new(0), + array: AtomicPtr::new(unsafe { cast::transmute(buf) }), + pool: pool, + } + } + + unsafe fn push(&mut self, data: T) { + let mut b = self.bottom.load(SeqCst); + let t = self.top.load(SeqCst); + let mut a = self.array.load(SeqCst); + let size = b - t; + if size >= (*a).size() - 1 { + // You won't find this code in the chase-lev deque paper. This is + // alluded to in a small footnote, however. We always free a buffer + // when growing in order to prevent leaks. + a = self.swap_buffer(b, a, (*a).resize(b, t, 1)); + b = self.bottom.load(SeqCst); + } + (*a).put(b, data); + self.bottom.store(b + 1, SeqCst); + } + + unsafe fn pop(&mut self) -> Option { + let b = self.bottom.load(SeqCst); + let a = self.array.load(SeqCst); + let b = b - 1; + self.bottom.store(b, SeqCst); + let t = self.top.load(SeqCst); + let size = b - t; + if size < 0 { + self.bottom.store(t, SeqCst); + return None; + } + let data = (*a).get(b); + if size > 0 { + self.maybe_shrink(b, t); + return Some(data); + } + if self.top.compare_and_swap(t, t + 1, SeqCst) == t { + self.bottom.store(t + 1, SeqCst); + return Some(data); + } else { + self.bottom.store(t + 1, SeqCst); + cast::forget(data); // someone else stole this value + return None; + } + } + + unsafe fn steal(&mut self) -> Stolen { + let t = self.top.load(SeqCst); + let old = self.array.load(SeqCst); + let b = self.bottom.load(SeqCst); + let a = self.array.load(SeqCst); + let size = b - t; + if size <= 0 { return Empty } + if size % (*a).size() == 0 { + if a == old && t == self.top.load(SeqCst) { + return Empty + } + return Abort + } + let data = (*a).get(t); + if self.top.compare_and_swap(t, t + 1, SeqCst) == t { + Data(data) + } else { + cast::forget(data); // someone else stole this value + Abort + } + } + + unsafe fn maybe_shrink(&mut self, b: int, t: int) { + let a = self.array.load(SeqCst); + if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) { + self.swap_buffer(b, a, (*a).resize(b, t, -1)); + } + } + + // Helper routine not mentioned in the paper which is used in growing and + // shrinking buffers to swap in a new buffer into place. As a bit of a + // recap, the whole point that we need a buffer pool rather than just + // calling malloc/free directly is that stealers can continue using buffers + // after this method has called 'free' on it. The continued usage is simply + // a read followed by a forget, but we must make sure that the memory can + // continue to be read after we flag this buffer for reclamation. + unsafe fn swap_buffer(&mut self, b: int, old: *mut Buffer, + buf: Buffer) -> *mut Buffer { + let newbuf: *mut Buffer = cast::transmute(~buf); + self.array.store(newbuf, SeqCst); + let ss = (*newbuf).size(); + self.bottom.store(b + ss, SeqCst); + let t = self.top.load(SeqCst); + if self.top.compare_and_swap(t, t + ss, SeqCst) != t { + self.bottom.store(b, SeqCst); + } + self.pool.free(cast::transmute(old)); + return newbuf; + } +} + + +#[unsafe_destructor] +impl Drop for Deque { + fn drop(&mut self) { + let t = self.top.load(SeqCst); + let b = self.bottom.load(SeqCst); + let a = self.array.load(SeqCst); + // Free whatever is leftover in the dequeue, and then move the buffer + // back into the pool. + for i in range(t, b) { + let _: T = unsafe { (*a).get(i) }; + } + self.pool.free(unsafe { cast::transmute(a) }); + } +} + +impl Buffer { + unsafe fn init(log_size: int) -> Buffer { + let size = (1 << log_size) * mem::size_of::(); + let buffer = libc::malloc(size as libc::size_t); + assert!(!buffer.is_null()); + Buffer { + storage: buffer as *T, + log_size: log_size, + } + } + + fn size(&self) -> int { 1 << self.log_size } + + // Apparently LLVM cannot optimize (foo % (1 << bar)) into this implicitly + fn mask(&self) -> int { (1 << self.log_size) - 1 } + + // This does not protect against loading duplicate values of the same cell, + // nor does this clear out the contents contained within. Hence, this is a + // very unsafe method which the caller needs to treat specially in case a + // race is lost. + unsafe fn get(&self, i: int) -> T { + ptr::read_ptr(self.storage.offset(i & self.mask())) + } + + // Unsafe because this unsafely overwrites possibly uninitialized or + // initialized data. + unsafe fn put(&mut self, i: int, t: T) { + let ptr = self.storage.offset(i & self.mask()); + ptr::copy_nonoverlapping_memory(ptr as *mut T, &t as *T, 1); + cast::forget(t); + } + + // Again, unsafe because this has incredibly dubious ownership violations. + // It is assumed that this buffer is immediately dropped. + unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer { + let mut buf = Buffer::init(self.log_size + delta); + for i in range(t, b) { + buf.put(i, self.get(i)); + } + return buf; + } +} + +#[unsafe_destructor] +impl Drop for Buffer { + fn drop(&mut self) { + // It is assumed that all buffers are empty on drop. + unsafe { libc::free(self.storage as *libc::c_void) } + } +} + +#[cfg(test)] +mod tests { + use prelude::*; + use super::{Data, BufferPool, Abort, Empty, Worker, Stealer}; + + use cast; + use rt::thread::Thread; + use rand; + use rand::Rng; + use unstable::atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst, + AtomicUint, INIT_ATOMIC_UINT}; + use vec; + + #[test] + fn smoke() { + let mut pool = BufferPool::init(); + let (mut w, mut s) = pool.deque(); + assert_eq!(w.pop(), None); + assert_eq!(s.steal(), Empty); + w.push(1); + assert_eq!(w.pop(), Some(1)); + w.push(1); + assert_eq!(s.steal(), Data(1)); + w.push(1); + assert_eq!(s.clone().steal(), Data(1)); + } + + #[test] + fn stealpush() { + static AMT: int = 100000; + let mut pool = BufferPool::::init(); + let (mut w, s) = pool.deque(); + let t = do Thread::start { + let mut s = s; + let mut left = AMT; + while left > 0 { + match s.steal() { + Data(i) => { + assert_eq!(i, 1); + left -= 1; + } + Abort | Empty => {} + } + } + }; + + for _ in range(0, AMT) { + w.push(1); + } + + t.join(); + } + + #[test] + fn stealpush_large() { + static AMT: int = 100000; + let mut pool = BufferPool::<(int, int)>::init(); + let (mut w, s) = pool.deque(); + let t = do Thread::start { + let mut s = s; + let mut left = AMT; + while left > 0 { + match s.steal() { + Data((1, 10)) => { left -= 1; } + Data(..) => fail!(), + Abort | Empty => {} + } + } + }; + + for _ in range(0, AMT) { + w.push((1, 10)); + } + + t.join(); + } + + fn stampede(mut w: Worker<~int>, s: Stealer<~int>, + nthreads: int, amt: uint) { + for _ in range(0, amt) { + w.push(~20); + } + let mut remaining = AtomicUint::new(amt); + let unsafe_remaining: *mut AtomicUint = &mut remaining; + + let threads = range(0, nthreads).map(|_| { + let s = s.clone(); + do Thread::start { + unsafe { + let mut s = s; + while (*unsafe_remaining).load(SeqCst) > 0 { + match s.steal() { + Data(~20) => { + (*unsafe_remaining).fetch_sub(1, SeqCst); + } + Data(..) => fail!(), + Abort | Empty => {} + } + } + } + } + }).to_owned_vec(); + + while remaining.load(SeqCst) > 0 { + match w.pop() { + Some(~20) => { remaining.fetch_sub(1, SeqCst); } + Some(..) => fail!(), + None => {} + } + } + + for thread in threads.move_iter() { + thread.join(); + } + } + + #[test] + fn run_stampede() { + let mut pool = BufferPool::<~int>::init(); + let (w, s) = pool.deque(); + stampede(w, s, 8, 10000); + } + + #[test] + fn many_stampede() { + static AMT: uint = 4; + let mut pool = BufferPool::<~int>::init(); + let threads = range(0, AMT).map(|_| { + let (w, s) = pool.deque(); + do Thread::start { + stampede(w, s, 4, 10000); + } + }).to_owned_vec(); + + for thread in threads.move_iter() { + thread.join(); + } + } + + #[test] + fn stress() { + static AMT: int = 100000; + static NTHREADS: int = 8; + static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; + static mut HITS: AtomicUint = INIT_ATOMIC_UINT; + let mut pool = BufferPool::::init(); + let (mut w, s) = pool.deque(); + + let threads = range(0, NTHREADS).map(|_| { + let s = s.clone(); + do Thread::start { + unsafe { + let mut s = s; + loop { + match s.steal() { + Data(2) => { HITS.fetch_add(1, SeqCst); } + Data(..) => fail!(), + _ if DONE.load(SeqCst) => break, + _ => {} + } + } + } + } + }).to_owned_vec(); + + let mut rng = rand::task_rng(); + let mut expected = 0; + while expected < AMT { + if rng.gen_range(0, 3) == 2 { + match w.pop() { + None => {} + Some(2) => unsafe { HITS.fetch_add(1, SeqCst); }, + Some(_) => fail!(), + } + } else { + expected += 1; + w.push(2); + } + } + + unsafe { + while HITS.load(SeqCst) < AMT as uint { + match w.pop() { + None => {} + Some(2) => { HITS.fetch_add(1, SeqCst); }, + Some(_) => fail!(), + } + } + DONE.store(true, SeqCst); + } + + for thread in threads.move_iter() { + thread.join(); + } + + assert_eq!(unsafe { HITS.load(SeqCst) }, expected as uint); + } + + #[test] + fn no_starvation() { + static AMT: int = 10000; + static NTHREADS: int = 4; + static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; + let mut pool = BufferPool::<(int, uint)>::init(); + let (mut w, s) = pool.deque(); + + let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| { + let s = s.clone(); + let box = ~AtomicUint::new(0); + let thread_box = unsafe { + *cast::transmute::<&~AtomicUint, **mut AtomicUint>(&box) + }; + (do Thread::start { + unsafe { + let mut s = s; + loop { + match s.steal() { + Data((1, 2)) => { + (*thread_box).fetch_add(1, SeqCst); + } + Data(..) => fail!(), + _ if DONE.load(SeqCst) => break, + _ => {} + } + } + } + }, box) + })); + + let mut rng = rand::task_rng(); + let mut myhit = false; + let mut iter = 0; + 'outer: loop { + for _ in range(0, rng.gen_range(0, AMT)) { + if !myhit && rng.gen_range(0, 3) == 2 { + match w.pop() { + None => {} + Some((1, 2)) => myhit = true, + Some(_) => fail!(), + } + } else { + w.push((1, 2)); + } + } + iter += 1; + + debug!("loop iteration {}", iter); + for (i, slot) in hits.iter().enumerate() { + let amt = slot.load(SeqCst); + debug!("thread {}: {}", i, amt); + if amt == 0 { continue 'outer; } + } + if myhit { + break + } + } + + unsafe { DONE.store(true, SeqCst); } + + for thread in threads.move_iter() { + thread.join(); + } + } +} + diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 78ec32ead3c..be1de6c5bdb 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -75,7 +75,6 @@ use vec::{OwnedVector, MutableVector, ImmutableVector}; use vec; use self::thread::Thread; -use self::work_queue::WorkQueue; // the os module needs to reach into this helper, so allow general access // through this reexport. @@ -130,9 +129,6 @@ pub mod rtio; /// or task-local storage. pub mod local; -/// A parallel work-stealing deque. -pub mod work_queue; - /// A parallel queue. pub mod message_queue; @@ -142,6 +138,9 @@ mod mpsc_queue; /// A lock-free multi-producer, multi-consumer bounded queue. mod mpmc_bounded_queue; +/// A parallel work-stealing deque +pub mod deque; + /// A parallel data structure for tracking sleeping schedulers. pub mod sleeper_list; @@ -287,7 +286,9 @@ fn run_(main: proc(), use_main_sched: bool) -> int { // Create a work queue for each scheduler, ntimes. Create an extra // for the main thread if that flag is set. We won't steal from it. - let work_queues: ~[WorkQueue<~Task>] = vec::from_fn(nscheds, |_| WorkQueue::new()); + let mut pool = deque::BufferPool::init(); + let arr = vec::from_fn(nscheds, |_| pool.deque()); + let (workers, stealers) = vec::unzip(arr.move_iter()); // The schedulers. let mut scheds = ~[]; @@ -295,14 +296,14 @@ fn run_(main: proc(), use_main_sched: bool) -> int { // sent the Shutdown message to terminate the schedulers. let mut handles = ~[]; - for work_queue in work_queues.iter() { + for worker in workers.move_iter() { rtdebug!("inserting a regular scheduler"); // Every scheduler is driven by an I/O event loop. let loop_ = new_event_loop(); let mut sched = ~Scheduler::new(loop_, - work_queue.clone(), - work_queues.clone(), + worker, + stealers.clone(), sleepers.clone()); let handle = sched.make_handle(); @@ -321,12 +322,12 @@ fn run_(main: proc(), use_main_sched: bool) -> int { // This scheduler needs a queue that isn't part of the stealee // set. - let work_queue = WorkQueue::new(); + let (worker, _) = pool.deque(); let main_loop = new_event_loop(); let mut main_sched = ~Scheduler::new_special(main_loop, - work_queue, - work_queues.clone(), + worker, + stealers.clone(), sleepers.clone(), false, Some(friend_handle)); diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index d66bd1e4135..a231bea5e27 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -13,13 +13,13 @@ use cast::{transmute, transmute_mut_region, transmute_mut_unsafe}; use clone::Clone; use unstable::raw; use super::sleeper_list::SleeperList; -use super::work_queue::WorkQueue; use super::stack::{StackPool}; use super::rtio::EventLoop; use super::context::Context; use super::task::{Task, AnySched, Sched}; use super::message_queue::MessageQueue; use rt::kill::BlockedTask; +use rt::deque; use rt::local_ptr; use rt::local::Local; use rt::rtio::{RemoteCallback, PausibleIdleCallback, Callback}; @@ -39,14 +39,14 @@ use vec::{OwnedVector}; /// in too much allocation and too many events. pub struct Scheduler { /// There are N work queues, one per scheduler. - priv work_queue: WorkQueue<~Task>, + work_queue: deque::Worker<~Task>, /// Work queues for the other schedulers. These are created by /// cloning the core work queues. - work_queues: ~[WorkQueue<~Task>], + work_queues: ~[deque::Stealer<~Task>], /// The queue of incoming messages from other schedulers. /// These are enqueued by SchedHandles after which a remote callback /// is triggered to handle the message. - priv message_queue: MessageQueue, + message_queue: MessageQueue, /// A shared list of sleeping schedulers. We'll use this to wake /// up schedulers when pushing work onto the work queue. sleeper_list: SleeperList, @@ -56,33 +56,33 @@ pub struct Scheduler { /// not active since there are multiple event sources that may /// wake the scheduler. It just prevents the scheduler from pushing /// multiple handles onto the sleeper list. - priv sleepy: bool, + sleepy: bool, /// A flag to indicate we've received the shutdown message and should /// no longer try to go to sleep, but exit instead. no_sleep: bool, stack_pool: StackPool, /// The scheduler runs on a special task. When it is not running /// it is stored here instead of the work queue. - priv sched_task: Option<~Task>, + sched_task: Option<~Task>, /// An action performed after a context switch on behalf of the /// code running before the context switch - priv cleanup_job: Option, + cleanup_job: Option, /// Should this scheduler run any task, or only pinned tasks? run_anything: bool, /// If the scheduler shouldn't run some tasks, a friend to send /// them to. - priv friend_handle: Option, + friend_handle: Option, /// A fast XorShift rng for scheduler use rng: XorShiftRng, /// A toggleable idle callback - priv idle_callback: Option<~PausibleIdleCallback>, + idle_callback: Option<~PausibleIdleCallback>, /// A countdown that starts at a random value and is decremented /// every time a yield check is performed. When it hits 0 a task /// will yield. - priv yield_check_count: uint, + yield_check_count: uint, /// A flag to tell the scheduler loop it needs to do some stealing /// in order to introduce randomness as part of a yield - priv steal_for_yield: bool, + steal_for_yield: bool, // n.b. currently destructors of an object are run in top-to-bottom in order // of field declaration. Due to its nature, the pausible idle callback @@ -115,8 +115,8 @@ impl Scheduler { // * Initialization Functions pub fn new(event_loop: ~EventLoop, - work_queue: WorkQueue<~Task>, - work_queues: ~[WorkQueue<~Task>], + work_queue: deque::Worker<~Task>, + work_queues: ~[deque::Stealer<~Task>], sleeper_list: SleeperList) -> Scheduler { @@ -127,8 +127,8 @@ impl Scheduler { } pub fn new_special(event_loop: ~EventLoop, - work_queue: WorkQueue<~Task>, - work_queues: ~[WorkQueue<~Task>], + work_queue: deque::Worker<~Task>, + work_queues: ~[deque::Stealer<~Task>], sleeper_list: SleeperList, run_anything: bool, friend: Option) @@ -440,11 +440,11 @@ impl Scheduler { let start_index = self.rng.gen_range(0, len); for index in range(0, len).map(|i| (i + start_index) % len) { match work_queues[index].steal() { - Some(task) => { + deque::Data(task) => { rtdebug!("found task by stealing"); return Some(task) } - None => () + _ => () } }; rtdebug!("giving up on stealing"); @@ -889,6 +889,7 @@ mod test { use borrow::to_uint; use rt::sched::{Scheduler}; use cell::Cell; + use rt::deque::BufferPool; use rt::thread::Thread; use rt::task::{Task, Sched}; use rt::basic; @@ -994,7 +995,6 @@ mod test { #[test] fn test_schedule_home_states() { use rt::sleeper_list::SleeperList; - use rt::work_queue::WorkQueue; use rt::sched::Shutdown; use borrow; use rt::comm::*; @@ -1002,14 +1002,15 @@ mod test { do run_in_bare_thread { let sleepers = SleeperList::new(); - let normal_queue = WorkQueue::new(); - let special_queue = WorkQueue::new(); - let queues = ~[normal_queue.clone(), special_queue.clone()]; + let mut pool = BufferPool::init(); + let (normal_worker, normal_stealer) = pool.deque(); + let (special_worker, special_stealer) = pool.deque(); + let queues = ~[normal_stealer, special_stealer]; // Our normal scheduler let mut normal_sched = ~Scheduler::new( basic::event_loop(), - normal_queue, + normal_worker, queues.clone(), sleepers.clone()); @@ -1020,7 +1021,7 @@ mod test { // Our special scheduler let mut special_sched = ~Scheduler::new_special( basic::event_loop(), - special_queue.clone(), + special_worker, queues.clone(), sleepers.clone(), false, @@ -1169,7 +1170,6 @@ mod test { // Used to deadlock because Shutdown was never recvd. #[test] fn no_missed_messages() { - use rt::work_queue::WorkQueue; use rt::sleeper_list::SleeperList; use rt::stack::StackPool; use rt::sched::{Shutdown, TaskFromFriend}; @@ -1178,13 +1178,13 @@ mod test { do run_in_bare_thread { stress_factor().times(|| { let sleepers = SleeperList::new(); - let queue = WorkQueue::new(); - let queues = ~[queue.clone()]; + let mut pool = BufferPool::init(); + let (worker, stealer) = pool.deque(); let mut sched = ~Scheduler::new( basic::event_loop(), - queue, - queues.clone(), + worker, + ~[stealer], sleepers.clone()); let mut handle = sched.make_handle(); diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index 2a6d30cf810..51ad37a2583 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -23,24 +23,25 @@ use rand; use result::{Result, Ok, Err}; use rt::basic; use rt::comm::oneshot; +use rt::deque::BufferPool; use rt::new_event_loop; use rt::sched::Scheduler; use rt::sleeper_list::SleeperList; use rt::task::Task; use rt::task::UnwindResult; use rt::thread::Thread; -use rt::work_queue::WorkQueue; use unstable::{run_in_bare_thread}; +use vec; use vec::{OwnedVector, MutableVector, ImmutableVector}; pub fn new_test_uv_sched() -> Scheduler { - let queue = WorkQueue::new(); - let queues = ~[queue.clone()]; + let mut pool = BufferPool::init(); + let (worker, stealer) = pool.deque(); let mut sched = Scheduler::new(new_event_loop(), - queue, - queues, + worker, + ~[stealer], SleeperList::new()); // Don't wait for the Shutdown message @@ -50,13 +51,12 @@ pub fn new_test_uv_sched() -> Scheduler { } pub fn new_test_sched() -> Scheduler { - - let queue = WorkQueue::new(); - let queues = ~[queue.clone()]; + let mut pool = BufferPool::init(); + let (worker, stealer) = pool.deque(); let mut sched = Scheduler::new(basic::event_loop(), - queue, - queues, + worker, + ~[stealer], SleeperList::new()); // Don't wait for the Shutdown message @@ -227,18 +227,16 @@ pub fn run_in_mt_newsched_task(f: proc()) { let mut handles = ~[]; let mut scheds = ~[]; - let mut work_queues = ~[]; - for _ in range(0u, nthreads) { - let work_queue = WorkQueue::new(); - work_queues.push(work_queue); - } + let mut pool = BufferPool::<~Task>::init(); + let workers = range(0, nthreads).map(|_| pool.deque()); + let (workers, stealers) = vec::unzip(workers); - for i in range(0u, nthreads) { + for worker in workers.move_iter() { let loop_ = new_event_loop(); let mut sched = ~Scheduler::new(loop_, - work_queues[i].clone(), - work_queues.clone(), + worker, + stealers.clone(), sleepers.clone()); let handle = sched.make_handle(); diff --git a/src/libstd/rt/work_queue.rs b/src/libstd/rt/work_queue.rs deleted file mode 100644 index 02ea8ab4f50..00000000000 --- a/src/libstd/rt/work_queue.rs +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use container::Container; -use option::*; -use vec::OwnedVector; -use unstable::sync::Exclusive; -use cell::Cell; -use kinds::Send; -use clone::Clone; - -pub struct WorkQueue { - // XXX: Another mystery bug fixed by boxing this lock - priv queue: ~Exclusive<~[T]> -} - -impl WorkQueue { - pub fn new() -> WorkQueue { - WorkQueue { - queue: ~Exclusive::new(~[]) - } - } - - pub fn push(&mut self, value: T) { - unsafe { - let value = Cell::new(value); - self.queue.with(|q| q.unshift(value.take()) ); - } - } - - pub fn pop(&mut self) -> Option { - unsafe { - self.queue.with(|q| { - if !q.is_empty() { - Some(q.shift()) - } else { - None - } - }) - } - } - - pub fn steal(&mut self) -> Option { - unsafe { - self.queue.with(|q| { - if !q.is_empty() { - Some(q.pop()) - } else { - None - } - }) - } - } - - pub fn is_empty(&self) -> bool { - unsafe { - self.queue.with_imm(|q| q.is_empty() ) - } - } -} - -impl Clone for WorkQueue { - fn clone(&self) -> WorkQueue { - WorkQueue { - queue: self.queue.clone() - } - } -} diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 198fe596a89..153b3e4ce25 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -84,7 +84,6 @@ use rt::sched::{Scheduler, Shutdown, TaskFromFriend}; use rt::task::{Task, Sched}; use rt::task::UnwindResult; use rt::thread::Thread; -use rt::work_queue::WorkQueue; use rt::{in_green_task_context, new_event_loop}; use task::SingleThreaded; use task::TaskOpts; @@ -111,11 +110,11 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) { // Since this is a 1:1 scheduler we create a queue not in // the stealee set. The run_anything flag is set false // which will disable stealing. - let work_queue = WorkQueue::new(); + let (worker, _stealer) = (*sched).work_queue.pool().deque(); // Create a new scheduler to hold the new task let mut new_sched = ~Scheduler::new_special(new_event_loop(), - work_queue, + worker, (*sched).work_queues.clone(), (*sched).sleeper_list.clone(), false,