core: New task API

This commit is contained in:
Brian Anderson 2012-02-18 16:34:42 -08:00
parent fbc95ba018
commit 4220dcf1e9
43 changed files with 1037 additions and 843 deletions

View File

@ -2375,10 +2375,10 @@ module `task`. Let's begin with the simplest one, `task::spawn()`:
~~~~ ~~~~
let some_value = 22; let some_value = 22;
let child_task = task::spawn {|| task::spawn {||
std::io::println("This executes in the child task."); std::io::println("This executes in the child task.");
std::io::println(#fmt("%d", some_value)); std::io::println(#fmt("%d", some_value));
}; }
~~~~ ~~~~
The argument to `task::spawn()` is a [unique The argument to `task::spawn()` is a [unique
@ -2456,26 +2456,27 @@ let result = comm::recv(port);
## Creating a task with a bi-directional communication path ## Creating a task with a bi-directional communication path
A very common thing to do is to spawn a child task where the parent A very common thing to do is to spawn a child task where the parent
and child both need to exchange messages with each other. The function and child both need to exchange messages with each
`task::spawn_connected()` supports this pattern. We'll look briefly at other. The function `task::spawn_listener()` supports this pattern. We'll look
how it is used. briefly at how it is used.
To see how `spawn_connected()` works, we will create a child task To see how `spawn_listener()` works, we will create a child task
which receives `uint` messages, converts them to a string, and sends which receives `uint` messages, converts them to a string, and sends
the string in response. The child terminates when `0` is received. the string in response. The child terminates when `0` is received.
Here is the function which implements the child task: Here is the function which implements the child task:
~~~~ ~~~~
fn stringifier(from_par: comm::port<uint>, fn stringifier(from_parent: comm::port<uint>,
to_par: comm::chan<str>) { to_parent: comm::chan<str>) {
let value: uint; let value: uint;
do { do {
value = comm::recv(from_par); value = comm::recv(from_parent);
comm::send(to_par, uint::to_str(value, 10u)); comm::send(to_parent, uint::to_str(value, 10u));
} while value != 0u; } while value != 0u;
} }
~~~~ ~~~~
You can see that the function takes two parameters. The first is a You can see that the function takes two parameters. The first is a
port used to receive messages from the parent, and the second is a port used to receive messages from the parent, and the second is a
channel used to send messages to the parent. The body itself simply channel used to send messages to the parent. The body itself simply
@ -2484,42 +2485,37 @@ to the `to_par` channel. The actual response itself is simply the
strified version of the received value, `uint::to_str(value)`. strified version of the received value, `uint::to_str(value)`.
Here is the code for the parent task: Here is the code for the parent task:
~~~~ ~~~~
# fn stringifier(from_par: comm::port<uint>, # fn stringifier(from_parent: comm::port<uint>,
# to_par: comm::chan<str>) { # to_parent: comm::chan<str>) {
# comm::send(to_par, "22"); # comm::send(to_parent, "22");
# comm::send(to_par, "23"); # comm::send(to_parent, "23");
# comm::send(to_par, "0"); # comm::send(to_parent, "0");
# } # }
fn main() { fn main() {
let t = task::spawn_connected(stringifier); let from_child = comm::port();
comm::send(t.to_child, 22u); let to_parent = comm::chan(from_child);
assert comm::recv(t.from_child) == "22"; let to_child = task::spawn_listener {|from_parent|
comm::send(t.to_child, 23u); stringifier(from_parent, to_parent);
assert comm::recv(t.from_child) == "23"; };
comm::send(t.to_child, 0u); comm::send(to_child, 22u);
assert comm::recv(t.from_child) == "0"; assert comm::recv(from_child) == "22";
comm::send(to_child, 23u);
assert comm::recv(from_child) == "23";
comm::send(to_child, 0u);
assert comm::recv(from_child) == "0";
} }
~~~~ ~~~~
The call to `spawn_connected()` on the first line will instantiate the The parent first sets up a port to receive data from and a channel
various ports and channels and startup the child task. The returned that the child can use to send data to that port. The call to
value, `t`, is a record of type `task::connected_task<uint,str>`. In `spawn_listener()` will spawn the child task, providing it with a port
addition to the task id of the child, this record defines two fields, on which to receive data from its parent, and returning to the parent
`from_child` and `to_child`, which contain the port and channel the associated channel. Finally, the closure passed to
respectively for communicating with the child. Those fields are used `spawn_listener()` that forms the body of the child task captures the
here to send and receive three messages from the child task. `to_parent` channel in its environment, so both parent and child
can send and receive data to and from the other.
## Joining a task
The function `spawn_joinable()` is used to spawn a task that can later
be joined. This is implemented by having the child task send a message
when it has completed (either successfully or by failing). Therefore,
`spawn_joinable()` returns a structure containing both the task ID and
the port where this message will be sent---this structure type is
called `task::joinable_task`. The structure can be passed to
`task::join()`, which simply blocks on the port, waiting to receive
the message from the child task.
## The supervisor relationship ## The supervisor relationship

View File

@ -143,8 +143,6 @@ fn monitor(f: fn~(diagnostic::emitter)) {
alt task::try {|| alt task::try {||
task::unsupervise();
// The 'diagnostics emitter'. Every error, warning, etc. should // The 'diagnostics emitter'. Every error, warning, etc. should
// go through this function. // go through this function.
let demitter = fn@(cmsp: option<(codemap::codemap, codemap::span)>, let demitter = fn@(cmsp: option<(codemap::codemap, codemap::span)>,

View File

@ -54,11 +54,11 @@ fn run(lib_path: str, prog: str, args: [str],
writeclose(pipe_in.out, input); writeclose(pipe_in.out, input);
let p = comm::port(); let p = comm::port();
let ch = comm::chan(p); let ch = comm::chan(p);
task::spawn_sched(1u) {|| task::spawn_sched(task::single_threaded) {||
let errput = readclose(pipe_err.in); let errput = readclose(pipe_err.in);
comm::send(ch, (2, errput)); comm::send(ch, (2, errput));
}; };
task::spawn_sched(1u) {|| task::spawn_sched(task::single_threaded) {||
let output = readclose(pipe_out.in); let output = readclose(pipe_out.in);
comm::send(ch, (1, output)); comm::send(ch, (1, output));
}; };

View File

@ -35,8 +35,9 @@ enum rust_port {}
#[abi = "cdecl"] #[abi = "cdecl"]
native mod rustrt { native mod rustrt {
fn get_task_id() -> task_id;
fn chan_id_send<T: send>(t: *sys::type_desc, fn chan_id_send<T: send>(t: *sys::type_desc,
target_task: task::task, target_port: port_id, target_task: task_id, target_port: port_id,
data: T) -> ctypes::uintptr_t; data: T) -> ctypes::uintptr_t;
fn new_port(unit_sz: ctypes::size_t) -> *rust_port; fn new_port(unit_sz: ctypes::size_t) -> *rust_port;
@ -58,6 +59,7 @@ native mod rusti {
fn call_with_retptr<T: send>(&&f: fn@(*uint)) -> T; fn call_with_retptr<T: send>(&&f: fn@(*uint)) -> T;
} }
type task_id = int;
type port_id = int; type port_id = int;
// It's critical that this only have one variant, so it has a record // It's critical that this only have one variant, so it has a record
@ -75,7 +77,7 @@ type port_id = int;
over other channels." over other channels."
)] )]
enum chan<T: send> { enum chan<T: send> {
chan_t(task::task, port_id) chan_t(task_id, port_id)
} }
resource port_ptr<T: send>(po: *rust_port) { resource port_ptr<T: send>(po: *rust_port) {
@ -208,7 +210,7 @@ fn peek<T: send>(p: port<T>) -> bool {
port used to construct it." port used to construct it."
)] )]
fn chan<T: send>(p: port<T>) -> chan<T> { fn chan<T: send>(p: port<T>) -> chan<T> {
chan_t(task::get_task(), rustrt::get_port_id(***p)) chan_t(rustrt::get_task_id(), rustrt::get_port_id(***p))
} }
#[test] #[test]

File diff suppressed because it is too large Load Diff

View File

@ -1929,16 +1929,10 @@ mod tests {
} }
#[test] #[test]
// FIXME: Windows can't undwind #[should_fail]
#[ignore(cfg(target_os = "win32"))] #[ignore(cfg(target_os = "win32"))]
fn test_init_empty() { fn test_init_empty() {
let r = task::join(
task::spawn_joinable {||
task::unsupervise();
init::<int>([]); init::<int>([]);
});
assert r == task::tr_failure
} }
#[test] #[test]

View File

@ -316,13 +316,12 @@ fn run_test(+test: test_desc, monitor_ch: comm::chan<monitor_msg>) {
task::spawn {|| task::spawn {||
let testfn = test.fn; let testfn = test.fn;
let test_task = task::spawn_joinable {|| let builder = task::mk_task_builder();
configure_test_task(); let result_future = task::future_result(builder);
testfn(); task::unsupervise(builder);
}; task::run(builder, testfn);
let task_result = future::get(result_future);
let task_result = task::join(test_task); let test_result = calc_result(test, task_result == task::success);
let test_result = calc_result(test, task_result == task::tr_success);
comm::send(monitor_ch, (test, test_result)); comm::send(monitor_ch, (test, test_result));
}; };
} }
@ -337,13 +336,6 @@ fn calc_result(test: test_desc, task_succeeded: bool) -> test_result {
} }
} }
// Call from within a test task to make sure it's set up correctly
fn configure_test_task() {
// If this task fails we don't want that failure to propagate to the
// test runner or else we couldn't keep running tests
task::unsupervise();
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {

View File

@ -539,6 +539,11 @@ chan_id_send(type_desc *t, rust_task_id target_task_id,
// FIXME: make sure this is thread-safe // FIXME: make sure this is thread-safe
bool sent = false; bool sent = false;
rust_task *task = rust_task_thread::get_task(); rust_task *task = rust_task_thread::get_task();
LOG(task, comm, "chan_id_send task: 0x%" PRIxPTR
" port: 0x%" PRIxPTR, (uintptr_t) target_task_id,
(uintptr_t) target_port_id);
rust_task *target_task = task->kernel->get_task_by_id(target_task_id); rust_task *target_task = task->kernel->get_task_by_id(target_task_id);
if(target_task) { if(target_task) {
rust_port *port = target_task->get_port_by_id(target_port_id); rust_port *port = target_task->get_port_by_id(target_port_id);
@ -547,8 +552,12 @@ chan_id_send(type_desc *t, rust_task_id target_task_id,
scoped_lock with(target_task->lock); scoped_lock with(target_task->lock);
port->deref(); port->deref();
sent = true; sent = true;
} else {
LOG(task, comm, "didn't get the port");
} }
target_task->deref(); target_task->deref();
} else {
LOG(task, comm, "didn't get the task");
} }
return (uintptr_t)sent; return (uintptr_t)sent;
} }

View File

@ -28,22 +28,28 @@ fn server(requests: comm::port<request>, responses: comm::chan<uint>) {
} }
fn run(args: [str]) { fn run(args: [str]) {
let server = task::spawn_connected(server); let from_child = comm::port();
let to_parent = comm::chan(from_child);
let to_child = task::spawn_listener {|po|
server(po, to_parent);
};
let size = uint::from_str(args[1]); let size = uint::from_str(args[1]);
let workers = uint::from_str(args[2]); let workers = uint::from_str(args[2]);
let start = std::time::precise_time_s(); let start = std::time::precise_time_s();
let to_child = server.to_child; let to_child = to_child;
let worker_tasks = []; let worker_results = [];
uint::range(0u, workers) {|_i| uint::range(0u, workers) {|_i|
worker_tasks += [task::spawn_joinable {|| let builder = task::mk_task_builder();
worker_results += [task::future_result(builder)];
task::run(builder) {||
uint::range(0u, size / workers) {|_i| uint::range(0u, size / workers) {|_i|
comm::send(to_child, bytes(100u)); comm::send(to_child, bytes(100u));
} }
}]; };
} }
vec::iter(worker_tasks) {|t| task::join(t); } vec::iter(worker_results) {|r| future::get(r); }
comm::send(server.to_child, stop); comm::send(to_child, stop);
let result = comm::recv(server.from_child); let result = comm::recv(from_child);
let end = std::time::precise_time_s(); let end = std::time::precise_time_s();
let elapsed = end - start; let elapsed = end - start;
std::io::stdout().write_str(#fmt("Count is %?\n", result)); std::io::stdout().write_str(#fmt("Count is %?\n", result));

View File

@ -69,11 +69,13 @@ fn stress_task(&&id: int) {
} }
fn stress(num_tasks: int) { fn stress(num_tasks: int) {
let tasks = []; let results = [];
range(0, num_tasks) {|i| range(0, num_tasks) {|i|
tasks += [task::spawn_joinable {|| stress_task(i); }]; let builder = task::mk_task_builder();
results += [task::future_result(builder)];
task::run(builder) {|| stress_task(i); }
} }
for t in tasks { task::join(t); } for r in results { future::get(r); }
} }
fn main(argv: [str]) { fn main(argv: [str]) {

View File

@ -10,7 +10,7 @@ fn start(+token: int) {
let ch = iter::foldl(bind int::range(2, n_threads + 1, _), let ch = iter::foldl(bind int::range(2, n_threads + 1, _),
comm::chan(p)) { |ch, i| comm::chan(p)) { |ch, i|
let id = n_threads + 2 - i; let id = n_threads + 2 - i;
let {to_child, _} = task::spawn_connected::<int, int> {|p, _ch| let to_child = task::spawn_listener::<int> {|p|
roundtrip(id, p, ch) roundtrip(id, p, ch)
}; };
to_child to_child

View File

@ -7,7 +7,7 @@ import str;
fn f(&&n: uint) { fn f(&&n: uint) {
let i = 0u; let i = 0u;
while i < n { while i < n {
task::join(task::spawn_joinable {|| g(); }); task::try {|| g() };
i += 1u; i += 1u;
} }
} }

View File

@ -15,7 +15,6 @@ import option::{some, none};
import std::{map, io, time}; import std::{map, io, time};
import io::reader_util; import io::reader_util;
import task::joinable_task;
import comm::chan; import comm::chan;
import comm::port; import comm::port;
import comm::recv; import comm::recv;
@ -59,12 +58,14 @@ mod map_reduce {
enum reduce_proto { emit_val(int), done, ref, release, } enum reduce_proto { emit_val(int), done, ref, release, }
fn start_mappers(ctrl: chan<ctrl_proto>, -inputs: [str]) -> fn start_mappers(ctrl: chan<ctrl_proto>, -inputs: [str]) ->
[joinable_task] { [future::future<task::task_result>] {
let tasks = []; let results = [];
for i: str in inputs { for i: str in inputs {
tasks += [task::spawn_joinable {|| map_task(ctrl, i)}]; let builder = task::mk_task_builder();
results += [task::future_result(builder)];
task::run(builder) {|| map_task(ctrl, i)}
} }
ret tasks; ret results;
} }
fn map_task(ctrl: chan<ctrl_proto>, input: str) { fn map_task(ctrl: chan<ctrl_proto>, input: str) {
@ -137,7 +138,7 @@ mod map_reduce {
reducers = map::new_str_hash(); reducers = map::new_str_hash();
let num_mappers = vec::len(inputs) as int; let num_mappers = vec::len(inputs) as int;
let tasks = start_mappers(chan(ctrl), inputs); let results = start_mappers(chan(ctrl), inputs);
while num_mappers > 0 { while num_mappers > 0 {
alt recv(ctrl) { alt recv(ctrl) {
@ -158,8 +159,9 @@ mod map_reduce {
// log(error, "creating new reducer for " + k); // log(error, "creating new reducer for " + k);
let p = port(); let p = port();
let ch = chan(p); let ch = chan(p);
tasks += let builder = task::mk_task_builder();
[task::spawn_joinable{||reduce_task(k, ch)}]; results += [task::future_result(builder)];
task::run(builder) {||reduce_task(k, ch)}
c = recv(p); c = recv(p);
reducers.insert(k, c); reducers.insert(k, c);
} }
@ -171,7 +173,7 @@ mod map_reduce {
reducers.values {|v| send(v, done); } reducers.values {|v| send(v, done); }
for t in tasks { task::join(t); } for r in results { future::get(r); }
} }
} }

View File

@ -96,16 +96,6 @@ fn test_ptr() unsafe {
assert p1 >= p2; assert p1 >= p2;
} }
fn test_task() {
fn f() { }
let f1 = f, f2 = f;
let t1 = task::spawn {|| f1(); };
let t2 = task::spawn {|| f2(); };
assert (t1 == t1);
assert (t1 != t2);
}
fn test_fn() { fn test_fn() {
fn f() { } fn f() { }
fn g() { } fn g() { }
@ -147,7 +137,6 @@ fn main() {
test_port(); test_port();
test_chan(); test_chan();
test_ptr(); test_ptr();
test_task();
test_fn(); test_fn();
test_native_fn(); test_native_fn();
} }

View File

@ -8,7 +8,6 @@
use std; use std;
import task; import task;
import task::join;
import comm; import comm;
import comm::chan; import comm::chan;
import comm::send; import comm::send;
@ -18,21 +17,18 @@ import comm::recv;
fn grandchild(c: chan<int>) { send(c, 42); } fn grandchild(c: chan<int>) { send(c, 42); }
fn child(c: chan<int>) { fn child(c: chan<int>) {
let _grandchild = task::spawn_joinable {|| grandchild(c); }; task::spawn {|| grandchild(c); }
join(_grandchild);
} }
fn main() { fn main() {
let p = comm::port(); let p = comm::port();
let ch = chan(p); let ch = chan(p);
let _child = task::spawn_joinable {|| child(ch); }; task::spawn {|| child(ch); }
let x: int = recv(p); let x: int = recv(p);
log(debug, x); log(debug, x);
assert (x == 42); assert (x == 42);
join(_child);
} }

View File

@ -21,6 +21,7 @@ fn a() {
} }
fn main() { fn main() {
let t = spawn_joinable {|| a(); }; iter::repeat(100u) {||
join(t); spawn {|| a(); }
}
} }

View File

@ -1,15 +0,0 @@
// -*- rust -*-
use std;
import task::*;
fn main() {
let other = spawn_joinable {|| child(); };
#error("1");
yield();
join(other);
#error("3");
}
fn child() { #error("2"); }

View File

@ -1,21 +0,0 @@
// -*- rust -*-
// xfail-win32
use std;
import task;
import comm::port;
import comm::recv;
fn child() { assert (1 == 2); }
fn parent() {
// Since this task isn't supervised it won't bring down the whole
// process
task::unsupervise();
let p = port::<int>();
task::spawn {|| child(); };
let x = recv(p);
}
fn main() {
task::spawn {|| parent(); };
}

View File

@ -9,12 +9,13 @@ fn die() {
} }
fn iloop() { fn iloop() {
task::unsupervise();
task::spawn {|| die(); }; task::spawn {|| die(); };
} }
fn main() { fn main() {
uint::range(0u, 100u) {|_i| uint::range(0u, 100u) {|_i|
task::spawn {|| iloop(); }; let builder = task::mk_task_builder();
task::unsupervise(builder);
task::run(builder) {|| iloop(); };
} }
} }

View File

@ -12,7 +12,7 @@ fn getbig(&&i: int) {
fn main() { fn main() {
let sz = 400u; let sz = 400u;
while sz < 500u { while sz < 500u {
task::join(task::spawn_joinable {|| getbig(200) }); task::try {|| getbig(200) };
sz += 1u; sz += 1u;
} }
} }

View File

@ -61,6 +61,6 @@ fn main() {
for f in fns { for f in fns {
let sz = rng.next() % 256u32 + 256u32; let sz = rng.next() % 256u32 + 256u32;
let frame_backoff = rng.next() % 10u32 + 1u32; let frame_backoff = rng.next() % 10u32 + 1u32;
task::join(task::spawn_joinable {|| runtest(f, frame_backoff);}); task::try {|| runtest(f, frame_backoff) };
} }
} }

View File

@ -9,7 +9,6 @@ fn die() {
} }
fn iloop() { fn iloop() {
task::unsupervise();
task::spawn {|| die(); }; task::spawn {|| die(); };
let p = comm::port::<()>(); let p = comm::port::<()>();
let c = comm::chan(p); let c = comm::chan(p);
@ -23,6 +22,8 @@ fn iloop() {
fn main() { fn main() {
uint::range(0u, 16u) {|_i| uint::range(0u, 16u) {|_i|
task::spawn {|| iloop(); }; let builder = task::mk_task_builder();
task::unsupervise(builder);
task::run(builder) {|| iloop(); }
} }
} }

View File

@ -1,9 +0,0 @@
use std;
import task::join;
import task::spawn_joinable;
fn main() { let x = spawn_joinable {|| m::child(10); }; join(x); }
mod m {
fn child(&&i: int) { log(debug, i); }
}

View File

@ -5,8 +5,7 @@ use std;
import task; import task;
fn main() { fn main() {
let t = task::spawn_joinable {|| child(10); }; task::spawn {|| child(10); };
task::join(t);
} }
fn child(&&i: int) { log(error, i); assert (i == 10); } fn child(&&i: int) { log(error, i); assert (i == 10); }

View File

@ -1,14 +1,8 @@
use std;
import task::spawn_joinable;
import task::join;
fn main() { test00(); } fn main() { test00(); }
fn start() { #debug("Started / Finished task."); } fn start() { #debug("Started / Finished task."); }
fn test00() { fn test00() {
let t = spawn_joinable {|| start(); }; task::try {|| start() };
join(t);
#debug("Completing."); #debug("Completing.");
} }

View File

@ -7,7 +7,9 @@ fn start(&&task_number: int) { #debug("Started / Finished task."); }
fn test00() { fn test00() {
let i: int = 0; let i: int = 0;
let t = task::spawn_joinable {|| start(i); }; let builder = task::mk_task_builder();
let r = task::future_result(builder);
task::run(builder) {|| start(i); };
// Sleep long enough for the task to finish. // Sleep long enough for the task to finish.
let i = 0; let i = 0;
@ -17,7 +19,7 @@ fn test00() {
} }
// Try joining tasks that have already finished. // Try joining tasks that have already finished.
task::join(t); future::get(r);
#debug("Joined task."); #debug("Joined task.");
} }

View File

@ -12,7 +12,6 @@ fn main() {
#debug("Check that we don't deadlock."); #debug("Check that we don't deadlock.");
let p = comm::port::<int>(); let p = comm::port::<int>();
let ch = comm::chan(p); let ch = comm::chan(p);
let a = task::spawn_joinable {|| start(ch, 0, 10); }; task::try {|| start(ch, 0, 10) };
task::join(a);
#debug("Joined task"); #debug("Joined task");
} }

View File

@ -1,31 +0,0 @@
// xfail-win32
use std;
import task;
fn main() {
#debug("===== SPAWNING and JOINING THREAD TASKS =====");
test00();
}
fn start(&&task_number: int) {
#debug("Started task.");
let i: int = 0;
while i < 10000 { i = i + 1; }
#debug("Finished task.");
}
fn test00() {
let number_of_tasks: int = 8;
let i: int = 0;
let tasks = [];
while i < number_of_tasks {
i = i + 1;
tasks += [task::spawn_joinable {|| start(i); }];
}
for t in tasks { task::join(t); }
#debug("Joined all task.");
}

View File

@ -30,17 +30,19 @@ fn test00() {
let i: int = 0; let i: int = 0;
// Create and spawn tasks... // Create and spawn tasks...
let tasks = []; let results = [];
while i < number_of_tasks { while i < number_of_tasks {
tasks += [task::spawn_joinable {|| let builder = task::mk_task_builder();
results += [task::future_result(builder)];
task::run(builder) {||
test00_start(ch, i, number_of_messages) test00_start(ch, i, number_of_messages)
}]; }
i = i + 1; i = i + 1;
} }
// Read from spawned tasks... // Read from spawned tasks...
let sum = 0; let sum = 0;
for t in tasks { for r in results {
i = 0; i = 0;
while i < number_of_messages { while i < number_of_messages {
let value = recv(po); let value = recv(po);
@ -50,7 +52,7 @@ fn test00() {
} }
// Join spawned tasks... // Join spawned tasks...
for t in tasks { task::join(t); } for r in results { future::get(r); }
#debug("Completed: Final number is: "); #debug("Completed: Final number is: ");
log(error, sum); log(error, sum);

View File

@ -1,9 +1,6 @@
use std; use std;
import task; import task;
import comm; import comm;
import comm::chan;
import comm::recv;
import comm::port;
fn main() { test00(); } fn main() { test00(); }
@ -15,40 +12,35 @@ fn test00_start(c: comm::chan<int>, start: int, number_of_messages: int) {
fn test00() { fn test00() {
let r: int = 0; let r: int = 0;
let sum: int = 0; let sum: int = 0;
let p = port(); let p = comm::port();
let number_of_messages: int = 10; let number_of_messages: int = 10;
let c = chan(p); let c = comm::chan(p);
let t0 = task::spawn_joinable {|| task::spawn {||
test00_start(c, number_of_messages * 0, number_of_messages); test00_start(c, number_of_messages * 0, number_of_messages);
}; }
let t1 = task::spawn_joinable {|| task::spawn {||
test00_start(c, number_of_messages * 1, number_of_messages); test00_start(c, number_of_messages * 1, number_of_messages);
}; }
let t2 = task::spawn_joinable {|| task::spawn {||
test00_start(c, number_of_messages * 2, number_of_messages); test00_start(c, number_of_messages * 2, number_of_messages);
}; }
let t3 = task::spawn_joinable {|| task::spawn {||
test00_start(c, number_of_messages * 3, number_of_messages); test00_start(c, number_of_messages * 3, number_of_messages);
}; }
let i: int = 0; let i: int = 0;
while i < number_of_messages { while i < number_of_messages {
r = recv(p); r = comm::recv(p);
sum += r; sum += r;
r = recv(p); r = comm::recv(p);
sum += r; sum += r;
r = recv(p); r = comm::recv(p);
sum += r; sum += r;
r = recv(p); r = comm::recv(p);
sum += r; sum += r;
i += 1; i += 1;
} }
task::join(t0);
task::join(t1);
task::join(t2);
task::join(t3);
assert (sum == number_of_messages * 4 * (number_of_messages * 4 - 1) / 2); assert (sum == number_of_messages * 4 * (number_of_messages * 4 - 1) / 2);
} }

View File

@ -1,51 +0,0 @@
use std;
import task;
import comm;
fn main() { test00(); }
fn test00_start(c: comm::chan<int>, start: int, number_of_messages: int) {
let i: int = 0;
while i < number_of_messages { comm::send(c, start + i); i += 1; }
}
fn test00() {
let r: int = 0;
let sum: int = 0;
let p = comm::port();
let c = comm::chan(p);
let number_of_messages: int = 10;
let t0 = task::spawn_joinable {||
test00_start(c, number_of_messages * 0, number_of_messages);
};
let t1 = task::spawn_joinable {||
test00_start(c, number_of_messages * 1, number_of_messages);
};
let t2 = task::spawn_joinable {||
test00_start(c, number_of_messages * 2, number_of_messages);
};
let t3 = task::spawn_joinable {||
test00_start(c, number_of_messages * 3, number_of_messages);
};
let i: int = 0;
while i < number_of_messages {
r = comm::recv(p);
sum += r;
r = comm::recv(p);
sum += r;
r = comm::recv(p);
sum += r;
r = comm::recv(p);
sum += r;
i += 1;
}
task::join(t0);
task::join(t1);
task::join(t2);
task::join(t3);
assert (sum == number_of_messages * 4 * (number_of_messages * 4 - 1) / 2);
}

View File

@ -16,9 +16,11 @@ fn test00() {
let number_of_messages: int = 10; let number_of_messages: int = 10;
let ch = comm::chan(p); let ch = comm::chan(p);
let t0 = task::spawn_joinable {|| let builder = task::mk_task_builder();
let r = task::future_result(builder);
task::run(builder) {||
test00_start(ch, number_of_messages); test00_start(ch, number_of_messages);
}; }
let i: int = 0; let i: int = 0;
while i < number_of_messages { while i < number_of_messages {
@ -27,7 +29,7 @@ fn test00() {
i += 1; i += 1;
} }
task::join(t0); future::get(r);
assert (sum == number_of_messages * (number_of_messages - 1) / 2); assert (sum == number_of_messages * (number_of_messages - 1) / 2);
} }

View File

@ -38,20 +38,20 @@ fn test00() {
let i: int = 0; let i: int = 0;
let tasks = []; let results = [];
while i < number_of_tasks { while i < number_of_tasks {
i = i + 1; i = i + 1;
tasks += [ let builder = task::mk_task_builder();
task::spawn_joinable {|| test00_start(ch, i, number_of_messages);} results += [task::future_result(builder)];
]; task::run(builder) {|| test00_start(ch, i, number_of_messages);}
} }
let sum: int = 0; let sum: int = 0;
for t in tasks { for r in results {
i = 0; i = 0;
while i < number_of_messages { sum += recv(po); i = i + 1; } while i < number_of_messages { sum += recv(po); i = i + 1; }
} }
for t in tasks { task::join(t); } for r in results { future::get(r); }
#debug("Completed: Final number is: "); #debug("Completed: Final number is: ");
assert (sum == assert (sum ==
@ -123,14 +123,16 @@ fn test06() {
let i: int = 0; let i: int = 0;
let tasks = []; let results = [];
while i < number_of_tasks { while i < number_of_tasks {
i = i + 1; i = i + 1;
tasks += [task::spawn_joinable {|| test06_start(i);}]; let builder = task::mk_task_builder();
results += [task::future_result(builder)];
task::run(builder) {|| test06_start(i);};
} }
for t in tasks { task::join(t); } for r in results { future::get(r); }
} }

View File

@ -1,18 +1,17 @@
// xfail-win32 // xfail-win32
// Create a task that is supervised by another task,
// join the supervised task from the supervising task, // Create a task that is supervised by another task, join the supervised task
// then fail the supervised task. The supervised task // from the supervising task, then fail the supervised task. The supervised
// will kill the supervising task, waking it up. The // task will kill the supervising task, waking it up. The supervising task no
// supervising task no longer needs to be wakened when // longer needs to be wakened when the supervised task exits.
// the supervised task exits.
use std; use std;
import task; import task;
fn supervised() { fn supervised() {
// Yield to make sure the supervisor joins before we // Yield to make sure the supervisor joins before we fail. This is
// fail. This is currently not needed because the supervisor // currently not needed because the supervisor runs first, but I can
// runs first, but I can imagine that changing. // imagine that changing.
task::yield(); task::yield();
fail; fail;
} }
@ -20,15 +19,14 @@ fn supervised() {
fn supervisor() { fn supervisor() {
// Unsupervise this task so the process doesn't return a failure status as // Unsupervise this task so the process doesn't return a failure status as
// a result of the main task being killed. // a result of the main task being killed.
task::unsupervise();
let f = supervised; let f = supervised;
let t = task::spawn_joinable {|| supervised(); }; task::try {|| supervised() };
task::join(t);
} }
fn main() { fn main() {
let dom2 = task::spawn_joinable {|| supervisor(); }; let builder = task::mk_task_builder();
task::join(dom2); task::unsupervise(builder);
task::run(builder) {|| supervisor(); }
} }
// Local Variables: // Local Variables:

View File

@ -1,18 +0,0 @@
fn stringifier(from_par: comm::port<uint>,
to_par: comm::chan<str>) {
let value: uint;
do {
value = comm::recv(from_par);
comm::send(to_par, uint::to_str(value, 10u));
} while value != 0u;
}
fn main() {
let t = task::spawn_connected(stringifier);
comm::send(t.to_child, 22u);
assert comm::recv(t.from_child) == "22";
comm::send(t.to_child, 23u);
assert comm::recv(t.from_child) == "23";
comm::send(t.to_child, 0u);
assert comm::recv(t.from_child) == "0";
}

View File

@ -11,14 +11,14 @@ fn test_cont() { let i = 0; while i < 1 { i += 1; let x: @int = cont; } }
fn test_ret() { let x: @int = ret; } fn test_ret() { let x: @int = ret; }
fn test_fail() { fn test_fail() {
fn f() { task::unsupervise(); let x: @int = fail; } fn f() { let x: @int = fail; }
task::spawn {|| f(); }; task::try {|| f() };
} }
fn test_fail_indirect() { fn test_fail_indirect() {
fn f() -> ! { fail; } fn f() -> ! { fail; }
fn g() { task::unsupervise(); let x: @int = f(); } fn g() { let x: @int = f(); }
task::spawn {|| g(); }; task::try {|| g() };
} }
fn main() { fn main() {

View File

@ -5,8 +5,9 @@
// that it doesn't bring down the whole proc // that it doesn't bring down the whole proc
fn main() { fn main() {
task::spawn {|| let builder = task::mk_task_builder();
task::unsupervise(); task::unsupervise(builder);
task::run(builder) {||
fn f() { f() }; fn f() { f() };
f(); f();
}; };

View File

@ -3,11 +3,12 @@ use std;
import task; import task;
fn f() { fn f() {
task::unsupervise();
let a = @0; let a = @0;
fail; fail;
} }
fn main() { fn main() {
task::spawn {|| f(); }; let builder = task::mk_task_builder();
task::unsupervise(builder);
task::run(builder) {|| f(); }
} }

View File

@ -8,7 +8,6 @@ resource complainer(c: comm::chan<bool>) {
} }
fn f(c: comm::chan<bool>) { fn f(c: comm::chan<bool>) {
task::unsupervise();
let c <- complainer(c); let c <- complainer(c);
fail; fail;
} }
@ -16,6 +15,8 @@ fn f(c: comm::chan<bool>) {
fn main() { fn main() {
let p = comm::port(); let p = comm::port();
let c = comm::chan(p); let c = comm::chan(p);
task::spawn {|| f(c); }; let builder = task::mk_task_builder();
task::unsupervise(builder);
task::run(builder) {|| f(c); }
assert comm::recv(p); assert comm::recv(p);
} }

View File

@ -7,11 +7,12 @@ resource complainer(c: @int) {
} }
fn f() { fn f() {
task::unsupervise();
let c <- complainer(@0); let c <- complainer(@0);
fail; fail;
} }
fn main() { fn main() {
task::spawn {|| f(); }; let builder = task::mk_task_builder();
task::unsupervise(builder);
task::run(builder) {|| f(); }
} }

View File

@ -3,11 +3,12 @@ use std;
import task; import task;
fn f() { fn f() {
task::unsupervise();
let a = ~0; let a = ~0;
fail; fail;
} }
fn main() { fn main() {
task::spawn {|| f(); }; let builder = task::mk_task_builder();
task::unsupervise(builder);
task::run(builder) {|| f(); }
} }

View File

@ -4,13 +4,15 @@ import task;
import task::*; import task::*;
fn main() { fn main() {
let other = task::spawn_joinable {|| child(); }; let builder = task::mk_task_builder();
let result = task::future_result(builder);
task::run(builder) {|| child(); }
#error("1"); #error("1");
yield(); yield();
#error("2"); #error("2");
yield(); yield();
#error("3"); #error("3");
join(other); future::get(result);
} }
fn child() { fn child() {

View File

@ -4,10 +4,12 @@ import task;
import task::*; import task::*;
fn main() { fn main() {
let other = task::spawn_joinable {|| child(); }; let builder = task::mk_task_builder();
let result = task::future_result(builder);
task::run(builder) {|| child(); }
#error("1"); #error("1");
yield(); yield();
join(other); future::get(result);
} }
fn child() { #error("2"); } fn child() { #error("2"); }