diff --git a/doc/tutorial.md b/doc/tutorial.md index 4cad3cd2009..1039f25df3f 100644 --- a/doc/tutorial.md +++ b/doc/tutorial.md @@ -2375,10 +2375,10 @@ module `task`. Let's begin with the simplest one, `task::spawn()`: ~~~~ let some_value = 22; -let child_task = task::spawn {|| +task::spawn {|| std::io::println("This executes in the child task."); std::io::println(#fmt("%d", some_value)); -}; +} ~~~~ The argument to `task::spawn()` is a [unique @@ -2456,70 +2456,66 @@ let result = comm::recv(port); ## Creating a task with a bi-directional communication path A very common thing to do is to spawn a child task where the parent -and child both need to exchange messages with each other. The function -`task::spawn_connected()` supports this pattern. We'll look briefly at -how it is used. +and child both need to exchange messages with each +other. The function `task::spawn_listener()` supports this pattern. We'll look +briefly at how it is used. -To see how `spawn_connected()` works, we will create a child task +To see how `spawn_listener()` works, we will create a child task which receives `uint` messages, converts them to a string, and sends the string in response. The child terminates when `0` is received. Here is the function which implements the child task: ~~~~ -fn stringifier(from_par: comm::port, - to_par: comm::chan) { +fn stringifier(from_parent: comm::port, + to_parent: comm::chan) { let value: uint; do { - value = comm::recv(from_par); - comm::send(to_par, uint::to_str(value, 10u)); + value = comm::recv(from_parent); + comm::send(to_parent, uint::to_str(value, 10u)); } while value != 0u; } ~~~~ + You can see that the function takes two parameters. The first is a port used to receive messages from the parent, and the second is a channel used to send messages to the parent. The body itself simply loops, reading from the `from_par` port and then sending its response to the `to_par` channel. The actual response itself is simply the strified version of the received value, `uint::to_str(value)`. - + Here is the code for the parent task: + ~~~~ -# fn stringifier(from_par: comm::port, -# to_par: comm::chan) { -# comm::send(to_par, "22"); -# comm::send(to_par, "23"); -# comm::send(to_par, "0"); +# fn stringifier(from_parent: comm::port, +# to_parent: comm::chan) { +# comm::send(to_parent, "22"); +# comm::send(to_parent, "23"); +# comm::send(to_parent, "0"); # } fn main() { - let t = task::spawn_connected(stringifier); - comm::send(t.to_child, 22u); - assert comm::recv(t.from_child) == "22"; - comm::send(t.to_child, 23u); - assert comm::recv(t.from_child) == "23"; - comm::send(t.to_child, 0u); - assert comm::recv(t.from_child) == "0"; + let from_child = comm::port(); + let to_parent = comm::chan(from_child); + let to_child = task::spawn_listener {|from_parent| + stringifier(from_parent, to_parent); + }; + comm::send(to_child, 22u); + assert comm::recv(from_child) == "22"; + comm::send(to_child, 23u); + assert comm::recv(from_child) == "23"; + comm::send(to_child, 0u); + assert comm::recv(from_child) == "0"; } ~~~~ -The call to `spawn_connected()` on the first line will instantiate the -various ports and channels and startup the child task. The returned -value, `t`, is a record of type `task::connected_task`. In -addition to the task id of the child, this record defines two fields, -`from_child` and `to_child`, which contain the port and channel -respectively for communicating with the child. Those fields are used -here to send and receive three messages from the child task. - -## Joining a task - -The function `spawn_joinable()` is used to spawn a task that can later -be joined. This is implemented by having the child task send a message -when it has completed (either successfully or by failing). Therefore, -`spawn_joinable()` returns a structure containing both the task ID and -the port where this message will be sent---this structure type is -called `task::joinable_task`. The structure can be passed to -`task::join()`, which simply blocks on the port, waiting to receive -the message from the child task. +The parent first sets up a port to receive data from and a channel +that the child can use to send data to that port. The call to +`spawn_listener()` will spawn the child task, providing it with a port +on which to receive data from its parent, and returning to the parent +the associated channel. Finally, the closure passed to +`spawn_listener()` that forms the body of the child task captures the +`to_parent` channel in its environment, so both parent and child +can send and receive data to and from the other. ## The supervisor relationship diff --git a/src/comp/driver/rustc.rs b/src/comp/driver/rustc.rs index 0239f3921de..5186290e4a4 100644 --- a/src/comp/driver/rustc.rs +++ b/src/comp/driver/rustc.rs @@ -143,8 +143,6 @@ fn monitor(f: fn~(diagnostic::emitter)) { alt task::try {|| - task::unsupervise(); - // The 'diagnostics emitter'. Every error, warning, etc. should // go through this function. let demitter = fn@(cmsp: option<(codemap::codemap, codemap::span)>, diff --git a/src/compiletest/procsrv.rs b/src/compiletest/procsrv.rs index b856b092e7a..abdd0fb9ab0 100644 --- a/src/compiletest/procsrv.rs +++ b/src/compiletest/procsrv.rs @@ -54,11 +54,11 @@ fn run(lib_path: str, prog: str, args: [str], writeclose(pipe_in.out, input); let p = comm::port(); let ch = comm::chan(p); - task::spawn_sched(1u) {|| + task::spawn_sched(task::single_threaded) {|| let errput = readclose(pipe_err.in); comm::send(ch, (2, errput)); }; - task::spawn_sched(1u) {|| + task::spawn_sched(task::single_threaded) {|| let output = readclose(pipe_out.in); comm::send(ch, (1, output)); }; diff --git a/src/libcore/comm.rs b/src/libcore/comm.rs index 884e920fd95..636fc702d35 100644 --- a/src/libcore/comm.rs +++ b/src/libcore/comm.rs @@ -35,8 +35,9 @@ enum rust_port {} #[abi = "cdecl"] native mod rustrt { + fn get_task_id() -> task_id; fn chan_id_send(t: *sys::type_desc, - target_task: task::task, target_port: port_id, + target_task: task_id, target_port: port_id, data: T) -> ctypes::uintptr_t; fn new_port(unit_sz: ctypes::size_t) -> *rust_port; @@ -58,6 +59,7 @@ native mod rusti { fn call_with_retptr(&&f: fn@(*uint)) -> T; } +type task_id = int; type port_id = int; // It's critical that this only have one variant, so it has a record @@ -75,7 +77,7 @@ type port_id = int; over other channels." )] enum chan { - chan_t(task::task, port_id) + chan_t(task_id, port_id) } resource port_ptr(po: *rust_port) { @@ -208,7 +210,7 @@ fn peek(p: port) -> bool { port used to construct it." )] fn chan(p: port) -> chan { - chan_t(task::get_task(), rustrt::get_port_id(***p)) + chan_t(rustrt::get_task_id(), rustrt::get_port_id(***p)) } #[test] diff --git a/src/libcore/task.rs b/src/libcore/task.rs index ad1f4a7fc7a..79316d5310d 100644 --- a/src/libcore/task.rs +++ b/src/libcore/task.rs @@ -1,5 +1,4 @@ -/* -Module: task +#[doc = " Task management. @@ -13,58 +12,561 @@ true: when a parent task fails its children will continue executing. When the root (main) task fails, all tasks fail, and then so does the entire process. -A task may remove itself from this failure propagation mechanism by -calling the function, after which failure will only -result in the termination of that task. - Tasks may execute in parallel and are scheduled automatically by the runtime. Example: -> spawn {|| -> log(debug, "Hello, World!"); -> }; + spawn {|| + log(error, \"Hello, World!\"); + } -*/ -import cast = unsafe::reinterpret_cast; -import comm; -import ptr; -import c = ctypes; +"]; export task; -export joinable_task; -export yield; -export task_notification; -export join; -export unsupervise; export task_result; -export tr_success; -export tr_failure; -export get_task; +export notification; +export sched_mode; +export sched_opts; +export task_opts; +export task_builder::{}; + +export default_task_opts; +export mk_task_builder; +export get_opts; +export set_opts; +export add_wrapper; +export run; + +export future_result; +export future_task; +export unsupervise; +export run_listener; + export spawn; -export spawn_joinable; -export spawn_connected; +export spawn_listener; export spawn_sched; -export connected_fn; -export connected_task; -export currently_unwinding; export try; +export yield; +export failing; +export get_task; + + +/* Data types */ + +#[doc = "A handle to a task"] +enum task = task_id; + +#[doc = " + +Indicates the manner in which a task exited. + +A task that completes without failing and whose supervised children complete +without failing is considered to exit successfully. + +FIXME: This description does not indicate the current behavior for linked +failure. + +"] +enum task_result { + success, + failure, +} + +#[doc = " + +A message type for notifying of task lifecycle events + +"] +enum notification { + #[doc = "Sent when a task exits with the task handle and result"] + exit(task, task_result) +} + +#[doc = "Scheduler modes"] +enum sched_mode { + #[doc = "All tasks run in the same OS thread"] + single_threaded, + #[doc = "Tasks are distributed among available CPUs"] + thread_per_core, + #[doc = "Each task runs in its own OS thread"] + thread_per_task, + #[doc = "Tasks are distributed among a fixed number of OS threads"] + manual_threads(uint), +} + +#[doc = " + +Scheduler configuration options + +Fields: + +* sched_mode - The operating mode of the scheduler + +* native_stack_size - The size of the native stack, in bytes + + Rust code runs on Rust-specific stacks. When Rust code calls native code + (via functions in native modules) it switches to a typical, large stack + appropriate for running code written in languages like C. By default these + native stacks have unspecified size, but with this option their size can + be precisely specified. + +"] +type sched_opts = { + mode: sched_mode, + native_stack_size: option, +}; + +#[doc = " + +Task configuration options + +Fields: + +* supervise - Do not propagate failure to the parent task + + All tasks are linked together via a tree, from parents to children. By + default children are 'supervised' by their parent and when they fail + so too will their parents. Settings this flag to false disables that + behavior. + +* notify_chan - Enable lifecycle notifications on the given channel + +* sched - Specify the configuration of a new scheduler to create the task in + + By default, every task is created in the same scheduler as its + parent, where it is scheduled cooperatively with all other tasks + in that scheduler. Some specialized applications may want more + control over their scheduling, in which case they can be spawned + into a new scheduler with the specific properties required. + + This is of particular importance for libraries which want to call + into native code that blocks. Without doing so in a different + scheduler other tasks will be impeded or even blocked indefinitely. + +"] +type task_opts = { + supervise: bool, + notify_chan: option>, + sched: option, +}; + +#[doc = " + +The task builder type. + +Provides detailed control over the properties and behavior of new tasks. + +"] +// NB: Builders are designed to be single-use because they do stateful +// things that get weird when reusing - e.g. if you create a result future +// it only applies to a single task, so then you have to maintain some +// potentially tricky state to ensure that everything behaves correctly +// when you try to reuse the builder to spawn a new task. We'll just +// sidestep that whole issue by making builder's uncopyable and making +// the run function move them in. +enum task_builder = { + mutable opts: task_opts, + mutable gen_body: fn@(+fn~()) -> fn~(), + can_not_copy: option> +}; + + +/* Task construction */ + +fn default_task_opts() -> task_opts { + #[doc = " + + The default task options + + By default all tasks are supervised by their parent, are spawned + into the same scheduler, and do not post lifecycle notifications. + + "]; + + { + supervise: true, + notify_chan: none, + sched: none + } +} + +fn mk_task_builder() -> task_builder { + #[doc = "Construct a task_builder"]; + + let body_identity = fn@(+body: fn~()) -> fn~() { body }; + + task_builder({ + mutable opts: default_task_opts(), + mutable gen_body: body_identity, + can_not_copy: none + }) +} + +fn get_opts(builder: task_builder) -> task_opts { + #[doc = "Get the task_opts associated with a task_builder"]; + + builder.opts +} + +fn set_opts(builder: task_builder, opts: task_opts) { + #[doc = " + + Set the task_opts associated with a task_builder + + To update a single option use a pattern like the following: + + set_opts(builder, { + supervise: false + with get_opts(builder) + }); + + "]; + + builder.opts = opts; +} + +fn add_wrapper(builder: task_builder, gen_body: fn@(+fn~()) -> fn~()) { + #[doc = " + + Add a wrapper to the body of the spawned task. + + Before the task is spawned it is passed through a 'body generator' + function that may perform local setup operations as well as wrap + the task body in remote setup operations. With this the behavior + of tasks can be extended in simple ways. + + This function augments the current body generator with a new body + generator by applying the task body which results from the + existing body generator to the new body generator. + + "]; + + let prev_gen_body = builder.gen_body; + builder.gen_body = fn@(+body: fn~()) -> fn~() { + gen_body(prev_gen_body(body)) + }; +} + +fn run(-builder: task_builder, +f: fn~()) { + #[doc(desc = " + + Creates and exucutes a new child task + + Sets up a new task with its own call stack and schedules it to run + the provided unique closure. The task has the properties and behavior + specified by `builder`. + + ", failure = " + + When spawning into a new scheduler, the number of threads requested + must be greater than zero. + + ")]; + + let body = builder.gen_body(f); + spawn_raw(builder.opts, body); +} + + +/* Builder convenience functions */ + +fn future_result(builder: task_builder) -> future::future { + #[doc = " + + Get a future representing the exit status of the task. + + Taking the value of the future will block until the child task terminates. + + Note that the future returning by this function is only useful for + obtaining the value of the next task to be spawning with the + builder. If additional tasks are spawned with the same builder + then a new result future must be obtained prior to spawning each + task. + + "]; + + // FIXME (1087, 1857): Once linked failure and notification are + // handled in the library, I can imagine implementing this by just + // registering an arbitrary number of task::on_exit handlers and + // sending out messages. + + let po = comm::port(); + let ch = comm::chan(po); + + set_opts(builder, { + notify_chan: some(ch) + with get_opts(builder) + }); + + future::from_fn {|| + alt comm::recv(po) { + exit(_, result) { result } + } + } +} + +fn future_task(builder: task_builder) -> future::future { + #[doc = "Get a future representing the handle to the new task"]; + + let po = comm::port(); + let ch = comm::chan(po); + add_wrapper(builder) {|body| + fn~[move body]() { + comm::send(ch, get_task()); + body(); + } + } + future::from_port(po) +} + +fn unsupervise(builder: task_builder) { + #[doc = "Configures the new task to not propagate failure to its parent"]; + + set_opts(builder, { + supervise: false + with get_opts(builder) + }); +} + +fn run_listener(-builder: task_builder, + +f: fn~(comm::port)) -> comm::chan { + #[doc = " + + Runs a new task while providing a channel from the parent to the child + + Sets up a communication channel from the current task to the new + child task, passes the port to child's body, and returns a channel + linked to the port to the parent. + + This encapsulates some boilerplate handshaking logic that would + otherwise be required to establish communication from the parent + to the child. + "]; + + let setup_po = comm::port(); + let setup_ch = comm::chan(setup_po); + + run(builder, fn~[move f]() { + let po = comm::port(); + let ch = comm::chan(po); + comm::send(setup_ch, ch); + f(po); + }); + + comm::recv(setup_po) +} + + +/* Spawn convenience functions */ + +fn spawn(+f: fn~()) { + #[doc = " + + Creates and exucutes a new child task + + Sets up a new task with its own call stack and schedules it to run + the provided unique closure. + + This function is equivalent to `run(mk_task_builder(), f)`. + "]; + + run(mk_task_builder(), f); +} + +fn spawn_listener(+f: fn~(comm::port)) -> comm::chan { + #[doc = " + + Runs a new task while providing a channel from the parent to the child + + Sets up a communication channel from the current task to the new + child task, passes the port to child's body, and returns a channel + linked to the port to the parent. + + This encapsulates some boilerplate handshaking logic that would + otherwise be required to establish communication from the parent + to the child. + + The simplest way to establish bidirectional communication between + a parent in child is as follows: + + let po = comm::port(); + let ch = comm::chan(po); + let ch = spawn_listener {|po| + // Now the child has a port called 'po' to read from and + // an environment-captured channel called 'ch'. + }; + // Likewise, the parent has both a 'po' and 'ch' + + This function is equivalent to `run_listener(mk_task_builder(), f)`. + + "]; + + run_listener(mk_task_builder(), f) +} + +fn spawn_sched(mode: sched_mode, +f: fn~()) { + #[doc(desc = " + + Creates a new scheduler and executes a task on it + + Tasks subsequently spawned by that task will also execute on + the new scheduler. When there are no more tasks to execute the + scheduler terminates. + + ", failure = " + + In manual threads mode the number of threads requested must be + greater than zero. + + ")]; + + let builder = mk_task_builder(); + set_opts(builder, { + sched: some({ + mode: mode, + native_stack_size: none + }) + with get_opts(builder) + }); + run(builder, f); +} + +fn try(+f: fn~() -> T) -> result::t { + #[doc(desc = " + + Execute a function in another task and return either the return value + of the function or result::err. + + ", return = " + + If the function executed successfully then try returns result::ok + containing the value returned by the function. If the function fails + then try returns result::err containing nil. + + ")]; + + let po = comm::port(); + let ch = comm::chan(po); + let builder = mk_task_builder(); + unsupervise(builder); + let result = future_result(builder); + run(builder, fn~[move f]() { + comm::send(ch, f()); + }); + alt future::get(result) { + success { result::ok(comm::recv(po)) } + failure { result::err(()) } + } +} + + +/* Lifecycle functions */ + +fn yield() { + #[doc = "Yield control to the task scheduler"]; + + let task_ = rustrt::rust_get_task(); + let killed = false; + rusti::task_yield(task_, killed); + if killed && !failing() { + fail "killed"; + } +} + +fn failing() -> bool { + #[doc = "True if the running task has failed"]; + + rustrt::rust_task_is_unwinding(rustrt::rust_get_task()) +} + +fn get_task() -> task { + #[doc = "Get a handle to the running task"]; + + task(rustrt::get_task_id()) +} + + +/* Internal */ + +type sched_id = int; +type task_id = int; + +// These are both opaque runtime/compiler types that we don't know the +// structure of and should only deal with via unsafe pointer +type rust_task = ctypes::void; +type rust_closure = ctypes::void; + +fn spawn_raw(opts: task_opts, +f: fn~()) unsafe { + + let f = if opts.supervise { + f + } else { + // FIXME: The runtime supervision API is weird here because it + // was designed to let the child unsupervise itself, when what + // we actually want is for parents to unsupervise new + // children. + fn~[move f]() { + rustrt::unsupervise(); + f(); + } + }; + + let fptr = ptr::addr_of(f); + let closure: *rust_closure = unsafe::reinterpret_cast(fptr); + + let task_id = alt opts.sched { + none { + rustrt::new_task() + } + some(sched_opts) { + new_task_in_new_sched(sched_opts) + } + }; + + option::may(opts.notify_chan) {|c| + // FIXME (1087): Would like to do notification in Rust + rustrt::rust_task_config_notify(task_id, c); + } + + rustrt::start_task(task_id, closure); + unsafe::leak(f); + + fn new_task_in_new_sched(opts: sched_opts) -> task_id { + if opts.native_stack_size != none { + fail "native_stack_size scheduler option unimplemented"; + } + + let num_threads = alt opts.mode { + single_threaded { 1u } + thread_per_core { + fail "thread_per_core scheduling mode unimplemented" + } + thread_per_task { + fail "thread_per_task scheduling mode unimplemented" + } + manual_threads(threads) { + if threads == 0u { + fail "can not create a scheduler with no threads"; + } + threads + } + }; + + let sched_id = rustrt::rust_new_sched(num_threads); + rustrt::rust_new_task_in_sched(sched_id) + } + +} + #[abi = "rust-intrinsic"] native mod rusti { - // these must run on the Rust stack so that they can swap stacks etc: fn task_yield(task: *rust_task, &killed: bool); } -type rust_closure = { - fnptr: c::intptr_t, envptr: c::intptr_t -}; - -#[link_name = "rustrt"] -#[abi = "cdecl"] native mod rustrt { fn rust_get_sched_id() -> sched_id; - fn rust_new_sched(num_threads: c::uintptr_t) -> sched_id; + fn rust_new_sched(num_threads: ctypes::uintptr_t) -> sched_id; fn get_task_id() -> task_id; fn rust_get_task() -> *rust_task; @@ -73,504 +575,353 @@ native mod rustrt { fn rust_new_task_in_sched(id: sched_id) -> task_id; fn rust_task_config_notify( - id: task_id, &&chan: comm::chan); + id: task_id, &&chan: comm::chan); - fn start_task(id: task, closure: *rust_closure); + fn start_task(id: task_id, closure: *rust_closure); fn rust_task_is_unwinding(rt: *rust_task) -> bool; fn unsupervise(); } -/* Section: Types */ -type rust_task = *ctypes::void; - -type sched_id = int; -type task_id = int; - -/* -Type: task - -A handle to a task -*/ -type task = task_id; - -/* -Function: spawn - -Creates and executes a new child task - -Sets up a new task with its own call stack and schedules it to be -executed. Upon execution, the closure `f()` will be invoked. - -Parameters: - -f - A function to execute in the new task - -Returns: - -A handle to the new task -*/ -fn spawn(+f: fn~()) -> task { - spawn_inner(f, none, new_task_in_this_sched) -} - -fn spawn_inner( - -f: fn~(), - notify: option>, - new_task: fn() -> task_id -) -> task unsafe { - let closure: *rust_closure = unsafe::reinterpret_cast(ptr::addr_of(f)); - #debug("spawn: closure={%x,%x}", (*closure).fnptr, (*closure).envptr); - let id = new_task(); - - // set up notifications if they are enabled. - option::may(notify) {|c| - rustrt::rust_task_config_notify(id, c); +#[test] +fn test_spawn_raw_simple() { + let po = comm::port(); + let ch = comm::chan(po); + spawn_raw(default_task_opts()) {|| + comm::send(ch, ()); } - - rustrt::start_task(id, closure); - unsafe::leak(f); - ret id; + comm::recv(po); } -fn new_task_in_this_sched() -> task_id { - rustrt::new_task() -} - -fn new_task_in_new_sched(num_threads: uint) -> task_id { - let sched_id = rustrt::rust_new_sched(num_threads); - rustrt::rust_new_task_in_sched(sched_id) -} - -/* -Function: spawn_sched - -Creates a new scheduler and executes a task on it. Tasks subsequently -spawned by that task will also execute on the new scheduler. When -there are no more tasks to execute the scheduler terminates. - -Arguments: - -num_threads - The number of OS threads to dedicate schedule tasks on -f - A unique closure to execute as a task on the new scheduler - -Failure: - -The number of threads must be greater than 0 - -*/ -fn spawn_sched(num_threads: uint, +f: fn~()) -> task { - if num_threads < 1u { - fail "Can not create a scheduler with no threads"; - } - spawn_inner(f, none, bind new_task_in_new_sched(num_threads)) -} - -/* -Type: joinable_task - -A task that sends notification upon termination -*/ -type joinable_task = (task, comm::port); - -fn spawn_joinable(+f: fn~()) -> joinable_task { - let notify_port = comm::port(); - let notify_chan = comm::chan(notify_port); - let task = spawn_inner(f, some(notify_chan), new_task_in_this_sched); - ret (task, notify_port); - /* - resource notify_rsrc(data: (comm::chan, - task, - @mutable task_result)) { - let (chan, task, tr) = data; - let msg = exit(task, *tr); - comm::send(chan, msg); - } - - let notify_port = comm::port(); - let notify_chan = comm::chan(notify_port); - let g = fn~[copy notify_chan; move f]() { - let this_task = rustrt::get_task_id(); - let result = @mutable tr_failure; - let _rsrc = notify_rsrc((notify_chan, this_task, result)); - f(); - *result = tr_success; // rsrc will fire msg when fn returns +#[test] +#[ignore(cfg(target_os = "win32"))] +fn test_spawn_raw_unsupervise() { + let opts = { + supervise: false + with default_task_opts() }; - let task = spawn(g); - ret (task, notify_port); - */ -} - -/* -Tag: task_result - -Indicates the manner in which a task exited -*/ -enum task_result { - /* Variant: tr_success */ - tr_success, - /* Variant: tr_failure */ - tr_failure, -} - -/* -Tag: task_notification - -Message sent upon task exit to indicate normal or abnormal termination -*/ -enum task_notification { - /* Variant: exit */ - exit(task, task_result), -} - -/* -Type: connected_fn - -The prototype for a connected child task function. Such a function will be -supplied with a channel to send messages to the parent and a port to receive -messages from the parent. The type parameter `ToCh` is the type for messages -sent from the parent to the child and `FrCh` is the type for messages sent -from the child to the parent. */ -type connected_fn = fn~(comm::port, comm::chan); - -/* -Type: connected_fn - -The result type of -*/ -type connected_task = { - from_child: comm::port, - to_child: comm::chan, - task: task -}; - -/* -Function: spawn_connected - -Spawns a child task along with a port/channel for exchanging messages -with the parent task. The type `ToCh` represents messages sent to the child -and `FrCh` messages received from the child. - -Parameters: - -f - the child function to execute - -Returns: - -The new child task along with the port to receive messages and the channel -to send messages. -*/ -fn spawn_connected(+f: connected_fn) - -> connected_task { - let from_child_port = comm::port::(); - let from_child_chan = comm::chan(from_child_port); - let get_to_child_port = comm::port::>(); - let get_to_child_chan = comm::chan(get_to_child_port); - let child_task = spawn(fn~[move f]() { - let to_child_port = comm::port::(); - comm::send(get_to_child_chan, comm::chan(to_child_port)); - f(to_child_port, from_child_chan); - }); - let to_child_chan = comm::recv(get_to_child_port); - ret {from_child: from_child_port, - to_child: to_child_chan, - task: child_task}; -} - -/* Section: Operations */ - -/* -Type: get_task - -Retreives a handle to the currently executing task -*/ -fn get_task() -> task { rustrt::get_task_id() } - -/* -Function: yield - -Yield control to the task scheduler - -The scheduler may schedule another task to execute. -*/ -fn yield() { - let task = rustrt::rust_get_task(); - let killed = false; - rusti::task_yield(task, killed); - if killed && !currently_unwinding() { - fail "killed"; + spawn_raw(opts) {|| + fail; } } -/* -Function: join +#[test] +#[ignore(cfg(target_os = "win32"))] +fn test_spawn_raw_notify() { + let task_po = comm::port(); + let task_ch = comm::chan(task_po); + let notify_po = comm::port(); + let notify_ch = comm::chan(notify_po); -Wait for a child task to exit + let opts = { + notify_chan: some(notify_ch) + with default_task_opts() + }; + spawn_raw(opts) {|| + comm::send(task_ch, get_task()); + } + let task_ = comm::recv(task_po); + assert comm::recv(notify_po) == exit(task_, success); -The child task must have been spawned with , which -produces a notification port that the child uses to communicate its -exit status. + let opts = { + supervise: false, + notify_chan: some(notify_ch) + with default_task_opts() + }; + spawn_raw(opts) {|| + comm::send(task_ch, get_task()); + fail; + } + let task_ = comm::recv(task_po); + assert comm::recv(notify_po) == exit(task_, failure); +} -Returns: +#[test] +fn test_run_basic() { + let po = comm::port(); + let ch = comm::chan(po); + let builder = mk_task_builder(); + run(builder) {|| + comm::send(ch, ()); + } + comm::recv(po); +} -A task_result indicating whether the task terminated normally or failed -*/ -fn join(task_port: joinable_task) -> task_result { - let (id, port) = task_port; - alt comm::recv::(port) { - exit(_id, res) { - if _id == id { - ret res - } else { - fail #fmt["join received id %d, expected %d", _id, id] +#[test] +fn test_add_wrapper() { + let po = comm::port(); + let ch = comm::chan(po); + let builder = mk_task_builder(); + add_wrapper(builder) {|body| + fn~() { + body(); + comm::send(ch, ()); } - } + } + run(builder) {||} + comm::recv(po); +} + +#[test] +#[ignore(cfg(target_os = "win32"))] +fn test_future_result() { + let builder = mk_task_builder(); + let result = future_result(builder); + run(builder) {||} + assert future::get(result) == success; + + let builder = mk_task_builder(); + let result = future_result(builder); + unsupervise(builder); + run(builder) {|| fail } + assert future::get(result) == failure; +} + +#[test] +fn test_future_task() { + let po = comm::port(); + let ch = comm::chan(po); + let builder = mk_task_builder(); + let task1 = future_task(builder); + run(builder) {|| comm::send(ch, get_task()) } + assert future::get(task1) == comm::recv(po); +} + +#[test] +fn test_spawn_listiner_bidi() { + let po = comm::port(); + let ch = comm::chan(po); + let ch = spawn_listener {|po| + // Now the child has a port called 'po' to read from and + // an environment-captured channel called 'ch'. + let res = comm::recv(po); + assert res == "ping"; + comm::send(ch, "pong"); + }; + // Likewise, the parent has both a 'po' and 'ch' + comm::send(ch, "ping"); + let res = comm::recv(po); + assert res == "pong"; +} + +#[test] +fn test_try_success() { + alt try {|| + "Success!" + } { + result::ok("Success!") { } + _ { fail; } } } -/* -Function: unsupervise - -Detaches this task from its parent in the task tree - -An unsupervised task will not propagate its failure up the task tree -*/ -fn unsupervise() { - rustrt::unsupervise(); +#[test] +#[ignore(cfg(target_os = "win32"))] +fn test_try_fail() { + alt try {|| + fail + } { + result::err(()) { } + _ { fail; } + } } -/* -Function: currently_unwinding() - -True if we are currently unwinding after a failure. -*/ -fn currently_unwinding() -> bool { - rustrt::rust_task_is_unwinding(rustrt::rust_get_task()) +#[test] +#[should_fail] +#[ignore(cfg(target_os = "win32"))] +fn test_spawn_sched_no_threads() { + spawn_sched(manual_threads(0u)) {|| }; } -/* -Function: try +#[test] +fn test_spawn_sched() { + let po = comm::port(); + let ch = comm::chan(po); -Execute a function in another task and return either the return value -of the function or result::err. + fn f(i: int, ch: comm::chan<()>) { + let parent_sched_id = rustrt::rust_get_sched_id(); -Returns: + spawn_sched(single_threaded) {|| + let child_sched_id = rustrt::rust_get_sched_id(); + assert parent_sched_id != child_sched_id; -If the function executed successfully then try returns result::ok -containing the value returned by the function. If the function fails -then try returns result::err containing nil. -*/ -fn try(+f: fn~() -> T) -> result::t { - let p = comm::port(); - let ch = comm::chan(p); - alt join(spawn_joinable {|| - unsupervise(); - comm::send(ch, f()); - }) { - tr_success { result::ok(comm::recv(p)) } - tr_failure { result::err(()) } + if (i == 0) { + comm::send(ch, ()); + } else { + f(i - 1, ch); + } + }; + + } + f(10, ch); + comm::recv(po); +} + +#[test] +fn test_spawn_sched_childs_on_same_sched() { + let po = comm::port(); + let ch = comm::chan(po); + + spawn_sched(single_threaded) {|| + let parent_sched_id = rustrt::rust_get_sched_id(); + spawn {|| + let child_sched_id = rustrt::rust_get_sched_id(); + // This should be on the same scheduler + assert parent_sched_id == child_sched_id; + comm::send(ch, ()); + }; + }; + + comm::recv(po); +} + +#[nolink] +#[cfg(test)] +native mod testrt { + fn rust_dbg_lock_create() -> *ctypes::void; + fn rust_dbg_lock_destroy(lock: *ctypes::void); + fn rust_dbg_lock_lock(lock: *ctypes::void); + fn rust_dbg_lock_unlock(lock: *ctypes::void); + fn rust_dbg_lock_wait(lock: *ctypes::void); + fn rust_dbg_lock_signal(lock: *ctypes::void); +} + +#[test] +fn test_spawn_sched_blocking() { + + // Testing that a task in one scheduler can block natively + // without affecting other schedulers + iter::repeat(20u) {|| + + let start_po = comm::port(); + let start_ch = comm::chan(start_po); + let fin_po = comm::port(); + let fin_ch = comm::chan(fin_po); + + let lock = testrt::rust_dbg_lock_create(); + + spawn_sched(single_threaded) {|| + testrt::rust_dbg_lock_lock(lock); + + comm::send(start_ch, ()); + + // Block the scheduler thread + testrt::rust_dbg_lock_wait(lock); + testrt::rust_dbg_lock_unlock(lock); + + comm::send(fin_ch, ()); + }; + + // Wait until the other task has its lock + comm::recv(start_po); + + fn pingpong(po: comm::port, ch: comm::chan) { + let val = 20; + while val > 0 { + val = comm::recv(po); + comm::send(ch, val - 1); + } + } + + let setup_po = comm::port(); + let setup_ch = comm::chan(setup_po); + let parent_po = comm::port(); + let parent_ch = comm::chan(parent_po); + spawn {|| + let child_po = comm::port(); + comm::send(setup_ch, comm::chan(child_po)); + pingpong(child_po, parent_ch); + }; + + let child_ch = comm::recv(setup_po); + comm::send(child_ch, 20); + pingpong(parent_po, child_ch); + testrt::rust_dbg_lock_lock(lock); + testrt::rust_dbg_lock_signal(lock); + testrt::rust_dbg_lock_unlock(lock); + comm::recv(fin_po); + testrt::rust_dbg_lock_destroy(lock); } } #[cfg(test)] -mod tests { - // FIXME: Leaks on windows - #[test] - #[ignore(cfg(target_os = "win32"))] - fn test_unsupervise() { - fn f() { unsupervise(); fail; } - spawn {|| f();}; - } +fn avoid_copying_the_body(spawnfn: fn(+fn~())) { + let p = comm::port::(); + let ch = comm::chan(p); - #[test] - fn test_lib_spawn() { - fn foo() { #error("Hello, World!"); } - spawn {|| foo();}; - } + let x = ~1; + let x_in_parent = ptr::addr_of(*x) as uint; - #[test] - fn test_lib_spawn2() { - fn foo(x: int) { assert (x == 42); } - spawn {|| foo(42);}; - } - - #[test] - fn test_join_chan() { - fn winner() { } - - let t = spawn_joinable {|| winner();}; - alt join(t) { - tr_success {/* yay! */ } - _ { fail "invalid task status received" } - } - } - - // FIXME: Leaks on windows - #[test] - #[ignore(cfg(target_os = "win32"))] - fn test_join_chan_fail() { - fn failer() { unsupervise(); fail } - - let t = spawn_joinable {|| failer();}; - alt join(t) { - tr_failure {/* yay! */ } - _ { fail "invalid task status received" } - } - } - - #[test] - fn spawn_polymorphic() { - fn foo(x: T) { log(error, x); } - spawn {|| foo(true);}; - spawn {|| foo(42);}; - } - - #[test] - fn try_success() { - alt try {|| - "Success!" - } { - result::ok("Success!") { } - _ { fail; } - } - } - - #[test] - #[ignore(cfg(target_os = "win32"))] - fn try_fail() { - alt try {|| - fail - } { - result::err(()) { } - _ { fail; } - } - } - - #[test] - #[should_fail] - #[ignore(cfg(target_os = "win32"))] - fn spawn_sched_no_threads() { - spawn_sched(0u) {|| }; - } - - #[test] - fn spawn_sched_1() { - let po = comm::port(); - let ch = comm::chan(po); - - fn f(i: int, ch: comm::chan<()>) { - let parent_sched_id = rustrt::rust_get_sched_id(); - - spawn_sched(1u) {|| - let child_sched_id = rustrt::rust_get_sched_id(); - assert parent_sched_id != child_sched_id; - - if (i == 0) { - comm::send(ch, ()); - } else { - f(i - 1, ch); - } - }; - - } - f(10, ch); - comm::recv(po); - } - - #[test] - fn spawn_sched_childs_on_same_sched() { - let po = comm::port(); - let ch = comm::chan(po); - - spawn_sched(1u) {|| - let parent_sched_id = rustrt::rust_get_sched_id(); - spawn {|| - let child_sched_id = rustrt::rust_get_sched_id(); - // This should be on the same scheduler - assert parent_sched_id == child_sched_id; - comm::send(ch, ()); - }; - }; - - comm::recv(po); - } - - #[nolink] - native mod rt { - fn rust_dbg_lock_create() -> *ctypes::void; - fn rust_dbg_lock_destroy(lock: *ctypes::void); - fn rust_dbg_lock_lock(lock: *ctypes::void); - fn rust_dbg_lock_unlock(lock: *ctypes::void); - fn rust_dbg_lock_wait(lock: *ctypes::void); - fn rust_dbg_lock_signal(lock: *ctypes::void); - } - - #[test] - fn spawn_sched_blocking() { - - // Testing that a task in one scheduler can block natively - // without affecting other schedulers - iter::repeat(20u) {|| - - let start_po = comm::port(); - let start_ch = comm::chan(start_po); - let fin_po = comm::port(); - let fin_ch = comm::chan(fin_po); - - let lock = rt::rust_dbg_lock_create(); - - spawn_sched(1u) {|| - rt::rust_dbg_lock_lock(lock); - - comm::send(start_ch, ()); - - // Block the scheduler thread - rt::rust_dbg_lock_wait(lock); - rt::rust_dbg_lock_unlock(lock); - - comm::send(fin_ch, ()); - }; - - // Wait until the other task has its lock - comm::recv(start_po); - - fn pingpong(po: comm::port, ch: comm::chan) { - let val = 20; - while val > 0 { - val = comm::recv(po); - comm::send(ch, val - 1); - } - } - - let setup_po = comm::port(); - let setup_ch = comm::chan(setup_po); - let parent_po = comm::port(); - let parent_ch = comm::chan(parent_po); - spawn {|| - let child_po = comm::port(); - comm::send(setup_ch, comm::chan(child_po)); - pingpong(child_po, parent_ch); - }; - - let child_ch = comm::recv(setup_po); - comm::send(child_ch, 20); - pingpong(parent_po, child_ch); - rt::rust_dbg_lock_lock(lock); - rt::rust_dbg_lock_signal(lock); - rt::rust_dbg_lock_unlock(lock); - comm::recv(fin_po); - rt::rust_dbg_lock_destroy(lock); - } - } + spawnfn(fn~[move x]() { + let x_in_child = ptr::addr_of(*x) as uint; + comm::send(ch, x_in_child); + }); + let x_in_child = comm::recv(p); + assert x_in_parent == x_in_child; } +#[test] +fn test_avoid_copying_the_body_spawn() { + avoid_copying_the_body(spawn); +} -// Local Variables: -// mode: rust; -// fill-column: 78; -// indent-tabs-mode: nil -// c-basic-offset: 4 -// buffer-file-coding-system: utf-8-unix -// End: +#[test] +fn test_avoid_copying_the_body_spawn_listener() { + avoid_copying_the_body {|f| + spawn_listener(fn~[move f](_po: comm::port) { + f(); + }); + } +} + +#[test] +fn test_avoid_copying_the_body_run() { + avoid_copying_the_body {|f| + let builder = mk_task_builder(); + run(builder, fn~[move f]() { + f(); + }); + } +} + +#[test] +fn test_avoid_copying_the_body_run_listener() { + avoid_copying_the_body {|f| + let builder = mk_task_builder(); + run_listener(builder,fn~[move f](_po: comm::port) { + f(); + }); + } +} + +#[test] +fn test_avoid_copying_the_body_try() { + avoid_copying_the_body {|f| + try(fn~[move f]() { + f(); + }); + } +} + +#[test] +fn test_avoid_copying_the_body_future_task() { + avoid_copying_the_body {|f| + let builder = mk_task_builder(); + future_task(builder); + run(builder, fn~[move f]() { + f(); + }); + } +} + +#[test] +fn test_avoid_copying_the_body_unsupervise() { + avoid_copying_the_body {|f| + let builder = mk_task_builder(); + unsupervise(builder); + run(builder, fn~[move f]() { + f(); + }); + } +} \ No newline at end of file diff --git a/src/libcore/vec.rs b/src/libcore/vec.rs index 99e7db9a7ad..1dc9ad9a574 100644 --- a/src/libcore/vec.rs +++ b/src/libcore/vec.rs @@ -1929,16 +1929,10 @@ mod tests { } #[test] - // FIXME: Windows can't undwind + #[should_fail] #[ignore(cfg(target_os = "win32"))] fn test_init_empty() { - - let r = task::join( - task::spawn_joinable {|| - task::unsupervise(); - init::([]); - }); - assert r == task::tr_failure + init::([]); } #[test] diff --git a/src/libstd/test.rs b/src/libstd/test.rs index 003949a9180..9f25916778f 100644 --- a/src/libstd/test.rs +++ b/src/libstd/test.rs @@ -316,13 +316,12 @@ fn run_test(+test: test_desc, monitor_ch: comm::chan) { task::spawn {|| let testfn = test.fn; - let test_task = task::spawn_joinable {|| - configure_test_task(); - testfn(); - }; - - let task_result = task::join(test_task); - let test_result = calc_result(test, task_result == task::tr_success); + let builder = task::mk_task_builder(); + let result_future = task::future_result(builder); + task::unsupervise(builder); + task::run(builder, testfn); + let task_result = future::get(result_future); + let test_result = calc_result(test, task_result == task::success); comm::send(monitor_ch, (test, test_result)); }; } @@ -337,13 +336,6 @@ fn calc_result(test: test_desc, task_succeeded: bool) -> test_result { } } -// Call from within a test task to make sure it's set up correctly -fn configure_test_task() { - // If this task fails we don't want that failure to propagate to the - // test runner or else we couldn't keep running tests - task::unsupervise(); -} - #[cfg(test)] mod tests { diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 6c0cd5e5fc0..29940492f79 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -539,6 +539,11 @@ chan_id_send(type_desc *t, rust_task_id target_task_id, // FIXME: make sure this is thread-safe bool sent = false; rust_task *task = rust_task_thread::get_task(); + + LOG(task, comm, "chan_id_send task: 0x%" PRIxPTR + " port: 0x%" PRIxPTR, (uintptr_t) target_task_id, + (uintptr_t) target_port_id); + rust_task *target_task = task->kernel->get_task_by_id(target_task_id); if(target_task) { rust_port *port = target_task->get_port_by_id(target_port_id); @@ -547,8 +552,12 @@ chan_id_send(type_desc *t, rust_task_id target_task_id, scoped_lock with(target_task->lock); port->deref(); sent = true; + } else { + LOG(task, comm, "didn't get the port"); } target_task->deref(); + } else { + LOG(task, comm, "didn't get the task"); } return (uintptr_t)sent; } diff --git a/src/test/bench/msgsend.rs b/src/test/bench/msgsend.rs index 250a942fa68..73db81c88a0 100644 --- a/src/test/bench/msgsend.rs +++ b/src/test/bench/msgsend.rs @@ -28,22 +28,28 @@ fn server(requests: comm::port, responses: comm::chan) { } fn run(args: [str]) { - let server = task::spawn_connected(server); + let from_child = comm::port(); + let to_parent = comm::chan(from_child); + let to_child = task::spawn_listener {|po| + server(po, to_parent); + }; let size = uint::from_str(args[1]); let workers = uint::from_str(args[2]); let start = std::time::precise_time_s(); - let to_child = server.to_child; - let worker_tasks = []; + let to_child = to_child; + let worker_results = []; uint::range(0u, workers) {|_i| - worker_tasks += [task::spawn_joinable {|| + let builder = task::mk_task_builder(); + worker_results += [task::future_result(builder)]; + task::run(builder) {|| uint::range(0u, size / workers) {|_i| comm::send(to_child, bytes(100u)); } - }]; + }; } - vec::iter(worker_tasks) {|t| task::join(t); } - comm::send(server.to_child, stop); - let result = comm::recv(server.from_child); + vec::iter(worker_results) {|r| future::get(r); } + comm::send(to_child, stop); + let result = comm::recv(from_child); let end = std::time::precise_time_s(); let elapsed = end - start; std::io::stdout().write_str(#fmt("Count is %?\n", result)); diff --git a/src/test/bench/shootout-pfib.rs b/src/test/bench/shootout-pfib.rs index 04f55d4d0b7..edf9ed71754 100644 --- a/src/test/bench/shootout-pfib.rs +++ b/src/test/bench/shootout-pfib.rs @@ -69,11 +69,13 @@ fn stress_task(&&id: int) { } fn stress(num_tasks: int) { - let tasks = []; + let results = []; range(0, num_tasks) {|i| - tasks += [task::spawn_joinable {|| stress_task(i); }]; + let builder = task::mk_task_builder(); + results += [task::future_result(builder)]; + task::run(builder) {|| stress_task(i); } } - for t in tasks { task::join(t); } + for r in results { future::get(r); } } fn main(argv: [str]) { diff --git a/src/test/bench/shootout-threadring.rs b/src/test/bench/shootout-threadring.rs index 4b73413ae56..3e0150fc45c 100644 --- a/src/test/bench/shootout-threadring.rs +++ b/src/test/bench/shootout-threadring.rs @@ -10,7 +10,7 @@ fn start(+token: int) { let ch = iter::foldl(bind int::range(2, n_threads + 1, _), comm::chan(p)) { |ch, i| let id = n_threads + 2 - i; - let {to_child, _} = task::spawn_connected:: {|p, _ch| + let to_child = task::spawn_listener:: {|p| roundtrip(id, p, ch) }; to_child diff --git a/src/test/bench/task-perf-spawnalot.rs b/src/test/bench/task-perf-spawnalot.rs index ae247f1c54d..3e38ff5ea22 100644 --- a/src/test/bench/task-perf-spawnalot.rs +++ b/src/test/bench/task-perf-spawnalot.rs @@ -7,7 +7,7 @@ import str; fn f(&&n: uint) { let i = 0u; while i < n { - task::join(task::spawn_joinable {|| g(); }); + task::try {|| g() }; i += 1u; } } diff --git a/src/test/bench/task-perf-word-count.rs b/src/test/bench/task-perf-word-count.rs index 67b1505afd9..9cf427f0300 100644 --- a/src/test/bench/task-perf-word-count.rs +++ b/src/test/bench/task-perf-word-count.rs @@ -15,7 +15,6 @@ import option::{some, none}; import std::{map, io, time}; import io::reader_util; -import task::joinable_task; import comm::chan; import comm::port; import comm::recv; @@ -59,12 +58,14 @@ mod map_reduce { enum reduce_proto { emit_val(int), done, ref, release, } fn start_mappers(ctrl: chan, -inputs: [str]) -> - [joinable_task] { - let tasks = []; + [future::future] { + let results = []; for i: str in inputs { - tasks += [task::spawn_joinable {|| map_task(ctrl, i)}]; + let builder = task::mk_task_builder(); + results += [task::future_result(builder)]; + task::run(builder) {|| map_task(ctrl, i)} } - ret tasks; + ret results; } fn map_task(ctrl: chan, input: str) { @@ -137,7 +138,7 @@ mod map_reduce { reducers = map::new_str_hash(); let num_mappers = vec::len(inputs) as int; - let tasks = start_mappers(chan(ctrl), inputs); + let results = start_mappers(chan(ctrl), inputs); while num_mappers > 0 { alt recv(ctrl) { @@ -158,8 +159,9 @@ mod map_reduce { // log(error, "creating new reducer for " + k); let p = port(); let ch = chan(p); - tasks += - [task::spawn_joinable{||reduce_task(k, ch)}]; + let builder = task::mk_task_builder(); + results += [task::future_result(builder)]; + task::run(builder) {||reduce_task(k, ch)} c = recv(p); reducers.insert(k, c); } @@ -171,7 +173,7 @@ mod map_reduce { reducers.values {|v| send(v, done); } - for t in tasks { task::join(t); } + for r in results { future::get(r); } } } diff --git a/src/test/run-pass/binops.rs b/src/test/run-pass/binops.rs index 3fc83c5759d..e841345cba8 100644 --- a/src/test/run-pass/binops.rs +++ b/src/test/run-pass/binops.rs @@ -96,16 +96,6 @@ fn test_ptr() unsafe { assert p1 >= p2; } -fn test_task() { - fn f() { } - let f1 = f, f2 = f; - let t1 = task::spawn {|| f1(); }; - let t2 = task::spawn {|| f2(); }; - - assert (t1 == t1); - assert (t1 != t2); -} - fn test_fn() { fn f() { } fn g() { } @@ -147,7 +137,6 @@ fn main() { test_port(); test_chan(); test_ptr(); - test_task(); test_fn(); test_native_fn(); } diff --git a/src/test/run-pass/issue-507.rs b/src/test/run-pass/issue-507.rs index 3cb790063a9..7490f06d548 100644 --- a/src/test/run-pass/issue-507.rs +++ b/src/test/run-pass/issue-507.rs @@ -8,7 +8,6 @@ use std; import task; -import task::join; import comm; import comm::chan; import comm::send; @@ -18,21 +17,18 @@ import comm::recv; fn grandchild(c: chan) { send(c, 42); } fn child(c: chan) { - let _grandchild = task::spawn_joinable {|| grandchild(c); }; - join(_grandchild); + task::spawn {|| grandchild(c); } } fn main() { let p = comm::port(); let ch = chan(p); - let _child = task::spawn_joinable {|| child(ch); }; + task::spawn {|| child(ch); } let x: int = recv(p); log(debug, x); assert (x == 42); - - join(_child); } diff --git a/src/test/run-pass/issue-783.rs b/src/test/run-pass/issue-783.rs index 27384b102eb..0dc78cbaec3 100644 --- a/src/test/run-pass/issue-783.rs +++ b/src/test/run-pass/issue-783.rs @@ -21,6 +21,7 @@ fn a() { } fn main() { - let t = spawn_joinable {|| a(); }; - join(t); + iter::repeat(100u) {|| + spawn {|| a(); } + } } diff --git a/src/test/run-pass/join.rs b/src/test/run-pass/join.rs deleted file mode 100644 index ef4073406b4..00000000000 --- a/src/test/run-pass/join.rs +++ /dev/null @@ -1,15 +0,0 @@ -// -*- rust -*- - -use std; - -import task::*; - -fn main() { - let other = spawn_joinable {|| child(); }; - #error("1"); - yield(); - join(other); - #error("3"); -} - -fn child() { #error("2"); } diff --git a/src/test/run-pass/linked-failure.rs b/src/test/run-pass/linked-failure.rs deleted file mode 100644 index b82aa7c2cb0..00000000000 --- a/src/test/run-pass/linked-failure.rs +++ /dev/null @@ -1,21 +0,0 @@ -// -*- rust -*- -// xfail-win32 -use std; -import task; -import comm::port; -import comm::recv; - -fn child() { assert (1 == 2); } - -fn parent() { - // Since this task isn't supervised it won't bring down the whole - // process - task::unsupervise(); - let p = port::(); - task::spawn {|| child(); }; - let x = recv(p); -} - -fn main() { - task::spawn {|| parent(); }; -} \ No newline at end of file diff --git a/src/test/run-pass/lots-a-fail.rs b/src/test/run-pass/lots-a-fail.rs index 9f51b688b11..924c0ea8a7a 100644 --- a/src/test/run-pass/lots-a-fail.rs +++ b/src/test/run-pass/lots-a-fail.rs @@ -9,12 +9,13 @@ fn die() { } fn iloop() { - task::unsupervise(); task::spawn {|| die(); }; } fn main() { uint::range(0u, 100u) {|_i| - task::spawn {|| iloop(); }; + let builder = task::mk_task_builder(); + task::unsupervise(builder); + task::run(builder) {|| iloop(); }; } } \ No newline at end of file diff --git a/src/test/run-pass/morestack5.rs b/src/test/run-pass/morestack5.rs index ecd187ae44a..db67db86626 100644 --- a/src/test/run-pass/morestack5.rs +++ b/src/test/run-pass/morestack5.rs @@ -12,7 +12,7 @@ fn getbig(&&i: int) { fn main() { let sz = 400u; while sz < 500u { - task::join(task::spawn_joinable {|| getbig(200) }); + task::try {|| getbig(200) }; sz += 1u; } } \ No newline at end of file diff --git a/src/test/run-pass/morestack6.rs b/src/test/run-pass/morestack6.rs index 87ff60f13a7..6d8ac3c38f8 100644 --- a/src/test/run-pass/morestack6.rs +++ b/src/test/run-pass/morestack6.rs @@ -61,6 +61,6 @@ fn main() { for f in fns { let sz = rng.next() % 256u32 + 256u32; let frame_backoff = rng.next() % 10u32 + 1u32; - task::join(task::spawn_joinable {|| runtest(f, frame_backoff);}); + task::try {|| runtest(f, frame_backoff) }; } } \ No newline at end of file diff --git a/src/test/run-pass/send-iloop.rs b/src/test/run-pass/send-iloop.rs index 5fb4e4e9bd9..ece3dc4ce64 100644 --- a/src/test/run-pass/send-iloop.rs +++ b/src/test/run-pass/send-iloop.rs @@ -9,7 +9,6 @@ fn die() { } fn iloop() { - task::unsupervise(); task::spawn {|| die(); }; let p = comm::port::<()>(); let c = comm::chan(p); @@ -23,6 +22,8 @@ fn iloop() { fn main() { uint::range(0u, 16u) {|_i| - task::spawn {|| iloop(); }; + let builder = task::mk_task_builder(); + task::unsupervise(builder); + task::run(builder) {|| iloop(); } } } \ No newline at end of file diff --git a/src/test/run-pass/spawn-module-qualified.rs b/src/test/run-pass/spawn-module-qualified.rs deleted file mode 100644 index cbc3b8b2ea6..00000000000 --- a/src/test/run-pass/spawn-module-qualified.rs +++ /dev/null @@ -1,9 +0,0 @@ -use std; -import task::join; -import task::spawn_joinable; - -fn main() { let x = spawn_joinable {|| m::child(10); }; join(x); } - -mod m { - fn child(&&i: int) { log(debug, i); } -} diff --git a/src/test/run-pass/spawn.rs b/src/test/run-pass/spawn.rs index 7a395f677aa..600cbdb06b8 100644 --- a/src/test/run-pass/spawn.rs +++ b/src/test/run-pass/spawn.rs @@ -5,8 +5,7 @@ use std; import task; fn main() { - let t = task::spawn_joinable {|| child(10); }; - task::join(t); + task::spawn {|| child(10); }; } fn child(&&i: int) { log(error, i); assert (i == 10); } diff --git a/src/test/run-pass/task-comm-1.rs b/src/test/run-pass/task-comm-1.rs index 96901b9ae56..713d487f3d8 100644 --- a/src/test/run-pass/task-comm-1.rs +++ b/src/test/run-pass/task-comm-1.rs @@ -1,14 +1,8 @@ -use std; - -import task::spawn_joinable; -import task::join; - fn main() { test00(); } fn start() { #debug("Started / Finished task."); } fn test00() { - let t = spawn_joinable {|| start(); }; - join(t); + task::try {|| start() }; #debug("Completing."); } diff --git a/src/test/run-pass/task-comm-12.rs b/src/test/run-pass/task-comm-12.rs index beaa93c02e0..4e690a034ce 100644 --- a/src/test/run-pass/task-comm-12.rs +++ b/src/test/run-pass/task-comm-12.rs @@ -7,7 +7,9 @@ fn start(&&task_number: int) { #debug("Started / Finished task."); } fn test00() { let i: int = 0; - let t = task::spawn_joinable {|| start(i); }; + let builder = task::mk_task_builder(); + let r = task::future_result(builder); + task::run(builder) {|| start(i); }; // Sleep long enough for the task to finish. let i = 0; @@ -17,7 +19,7 @@ fn test00() { } // Try joining tasks that have already finished. - task::join(t); + future::get(r); #debug("Joined task."); } diff --git a/src/test/run-pass/task-comm-13.rs b/src/test/run-pass/task-comm-13.rs index dc2393d13f4..5998dd15cf2 100644 --- a/src/test/run-pass/task-comm-13.rs +++ b/src/test/run-pass/task-comm-13.rs @@ -12,7 +12,6 @@ fn main() { #debug("Check that we don't deadlock."); let p = comm::port::(); let ch = comm::chan(p); - let a = task::spawn_joinable {|| start(ch, 0, 10); }; - task::join(a); + task::try {|| start(ch, 0, 10) }; #debug("Joined task"); } diff --git a/src/test/run-pass/task-comm-2.rs b/src/test/run-pass/task-comm-2.rs deleted file mode 100644 index c17b1c8a11e..00000000000 --- a/src/test/run-pass/task-comm-2.rs +++ /dev/null @@ -1,31 +0,0 @@ -// xfail-win32 -use std; - -import task; - -fn main() { - #debug("===== SPAWNING and JOINING THREAD TASKS ====="); - test00(); -} - -fn start(&&task_number: int) { - #debug("Started task."); - let i: int = 0; - while i < 10000 { i = i + 1; } - #debug("Finished task."); -} - -fn test00() { - let number_of_tasks: int = 8; - - let i: int = 0; - let tasks = []; - while i < number_of_tasks { - i = i + 1; - tasks += [task::spawn_joinable {|| start(i); }]; - } - - for t in tasks { task::join(t); } - - #debug("Joined all task."); -} diff --git a/src/test/run-pass/task-comm-3.rs b/src/test/run-pass/task-comm-3.rs index d5826309356..bb1044d39e3 100644 --- a/src/test/run-pass/task-comm-3.rs +++ b/src/test/run-pass/task-comm-3.rs @@ -30,17 +30,19 @@ fn test00() { let i: int = 0; // Create and spawn tasks... - let tasks = []; + let results = []; while i < number_of_tasks { - tasks += [task::spawn_joinable {|| + let builder = task::mk_task_builder(); + results += [task::future_result(builder)]; + task::run(builder) {|| test00_start(ch, i, number_of_messages) - }]; + } i = i + 1; } // Read from spawned tasks... let sum = 0; - for t in tasks { + for r in results { i = 0; while i < number_of_messages { let value = recv(po); @@ -50,7 +52,7 @@ fn test00() { } // Join spawned tasks... - for t in tasks { task::join(t); } + for r in results { future::get(r); } #debug("Completed: Final number is: "); log(error, sum); diff --git a/src/test/run-pass/task-comm-7.rs b/src/test/run-pass/task-comm-7.rs index c7c15830955..64649b85459 100644 --- a/src/test/run-pass/task-comm-7.rs +++ b/src/test/run-pass/task-comm-7.rs @@ -1,9 +1,6 @@ use std; import task; import comm; -import comm::chan; -import comm::recv; -import comm::port; fn main() { test00(); } @@ -15,40 +12,35 @@ fn test00_start(c: comm::chan, start: int, number_of_messages: int) { fn test00() { let r: int = 0; let sum: int = 0; - let p = port(); + let p = comm::port(); let number_of_messages: int = 10; - let c = chan(p); + let c = comm::chan(p); - let t0 = task::spawn_joinable {|| + task::spawn {|| test00_start(c, number_of_messages * 0, number_of_messages); - }; - let t1 = task::spawn_joinable {|| + } + task::spawn {|| test00_start(c, number_of_messages * 1, number_of_messages); - }; - let t2 = task::spawn_joinable {|| + } + task::spawn {|| test00_start(c, number_of_messages * 2, number_of_messages); - }; - let t3 = task::spawn_joinable {|| + } + task::spawn {|| test00_start(c, number_of_messages * 3, number_of_messages); - }; + } let i: int = 0; while i < number_of_messages { - r = recv(p); + r = comm::recv(p); sum += r; - r = recv(p); + r = comm::recv(p); sum += r; - r = recv(p); + r = comm::recv(p); sum += r; - r = recv(p); + r = comm::recv(p); sum += r; i += 1; } - task::join(t0); - task::join(t1); - task::join(t2); - task::join(t3); - assert (sum == number_of_messages * 4 * (number_of_messages * 4 - 1) / 2); } diff --git a/src/test/run-pass/task-comm-8.rs b/src/test/run-pass/task-comm-8.rs deleted file mode 100644 index 8259fde9fad..00000000000 --- a/src/test/run-pass/task-comm-8.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std; -import task; -import comm; - -fn main() { test00(); } - -fn test00_start(c: comm::chan, start: int, number_of_messages: int) { - let i: int = 0; - while i < number_of_messages { comm::send(c, start + i); i += 1; } -} - -fn test00() { - let r: int = 0; - let sum: int = 0; - let p = comm::port(); - let c = comm::chan(p); - let number_of_messages: int = 10; - - let t0 = task::spawn_joinable {|| - test00_start(c, number_of_messages * 0, number_of_messages); - }; - let t1 = task::spawn_joinable {|| - test00_start(c, number_of_messages * 1, number_of_messages); - }; - let t2 = task::spawn_joinable {|| - test00_start(c, number_of_messages * 2, number_of_messages); - }; - let t3 = task::spawn_joinable {|| - test00_start(c, number_of_messages * 3, number_of_messages); - }; - - let i: int = 0; - while i < number_of_messages { - r = comm::recv(p); - sum += r; - r = comm::recv(p); - sum += r; - r = comm::recv(p); - sum += r; - r = comm::recv(p); - sum += r; - i += 1; - } - - task::join(t0); - task::join(t1); - task::join(t2); - task::join(t3); - - assert (sum == number_of_messages * 4 * (number_of_messages * 4 - 1) / 2); -} diff --git a/src/test/run-pass/task-comm-9.rs b/src/test/run-pass/task-comm-9.rs index f6cc1dae6f0..85d1e03c09c 100644 --- a/src/test/run-pass/task-comm-9.rs +++ b/src/test/run-pass/task-comm-9.rs @@ -16,9 +16,11 @@ fn test00() { let number_of_messages: int = 10; let ch = comm::chan(p); - let t0 = task::spawn_joinable {|| + let builder = task::mk_task_builder(); + let r = task::future_result(builder); + task::run(builder) {|| test00_start(ch, number_of_messages); - }; + } let i: int = 0; while i < number_of_messages { @@ -27,7 +29,7 @@ fn test00() { i += 1; } - task::join(t0); + future::get(r); assert (sum == number_of_messages * (number_of_messages - 1) / 2); } diff --git a/src/test/run-pass/task-comm.rs b/src/test/run-pass/task-comm.rs index f029dcd61a2..d710b50e2b0 100644 --- a/src/test/run-pass/task-comm.rs +++ b/src/test/run-pass/task-comm.rs @@ -38,20 +38,20 @@ fn test00() { let i: int = 0; - let tasks = []; + let results = []; while i < number_of_tasks { i = i + 1; - tasks += [ - task::spawn_joinable {|| test00_start(ch, i, number_of_messages);} - ]; + let builder = task::mk_task_builder(); + results += [task::future_result(builder)]; + task::run(builder) {|| test00_start(ch, i, number_of_messages);} } let sum: int = 0; - for t in tasks { + for r in results { i = 0; while i < number_of_messages { sum += recv(po); i = i + 1; } } - for t in tasks { task::join(t); } + for r in results { future::get(r); } #debug("Completed: Final number is: "); assert (sum == @@ -123,14 +123,16 @@ fn test06() { let i: int = 0; - let tasks = []; + let results = []; while i < number_of_tasks { i = i + 1; - tasks += [task::spawn_joinable {|| test06_start(i);}]; + let builder = task::mk_task_builder(); + results += [task::future_result(builder)]; + task::run(builder) {|| test06_start(i);}; } - for t in tasks { task::join(t); } + for r in results { future::get(r); } } diff --git a/src/test/run-pass/task-killjoin.rs b/src/test/run-pass/task-killjoin.rs index 313d9bd3702..872639bc8e7 100644 --- a/src/test/run-pass/task-killjoin.rs +++ b/src/test/run-pass/task-killjoin.rs @@ -1,18 +1,17 @@ // xfail-win32 -// Create a task that is supervised by another task, -// join the supervised task from the supervising task, -// then fail the supervised task. The supervised task -// will kill the supervising task, waking it up. The -// supervising task no longer needs to be wakened when -// the supervised task exits. + +// Create a task that is supervised by another task, join the supervised task +// from the supervising task, then fail the supervised task. The supervised +// task will kill the supervising task, waking it up. The supervising task no +// longer needs to be wakened when the supervised task exits. use std; import task; fn supervised() { - // Yield to make sure the supervisor joins before we - // fail. This is currently not needed because the supervisor - // runs first, but I can imagine that changing. + // Yield to make sure the supervisor joins before we fail. This is + // currently not needed because the supervisor runs first, but I can + // imagine that changing. task::yield(); fail; } @@ -20,15 +19,14 @@ fn supervised() { fn supervisor() { // Unsupervise this task so the process doesn't return a failure status as // a result of the main task being killed. - task::unsupervise(); let f = supervised; - let t = task::spawn_joinable {|| supervised(); }; - task::join(t); + task::try {|| supervised() }; } fn main() { - let dom2 = task::spawn_joinable {|| supervisor(); }; - task::join(dom2); + let builder = task::mk_task_builder(); + task::unsupervise(builder); + task::run(builder) {|| supervisor(); } } // Local Variables: diff --git a/src/test/run-pass/task-spawn-connected.rs b/src/test/run-pass/task-spawn-connected.rs deleted file mode 100644 index 053dab56a05..00000000000 --- a/src/test/run-pass/task-spawn-connected.rs +++ /dev/null @@ -1,18 +0,0 @@ -fn stringifier(from_par: comm::port, - to_par: comm::chan) { - let value: uint; - do { - value = comm::recv(from_par); - comm::send(to_par, uint::to_str(value, 10u)); - } while value != 0u; -} - -fn main() { - let t = task::spawn_connected(stringifier); - comm::send(t.to_child, 22u); - assert comm::recv(t.from_child) == "22"; - comm::send(t.to_child, 23u); - assert comm::recv(t.from_child) == "23"; - comm::send(t.to_child, 0u); - assert comm::recv(t.from_child) == "0"; -} \ No newline at end of file diff --git a/src/test/run-pass/terminate-in-initializer.rs b/src/test/run-pass/terminate-in-initializer.rs index 23597b3ed62..9b722a947c0 100644 --- a/src/test/run-pass/terminate-in-initializer.rs +++ b/src/test/run-pass/terminate-in-initializer.rs @@ -11,14 +11,14 @@ fn test_cont() { let i = 0; while i < 1 { i += 1; let x: @int = cont; } } fn test_ret() { let x: @int = ret; } fn test_fail() { - fn f() { task::unsupervise(); let x: @int = fail; } - task::spawn {|| f(); }; + fn f() { let x: @int = fail; } + task::try {|| f() }; } fn test_fail_indirect() { fn f() -> ! { fail; } - fn g() { task::unsupervise(); let x: @int = f(); } - task::spawn {|| g(); }; + fn g() { let x: @int = f(); } + task::try {|| g() }; } fn main() { diff --git a/src/test/run-pass/too-much-recursion.rs b/src/test/run-pass/too-much-recursion.rs index 03d537d0ca6..04119e296cf 100644 --- a/src/test/run-pass/too-much-recursion.rs +++ b/src/test/run-pass/too-much-recursion.rs @@ -5,8 +5,9 @@ // that it doesn't bring down the whole proc fn main() { - task::spawn {|| - task::unsupervise(); + let builder = task::mk_task_builder(); + task::unsupervise(builder); + task::run(builder) {|| fn f() { f() }; f(); }; diff --git a/src/test/run-pass/unwind-box.rs b/src/test/run-pass/unwind-box.rs index ca4221e0763..5281ddb05c7 100644 --- a/src/test/run-pass/unwind-box.rs +++ b/src/test/run-pass/unwind-box.rs @@ -3,11 +3,12 @@ use std; import task; fn f() { - task::unsupervise(); let a = @0; fail; } fn main() { - task::spawn {|| f(); }; + let builder = task::mk_task_builder(); + task::unsupervise(builder); + task::run(builder) {|| f(); } } \ No newline at end of file diff --git a/src/test/run-pass/unwind-resource.rs b/src/test/run-pass/unwind-resource.rs index 5bb70236b7c..0c64af20b40 100644 --- a/src/test/run-pass/unwind-resource.rs +++ b/src/test/run-pass/unwind-resource.rs @@ -8,7 +8,6 @@ resource complainer(c: comm::chan) { } fn f(c: comm::chan) { - task::unsupervise(); let c <- complainer(c); fail; } @@ -16,6 +15,8 @@ fn f(c: comm::chan) { fn main() { let p = comm::port(); let c = comm::chan(p); - task::spawn {|| f(c); }; + let builder = task::mk_task_builder(); + task::unsupervise(builder); + task::run(builder) {|| f(c); } assert comm::recv(p); } \ No newline at end of file diff --git a/src/test/run-pass/unwind-resource2.rs b/src/test/run-pass/unwind-resource2.rs index dbf91f31a3f..b4d2294fbfe 100644 --- a/src/test/run-pass/unwind-resource2.rs +++ b/src/test/run-pass/unwind-resource2.rs @@ -7,11 +7,12 @@ resource complainer(c: @int) { } fn f() { - task::unsupervise(); let c <- complainer(@0); fail; } fn main() { - task::spawn {|| f(); }; + let builder = task::mk_task_builder(); + task::unsupervise(builder); + task::run(builder) {|| f(); } } \ No newline at end of file diff --git a/src/test/run-pass/unwind-unique.rs b/src/test/run-pass/unwind-unique.rs index 9fab3c54609..222343e1dc4 100644 --- a/src/test/run-pass/unwind-unique.rs +++ b/src/test/run-pass/unwind-unique.rs @@ -3,11 +3,12 @@ use std; import task; fn f() { - task::unsupervise(); let a = ~0; fail; } fn main() { - task::spawn {|| f(); }; + let builder = task::mk_task_builder(); + task::unsupervise(builder); + task::run(builder) {|| f(); } } \ No newline at end of file diff --git a/src/test/run-pass/yield.rs b/src/test/run-pass/yield.rs index eb4a74c141a..3194676aae7 100644 --- a/src/test/run-pass/yield.rs +++ b/src/test/run-pass/yield.rs @@ -4,13 +4,15 @@ import task; import task::*; fn main() { - let other = task::spawn_joinable {|| child(); }; + let builder = task::mk_task_builder(); + let result = task::future_result(builder); + task::run(builder) {|| child(); } #error("1"); yield(); #error("2"); yield(); #error("3"); - join(other); + future::get(result); } fn child() { diff --git a/src/test/run-pass/yield1.rs b/src/test/run-pass/yield1.rs index a5234cf8d12..c309e5b1b0c 100644 --- a/src/test/run-pass/yield1.rs +++ b/src/test/run-pass/yield1.rs @@ -4,10 +4,12 @@ import task; import task::*; fn main() { - let other = task::spawn_joinable {|| child(); }; + let builder = task::mk_task_builder(); + let result = task::future_result(builder); + task::run(builder) {|| child(); } #error("1"); yield(); - join(other); + future::get(result); } fn child() { #error("2"); }