Fix an apparent race in pipes.
Also removed some unsafety in pipes and added vec::consume_mut.
This commit is contained in:
parent
110ff312df
commit
bd195518c7
@ -117,6 +117,8 @@ struct packet_header {
|
|||||||
}
|
}
|
||||||
|
|
||||||
unsafe fn unblock() {
|
unsafe fn unblock() {
|
||||||
|
assert self.state != blocked || self.blocked_task != none;
|
||||||
|
self.blocked_task = none;
|
||||||
alt swap_state_acq(self.state, empty) {
|
alt swap_state_acq(self.state, empty) {
|
||||||
empty | blocked { }
|
empty | blocked { }
|
||||||
terminated { self.state = terminated; }
|
terminated { self.state = terminated; }
|
||||||
@ -322,7 +324,7 @@ fn send<T: send, Tbuffer: send>(-p: send_packet_buffered<T, Tbuffer>,
|
|||||||
rustrt::task_signal_event(
|
rustrt::task_signal_event(
|
||||||
task, ptr::addr_of(p.header) as *libc::c_void);
|
task, ptr::addr_of(p.header) as *libc::c_void);
|
||||||
}
|
}
|
||||||
none { fail ~"blocked packet has no task" }
|
none { debug!{"just kidding!"} }
|
||||||
}
|
}
|
||||||
|
|
||||||
// The receiver will eventually clean this up.
|
// The receiver will eventually clean this up.
|
||||||
@ -878,23 +880,28 @@ struct port_set<T: send> : recv<T> {
|
|||||||
|
|
||||||
fn try_recv() -> option<T> {
|
fn try_recv() -> option<T> {
|
||||||
let mut result = none;
|
let mut result = none;
|
||||||
while result == none && self.ports.len() > 0 {
|
// we have to swap the ports array so we aren't borrowing
|
||||||
let i = wait_many(self.ports.map(|p| p.header()));
|
// aliasable mutable memory.
|
||||||
// dereferencing an unsafe pointer nonsense to appease the
|
let mut ports = ~[];
|
||||||
// borrowchecker.
|
ports <-> self.ports;
|
||||||
alt move unsafe {(*ptr::addr_of(self.ports[i])).try_recv()} {
|
while result == none && ports.len() > 0 {
|
||||||
some(m) {
|
let i = wait_many(ports.map(|p| p.header()));
|
||||||
result = some(move_it!{m});
|
alt move ports[i].try_recv() {
|
||||||
}
|
some(m) {
|
||||||
none {
|
result = some(move m);
|
||||||
// Remove this port.
|
}
|
||||||
let mut ports = ~[];
|
none {
|
||||||
self.ports <-> ports;
|
// Remove this port.
|
||||||
vec::consume(ports,
|
let mut ports_ = ~[];
|
||||||
|j, x| if i != j { vec::push(self.ports, x) });
|
ports <-> ports_;
|
||||||
}
|
vec::consume(ports_,
|
||||||
|
|j, x| if i != j {
|
||||||
|
vec::push(ports, x)
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ports <-> self.ports;
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6,7 +6,7 @@ import libc::size_t;
|
|||||||
|
|
||||||
export append;
|
export append;
|
||||||
export append_one;
|
export append_one;
|
||||||
export consume;
|
export consume, consume_mut;
|
||||||
export init_op;
|
export init_op;
|
||||||
export is_empty;
|
export is_empty;
|
||||||
export is_not_empty;
|
export is_not_empty;
|
||||||
@ -490,6 +490,17 @@ fn consume<T>(+v: ~[T], f: fn(uint, +T)) unsafe {
|
|||||||
unsafe::set_len(v, 0);
|
unsafe::set_len(v, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn consume_mut<T>(+v: ~[mut T], f: fn(uint, +T)) unsafe {
|
||||||
|
do as_buf(v) |p, ln| {
|
||||||
|
for uint::range(0, ln) |i| {
|
||||||
|
let x <- *ptr::offset(p, i);
|
||||||
|
f(i, x);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe::set_len(v, 0);
|
||||||
|
}
|
||||||
|
|
||||||
/// Remove the last element from a vector and return it
|
/// Remove the last element from a vector and return it
|
||||||
fn pop<T>(&v: ~[const T]) -> T {
|
fn pop<T>(&v: ~[const T]) -> T {
|
||||||
let ln = len(v);
|
let ln = len(v);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user