Add spawn_conversation

This commit is contained in:
Jesse Ruderman 2012-08-08 16:38:26 -04:00
parent 166cb1b28b
commit a76e4334b3
5 changed files with 67 additions and 51 deletions

View File

@ -2960,7 +2960,11 @@ do spawn {
~~~~
This child will perform the expensive computation send the result
over the channel. Finally, the parent continues by performing
over the channel. (Under the hood, `chan` was captured by the
closure that forms the body of the child task. This capture is
allowed because channels are sendable.)
Finally, the parent continues by performing
some other expensive computation and then waiting for the child's result
to arrive on the port:
@ -2978,10 +2982,10 @@ let result = port.recv();
A very common thing to do is to spawn a child task where the parent
and child both need to exchange messages with each
other. The function `task::spawn_listener()` supports this pattern. We'll look
briefly at how it is used.
other. The function `task::spawn_conversation()` supports this pattern.
We'll look briefly at how it is used.
To see how `spawn_listener()` works, we will create a child task
To see how `spawn_conversation()` works, we will create a child task
that receives `uint` messages, converts them to a string, and sends
the string in response. The child terminates when `0` is received.
Here is the function that implements the child task:
@ -3010,7 +3014,7 @@ simply the strified version of the received value,
Here is the code for the parent task:
~~~~
# import task::{spawn_listener};
# import task::{spawn_conversation};
# import comm::{chan, port, methods};
# fn stringifier(from_parent: comm::port<uint>,
# to_parent: comm::chan<~str>) {
@ -3020,9 +3024,7 @@ Here is the code for the parent task:
# }
# fn main() {
let from_child = port();
let to_parent = from_child.chan();
let to_child = do spawn_listener |from_parent| {
let (from_child, to_child) = do spawn_conversation |from_parent, to_parent| {
stringifier(from_parent, to_parent);
};
@ -3030,22 +3032,22 @@ to_child.send(22u);
assert from_child.recv() == ~"22";
to_child.send(23u);
assert from_child.recv() == ~"23";
to_child.send(0u);
assert from_child.recv() == ~"23";
assert from_child.recv() == ~"0";
# }
~~~~
The parent first sets up a port to receive data from and a channel
that the child can use to send data to that port. The call to
`spawn_listener()` will spawn the child task, providing it with a port
on which to receive data from its parent, and returning to the parent
the associated channel. Finally, the closure passed to
`spawn_listener()` that forms the body of the child task captures the
`to_parent` channel in its environment, so both parent and child
can send and receive data to and from the other.
The parent task calls `spawn_conversation` with a function that takes
a `from_parent` port and a `to_parent` channel. In return, it gets a
`from_child` channel and a `to_child` port. As a result, both parent
and child can send and receive data to and from the other.
`spawn_conversation`
will create two port/channel pairs, passing one set to the child task
and returning the other set to the caller.
# Testing

View File

@ -40,9 +40,8 @@ unsafe fn chan_from_global_ptr<T: send>(
log(debug,~"is probably zero...");
// There's no global channel. We must make it
let setup_po = comm::port();
let setup_ch = comm::chan(setup_po);
let setup_ch = do task_fn().spawn_listener |setup_po| {
let (setup_po, setup_ch) = do task_fn().spawn_conversation
|setup_po, setup_ch| {
let po = comm::port::<T>();
let ch = comm::chan(po);
comm::send(setup_ch, ch);

View File

@ -52,6 +52,7 @@ export spawn_unlinked;
export spawn_supervised;
export spawn_with;
export spawn_listener;
export spawn_conversation;
export spawn_sched;
export try;
@ -376,6 +377,20 @@ impl task_builder for task_builder {
comm::recv(setup_po)
}
/**
* Runs a new task, setting up communication in both directions
*/
fn spawn_conversation<A: send, B: send>
(+f: fn~(comm::port<A>, comm::chan<B>))
-> (comm::port<B>, comm::chan<A>) {
let from_child = comm::port();
let to_parent = comm::chan(from_child);
let to_child = do self.spawn_listener |from_parent| {
f(from_parent, to_parent)
};
(from_child, to_child)
}
/**
* Execute a function in another task and return either the return value
* of the function or result::err.
@ -474,31 +489,24 @@ fn spawn_listener<A:send>(+f: fn~(comm::port<A>)) -> comm::chan<A> {
/*!
* Runs a new task while providing a channel from the parent to the child
*
* Sets up a communication channel from the current task to the new
* child task, passes the port to child's body, and returns a channel
* linked to the port to the parent.
*
* This encapsulates some boilerplate handshaking logic that would
* otherwise be required to establish communication from the parent
* to the child.
*
* The simplest way to establish bidirectional communication between
* a parent in child is as follows:
*
* let po = comm::port();
* let ch = comm::chan(po);
* let ch = do spawn_listener |po| {
* // Now the child has a port called 'po' to read from and
* // an environment-captured channel called 'ch'.
* };
* // Likewise, the parent has both a 'po' and 'ch'
*
* This function is equivalent to `task().spawn_listener(f)`.
*/
task().spawn_listener(f)
}
fn spawn_conversation<A: send, B: send>
(+f: fn~(comm::port<A>, comm::chan<B>))
-> (comm::port<B>, comm::chan<A>) {
/*!
* Runs a new task, setting up communication in both directions
*
* This function is equivalent to `task().spawn_conversation(f)`.
*/
task().spawn_conversation(f)
}
fn spawn_sched(mode: sched_mode, +f: fn~()) {
/*!
* Creates a new scheduler and executes a task on it
@ -1716,6 +1724,17 @@ fn test_spawn_listiner_bidi() {
assert res == ~"pong";
}
#[test]
fn test_spawn_conversation() {
let (recv_str, send_int) = do spawn_conversation |recv_int, send_str| {
let input = comm::recv(recv_int);
let output = int::str(input);
comm::send(send_str, output);
};
comm::send(send_int, 1);
assert comm::recv(recv_str) == ~"1";
}
#[test]
fn test_try_success() {
match do try {

View File

@ -29,9 +29,8 @@ fn run(
return doc;
}
let result_port = comm::port();
let result_chan = comm::chan(result_port);
let page_chan = do task::spawn_listener |page_port| {
let (result_port, page_chan) = do task::spawn_conversation
|page_port, result_chan| {
comm::send(result_chan, make_doc_from_pages(page_port));
};

View File

@ -28,15 +28,12 @@ fn server(requests: comm::port<request>, responses: comm::chan<uint>) {
}
fn run(args: ~[~str]) {
let from_child = comm::port();
let to_parent = comm::chan(from_child);
let to_child = do task::spawn_listener |po| {
server(po, to_parent);
let (from_child, to_child) = do task::spawn_conversation |po, ch| {
server(po, ch);
};
let size = option::get(uint::from_str(args[1]));
let workers = option::get(uint::from_str(args[2]));
let start = std::time::precise_time_s();
let to_child = to_child;
let mut worker_results = ~[];
for uint::range(0u, workers) |_i| {
do task::task().future_result(|+r| {