From c62d604531456e96de506b835207223136361dc2 Mon Sep 17 00:00:00 2001 From: Jason Toffaletti Date: Thu, 3 Oct 2013 19:23:47 -0700 Subject: [PATCH 01/11] lock-free queue for scheduler message queue --- src/libstd/rt/mod.rs | 3 + src/libstd/rt/mpsc_queue.rs | 203 ++++++++++++++++++++++++++++++++++++ src/libstd/rt/sched.rs | 8 +- 3 files changed, 210 insertions(+), 4 deletions(-) create mode 100644 src/libstd/rt/mpsc_queue.rs diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 5113c28aa08..771b15588d0 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -136,6 +136,9 @@ mod work_queue; /// A parallel queue. mod message_queue; +/// A mostly lock-free multi-producer, single consumer queue. +mod mpsc_queue; + /// A parallel data structure for tracking sleeping schedulers. mod sleeper_list; diff --git a/src/libstd/rt/mpsc_queue.rs b/src/libstd/rt/mpsc_queue.rs new file mode 100644 index 00000000000..57b7d4f469b --- /dev/null +++ b/src/libstd/rt/mpsc_queue.rs @@ -0,0 +1,203 @@ +/* Multi-producer/single-consumer queue + * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are + * those of the authors and should not be interpreted as representing official + * policies, either expressed or implied, of Dmitry Vyukov. + */ + +//! A mostly lock-free multi-producer, single consumer queue. + +use unstable::sync::UnsafeArc; +use unstable::atomics::{AtomicPtr,Relaxed,Release,Acquire}; +use ptr::{mut_null, to_mut_unsafe_ptr}; +use cast; +use option::*; +use clone::Clone; +use default::Default; +use kinds::Send; +use fmt; + +struct Node { + next: AtomicPtr>, + value: Option, +} + +impl Node { + fn new(value: T) -> Node { + Node{next: AtomicPtr::new(mut_null()), value: Some(value)} + } +} + +impl Default for Node { + fn default() -> Node { + Node{next: AtomicPtr::new(mut_null()), value: None} + } +} + +struct State { + stub: Node, + head: AtomicPtr>, + tail: *mut Node, +} + +struct Queue { + priv state: UnsafeArc>, +} + +impl Clone for Queue { + fn clone(&self) -> Queue { + Queue { + state: self.state.clone() + } + } +} + +impl fmt::Default for Queue { + fn fmt(value: &Queue, f: &mut fmt::Formatter) { + write!(f.buf, "Queue({})", value.state.get()); + } +} + +impl Queue { + pub fn new() -> Queue { + let mut q = Queue{state: UnsafeArc::new(State { + stub: Default::default(), + head: AtomicPtr::new(mut_null()), + tail: mut_null(), + })}; + let stub = q.get_stub_unsafe(); + q.get_head().store(stub, Relaxed); + q.set_tail(stub); + q + } + + pub fn push(&mut self, value: T) { + unsafe { + let node = cast::transmute(~Node::new(value)); + self.push_node(node); + } + } + + fn push_node(&mut self, node: *mut Node) { + unsafe { + (*node).next.store(mut_null(), Release); + let prev = (*self.state.get()).head.swap(node, Relaxed); + (*prev).next.store(node, Release); + } + } + + fn get_stub_unsafe(&mut self) -> *mut Node { + unsafe { to_mut_unsafe_ptr(&mut (*self.state.get()).stub) } + } + + fn get_head(&mut self) -> &mut AtomicPtr> { + unsafe { &mut (*self.state.get()).head } + } + + fn get_tail(&mut self) -> *mut Node { + unsafe { (*self.state.get()).tail } + } + + fn set_tail(&mut self, tail: *mut Node) { + unsafe { (*self.state.get()).tail = tail } + } + + pub fn casual_pop(&mut self) -> Option { + self.pop() + } + + pub fn pop(&mut self) -> Option { + unsafe { + let mut tail = self.get_tail(); + let mut next = (*tail).next.load(Acquire); + let stub = self.get_stub_unsafe(); + if tail == stub { + if mut_null() == next { + return None + } + self.set_tail(next); + tail = next; + next = (*next).next.load(Acquire); + } + if next != mut_null() { + let tail: ~Node = cast::transmute(tail); + self.set_tail(next); + return tail.value + } + let head = self.get_head().load(Relaxed); + if tail != head { + return None + } + self.push_node(stub); + next = (*tail).next.load(Acquire); + if next != mut_null() { + let tail: ~Node = cast::transmute(tail); + self.set_tail(next); + return tail.value + } + } + None + } +} + +#[cfg(test)] +mod tests { + use prelude::*; + use option::*; + use task; + use comm; + use fmt; + use super::Queue; + + #[test] + fn test() { + let nthreads = 8u; + let nmsgs = 1000u; + let mut q = Queue::new(); + assert_eq!(None, q.pop()); + + for _ in range(0, nthreads) { + let (port, chan) = comm::stream(); + chan.send(q.clone()); + do task::spawn_sched(task::SingleThreaded) { + let mut q = port.recv(); + for i in range(0, nmsgs) { + q.push(i); + } + } + } + + let mut i = 0u; + loop { + match q.pop() { + None => {}, + Some(_) => { + i += 1; + if i == nthreads*nmsgs { break } + } + } + } + } +} + diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index b008a8a74f2..e739eed32fe 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -19,7 +19,7 @@ use super::stack::{StackPool}; use super::rtio::EventLoop; use super::context::Context; use super::task::{Task, AnySched, Sched}; -use super::message_queue::MessageQueue; +use super::mpsc_queue::Queue; use rt::kill::BlockedTask; use rt::local_ptr; use rt::local::Local; @@ -47,7 +47,7 @@ pub struct Scheduler { /// The queue of incoming messages from other schedulers. /// These are enqueued by SchedHandles after which a remote callback /// is triggered to handle the message. - priv message_queue: MessageQueue, + priv message_queue: Queue, /// A shared list of sleeping schedulers. We'll use this to wake /// up schedulers when pushing work onto the work queue. sleeper_list: SleeperList, @@ -137,7 +137,7 @@ impl Scheduler { let mut sched = Scheduler { sleeper_list: sleeper_list, - message_queue: MessageQueue::new(), + message_queue: Queue::new(), sleepy: false, no_sleep: false, event_loop: event_loop, @@ -802,7 +802,7 @@ pub enum SchedMessage { pub struct SchedHandle { priv remote: ~RemoteCallback, - priv queue: MessageQueue, + priv queue: Queue, sched_id: uint } From bf0e6eb346665d779ad012f7def9b4948c5c6b26 Mon Sep 17 00:00:00 2001 From: Jason Toffaletti Date: Fri, 4 Oct 2013 01:25:55 -0700 Subject: [PATCH 02/11] add cache line padding --- src/libstd/rt/mpsc_queue.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/libstd/rt/mpsc_queue.rs b/src/libstd/rt/mpsc_queue.rs index 57b7d4f469b..09b148940f8 100644 --- a/src/libstd/rt/mpsc_queue.rs +++ b/src/libstd/rt/mpsc_queue.rs @@ -56,9 +56,13 @@ impl Default for Node { } struct State { - stub: Node, + pad0: [u8, ..64], head: AtomicPtr>, + pad1: [u8, ..64], + stub: Node, + pad2: [u8, ..64], tail: *mut Node, + pad3: [u8, ..64], } struct Queue { @@ -82,9 +86,13 @@ impl fmt::Default for Queue { impl Queue { pub fn new() -> Queue { let mut q = Queue{state: UnsafeArc::new(State { - stub: Default::default(), + pad0: [0, ..64], head: AtomicPtr::new(mut_null()), + pad1: [0, ..64], + stub: Default::default(), + pad2: [0, ..64], tail: mut_null(), + pad3: [0, ..64], })}; let stub = q.get_stub_unsafe(); q.get_head().store(stub, Relaxed); @@ -102,7 +110,7 @@ impl Queue { fn push_node(&mut self, node: *mut Node) { unsafe { (*node).next.store(mut_null(), Release); - let prev = (*self.state.get()).head.swap(node, Relaxed); + let prev = self.get_head().swap(node, Relaxed); (*prev).next.store(node, Release); } } @@ -167,7 +175,6 @@ mod tests { use option::*; use task; use comm; - use fmt; use super::Queue; #[test] From 5876e21225f0cf34e8caa40b18db56fa716e8c92 Mon Sep 17 00:00:00 2001 From: Jason Toffaletti Date: Mon, 7 Oct 2013 00:07:04 -0700 Subject: [PATCH 03/11] add multi-producer multi-consumer bounded queue to use for sleeper list --- src/libstd/rt/mod.rs | 3 + src/libstd/rt/mpmc_bounded_queue.rs | 199 ++++++++++++++++++++++++++++ src/libstd/rt/sleeper_list.rs | 65 ++------- 3 files changed, 211 insertions(+), 56 deletions(-) create mode 100644 src/libstd/rt/mpmc_bounded_queue.rs diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 771b15588d0..d87580c83bf 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -139,6 +139,9 @@ mod message_queue; /// A mostly lock-free multi-producer, single consumer queue. mod mpsc_queue; +/// A lock-free multi-producer, multi-consumer bounded queue. +mod mpmc_bounded_queue; + /// A parallel data structure for tracking sleeping schedulers. mod sleeper_list; diff --git a/src/libstd/rt/mpmc_bounded_queue.rs b/src/libstd/rt/mpmc_bounded_queue.rs new file mode 100644 index 00000000000..8e6ac8f79c7 --- /dev/null +++ b/src/libstd/rt/mpmc_bounded_queue.rs @@ -0,0 +1,199 @@ +/* Multi-producer/multi-consumer bounded queue + * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are + * those of the authors and should not be interpreted as representing official + * policies, either expressed or implied, of Dmitry Vyukov. + */ + +use unstable::sync::UnsafeArc; +use unstable::atomics::{AtomicUint,Relaxed,Release,Acquire}; +use option::*; +use vec; +use clone::Clone; +use kinds::Send; +use num::{Exponential,Algebraic,Round}; + +struct Node { + sequence: AtomicUint, + value: Option, +} + +struct State { + buffer: ~[Node], + mask: uint, + enqueue_pos: AtomicUint, + dequeue_pos: AtomicUint, +} + +struct Queue { + priv state: UnsafeArc>, +} + +impl State { + fn with_capacity(capacity: uint) -> State { + let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 { + // use next power of 2 as capacity + 2f64.pow(&((capacity as f64).log2().floor()+1f64)) as uint + } else { + capacity + }; + let buffer = do vec::from_fn(capacity) |i:uint| { + Node{sequence:AtomicUint::new(i),value:None} + }; + State{ + buffer: buffer, + mask: capacity-1, + enqueue_pos: AtomicUint::new(0), + dequeue_pos: AtomicUint::new(0), + } + } + + fn push(&mut self, value: T) -> bool { + let mask = self.mask; + let mut pos = self.enqueue_pos.load(Relaxed); + loop { + let node = &mut self.buffer[pos & mask]; + let seq = node.sequence.load(Acquire); + let diff: int = seq as int - pos as int; + + if diff == 0 { + let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed); + if enqueue_pos == pos { + node.value = Some(value); + node.sequence.store(pos+1, Release); + break + } else { + pos = enqueue_pos; + } + } else if (diff < 0) { + return false + } else { + pos = self.enqueue_pos.load(Relaxed); + } + } + true + } + + fn pop(&mut self) -> Option { + let mask = self.mask; + let mut pos = self.dequeue_pos.load(Relaxed); + loop { + let node = &mut self.buffer[pos & mask]; + let seq = node.sequence.load(Acquire); + let diff: int = seq as int - (pos + 1) as int; + if diff == 0 { + let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed); + if dequeue_pos == pos { + let value = node.value.take(); + node.sequence.store(pos + mask + 1, Release); + return value + } else { + pos = dequeue_pos; + } + } else if diff < 0 { + return None + } else { + pos = self.dequeue_pos.load(Relaxed); + } + } + } +} + +impl Queue { + pub fn with_capacity(capacity: uint) -> Queue { + Queue{ + state: UnsafeArc::new(State::with_capacity(capacity)) + } + } + + pub fn push(&mut self, value: T) -> bool { + unsafe { (*self.state.get()).push(value) } + } + + pub fn pop(&mut self) -> Option { + unsafe { (*self.state.get()).pop() } + } +} + +impl Clone for Queue { + fn clone(&self) -> Queue { + Queue { + state: self.state.clone() + } + } +} + +#[cfg(test)] +mod tests { + use prelude::*; + use option::*; + use task; + use comm; + use super::Queue; + + #[test] + fn test() { + let nthreads = 8u; + let nmsgs = 1000u; + let mut q = Queue::with_capacity(nthreads*nmsgs); + assert_eq!(None, q.pop()); + + for _ in range(0, nthreads) { + let (port, chan) = comm::stream(); + chan.send(q.clone()); + do task::spawn_sched(task::SingleThreaded) { + let mut q = port.recv(); + for i in range(0, nmsgs) { + assert!(q.push(i)); + } + } + } + + let mut completion_ports = ~[]; + for _ in range(0, nthreads) { + let (completion_port, completion_chan) = comm::stream(); + completion_ports.push(completion_port); + let (port, chan) = comm::stream(); + chan.send(q.clone()); + do task::spawn_sched(task::SingleThreaded) { + let mut q = port.recv(); + let mut i = 0u; + loop { + match q.pop() { + None => {}, + Some(_) => { + i += 1; + if i == nmsgs { break } + } + } + } + completion_chan.send(i); + } + } + + for completion_port in completion_ports.iter() { + assert_eq!(nmsgs, completion_port.recv()); + } + } +} diff --git a/src/libstd/rt/sleeper_list.rs b/src/libstd/rt/sleeper_list.rs index f4fdf15cda6..39c7431837f 100644 --- a/src/libstd/rt/sleeper_list.rs +++ b/src/libstd/rt/sleeper_list.rs @@ -11,84 +11,37 @@ //! Maintains a shared list of sleeping schedulers. Schedulers //! use this to wake each other up. -use container::Container; -use vec::OwnedVector; -use option::{Option, Some, None}; -use cell::Cell; -use unstable::sync::{UnsafeArc, LittleLock}; use rt::sched::SchedHandle; +use rt::mpmc_bounded_queue::Queue; +use option::*; use clone::Clone; pub struct SleeperList { - priv state: UnsafeArc -} - -struct State { - count: uint, - stack: ~[SchedHandle], - lock: LittleLock + priv q: Queue, } impl SleeperList { pub fn new() -> SleeperList { - SleeperList { - state: UnsafeArc::new(State { - count: 0, - stack: ~[], - lock: LittleLock::new() - }) - } + SleeperList{q: Queue::with_capacity(8*1024)} } - pub fn push(&mut self, handle: SchedHandle) { - let handle = Cell::new(handle); - unsafe { - let state = self.state.get(); - do (*state).lock.lock { - (*state).count += 1; - (*state).stack.push(handle.take()); - } - } + pub fn push(&mut self, value: SchedHandle) { + assert!(self.q.push(value)) } pub fn pop(&mut self) -> Option { - unsafe { - let state = self.state.get(); - do (*state).lock.lock { - if !(*state).stack.is_empty() { - (*state).count -= 1; - Some((*state).stack.pop()) - } else { - None - } - } - } + self.q.pop() } - /// A pop that may sometimes miss enqueued elements, but is much faster - /// to give up without doing any synchronization pub fn casual_pop(&mut self) -> Option { - unsafe { - let state = self.state.get(); - // NB: Unsynchronized check - if (*state).count == 0 { return None; } - do (*state).lock.lock { - if !(*state).stack.is_empty() { - // NB: count is also protected by the lock - (*state).count -= 1; - Some((*state).stack.pop()) - } else { - None - } - } - } + self.q.pop() } } impl Clone for SleeperList { fn clone(&self) -> SleeperList { SleeperList { - state: self.state.clone() + q: self.q.clone() } } } From c372fa55560f1cdfdcb566f3027689ba88c46da5 Mon Sep 17 00:00:00 2001 From: Jason Toffaletti Date: Mon, 7 Oct 2013 00:43:34 -0700 Subject: [PATCH 04/11] add padding to prevent false sharing --- src/libstd/rt/mpmc_bounded_queue.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/libstd/rt/mpmc_bounded_queue.rs b/src/libstd/rt/mpmc_bounded_queue.rs index 8e6ac8f79c7..c0c7d281136 100644 --- a/src/libstd/rt/mpmc_bounded_queue.rs +++ b/src/libstd/rt/mpmc_bounded_queue.rs @@ -40,10 +40,14 @@ struct Node { } struct State { + pad0: [u8, ..64], buffer: ~[Node], mask: uint, + pad1: [u8, ..64], enqueue_pos: AtomicUint, + pad2: [u8, ..64], dequeue_pos: AtomicUint, + pad3: [u8, ..64], } struct Queue { @@ -62,10 +66,14 @@ impl State { Node{sequence:AtomicUint::new(i),value:None} }; State{ + pad0: [0, ..64], buffer: buffer, mask: capacity-1, + pad1: [0, ..64], enqueue_pos: AtomicUint::new(0), + pad2: [0, ..64], dequeue_pos: AtomicUint::new(0), + pad3: [0, ..64], } } From 89c91208a7e1e2a5ce77dcb2032601393d861128 Mon Sep 17 00:00:00 2001 From: Jason Toffaletti Date: Mon, 7 Oct 2013 00:43:51 -0700 Subject: [PATCH 05/11] clean up --- src/libstd/rt/mpsc_queue.rs | 74 +++++++++++++++++++------------------ 1 file changed, 38 insertions(+), 36 deletions(-) diff --git a/src/libstd/rt/mpsc_queue.rs b/src/libstd/rt/mpsc_queue.rs index 09b148940f8..7dd050f5a0c 100644 --- a/src/libstd/rt/mpsc_queue.rs +++ b/src/libstd/rt/mpsc_queue.rs @@ -83,9 +83,9 @@ impl fmt::Default for Queue { } } -impl Queue { - pub fn new() -> Queue { - let mut q = Queue{state: UnsafeArc::new(State { +impl State { + pub fn new() -> State { + let mut state = State { pad0: [0, ..64], head: AtomicPtr::new(mut_null()), pad1: [0, ..64], @@ -93,14 +93,18 @@ impl Queue { pad2: [0, ..64], tail: mut_null(), pad3: [0, ..64], - })}; - let stub = q.get_stub_unsafe(); - q.get_head().store(stub, Relaxed); - q.set_tail(stub); - q + }; + let stub = state.get_stub_unsafe(); + state.head.store(stub, Relaxed); + state.tail = stub; + state } - pub fn push(&mut self, value: T) { + fn get_stub_unsafe(&mut self) -> *mut Node { + unsafe { to_mut_unsafe_ptr(&mut self.stub) } + } + + fn push(&mut self, value: T) { unsafe { let node = cast::transmute(~Node::new(value)); self.push_node(node); @@ -110,50 +114,30 @@ impl Queue { fn push_node(&mut self, node: *mut Node) { unsafe { (*node).next.store(mut_null(), Release); - let prev = self.get_head().swap(node, Relaxed); + let prev = self.head.swap(node, Relaxed); (*prev).next.store(node, Release); } } - fn get_stub_unsafe(&mut self) -> *mut Node { - unsafe { to_mut_unsafe_ptr(&mut (*self.state.get()).stub) } - } - - fn get_head(&mut self) -> &mut AtomicPtr> { - unsafe { &mut (*self.state.get()).head } - } - - fn get_tail(&mut self) -> *mut Node { - unsafe { (*self.state.get()).tail } - } - - fn set_tail(&mut self, tail: *mut Node) { - unsafe { (*self.state.get()).tail = tail } - } - - pub fn casual_pop(&mut self) -> Option { - self.pop() - } - - pub fn pop(&mut self) -> Option { + fn pop(&mut self) -> Option { unsafe { - let mut tail = self.get_tail(); + let mut tail = self.tail; let mut next = (*tail).next.load(Acquire); let stub = self.get_stub_unsafe(); if tail == stub { if mut_null() == next { return None } - self.set_tail(next); + self.tail = next; tail = next; next = (*next).next.load(Acquire); } if next != mut_null() { let tail: ~Node = cast::transmute(tail); - self.set_tail(next); + self.tail = next; return tail.value } - let head = self.get_head().load(Relaxed); + let head = self.head.load(Relaxed); if tail != head { return None } @@ -161,7 +145,7 @@ impl Queue { next = (*tail).next.load(Acquire); if next != mut_null() { let tail: ~Node = cast::transmute(tail); - self.set_tail(next); + self.tail = next; return tail.value } } @@ -169,6 +153,24 @@ impl Queue { } } +impl Queue { + pub fn new() -> Queue { + Queue{state: UnsafeArc::new(State::new())} + } + + pub fn push(&mut self, value: T) { + unsafe { (*self.state.get()).push(value) } + } + + pub fn casual_pop(&mut self) -> Option { + unsafe { (*self.state.get()).pop() } + } + + pub fn pop(&mut self) -> Option { + unsafe{ (*self.state.get()).pop() } + } +} + #[cfg(test)] mod tests { use prelude::*; From 1916732cfd7efa916da3877ca6e823d18221b04b Mon Sep 17 00:00:00 2001 From: Jason Toffaletti Date: Mon, 7 Oct 2013 01:17:09 -0700 Subject: [PATCH 06/11] fix bug introduced by previous clean up. more clean up. --- src/libstd/rt/mpsc_queue.rs | 44 +++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/src/libstd/rt/mpsc_queue.rs b/src/libstd/rt/mpsc_queue.rs index 7dd050f5a0c..e10ec7de017 100644 --- a/src/libstd/rt/mpsc_queue.rs +++ b/src/libstd/rt/mpsc_queue.rs @@ -34,9 +34,7 @@ use ptr::{mut_null, to_mut_unsafe_ptr}; use cast; use option::*; use clone::Clone; -use default::Default; use kinds::Send; -use fmt; struct Node { next: AtomicPtr>, @@ -44,15 +42,13 @@ struct Node { } impl Node { - fn new(value: T) -> Node { - Node{next: AtomicPtr::new(mut_null()), value: Some(value)} - } -} - -impl Default for Node { - fn default() -> Node { + fn empty() -> Node { Node{next: AtomicPtr::new(mut_null()), value: None} } + + fn with_value(value: T) -> Node { + Node{next: AtomicPtr::new(mut_null()), value: Some(value)} + } } struct State { @@ -77,27 +73,23 @@ impl Clone for Queue { } } -impl fmt::Default for Queue { - fn fmt(value: &Queue, f: &mut fmt::Formatter) { - write!(f.buf, "Queue({})", value.state.get()); - } -} - impl State { pub fn new() -> State { - let mut state = State { + State{ pad0: [0, ..64], head: AtomicPtr::new(mut_null()), pad1: [0, ..64], - stub: Default::default(), + stub: Node::::empty(), pad2: [0, ..64], tail: mut_null(), pad3: [0, ..64], - }; - let stub = state.get_stub_unsafe(); - state.head.store(stub, Relaxed); - state.tail = stub; - state + } + } + + fn init(&mut self) { + let stub = self.get_stub_unsafe(); + self.head.store(stub, Relaxed); + self.tail = stub; } fn get_stub_unsafe(&mut self) -> *mut Node { @@ -106,7 +98,7 @@ impl State { fn push(&mut self, value: T) { unsafe { - let node = cast::transmute(~Node::new(value)); + let node = cast::transmute(~Node::with_value(value)); self.push_node(node); } } @@ -155,7 +147,11 @@ impl State { impl Queue { pub fn new() -> Queue { - Queue{state: UnsafeArc::new(State::new())} + unsafe { + let mut q = Queue{state: UnsafeArc::new(State::new())}; + (*q.state.get()).init(); + q + } } pub fn push(&mut self, value: T) { From 8c95f558d0f3acb5ea301acb631d0d616d61c7b6 Mon Sep 17 00:00:00 2001 From: Jason Toffaletti Date: Mon, 7 Oct 2013 14:51:23 -0700 Subject: [PATCH 07/11] minor --- src/libstd/rt/mpsc_queue.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libstd/rt/mpsc_queue.rs b/src/libstd/rt/mpsc_queue.rs index e10ec7de017..93e66838a42 100644 --- a/src/libstd/rt/mpsc_queue.rs +++ b/src/libstd/rt/mpsc_queue.rs @@ -93,7 +93,7 @@ impl State { } fn get_stub_unsafe(&mut self) -> *mut Node { - unsafe { to_mut_unsafe_ptr(&mut self.stub) } + to_mut_unsafe_ptr(&mut self.stub) } fn push(&mut self, value: T) { @@ -148,7 +148,7 @@ impl State { impl Queue { pub fn new() -> Queue { unsafe { - let mut q = Queue{state: UnsafeArc::new(State::new())}; + let q = Queue{state: UnsafeArc::new(State::new())}; (*q.state.get()).init(); q } From 5e91ac10b65a4c20915868d3af3c06f1d3d3cada Mon Sep 17 00:00:00 2001 From: Jason Toffaletti Date: Tue, 8 Oct 2013 08:09:13 -0700 Subject: [PATCH 08/11] minor --- src/libstd/rt/mpmc_bounded_queue.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/libstd/rt/mpmc_bounded_queue.rs b/src/libstd/rt/mpmc_bounded_queue.rs index c0c7d281136..ea5d1050c18 100644 --- a/src/libstd/rt/mpmc_bounded_queue.rs +++ b/src/libstd/rt/mpmc_bounded_queue.rs @@ -57,8 +57,12 @@ struct Queue { impl State { fn with_capacity(capacity: uint) -> State { let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 { - // use next power of 2 as capacity - 2f64.pow(&((capacity as f64).log2().floor()+1f64)) as uint + if capacity < 2 { + 2u + } else { + // use next power of 2 as capacity + 2f64.pow(&((capacity as f64).log2().ceil())) as uint + } } else { capacity }; From 49d9135eeaefc5ab267c2ee2bf1c28e245320709 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Fri, 25 Oct 2013 18:33:05 -0700 Subject: [PATCH 09/11] Tidy --- src/etc/licenseck.py | 2 ++ src/libstd/rt/mpmc_bounded_queue.rs | 6 +++--- src/libstd/rt/mpsc_queue.rs | 6 +++--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/etc/licenseck.py b/src/etc/licenseck.py index 1e0c541cd89..91231430d2a 100644 --- a/src/etc/licenseck.py +++ b/src/etc/licenseck.py @@ -76,6 +76,8 @@ exceptions = [ "rt/isaac/randport.cpp", # public domain "rt/isaac/rand.h", # public domain "rt/isaac/standard.h", # public domain + "libstd/rt/mpsc_queue.rs", # BSD + "libstd/rt/mpmc_bounded_queue.rs", # BSD ] def check_license(name, contents): diff --git a/src/libstd/rt/mpmc_bounded_queue.rs b/src/libstd/rt/mpmc_bounded_queue.rs index ea5d1050c18..a8ef5364276 100644 --- a/src/libstd/rt/mpmc_bounded_queue.rs +++ b/src/libstd/rt/mpmc_bounded_queue.rs @@ -2,14 +2,14 @@ * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: - * + * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. - * + * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. - * + * * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT diff --git a/src/libstd/rt/mpsc_queue.rs b/src/libstd/rt/mpsc_queue.rs index 93e66838a42..38186aa7e8c 100644 --- a/src/libstd/rt/mpsc_queue.rs +++ b/src/libstd/rt/mpsc_queue.rs @@ -2,14 +2,14 @@ * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: - * + * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. - * + * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. - * + * * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT From 1ce5081f4d7a8d636f67204e0e62fe0e9164b560 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Fri, 25 Oct 2013 19:46:35 -0700 Subject: [PATCH 10/11] Add links to original mpmc and mpsc implementations --- src/libstd/rt/mpmc_bounded_queue.rs | 2 ++ src/libstd/rt/mpsc_queue.rs | 1 + 2 files changed, 3 insertions(+) diff --git a/src/libstd/rt/mpmc_bounded_queue.rs b/src/libstd/rt/mpmc_bounded_queue.rs index a8ef5364276..2f61a433983 100644 --- a/src/libstd/rt/mpmc_bounded_queue.rs +++ b/src/libstd/rt/mpmc_bounded_queue.rs @@ -26,6 +26,8 @@ * policies, either expressed or implied, of Dmitry Vyukov. */ +// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue + use unstable::sync::UnsafeArc; use unstable::atomics::{AtomicUint,Relaxed,Release,Acquire}; use option::*; diff --git a/src/libstd/rt/mpsc_queue.rs b/src/libstd/rt/mpsc_queue.rs index 38186aa7e8c..4ddd5e066cd 100644 --- a/src/libstd/rt/mpsc_queue.rs +++ b/src/libstd/rt/mpsc_queue.rs @@ -27,6 +27,7 @@ */ //! A mostly lock-free multi-producer, single consumer queue. +// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue use unstable::sync::UnsafeArc; use unstable::atomics::{AtomicPtr,Relaxed,Release,Acquire}; From a849c476f5a62bdf5af546b603a5d7038fcb5e52 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Fri, 25 Oct 2013 19:52:02 -0700 Subject: [PATCH 11/11] Encapsulate the lock-free mpsc queue in the MessageQueue type --- src/libstd/rt/message_queue.rs | 60 +++++++--------------------------- src/libstd/rt/mpsc_queue.rs | 4 --- src/libstd/rt/sched.rs | 8 ++--- 3 files changed, 15 insertions(+), 57 deletions(-) diff --git a/src/libstd/rt/message_queue.rs b/src/libstd/rt/message_queue.rs index 99b5156b319..10e457368f0 100644 --- a/src/libstd/rt/message_queue.rs +++ b/src/libstd/rt/message_queue.rs @@ -11,83 +11,45 @@ //! A concurrent queue that supports multiple producers and a //! single consumer. -use container::Container; use kinds::Send; use vec::OwnedVector; -use cell::Cell; -use option::*; -use unstable::sync::{UnsafeArc, LittleLock}; +use option::Option; use clone::Clone; +use rt::mpsc_queue::Queue; pub struct MessageQueue { - priv state: UnsafeArc> -} - -struct State { - count: uint, - queue: ~[T], - lock: LittleLock + priv queue: Queue } impl MessageQueue { pub fn new() -> MessageQueue { MessageQueue { - state: UnsafeArc::new(State { - count: 0, - queue: ~[], - lock: LittleLock::new() - }) + queue: Queue::new() } } + #[inline] pub fn push(&mut self, value: T) { - unsafe { - let value = Cell::new(value); - let state = self.state.get(); - do (*state).lock.lock { - (*state).count += 1; - (*state).queue.push(value.take()); - } - } + self.queue.push(value) } + #[inline] pub fn pop(&mut self) -> Option { - unsafe { - let state = self.state.get(); - do (*state).lock.lock { - if !(*state).queue.is_empty() { - (*state).count += 1; - Some((*state).queue.shift()) - } else { - None - } - } - } + self.queue.pop() } /// A pop that may sometimes miss enqueued elements, but is much faster /// to give up without doing any synchronization + #[inline] pub fn casual_pop(&mut self) -> Option { - unsafe { - let state = self.state.get(); - // NB: Unsynchronized check - if (*state).count == 0 { return None; } - do (*state).lock.lock { - if !(*state).queue.is_empty() { - (*state).count += 1; - Some((*state).queue.shift()) - } else { - None - } - } - } + self.queue.pop() } } impl Clone for MessageQueue { fn clone(&self) -> MessageQueue { MessageQueue { - state: self.state.clone() + queue: self.queue.clone() } } } diff --git a/src/libstd/rt/mpsc_queue.rs b/src/libstd/rt/mpsc_queue.rs index 4ddd5e066cd..4f39a1df4fa 100644 --- a/src/libstd/rt/mpsc_queue.rs +++ b/src/libstd/rt/mpsc_queue.rs @@ -159,10 +159,6 @@ impl Queue { unsafe { (*self.state.get()).push(value) } } - pub fn casual_pop(&mut self) -> Option { - unsafe { (*self.state.get()).pop() } - } - pub fn pop(&mut self) -> Option { unsafe{ (*self.state.get()).pop() } } diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index e739eed32fe..b008a8a74f2 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -19,7 +19,7 @@ use super::stack::{StackPool}; use super::rtio::EventLoop; use super::context::Context; use super::task::{Task, AnySched, Sched}; -use super::mpsc_queue::Queue; +use super::message_queue::MessageQueue; use rt::kill::BlockedTask; use rt::local_ptr; use rt::local::Local; @@ -47,7 +47,7 @@ pub struct Scheduler { /// The queue of incoming messages from other schedulers. /// These are enqueued by SchedHandles after which a remote callback /// is triggered to handle the message. - priv message_queue: Queue, + priv message_queue: MessageQueue, /// A shared list of sleeping schedulers. We'll use this to wake /// up schedulers when pushing work onto the work queue. sleeper_list: SleeperList, @@ -137,7 +137,7 @@ impl Scheduler { let mut sched = Scheduler { sleeper_list: sleeper_list, - message_queue: Queue::new(), + message_queue: MessageQueue::new(), sleepy: false, no_sleep: false, event_loop: event_loop, @@ -802,7 +802,7 @@ pub enum SchedMessage { pub struct SchedHandle { priv remote: ~RemoteCallback, - priv queue: Queue, + priv queue: MessageQueue, sched_id: uint }