diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index ae141cedc82..d6c5b32de59 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -11,7 +11,7 @@ enum state { type packet = { mut state: state, - mut blocked_task: option, + mut blocked_task: option<*rust_task>, mut payload: option }; @@ -31,6 +31,19 @@ native mod rusti { fn atomic_xchng_rel(&dst: int, src: int) -> int; } +type rust_task = libc::c_void; + +native mod rustrt { + #[rust_stack] + fn rust_get_task() -> *rust_task; + + #[rust_stack] + fn task_clear_event_reject(task: *rust_task); + + fn task_wait_event(this: *rust_task) -> *libc::c_void; + fn task_signal_event(target: *rust_task, event: *libc::c_void); +} + // We should consider moving this to core::unsafe, although I // suspect graydon would want us to use void pointers instead. unsafe fn uniquify(x: *T) -> ~T { @@ -54,8 +67,8 @@ fn swap_state_rel(&dst: state, src: state) -> state { } fn send(-p: send_packet, -payload: T) { - let p = p.unwrap(); - let p = unsafe { uniquify(p) }; + let p_ = p.unwrap(); + let p = unsafe { uniquify(p_) }; assert (*p).payload == none; (*p).payload <- some(payload); let old_state = swap_state_rel((*p).state, full); @@ -68,8 +81,13 @@ fn send(-p: send_packet, -payload: T) { } full { fail "duplicate send" } blocked { - // TODO: once the target will actually block, tell the - // scheduler to wake it up. + #debug("waking up task for %?", p_); + alt p.blocked_task { + some(task) { + rustrt::task_signal_event(task, p_ as *libc::c_void); + } + none { fail "blocked packet has no task" } + } // The receiver will eventually clean this up. unsafe { forget(p); } @@ -82,16 +100,32 @@ fn send(-p: send_packet, -payload: T) { } fn recv(-p: recv_packet) -> option { - let p = p.unwrap(); - let p = unsafe { uniquify(p) }; + let p_ = p.unwrap(); + let p = unsafe { uniquify(p_) }; + let this = rustrt::rust_get_task(); + rustrt::task_clear_event_reject(this); + p.blocked_task = some(this); loop { let old_state = swap_state_acq((*p).state, blocked); + #debug("%?", old_state); alt old_state { - empty | blocked { task::yield(); } + empty { + #debug("no data available on %?, going to sleep.", p_); + rustrt::task_wait_event(this); + #debug("woke up, p.state = %?", p.state); + if p.state == full { + let mut payload = none; + payload <-> (*p).payload; + p.state = terminated; + ret some(option::unwrap(payload)) + } + } + blocked { fail "blocking on already blocked packet" } full { let mut payload = none; payload <-> (*p).payload; + p.state = terminated; ret some(option::unwrap(payload)) } terminated { diff --git a/src/libcore/task.rs b/src/libcore/task.rs index fd69525d63e..7730ef8adfe 100644 --- a/src/libcore/task.rs +++ b/src/libcore/task.rs @@ -782,6 +782,7 @@ extern mod rustrt { fn rust_new_sched(num_threads: libc::uintptr_t) -> sched_id; fn get_task_id() -> task_id; + #[rust_stack] fn rust_get_task() -> *rust_task; fn new_task() -> *rust_task; diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index a33d3cb90fe..732dbaa3293 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -922,6 +922,26 @@ rust_task_local_data_atexit(rust_task *task, void (*cleanup_fn)(void *data)) { task->task_local_data_cleanup = cleanup_fn; } +extern "C" void +task_clear_event_reject(rust_task *task) { + task->clear_event_reject(); +} + +// Waits on an event, returning the pointer to the event that unblocked this +// task. +extern "C" void * +task_wait_event(rust_task *task) { + // TODO: we should assert that the passed in task is the currently running + // task. We wouldn't want to wait some other task. + + return task->wait_event(); +} + +extern "C" void +task_signal_event(rust_task *target, void *event) { + target->signal_event(event); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index d7d43bcd27d..6a9f5cf5001 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -36,6 +36,8 @@ rust_task::rust_task(rust_sched_loop *sched_loop, rust_task_state state, state(state), cond(NULL), cond_name("none"), + event_reject(false), + event(NULL), killed(false), reentered_rust_stack(false), disallow_kill(0), @@ -407,13 +409,20 @@ rust_task::free(void *p) void rust_task::transition(rust_task_state src, rust_task_state dst, rust_cond *cond, const char* cond_name) { + scoped_lock with(state_lock); + transition_locked(src, dst, cond, cond_name); +} + +void rust_task::transition_locked(rust_task_state src, rust_task_state dst, + rust_cond *cond, const char* cond_name) { + state_lock.must_have_lock(); sched_loop->transition(this, src, dst, cond, cond_name); } void rust_task::set_state(rust_task_state state, rust_cond *cond, const char* cond_name) { - scoped_lock with(state_lock); + state_lock.must_have_lock(); this->state = state; this->cond = cond; this->cond_name = cond_name; @@ -422,7 +431,11 @@ rust_task::set_state(rust_task_state state, bool rust_task::block(rust_cond *on, const char* name) { scoped_lock with(kill_lock); + return block_locked(on, name); +} +bool +rust_task::block_locked(rust_cond *on, const char* name) { if (must_fail_from_being_killed_unlocked()) { // We're already going to die. Don't block. Tell the task to fail return false; @@ -433,19 +446,25 @@ rust_task::block(rust_cond *on, const char* name) { assert(cond == NULL && "Cannot block an already blocked task."); assert(on != NULL && "Cannot block on a NULL object."); - transition(task_state_running, task_state_blocked, on, name); + transition_locked(task_state_running, task_state_blocked, on, name); return true; } void rust_task::wakeup(rust_cond *from) { + scoped_lock with(state_lock); + wakeup_locked(from); +} + +void +rust_task::wakeup_locked(rust_cond *from) { assert(cond != NULL && "Cannot wake up unblocked task."); LOG(this, task, "Blocked on 0x%" PRIxPTR " woken up on 0x%" PRIxPTR, (uintptr_t) cond, (uintptr_t) from); assert(cond == from && "Cannot wake up blocked task on wrong condition."); - transition(task_state_blocked, task_state_running, NULL, "none"); + transition_locked(task_state_blocked, task_state_running, NULL, "none"); } void @@ -693,6 +712,34 @@ rust_task::allow_kill() { disallow_kill--; } +void * +rust_task::wait_event() { + scoped_lock with(state_lock); + + if(!event_reject) { + block_locked(&event_cond, "waiting on event"); + bool killed = false; + state_lock.unlock(); + yield(&killed); + state_lock.lock(); + // TODO: what is the right thing to do if we are killed? + } + + event_reject = false; + return event; +} + +void +rust_task::signal_event(void *event) { + scoped_lock with(state_lock); + + this->event = event; + event_reject = true; + if(task_state_blocked == state) { + wakeup_locked(&event_cond); + } +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index 96b0bddd4e0..c5bdc50e430 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -175,6 +175,10 @@ private: rust_cond *cond; const char *cond_name; + bool event_reject; + rust_cond event_cond; + void *event; + // Protects the killed flag, disallow_kill flag, reentered_rust_stack lock_and_signal kill_lock; // Indicates that the task was killed and needs to unwind @@ -205,6 +209,8 @@ private: void transition(rust_task_state src, rust_task_state dst, rust_cond *cond, const char* cond_name); + void transition_locked(rust_task_state src, rust_task_state dst, + rust_cond *cond, const char* cond_name); bool must_fail_from_being_killed_unlocked(); // Called by rust_task_fail to unwind on failure @@ -221,6 +227,9 @@ private: char const *file, size_t line); + bool block_locked(rust_cond *on, const char* name); + void wakeup_locked(rust_cond *from); + public: // Only a pointer to 'name' is kept, so it must live as long as this task. @@ -303,6 +312,13 @@ public: rust_cond *get_cond() { return cond; } const char *get_cond_name() { return cond_name; } + void clear_event_reject() { + this->event_reject = false; + } + + void *wait_event(); + void signal_event(void *event); + void cleanup_after_turn(); void inhibit_kill(); diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index a218782dcbe..e674d6fa197 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -64,6 +64,9 @@ start_task vec_reserve_shared str_reserve_shared vec_from_buf_shared +task_clear_event_reject +task_wait_event +task_signal_event unsupervise upcall_cmp_type upcall_fail