std::rt: Reduce MessageQueue contention
It's not a huge win but it does reduce the amount of time spent contesting the message queue when the schedulers are under load
This commit is contained in:
parent
5d04234868
commit
0fff8b6549
@ -16,32 +16,66 @@ use kinds::Send;
|
||||
use vec::OwnedVector;
|
||||
use cell::Cell;
|
||||
use option::*;
|
||||
use unstable::sync::Exclusive;
|
||||
use unstable::sync::{UnsafeAtomicRcBox, LittleLock};
|
||||
use clone::Clone;
|
||||
|
||||
pub struct MessageQueue<T> {
|
||||
priv queue: Exclusive<~[T]>
|
||||
priv state: UnsafeAtomicRcBox<State<T>>
|
||||
}
|
||||
|
||||
struct State<T> {
|
||||
count: uint,
|
||||
queue: ~[T],
|
||||
lock: LittleLock
|
||||
}
|
||||
|
||||
impl<T: Send> MessageQueue<T> {
|
||||
pub fn new() -> MessageQueue<T> {
|
||||
MessageQueue {
|
||||
queue: Exclusive::new(~[])
|
||||
state: UnsafeAtomicRcBox::new(State {
|
||||
count: 0,
|
||||
queue: ~[],
|
||||
lock: LittleLock::new()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push(&mut self, value: T) {
|
||||
unsafe {
|
||||
let value = Cell::new(value);
|
||||
self.queue.with(|q| q.push(value.take()) );
|
||||
let state = self.state.get();
|
||||
do (*state).lock.lock {
|
||||
(*state).count += 1;
|
||||
(*state).queue.push(value.take());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn pop(&mut self) -> Option<T> {
|
||||
unsafe {
|
||||
do self.queue.with |q| {
|
||||
if !q.is_empty() {
|
||||
Some(q.shift())
|
||||
let state = self.state.get();
|
||||
do (*state).lock.lock {
|
||||
if !(*state).queue.is_empty() {
|
||||
(*state).count += 1;
|
||||
Some((*state).queue.shift())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A pop that may sometimes miss enqueued elements, but is much faster
|
||||
/// to give up without doing any synchronization
|
||||
pub fn casual_pop(&mut self) -> Option<T> {
|
||||
unsafe {
|
||||
let state = self.state.get();
|
||||
// NB: Unsynchronized check
|
||||
if (*state).count == 0 { return None; }
|
||||
do (*state).lock.lock {
|
||||
if !(*state).queue.is_empty() {
|
||||
(*state).count += 1;
|
||||
Some((*state).queue.shift())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
@ -53,7 +87,7 @@ impl<T: Send> MessageQueue<T> {
|
||||
impl<T: Send> Clone for MessageQueue<T> {
|
||||
fn clone(&self) -> MessageQueue<T> {
|
||||
MessageQueue {
|
||||
queue: self.queue.clone()
|
||||
state: self.state.clone()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -83,6 +83,14 @@ pub struct Scheduler {
|
||||
idle_callback: Option<~PausibleIdleCallback>
|
||||
}
|
||||
|
||||
/// An indication of how hard to work on a given operation, the difference
|
||||
/// mainly being whether memory is synchronized or not
|
||||
#[deriving(Eq)]
|
||||
enum EffortLevel {
|
||||
DontTryTooHard,
|
||||
GiveItYourBest
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
|
||||
// * Initialization Functions
|
||||
@ -237,14 +245,21 @@ impl Scheduler {
|
||||
|
||||
// First we check for scheduler messages, these are higher
|
||||
// priority than regular tasks.
|
||||
let sched = match sched.interpret_message_queue() {
|
||||
let sched = match sched.interpret_message_queue(DontTryTooHard) {
|
||||
Some(sched) => sched,
|
||||
None => return
|
||||
};
|
||||
|
||||
// This helper will use a randomized work-stealing algorithm
|
||||
// to find work.
|
||||
let mut sched = match sched.do_work() {
|
||||
let sched = match sched.do_work() {
|
||||
Some(sched) => sched,
|
||||
None => return
|
||||
};
|
||||
|
||||
// Now, before sleeping we need to find out if there really
|
||||
// were any messages. Give it your best!
|
||||
let mut sched = match sched.interpret_message_queue(GiveItYourBest) {
|
||||
Some(sched) => sched,
|
||||
None => return
|
||||
};
|
||||
@ -277,10 +292,18 @@ impl Scheduler {
|
||||
// returns the still-available scheduler. At this point all
|
||||
// message-handling will count as a turn of work, and as a result
|
||||
// return None.
|
||||
fn interpret_message_queue(~self) -> Option<~Scheduler> {
|
||||
fn interpret_message_queue(~self, effort: EffortLevel) -> Option<~Scheduler> {
|
||||
|
||||
let mut this = self;
|
||||
match this.message_queue.pop() {
|
||||
|
||||
let msg = if effort == DontTryTooHard {
|
||||
// Do a cheap check that may miss messages
|
||||
this.message_queue.casual_pop()
|
||||
} else {
|
||||
this.message_queue.pop()
|
||||
};
|
||||
|
||||
match msg {
|
||||
Some(PinnedTask(task)) => {
|
||||
let mut task = task;
|
||||
task.give_home(Sched(this.make_handle()));
|
||||
|
Loading…
Reference in New Issue
Block a user