Pipes sleep and wake properly.
This commit is contained in:
parent
a4838c93aa
commit
e5c9cb2b3d
@ -11,7 +11,7 @@ enum state {
|
||||
|
||||
type packet<T: send> = {
|
||||
mut state: state,
|
||||
mut blocked_task: option<task::task>,
|
||||
mut blocked_task: option<*rust_task>,
|
||||
mut payload: option<T>
|
||||
};
|
||||
|
||||
@ -31,6 +31,19 @@ native mod rusti {
|
||||
fn atomic_xchng_rel(&dst: int, src: int) -> int;
|
||||
}
|
||||
|
||||
type rust_task = libc::c_void;
|
||||
|
||||
native mod rustrt {
|
||||
#[rust_stack]
|
||||
fn rust_get_task() -> *rust_task;
|
||||
|
||||
#[rust_stack]
|
||||
fn task_clear_event_reject(task: *rust_task);
|
||||
|
||||
fn task_wait_event(this: *rust_task) -> *libc::c_void;
|
||||
fn task_signal_event(target: *rust_task, event: *libc::c_void);
|
||||
}
|
||||
|
||||
// We should consider moving this to core::unsafe, although I
|
||||
// suspect graydon would want us to use void pointers instead.
|
||||
unsafe fn uniquify<T>(x: *T) -> ~T {
|
||||
@ -54,8 +67,8 @@ fn swap_state_rel(&dst: state, src: state) -> state {
|
||||
}
|
||||
|
||||
fn send<T: send>(-p: send_packet<T>, -payload: T) {
|
||||
let p = p.unwrap();
|
||||
let p = unsafe { uniquify(p) };
|
||||
let p_ = p.unwrap();
|
||||
let p = unsafe { uniquify(p_) };
|
||||
assert (*p).payload == none;
|
||||
(*p).payload <- some(payload);
|
||||
let old_state = swap_state_rel((*p).state, full);
|
||||
@ -68,8 +81,13 @@ fn send<T: send>(-p: send_packet<T>, -payload: T) {
|
||||
}
|
||||
full { fail "duplicate send" }
|
||||
blocked {
|
||||
// TODO: once the target will actually block, tell the
|
||||
// scheduler to wake it up.
|
||||
#debug("waking up task for %?", p_);
|
||||
alt p.blocked_task {
|
||||
some(task) {
|
||||
rustrt::task_signal_event(task, p_ as *libc::c_void);
|
||||
}
|
||||
none { fail "blocked packet has no task" }
|
||||
}
|
||||
|
||||
// The receiver will eventually clean this up.
|
||||
unsafe { forget(p); }
|
||||
@ -82,16 +100,32 @@ fn send<T: send>(-p: send_packet<T>, -payload: T) {
|
||||
}
|
||||
|
||||
fn recv<T: send>(-p: recv_packet<T>) -> option<T> {
|
||||
let p = p.unwrap();
|
||||
let p = unsafe { uniquify(p) };
|
||||
let p_ = p.unwrap();
|
||||
let p = unsafe { uniquify(p_) };
|
||||
let this = rustrt::rust_get_task();
|
||||
rustrt::task_clear_event_reject(this);
|
||||
p.blocked_task = some(this);
|
||||
loop {
|
||||
let old_state = swap_state_acq((*p).state,
|
||||
blocked);
|
||||
#debug("%?", old_state);
|
||||
alt old_state {
|
||||
empty | blocked { task::yield(); }
|
||||
empty {
|
||||
#debug("no data available on %?, going to sleep.", p_);
|
||||
rustrt::task_wait_event(this);
|
||||
#debug("woke up, p.state = %?", p.state);
|
||||
if p.state == full {
|
||||
let mut payload = none;
|
||||
payload <-> (*p).payload;
|
||||
p.state = terminated;
|
||||
ret some(option::unwrap(payload))
|
||||
}
|
||||
}
|
||||
blocked { fail "blocking on already blocked packet" }
|
||||
full {
|
||||
let mut payload = none;
|
||||
payload <-> (*p).payload;
|
||||
p.state = terminated;
|
||||
ret some(option::unwrap(payload))
|
||||
}
|
||||
terminated {
|
||||
|
@ -782,6 +782,7 @@ extern mod rustrt {
|
||||
fn rust_new_sched(num_threads: libc::uintptr_t) -> sched_id;
|
||||
|
||||
fn get_task_id() -> task_id;
|
||||
#[rust_stack]
|
||||
fn rust_get_task() -> *rust_task;
|
||||
|
||||
fn new_task() -> *rust_task;
|
||||
|
@ -922,6 +922,26 @@ rust_task_local_data_atexit(rust_task *task, void (*cleanup_fn)(void *data)) {
|
||||
task->task_local_data_cleanup = cleanup_fn;
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
task_clear_event_reject(rust_task *task) {
|
||||
task->clear_event_reject();
|
||||
}
|
||||
|
||||
// Waits on an event, returning the pointer to the event that unblocked this
|
||||
// task.
|
||||
extern "C" void *
|
||||
task_wait_event(rust_task *task) {
|
||||
// TODO: we should assert that the passed in task is the currently running
|
||||
// task. We wouldn't want to wait some other task.
|
||||
|
||||
return task->wait_event();
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
task_signal_event(rust_task *target, void *event) {
|
||||
target->signal_event(event);
|
||||
}
|
||||
|
||||
//
|
||||
// Local Variables:
|
||||
// mode: C++
|
||||
|
@ -36,6 +36,8 @@ rust_task::rust_task(rust_sched_loop *sched_loop, rust_task_state state,
|
||||
state(state),
|
||||
cond(NULL),
|
||||
cond_name("none"),
|
||||
event_reject(false),
|
||||
event(NULL),
|
||||
killed(false),
|
||||
reentered_rust_stack(false),
|
||||
disallow_kill(0),
|
||||
@ -407,13 +409,20 @@ rust_task::free(void *p)
|
||||
void
|
||||
rust_task::transition(rust_task_state src, rust_task_state dst,
|
||||
rust_cond *cond, const char* cond_name) {
|
||||
scoped_lock with(state_lock);
|
||||
transition_locked(src, dst, cond, cond_name);
|
||||
}
|
||||
|
||||
void rust_task::transition_locked(rust_task_state src, rust_task_state dst,
|
||||
rust_cond *cond, const char* cond_name) {
|
||||
state_lock.must_have_lock();
|
||||
sched_loop->transition(this, src, dst, cond, cond_name);
|
||||
}
|
||||
|
||||
void
|
||||
rust_task::set_state(rust_task_state state,
|
||||
rust_cond *cond, const char* cond_name) {
|
||||
scoped_lock with(state_lock);
|
||||
state_lock.must_have_lock();
|
||||
this->state = state;
|
||||
this->cond = cond;
|
||||
this->cond_name = cond_name;
|
||||
@ -422,7 +431,11 @@ rust_task::set_state(rust_task_state state,
|
||||
bool
|
||||
rust_task::block(rust_cond *on, const char* name) {
|
||||
scoped_lock with(kill_lock);
|
||||
return block_locked(on, name);
|
||||
}
|
||||
|
||||
bool
|
||||
rust_task::block_locked(rust_cond *on, const char* name) {
|
||||
if (must_fail_from_being_killed_unlocked()) {
|
||||
// We're already going to die. Don't block. Tell the task to fail
|
||||
return false;
|
||||
@ -433,19 +446,25 @@ rust_task::block(rust_cond *on, const char* name) {
|
||||
assert(cond == NULL && "Cannot block an already blocked task.");
|
||||
assert(on != NULL && "Cannot block on a NULL object.");
|
||||
|
||||
transition(task_state_running, task_state_blocked, on, name);
|
||||
transition_locked(task_state_running, task_state_blocked, on, name);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
rust_task::wakeup(rust_cond *from) {
|
||||
scoped_lock with(state_lock);
|
||||
wakeup_locked(from);
|
||||
}
|
||||
|
||||
void
|
||||
rust_task::wakeup_locked(rust_cond *from) {
|
||||
assert(cond != NULL && "Cannot wake up unblocked task.");
|
||||
LOG(this, task, "Blocked on 0x%" PRIxPTR " woken up on 0x%" PRIxPTR,
|
||||
(uintptr_t) cond, (uintptr_t) from);
|
||||
assert(cond == from && "Cannot wake up blocked task on wrong condition.");
|
||||
|
||||
transition(task_state_blocked, task_state_running, NULL, "none");
|
||||
transition_locked(task_state_blocked, task_state_running, NULL, "none");
|
||||
}
|
||||
|
||||
void
|
||||
@ -693,6 +712,34 @@ rust_task::allow_kill() {
|
||||
disallow_kill--;
|
||||
}
|
||||
|
||||
void *
|
||||
rust_task::wait_event() {
|
||||
scoped_lock with(state_lock);
|
||||
|
||||
if(!event_reject) {
|
||||
block_locked(&event_cond, "waiting on event");
|
||||
bool killed = false;
|
||||
state_lock.unlock();
|
||||
yield(&killed);
|
||||
state_lock.lock();
|
||||
// TODO: what is the right thing to do if we are killed?
|
||||
}
|
||||
|
||||
event_reject = false;
|
||||
return event;
|
||||
}
|
||||
|
||||
void
|
||||
rust_task::signal_event(void *event) {
|
||||
scoped_lock with(state_lock);
|
||||
|
||||
this->event = event;
|
||||
event_reject = true;
|
||||
if(task_state_blocked == state) {
|
||||
wakeup_locked(&event_cond);
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Local Variables:
|
||||
// mode: C++
|
||||
|
@ -175,6 +175,10 @@ private:
|
||||
rust_cond *cond;
|
||||
const char *cond_name;
|
||||
|
||||
bool event_reject;
|
||||
rust_cond event_cond;
|
||||
void *event;
|
||||
|
||||
// Protects the killed flag, disallow_kill flag, reentered_rust_stack
|
||||
lock_and_signal kill_lock;
|
||||
// Indicates that the task was killed and needs to unwind
|
||||
@ -205,6 +209,8 @@ private:
|
||||
|
||||
void transition(rust_task_state src, rust_task_state dst,
|
||||
rust_cond *cond, const char* cond_name);
|
||||
void transition_locked(rust_task_state src, rust_task_state dst,
|
||||
rust_cond *cond, const char* cond_name);
|
||||
|
||||
bool must_fail_from_being_killed_unlocked();
|
||||
// Called by rust_task_fail to unwind on failure
|
||||
@ -221,6 +227,9 @@ private:
|
||||
char const *file,
|
||||
size_t line);
|
||||
|
||||
bool block_locked(rust_cond *on, const char* name);
|
||||
void wakeup_locked(rust_cond *from);
|
||||
|
||||
public:
|
||||
|
||||
// Only a pointer to 'name' is kept, so it must live as long as this task.
|
||||
@ -303,6 +312,13 @@ public:
|
||||
rust_cond *get_cond() { return cond; }
|
||||
const char *get_cond_name() { return cond_name; }
|
||||
|
||||
void clear_event_reject() {
|
||||
this->event_reject = false;
|
||||
}
|
||||
|
||||
void *wait_event();
|
||||
void signal_event(void *event);
|
||||
|
||||
void cleanup_after_turn();
|
||||
|
||||
void inhibit_kill();
|
||||
|
@ -64,6 +64,9 @@ start_task
|
||||
vec_reserve_shared
|
||||
str_reserve_shared
|
||||
vec_from_buf_shared
|
||||
task_clear_event_reject
|
||||
task_wait_event
|
||||
task_signal_event
|
||||
unsupervise
|
||||
upcall_cmp_type
|
||||
upcall_fail
|
||||
|
Loading…
Reference in New Issue
Block a user