Adding a module with parallel vector operations.

This should go in libstd, but currently resolve bugs make this not
work.
This commit is contained in:
Eric Holk 2012-05-16 09:47:00 -07:00
parent d485f23a1a
commit a785f3fc95
3 changed files with 217 additions and 4 deletions

103
src/libstd/par.rs Normal file
View File

@ -0,0 +1,103 @@
import comm::port;
import comm::chan;
import comm::send;
import comm::recv;
import task::spawn;
export future;
export map;
export alli;
iface future<T: send> {
fn get() -> T;
}
type future_<T: send> = {
mut slot : option<T>,
port : port<T>,
};
impl<T: send> of future<T> for future_<T> {
fn get() -> T {
alt(self.slot) {
some(x) { x }
none {
let x = recv(self.port);
self.slot = some(x);
x
}
}
}
}
#[doc="Executes a bit of code asynchronously.
Returns a handle that can be used to retrieve the result at your
leisure."]
fn future<T: send>(thunk : fn~() -> T) -> future<T> {
let p = port();
let c = chan(p);
spawn() {||
send(c, thunk());
}
{mut slot: none::<T>, port : p} as future::<T>
}
#[doc="The maximum number of tasks this module will spawn for a single
operationg."]
const max_tasks : uint = 32u;
#[doc="The minimum number of elements each task will process."]
const min_granularity : uint = 1024u;
#[doc="An internal helper to map a function over a large vector and
return the intermediate results.
This is used to build most of the other parallel vector functions,
like map or alli."]
fn map_slices<A: send, B: send>(xs: [A], f: fn~(uint, [A]) -> B) -> [B] {
let len = xs.len();
if len < min_granularity {
// This is a small vector, fall back on the normal map.
[f(0u, xs)]
}
else {
let num_tasks = uint::min(max_tasks, len / min_granularity);
let items_per_task = len / num_tasks;
let mut futures = [];
let mut base = 0u;
while base < len {
let slice = vec::slice(xs, base,
uint::min(len, base + items_per_task));
futures += [future() {|copy base|
f(base, slice)
}];
base += items_per_task;
}
futures.map() {|ys|
ys.get()
}
}
}
#[doc="A parallel version of map."]
fn map<A: send, B: send>(xs: [A], f: fn~(A) -> B) -> [B] {
vec::concat(map_slices(xs) {|_base, slice|
map(slice, f)
})
}
#[doc="Returns true if the function holds for all elements in the vector."]
fn alli<A: send>(xs: [A], f: fn~(uint, A) -> bool) -> bool {
vec::all(map_slices(xs) {|base, slice|
slice.alli() {|i, x|
f(i + base, x)
}
}) {|x| x }
}

View File

@ -19,6 +19,7 @@ export bitv, deque, fun_treemap, list, map, smallintmap, sort, treemap;
export rope, arena;
export ebml, dbg, getopts, json, rand, sha1, term, time, prettyprint;
export test, tempfile, serialization;
export par;
// General io and system-services modules
@ -58,6 +59,7 @@ mod getopts;
mod json;
mod sha1;
mod md4;
mod par;
mod tempfile;
mod term;
mod time;

View File

@ -10,6 +10,7 @@ import std::map;
import std::map::hashmap;
import std::deque;
import std::deque::t;
//import std::par;
import io::writer_util;
import comm::*;
import int::abs;
@ -221,13 +222,11 @@ fn validate(edges: [(node_id, node_id)],
log(info, "Verifying tree and graph edges...");
let status = tree.alli() {|u, v|
let status = par::alli(tree) {|u, v|
if v == -1 || u as int == root {
true
}
else {
log(info, #fmt("Checking for %? or %?",
(u, v), (v, u)));
edges.contains((u as int, v)) || edges.contains((v, u as int))
}
};
@ -269,9 +268,118 @@ fn main() {
stop - start));
let start = time::precise_time_s();
assert(validate(graph, edges, root, bfs_tree));
assert(validate(edges, root, bfs_tree));
let stop = time::precise_time_s();
io::stdout().write_line(#fmt("Validation completed in %? seconds.",
stop - start));
}
// par stuff /////////////////////////////////////////////////////////
mod par {
import comm::port;
import comm::chan;
import comm::send;
import comm::recv;
import task::spawn;
iface future<T: send> {
fn get() -> T;
}
type future_<T: send> = {
mut slot : option<T>,
port : port<T>,
};
impl<T: send> of future<T> for future_<T> {
fn get() -> T {
get(self)
}
}
fn get<T: send>(f: future_<T>) -> T {
alt(f.slot) {
some(x) { x }
none {
let x = recv(f.port);
f.slot = some(x);
x
}
}
}
#[doc="Executes a bit of code asynchronously.
Returns a handle that can be used to retrieve the result at your
leisure."]
fn future<T: send>(thunk : fn~() -> T) -> future<T> {
let p = port();
let c = chan(p);
spawn() {||
send(c, thunk());
}
{mut slot: none::<T>, port : p} as future::<T>
}
#[doc="The maximum number of tasks this module will spawn for a single
operationg."]
const max_tasks : uint = 32u;
#[doc="The minimum number of elements each task will process."]
const min_granularity : uint = 1024u;
#[doc="An internal helper to map a function over a large vector and
return the intermediate results.
This is used to build most of the other parallel vector functions,
like map or alli."]
fn map_slices<A: send, B: send>(xs: [A], f: fn~(uint, [A]) -> B) -> [B] {
let len = xs.len();
if len < min_granularity {
// This is a small vector, fall back on the normal map.
[f(0u, xs)]
}
else {
let num_tasks = uint::min(max_tasks, len / min_granularity);
let items_per_task = len / num_tasks;
let mut futures = [];
let mut base = 0u;
while base < len {
let slice = vec::slice(xs, base,
uint::min(len, base + items_per_task));
futures += [future() {|copy base|
f(base, slice)
}];
base += items_per_task;
}
futures.map() {|ys|
ys.get()
}
}
}
#[doc="A parallel version of map."]
fn map<A: send, B: send>(xs: [A], f: fn~(A) -> B) -> [B] {
vec::concat(map_slices(xs) {|_base, slice|
map(slice, f)
})
}
#[doc="Returns true if the function holds for all elements in the vector."]
fn alli<A: send>(xs: [A], f: fn~(uint, A) -> bool) -> bool {
vec::all(map_slices(xs) {|base, slice|
slice.alli() {|i, x|
f(i + base, x)
}
}) {|x| x }
}
}