Removing locked queue port/chan prototype.

This commit is contained in:
Eric Holk 2012-07-06 11:05:28 -07:00
parent 7b03832c95
commit 604f7c66ff
3 changed files with 1 additions and 164 deletions

View File

@ -39,7 +39,7 @@ export float, f32, f64;
export box, char, str, ptr, vec, bool;
export either, option, result, iter;
export libc, os, io, run, rand, sys, unsafe, logging;
export arc, newcomm, comm, task, future, pipes;
export arc, comm, task, future, pipes;
export extfmt;
export tuple;
export to_str, to_bytes;
@ -183,7 +183,6 @@ mod dlist_iter {
// Concurrency
mod arc;
mod newcomm;
mod comm;
mod task;
mod future;

View File

@ -1,85 +0,0 @@
/**
* A new implementation of communication.
*
* This should be implementing almost entirely in Rust, and hopefully
* avoid needing a single global lock.
*/
import arc::methods;
import dvec::dvec;
import dvec::extensions;
import sys::methods;
export port;
export chan;
export send, recv;
export methods;
type raw_port<T: send> = arc::exclusive<dvec<T>>;
enum port<T: send> {
port_(raw_port<T>)
}
enum chan<T: send> {
chan_(raw_port<T>)
}
fn port<T: send>() -> port<T> {
port_(arc::exclusive(dvec()))
}
fn chan<T: send>(p: port<T>) -> chan<T> {
chan_((*p).clone())
}
fn send<T: send>(c: chan<T>, -x: T) {
let mut x <- some(x);
do (*c).with |cond, data| {
let mut xx = none;
xx <-> x;
(*data).push(option::unwrap(xx));
cond.signal();
}
}
fn recv<T: send>(p: port<T>) -> T {
do (*p).with |cond, data| {
if (*data).len() == 0u {
cond.wait();
}
assert (*data).len() > 0u;
(*data).shift()
}
}
impl methods<T: send> for chan<T> {
fn send(-x: T) {
send(self, x)
}
fn clone() -> chan<T> {
chan_((*self).clone())
}
}
impl methods<T: send> for port<T> {
fn recv() -> T {
recv(self)
}
fn chan() -> chan<T> {
chan(self)
}
}
#[cfg(test)]
mod test {
#[test]
fn newport_simple() {
let p = port();
let c = chan(p);
c.send(42);
assert p.recv() == 42;
}
}

View File

@ -1,77 +0,0 @@
// This test creates a bunch of tasks that simultaneously send to each
// other in a ring. The messages should all be basically
// independent. It's designed to hammer the global kernel lock, so
// that things will look really good once we get that lock out of the
// message path.
import newcomm::*;
import future::future;
import future::extensions;
use std;
import std::time;
fn thread_ring(i: uint,
count: uint,
num_chan: chan<uint>,
num_port: port<uint>) {
// Send/Receive lots of messages.
for uint::range(0u, count) |j| {
num_chan.send(i * j);
num_port.recv();
};
}
fn main(args: ~[str]) {
let args = if os::getenv("RUST_BENCH").is_some() {
~["", "100", "10000"]
} else if args.len() <= 1u {
~["", "100", "1000"]
} else {
args
};
let num_tasks = option::get(uint::from_str(args[1]));
let msg_per_task = option::get(uint::from_str(args[2]));
let num_port = port();
let mut num_chan = chan(num_port);
let start = time::precise_time_s();
// create the ring
let mut futures = ~[];
for uint::range(1u, num_tasks) |i| {
let get_chan = port();
let get_chan_chan = chan(get_chan);
{
let num_chan = num_chan.clone();
futures += ~[do future::spawn |move num_chan, move get_chan_chan| {
let p = port();
get_chan_chan.send(chan(p));
thread_ring(i, msg_per_task, num_chan, p)
}];
}
num_chan = get_chan.recv();
};
// do our iteration
thread_ring(0u, msg_per_task, num_chan, num_port);
// synchronize
for futures.each |f| { f.get() };
let stop = time::precise_time_s();
// all done, report stats.
let num_msgs = num_tasks * msg_per_task;
let elapsed = (stop - start);
let rate = (num_msgs as float) / elapsed;
io::println(#fmt("Sent %? messages in %? seconds",
num_msgs, elapsed));
io::println(#fmt(" %? messages / second", rate));
io::println(#fmt(" %? μs / message", 1000000. / rate));
}