rewrite to use old C++-based mechanism

This commit is contained in:
Niko Matsakis 2012-01-06 22:02:05 -08:00
parent a1ef79c9d2
commit d4410a9f9b
3 changed files with 26 additions and 10 deletions

View File

@ -28,8 +28,6 @@ Example:
*/
import cast = unsafe::reinterpret_cast;
import comm;
import option::{some, none};
import option = option::t;
import ptr;
import c = ctypes;
@ -112,10 +110,23 @@ Returns:
A handle to the new task
*/
fn spawn(-f: sendfn()) -> task unsafe {
fn spawn(-f: sendfn()) -> task {
spawn_inner(f, none)
}
fn spawn_inner(-f: sendfn(),
notify: option<comm::chan<task_notification>>) -> task unsafe {
let closure: *rust_closure = unsafe::reinterpret_cast(ptr::addr_of(f));
#debug("spawn: closure={%x,%x}", (*closure).fnptr, (*closure).envptr);
let id = rustrt::new_task();
// set up notifications if they are enabled.
option::may(notify) {|c|
let task_ptr <- rust_task_ptr(rustrt::get_task_pointer(id));
(**task_ptr).notify_enabled = 1;
(**task_ptr).notify_chan = c;
}
rustrt::start_task(id, closure);
unsafe::leak(f);
ret id;
@ -129,6 +140,11 @@ A task that sends notification upon termination
type joinable_task = (task, comm::port<task_notification>);
fn spawn_joinable(-f: sendfn()) -> joinable_task {
let notify_port = comm::port();
let notify_chan = comm::chan(notify_port);
let task = spawn_inner(f, some(notify_chan));
ret (task, notify_port);
/*
resource notify_rsrc(data: (comm::chan<task_notification>,
task,
@mutable task_result)) {
@ -148,6 +164,7 @@ fn spawn_joinable(-f: sendfn()) -> joinable_task {
};
let task = spawn(g);
ret (task, notify_port);
*/
}
/*

View File

@ -12,7 +12,7 @@ fn f(&&n: uint) {
}
}
fn g(&&_i: ()) { }
fn g() { }
fn main(args: [str]) {
let n =

View File

@ -71,13 +71,12 @@ mod map_reduce {
[joinable_task] {
let tasks = [];
for i: str in inputs {
tasks += [task::spawn_joinable((ctrl, i), map_task)];
tasks += [task::spawn_joinable {|| map_task(ctrl, i)}];
}
ret tasks;
}
fn map_task(args: (chan<ctrl_proto>, str)) {
let (ctrl, input) = args;
fn map_task(ctrl: chan<ctrl_proto>, input: str) {
// log(error, "map_task " + input);
let intermediates = map::new_str_hash();
@ -106,8 +105,7 @@ mod map_reduce {
send(ctrl, mapper_done);
}
fn reduce_task(args: (str, chan<chan<reduce_proto>>)) {
let (key, out) = args;
fn reduce_task(key: str, out: chan<chan<reduce_proto>>) {
let p = port();
send(out, chan(p));
@ -168,8 +166,9 @@ mod map_reduce {
none. {
// log(error, "creating new reducer for " + k);
let p = port();
let ch = chan(p);
tasks +=
[task::spawn_joinable((k, chan(p)), reduce_task)];
[task::spawn_joinable{||reduce_task(k, ch)}];
c = recv(p);
reducers.insert(k, c);
}