diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 0cf223f3029..33b4b307af8 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -225,9 +225,10 @@ impl Select for PortOne { fn optimistic_check(&mut self) -> bool { // The optimistic check is never necessary for correctness. For testing // purposes, making it randomly return false simulates a racing sender. - use rand::{Rand, rng}; - let mut rng = rng(); - let actually_check = Rand::rand(&mut rng); + use rand::{Rand}; + let actually_check = do Local::borrow:: |sched| { + Rand::rand(&mut sched.rng) + }; if actually_check { unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE } } else { diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 147c75e5c41..01a52892f63 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -63,8 +63,7 @@ Several modules in `core` are clients of `rt`: use cell::Cell; use clone::Clone; use container::Container; -use iter::Times; -use iterator::{Iterator, IteratorUtil}; +use iterator::{Iterator, IteratorUtil, range}; use option::{Some, None}; use ptr::RawPtr; use rt::local::Local; @@ -247,11 +246,16 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { let main = Cell::new(main); - // The shared list of sleeping schedulers. Schedulers wake each other - // occassionally to do new work. + // The shared list of sleeping schedulers. let sleepers = SleeperList::new(); - // The shared work queue. Temporary until work stealing is implemented. - let work_queue = WorkQueue::new(); + + // Create a work queue for each scheduler, ntimes. Create an extra + // for the main thread if that flag is set. We won't steal from it. + let mut work_queues = ~[]; + for _ in range(0u, nscheds) { + let work_queue: WorkQueue<~Task> = WorkQueue::new(); + work_queues.push(work_queue); + } // The schedulers. let mut scheds = ~[]; @@ -259,12 +263,15 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { // sent the Shutdown message to terminate the schedulers. let mut handles = ~[]; - do nscheds.times { + for i in range(0u, nscheds) { rtdebug!("inserting a regular scheduler"); // Every scheduler is driven by an I/O event loop. let loop_ = ~UvEventLoop::new(); - let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone()); + let mut sched = ~Scheduler::new(loop_, + work_queues[i].clone(), + work_queues.clone(), + sleepers.clone()); let handle = sched.make_handle(); scheds.push(sched); @@ -280,9 +287,14 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { let friend_handle = friend_sched.make_handle(); scheds.push(friend_sched); + // This scheduler needs a queue that isn't part of the stealee + // set. + let work_queue = WorkQueue::new(); + let main_loop = ~UvEventLoop::new(); let mut main_sched = ~Scheduler::new_special(main_loop, - work_queue.clone(), + work_queue, + work_queues.clone(), sleepers.clone(), false, Some(friend_handle)); @@ -371,7 +383,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool, None, home, main.take()); main_task.death.on_exit = Some(on_exit.take()); - rtdebug!("boostrapping main_task"); + rtdebug!("bootstrapping main_task"); main_sched.bootstrap(main_task); } diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 990e1a4a3de..ce4e64c47d2 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -13,7 +13,6 @@ use option::{Option, Some, None}; use cast::{transmute, transmute_mut_region, transmute_mut_unsafe}; use clone::Clone; use unstable::raw; - use super::sleeper_list::SleeperList; use super::work_queue::WorkQueue; use super::stack::{StackPool}; @@ -28,6 +27,9 @@ use rt::rtio::RemoteCallback; use rt::metrics::SchedMetrics; use borrow::{to_uint}; use cell::Cell; +use rand::{XorShiftRng, RngUtil}; +use iterator::{range}; +use vec::{OwnedVector}; /// The Scheduler is responsible for coordinating execution of Coroutines /// on a single thread. When the scheduler is running it is owned by @@ -37,9 +39,11 @@ use cell::Cell; /// XXX: This creates too many callbacks to run_sched_once, resulting /// in too much allocation and too many events. pub struct Scheduler { - /// A queue of available work. Under a work-stealing policy there - /// is one per Scheduler. - work_queue: WorkQueue<~Task>, + /// There are N work queues, one per scheduler. + priv work_queue: WorkQueue<~Task>, + /// Work queues for the other schedulers. These are created by + /// cloning the core work queues. + work_queues: ~[WorkQueue<~Task>], /// The queue of incoming messages from other schedulers. /// These are enqueued by SchedHandles after which a remote callback /// is triggered to handle the message. @@ -70,7 +74,10 @@ pub struct Scheduler { run_anything: bool, /// If the scheduler shouldn't run some tasks, a friend to send /// them to. - friend_handle: Option + friend_handle: Option, + /// A fast XorShift rng for scheduler use + rng: XorShiftRng + } pub struct SchedHandle { @@ -97,10 +104,13 @@ impl Scheduler { pub fn new(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Task>, + work_queues: ~[WorkQueue<~Task>], sleeper_list: SleeperList) -> Scheduler { - Scheduler::new_special(event_loop, work_queue, sleeper_list, true, None) + Scheduler::new_special(event_loop, work_queue, + work_queues, + sleeper_list, true, None) } @@ -108,6 +118,7 @@ impl Scheduler { // task field is None. pub fn new_special(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Task>, + work_queues: ~[WorkQueue<~Task>], sleeper_list: SleeperList, run_anything: bool, friend: Option) @@ -120,12 +131,14 @@ impl Scheduler { no_sleep: false, event_loop: event_loop, work_queue: work_queue, + work_queues: work_queues, stack_pool: StackPool::new(), sched_task: None, cleanup_job: None, metrics: SchedMetrics::new(), run_anything: run_anything, - friend_handle: friend + friend_handle: friend, + rng: XorShiftRng::new() } } @@ -248,7 +261,7 @@ impl Scheduler { // Second activity is to try resuming a task from the queue. - let result = sched.resume_task_from_queue(); + let result = sched.do_work(); let mut sched = match result { Some(sched) => { // Failed to dequeue a task, so we return. @@ -415,47 +428,98 @@ impl Scheduler { } } - // Resume a task from the queue - but also take into account that - // it might not belong here. + // Workstealing: In this iteration of the runtime each scheduler + // thread has a distinct work queue. When no work is available + // locally, make a few attempts to steal work from the queues of + // other scheduler threads. If a few steals fail we end up in the + // old "no work" path which is fine. - // If we perform a scheduler action we give away the scheduler ~ - // pointer, if it is still available we return it. - - fn resume_task_from_queue(~self) -> Option<~Scheduler> { - - let mut this = self; - - match this.work_queue.pop() { + // First step in the process is to find a task. This function does + // that by first checking the local queue, and if there is no work + // there, trying to steal from the remote work queues. + fn find_work(&mut self) -> Option<~Task> { + rtdebug!("scheduler looking for work"); + match self.work_queue.pop() { Some(task) => { - let mut task = task; - let home = task.take_unwrap_home(); - match home { - Sched(home_handle) => { - if home_handle.sched_id != this.sched_id() { - task.give_home(Sched(home_handle)); - Scheduler::send_task_home(task); - return Some(this); - } else { - this.event_loop.callback(Scheduler::run_sched_once); - task.give_home(Sched(home_handle)); - this.resume_task_immediately(task); - return None; - } - } - AnySched if this.run_anything => { - this.event_loop.callback(Scheduler::run_sched_once); - task.give_home(AnySched); - this.resume_task_immediately(task); - return None; - } - AnySched => { - task.give_home(AnySched); - this.send_to_friend(task); - return Some(this); - } - } + rtdebug!("found a task locally"); + return Some(task) } None => { + // Our naive stealing, try kinda hard. + rtdebug!("scheduler trying to steal"); + let _len = self.work_queues.len(); + return self.try_steals(2); + } + } + } + + // With no backoff try stealing n times from the queues the + // scheduler knows about. This naive implementation can steal from + // our own queue or from other special schedulers. + fn try_steals(&mut self, n: uint) -> Option<~Task> { + for _ in range(0, n) { + let index = self.rng.gen_uint_range(0, self.work_queues.len()); + let work_queues = &mut self.work_queues; + match work_queues[index].steal() { + Some(task) => { + rtdebug!("found task by stealing"); return Some(task) + } + None => () + } + }; + rtdebug!("giving up on stealing"); + return None; + } + + // Given a task, execute it correctly. + fn process_task(~self, task: ~Task) -> Option<~Scheduler> { + let mut this = self; + let mut task = task; + + rtdebug!("processing a task"); + + let home = task.take_unwrap_home(); + match home { + Sched(home_handle) => { + if home_handle.sched_id != this.sched_id() { + rtdebug!("sending task home"); + task.give_home(Sched(home_handle)); + Scheduler::send_task_home(task); + return Some(this); + } else { + rtdebug!("running task here"); + task.give_home(Sched(home_handle)); + this.resume_task_immediately(task); + return None; + } + } + AnySched if this.run_anything => { + rtdebug!("running anysched task here"); + task.give_home(AnySched); + this.resume_task_immediately(task); + return None; + } + AnySched => { + rtdebug!("sending task to friend"); + task.give_home(AnySched); + this.send_to_friend(task); + return Some(this); + } + } + } + + // Bundle the helpers together. + fn do_work(~self) -> Option<~Scheduler> { + let mut this = self; + + rtdebug!("scheduler calling do work"); + match this.find_work() { + Some(task) => { + rtdebug!("found some work! processing the task"); + return this.process_task(task); + } + None => { + rtdebug!("no work was found, returning the scheduler struct"); return Some(this); } } @@ -711,7 +775,6 @@ impl Scheduler { GiveTask(task, f) => f.to_fn()(self, task) } } - } // The cases for the below function. @@ -745,6 +808,8 @@ impl ClosureConverter for UnsafeTaskReceiver { #[cfg(test)] mod test { + extern mod extra; + use prelude::*; use rt::test::*; use unstable::run_in_bare_thread; @@ -862,12 +927,15 @@ mod test { do run_in_bare_thread { let sleepers = SleeperList::new(); - let work_queue = WorkQueue::new(); + let normal_queue = WorkQueue::new(); + let special_queue = WorkQueue::new(); + let queues = ~[normal_queue.clone(), special_queue.clone()]; // Our normal scheduler let mut normal_sched = ~Scheduler::new( ~UvEventLoop::new(), - work_queue.clone(), + normal_queue, + queues.clone(), sleepers.clone()); let normal_handle = Cell::new(normal_sched.make_handle()); @@ -877,7 +945,8 @@ mod test { // Our special scheduler let mut special_sched = ~Scheduler::new_special( ~UvEventLoop::new(), - work_queue.clone(), + special_queue.clone(), + queues.clone(), sleepers.clone(), false, Some(friend_handle)); diff --git a/src/libstd/rt/select.rs b/src/libstd/rt/select.rs index 006b777b71b..07f8ca77d9d 100644 --- a/src/libstd/rt/select.rs +++ b/src/libstd/rt/select.rs @@ -182,6 +182,7 @@ mod test { fn select_stream() { use util; use comm::GenericChan; + use iter::Times; // Sends 10 buffered packets, and uses select to retrieve them all. // Puts the port in a different spot in the vector each time. @@ -265,6 +266,7 @@ mod test { fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) { use rt::test::spawntask_random; + use iter::Times; do run_in_newsched_task { // A bit of stress, since ordinarily this is just smoke and mirrors. diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index 792ea5eb33f..92366d5187f 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -15,8 +15,8 @@ use cell::Cell; use clone::Clone; use container::Container; use iterator::{Iterator, range}; -use vec::{OwnedVector, MutableVector}; use super::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr}; +use vec::{OwnedVector, MutableVector, ImmutableVector}; use rt::sched::Scheduler; use unstable::run_in_bare_thread; use rt::thread::Thread; @@ -29,8 +29,12 @@ use result::{Result, Ok, Err}; pub fn new_test_uv_sched() -> Scheduler { + let queue = WorkQueue::new(); + let queues = ~[queue.clone()]; + let mut sched = Scheduler::new(~UvEventLoop::new(), - WorkQueue::new(), + queue, + queues, SleeperList::new()); // Don't wait for the Shutdown message @@ -164,15 +168,21 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { }; let sleepers = SleeperList::new(); - let work_queue = WorkQueue::new(); let mut handles = ~[]; let mut scheds = ~[]; + let mut work_queues = ~[]; for _ in range(0u, nthreads) { + let work_queue = WorkQueue::new(); + work_queues.push(work_queue); + } + + for i in range(0u, nthreads) { let loop_ = ~UvEventLoop::new(); let mut sched = ~Scheduler::new(loop_, - work_queue.clone(), + work_queues[i].clone(), + work_queues.clone(), sleepers.clone()); let handle = sched.make_handle(); diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 2d0a2d98e9f..05a17f8539c 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -98,6 +98,7 @@ use rt::kill::KillHandle; use rt::sched::Scheduler; use rt::uv::uvio::UvEventLoop; use rt::thread::Thread; +use rt::work_queue::WorkQueue; #[cfg(test)] use task::default_task_opts; #[cfg(test)] use comm; @@ -722,10 +723,16 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { let sched = Local::unsafe_borrow::(); let sched_handle = (*sched).make_handle(); + // Since this is a 1:1 scheduler we create a queue not in + // the stealee set. The run_anything flag is set false + // which will disable stealing. + let work_queue = WorkQueue::new(); + // Create a new scheduler to hold the new task let new_loop = ~UvEventLoop::new(); let mut new_sched = ~Scheduler::new_special(new_loop, - (*sched).work_queue.clone(), + work_queue, + (*sched).work_queues.clone(), (*sched).sleeper_list.clone(), false, Some(sched_handle)); diff --git a/src/test/bench/rt-messaging-ping-pong.rs b/src/test/bench/rt-messaging-ping-pong.rs new file mode 100644 index 00000000000..3d38d61bc2e --- /dev/null +++ b/src/test/bench/rt-messaging-ping-pong.rs @@ -0,0 +1,82 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +extern mod extra; + +use std::task::spawn; +use std::os; +use std::uint; +use std::rt::test::spawntask_later; +use std::cell::Cell; + +// This is a simple bench that creates M pairs of of tasks. These +// tasks ping-pong back and forth over a pair of streams. This is a +// cannonical message-passing benchmark as it heavily strains message +// passing and almost nothing else. + +fn ping_pong_bench(n: uint, m: uint) { + + // Create pairs of tasks that pingpong back and forth. + fn run_pair(n: uint) { + // Create a stream A->B + let (pa,ca) = stream::<()>(); + // Create a stream B->A + let (pb,cb) = stream::<()>(); + + let pa = Cell::new(pa); + let ca = Cell::new(ca); + let pb = Cell::new(pb); + let cb = Cell::new(cb); + + do spawntask_later() || { + let chan = ca.take(); + let port = pb.take(); + do n.times { + chan.send(()); + port.recv(); + } + } + + do spawntask_later() || { + let chan = cb.take(); + let port = pa.take(); + do n.times { + port.recv(); + chan.send(()); + } + } + } + + do m.times { + run_pair(n) + } + +} + + + +fn main() { + + let args = os::args(); + let n = if args.len() == 3 { + uint::from_str(args[1]).unwrap() + } else { + 10000 + }; + + let m = if args.len() == 3 { + uint::from_str(args[2]).unwrap() + } else { + 4 + }; + + ping_pong_bench(n, m); + +} diff --git a/src/test/bench/rt-parfib.rs b/src/test/bench/rt-parfib.rs new file mode 100644 index 00000000000..6669342f511 --- /dev/null +++ b/src/test/bench/rt-parfib.rs @@ -0,0 +1,49 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +extern mod extra; + +use std::task::spawn; +use std::os; +use std::uint; +use std::rt::test::spawntask_later; +use std::cell::Cell; +use std::comm::*; + +// A simple implementation of parfib. One subtree is found in a new +// task and communicated over a oneshot pipe, the other is found +// locally. There is no sequential-mode threshold. + +fn parfib(n: uint) -> uint { + if(n == 0 || n == 1) { + return 1; + } + + let (port,chan) = oneshot::(); + let chan = Cell::new(chan); + do spawntask_later { + chan.take().send(parfib(n-1)); + }; + let m2 = parfib(n-2); + return (port.recv() + m2); +} + +fn main() { + + let args = os::args(); + let n = if args.len() == 2 { + uint::from_str(args[1]).unwrap() + } else { + 10 + }; + + parfib(n); + +} diff --git a/src/test/bench/rt-spawn-rate.rs b/src/test/bench/rt-spawn-rate.rs new file mode 100644 index 00000000000..ff578ed70b9 --- /dev/null +++ b/src/test/bench/rt-spawn-rate.rs @@ -0,0 +1,33 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +extern mod extra; + +use std::task::spawn; +use std::os; +use std::uint; + +// Very simple spawn rate test. Spawn N tasks that do nothing and +// return. + +fn main() { + + let args = os::args(); + let n = if args.len() == 2 { + uint::from_str(args[1]).unwrap() + } else { + 100000 + }; + + do n.times { + do spawn || {}; + } + +}