Basic functionality for new ports and chans

The first benchmark shows about twice the throughput of the old system.
This commit is contained in:
Eric Holk 2012-06-07 12:18:34 -07:00
parent f54829cf13
commit e4c291530e
5 changed files with 149 additions and 33 deletions

View File

@ -28,6 +28,7 @@ resource arc_destruct<T>(data: *libc::c_void) {
unsafe { unsafe {
let data: ~arc_data<T> = unsafe::reinterpret_cast(data); let data: ~arc_data<T> = unsafe::reinterpret_cast(data);
let new_count = rustrt::rust_atomic_decrement(&mut data.count); let new_count = rustrt::rust_atomic_decrement(&mut data.count);
let data_ptr : *() = unsafe::reinterpret_cast(data);
assert new_count >= 0; assert new_count >= 0;
if new_count == 0 { if new_count == 0 {
// drop glue takes over. // drop glue takes over.
@ -68,17 +69,19 @@ allowing them to share the underlying data."]
fn clone<T: const>(rc: &arc<T>) -> arc<T> { fn clone<T: const>(rc: &arc<T>) -> arc<T> {
unsafe { unsafe {
let ptr: ~arc_data<T> = unsafe::reinterpret_cast(**rc); let ptr: ~arc_data<T> = unsafe::reinterpret_cast(**rc);
rustrt::rust_atomic_increment(&mut ptr.count); let new_count = rustrt::rust_atomic_increment(&mut ptr.count);
let data_ptr : *() = unsafe::reinterpret_cast(ptr);
assert new_count >= 2;
unsafe::forget(ptr); unsafe::forget(ptr);
} }
arc_destruct(**rc) arc_destruct(**rc)
} }
// An arc over mutable data that is protected by a lock. // An arc over mutable data that is protected by a lock.
type ex_data<T> = {lock: sys::lock_and_signal, data: T}; type ex_data<T: send> = {lock: sys::lock_and_signal, data: T};
type exclusive<T> = arc_destruct<ex_data<T>>; type exclusive<T: send> = arc_destruct<ex_data<T>>;
fn exclusive<T>(-data: T) -> exclusive<T> { fn exclusive<T:send >(-data: T) -> exclusive<T> {
let data = ~{mut count: 1, data: {lock: sys::create_lock(), let data = ~{mut count: 1, data: {lock: sys::create_lock(),
data: data}}; data: data}};
unsafe { unsafe {
@ -88,12 +91,14 @@ fn exclusive<T>(-data: T) -> exclusive<T> {
} }
} }
impl methods<T> for exclusive<T> { impl methods<T: send> for exclusive<T> {
fn clone() -> exclusive<T> { fn clone() -> exclusive<T> {
unsafe { unsafe {
// this makes me nervous... // this makes me nervous...
let ptr: ~arc_data<ex_data<T>> = unsafe::reinterpret_cast(*self); let ptr: ~arc_data<ex_data<T>> = unsafe::reinterpret_cast(*self);
rustrt::rust_atomic_increment(&mut ptr.count); let new_count = rustrt::rust_atomic_increment(&mut ptr.count);
let data_ptr : *() = unsafe::reinterpret_cast(ptr);
assert new_count > 1;
unsafe::forget(ptr); unsafe::forget(ptr);
} }
arc_destruct(*self) arc_destruct(*self)

View File

@ -39,7 +39,7 @@ export float, f32, f64;
export box, char, str, ptr, vec, bool; export box, char, str, ptr, vec, bool;
export either, option, result, iter; export either, option, result, iter;
export libc, os, io, run, rand, sys, unsafe, logging; export libc, os, io, run, rand, sys, unsafe, logging;
export arc, comm, task, future; export arc, newcomm, comm, task, future;
export extfmt; export extfmt;
export tuple; export tuple;
export to_str; export to_str;
@ -176,6 +176,7 @@ mod dvec_iter {
// Concurrency // Concurrency
mod arc; mod arc;
mod newcomm;
mod comm; mod comm;
mod task; mod task;
mod future; mod future;

View File

@ -132,14 +132,6 @@ impl extensions<A> for dvec<A> {
self.check_not_borrowed(); self.check_not_borrowed();
self.data <- w; self.data <- w;
} }
}
impl extensions<A:copy> for dvec<A> {
#[doc = "Append a single item to the end of the list"]
fn push(t: A) {
self.check_not_borrowed();
vec::push(self.data, t);
}
#[doc = "Remove and return the last element"] #[doc = "Remove and return the last element"]
fn pop() -> A { fn pop() -> A {
@ -151,6 +143,38 @@ impl extensions<A:copy> for dvec<A> {
} }
} }
#[doc = "Insert a single item at the front of the list"]
fn unshift(-t: A) {
unsafe {
let mut data = unsafe::reinterpret_cast(null::<()>());
data <-> self.data;
let data_ptr: *() = unsafe::reinterpret_cast(data);
if data_ptr.is_null() { fail "Recursive use of dvec"; }
log(error, "a");
self.data <- [mut t] + data;
log(error, "b");
}
}
#[doc = "Append a single item to the end of the list"]
fn push(+t: A) {
self.check_not_borrowed();
vec::push(self.data, t);
}
#[doc = "Remove and return the first element"]
fn shift() -> A {
self.borrow { |v|
let mut v = vec::from_mut(v);
let result = vec::shift(v);
self.return(vec::to_mut(v));
result
}
}
}
impl extensions<A:copy> for dvec<A> {
#[doc = " #[doc = "
Append all elements of a vector to the end of the list Append all elements of a vector to the end of the list
@ -213,16 +237,6 @@ impl extensions<A:copy> for dvec<A> {
} }
} }
#[doc = "Remove and return the first element"]
fn shift() -> A {
self.borrow { |v|
let mut v = vec::from_mut(v);
let result = vec::shift(v);
self.return(vec::to_mut(v));
result
}
}
#[doc = "Copy out an individual element"] #[doc = "Copy out an individual element"]
#[inline(always)] #[inline(always)]
fn [](idx: uint) -> A { fn [](idx: uint) -> A {

88
src/libcore/newcomm.rs Normal file
View File

@ -0,0 +1,88 @@
#[doc="A new implementation of communication.
This should be implementing almost entirely in Rust, and hopefully
avoid needing a single global lock."]
import arc::methods;
import dvec::dvec;
import dvec::{extensions};
export port;
export chan;
export send, recv;
export methods;
type raw_port<T: send> = arc::exclusive<dvec<T>>;
enum port<T: send> {
port_(raw_port<T>)
}
enum chan<T: send> {
chan_(raw_port<T>)
}
fn port<T: send>() -> port<T> {
port_(arc::exclusive(dvec()))
}
fn chan<T: send>(p: port<T>) -> chan<T> {
chan_((*p).clone())
}
fn send<T: send>(c: chan<T>, -x: T) {
let mut x <- some(x);
(*c).with {|cond, data|
let mut xx = none;
xx <-> x;
alt xx {
some(y) {
let mut x <- y;
(*data).push(x);
cond.signal();
}
none { fail }
};
}
}
fn recv<T: send>(p: port<T>) -> T {
(*p).with {|cond, data|
if (*data).len() == 0u {
cond.wait();
}
assert (*data).len() > 0u;
(*data).shift()
}
}
impl methods<T: send> for chan<T> {
fn send(-x: T) {
send(self, x)
}
fn clone() -> chan<T> {
chan_((*self).clone())
}
}
impl methods<T: send> for port<T> {
fn recv() -> T {
recv(self)
}
fn chan() -> chan<T> {
chan(self)
}
}
#[cfg(test)]
mod test {
#[test]
fn newport_simple() {
let p = port();
let c = chan(p);
c.send(42);
assert p.recv() == 42;
}
}

View File

@ -371,17 +371,25 @@ fn rsplitn<T: copy>(v: [T]/&, n: uint, f: fn(T) -> bool) -> [[T]] {
// Mutators // Mutators
#[doc = "Removes the first element from a vector and return it"] #[doc = "Removes the first element from a vector and return it"]
fn shift<T: copy>(&v: [T]) -> T { fn shift<T>(&v: [T]) -> T {
let ln = len::<T>(v); let ln = len::<T>(v);
assert (ln > 0u); assert (ln > 0u);
let e = v[0];
v = slice::<T>(v, 1u, ln); let mut vv = [];
ret e; v <-> vv;
unsafe {
let vv = unsafe::to_ptr(vv);
let r <- *vv;
for uint::range(1u, ln) {|i|
// FIXME: this isn't legal, per se...
let r <- *ptr::offset(vv, i);
push(v, r);
} }
#[doc = "Prepend an element to a vector"] r
fn unshift<T: copy>(&v: [T], +t: T) { }
v = [t] + v;
} }
#[doc = "Remove the last element from a vector and return it"] #[doc = "Remove the last element from a vector and return it"]