Task-ified the word count program.

This meant most of the generic-ness of it had to go away, since our
type system doesn't quite support it yet. Hopefully someday...

This version has lots of memory management errors. My next commit will
hopefully fix these.
This commit is contained in:
Eric Holk 2011-07-12 11:13:15 -07:00
parent 0cacbe901d
commit c7e967148c
1 changed files with 119 additions and 55 deletions

View File

@ -18,6 +18,35 @@ import std::str;
import std::vec;
import std::map;
fn map(str filename, map_reduce::putter emit) {
auto f = io::file_reader(filename);
while(true) {
alt(read_word(f)) {
case (some(?w)) {
emit(w, "1");
}
case (none) {
break;
}
}
}
}
fn reduce(str word, map_reduce::getter get) {
auto count = 0;
while(true) {
alt(get()) {
case(some(_)) { count += 1 }
case(none) { break }
}
}
auto out = io::stdout();
out.write_line(#fmt("%s: %d", word, count));
}
mod map_reduce {
export putter;
export getter;
@ -33,42 +62,106 @@ mod map_reduce {
type reducer = fn(str, getter);
tag ctrl_proto {
find_reducer(str, chan[chan[reduce_proto]]);
mapper_done;
}
fn map_reduce (vec[str] inputs,
mapper f,
reducer reduce) {
auto intermediates = map::new_str_hash[vec[str]]();
tag reduce_proto {
emit_val(str);
done;
}
fn emit(&map::hashmap[str, vec[str]] im,
fn start_mappers(chan[ctrl_proto] ctrl,
vec[str] inputs) {
for(str i in inputs) {
spawn map_task(ctrl, i);
}
}
fn map_task(chan[ctrl_proto] ctrl,
str input) {
auto intermediates = map::new_str_hash();
fn emit(&map::hashmap[str, chan[reduce_proto]] im,
chan[ctrl_proto] ctrl,
str key, str val) {
auto old = [];
alt(im.remove(key)) {
case (some(?v)) {
old = v;
auto c;
alt(im.find(key)) {
case(some(?_c)) {
c = _c
}
case(none) {
auto p = port[chan[reduce_proto]]();
ctrl <| find_reducer(key, chan(p));
p |> c;
im.insert(key, c);
}
case (none) { }
}
im.insert(key, old + [val]);
c <| emit_val(val);
}
for (str i in inputs) {
f(i, bind emit(intermediates, _, _));
}
map(input, bind emit(intermediates, ctrl, _, _));
ctrl <| mapper_done;
}
fn get(vec[str] vals, &mutable uint i) -> option[str] {
i += 1u;
if(i <= vec::len(vals)) {
some(vals.(i - 1u))
}
else {
none
fn reduce_task(str key, chan[chan[reduce_proto]] out) {
auto p = port();
out <| chan(p);
fn get(port[reduce_proto] p) -> option[str] {
auto m;
p |> m;
alt(m) {
case(emit_val(?v)) { ret some(v); }
case(done) { ret none; }
}
}
for each (@tup(str, vec[str]) kv in intermediates.items()) {
auto i = 0u;
reduce(kv._0, bind get(kv._1, i));
reduce(key, bind get(p));
}
fn map_reduce (vec[str] inputs) {
auto ctrl = port[ctrl_proto]();
// This task becomes the master control task. It spawns others
// to do the rest.
let map::hashmap[str, chan[reduce_proto]] reducers;
reducers = map::new_str_hash();
start_mappers(chan(ctrl), inputs);
auto num_mappers = vec::len(inputs) as int;
while(num_mappers > 0) {
auto m;
ctrl |> m;
alt(m) {
case(mapper_done) { num_mappers -= 1; }
case(find_reducer(?k, ?cc)) {
auto c;
alt(reducers.find(k)) {
case(some(?_c)) { c = _c; }
case(none) {
auto p = port();
spawn reduce_task(k, chan(p));
p |> c;
reducers.insert(k, c);
}
}
cc <| c;
}
}
}
for each(@tup(str, chan[reduce_proto]) kv in reducers.items()) {
kv._1 <| done;
}
}
}
@ -81,36 +174,7 @@ fn main(vec[str] argv) {
fail;
}
fn map(str filename, map_reduce::putter emit) {
auto f = io::file_reader(filename);
while(true) {
alt(read_word(f)) {
case (some(?w)) {
emit(w, "1");
}
case (none) {
break;
}
}
}
}
fn reduce(str word, map_reduce::getter get) {
auto count = 0;
while(true) {
alt(get()) {
case(some(_)) { count += 1 }
case(none) { break }
}
}
auto out = io::stdout();
out.write_line(#fmt("%s: %d", word, count));
}
map_reduce::map_reduce(vec::slice(argv, 1u, vec::len(argv)), map, reduce);
map_reduce::map_reduce(vec::slice(argv, 1u, vec::len(argv)));
}
fn read_word(io::reader r) -> option[str] {