rt: Add rust_port_select function

This commit is contained in:
Brian Anderson 2012-02-14 22:23:16 -08:00
parent e62ddf4898
commit b2cfb7ef82
8 changed files with 153 additions and 9 deletions

View File

@ -52,6 +52,7 @@ RUNTIME_CS_$(1) := \
rt/rust_uv.cpp \
rt/rust_uvtmp.cpp \
rt/rust_log.cpp \
rt/rust_port_selector.cpp \
rt/circular_buffer.cpp \
rt/isaac/randport.cpp \
rt/rust_srv.cpp \
@ -88,6 +89,7 @@ RUNTIME_HDR_$(1) := rt/globals.h \
rt/rust_stack.h \
rt/rust_task_list.h \
rt/rust_log.h \
rt/rust_port_selector.h \
rt/circular_buffer.h \
rt/util/array_list.h \
rt/util/indexed_list.h \

View File

@ -593,6 +593,14 @@ port_recv(uintptr_t *dptr, rust_port *port,
return;
}
extern "C" CDECL void
rust_port_select(rust_port **dptr, rust_port **ports,
size_t n_ports, uintptr_t *yield) {
rust_task *task = rust_task_thread::get_task();
rust_port_selector *selector = task->get_port_selector();
selector->select(task, dptr, ports, n_ports, yield);
}
extern "C" CDECL void
rust_set_exit_status(intptr_t code) {
rust_task *task = rust_task_thread::get_task();

View File

@ -30,18 +30,34 @@ void rust_port::detach() {
void rust_port::send(void *sptr) {
I(task->thread, !lock.lock_held_by_current_thread());
scoped_lock with(lock);
bool did_rendezvous = false;
{
scoped_lock with(lock);
buffer.enqueue(sptr);
buffer.enqueue(sptr);
A(kernel, !buffer.is_empty(),
"rust_chan::transmit with nothing to send.");
A(kernel, !buffer.is_empty(),
"rust_chan::transmit with nothing to send.");
if (task->blocked_on(this)) {
KLOG(kernel, comm, "dequeued in rendezvous_ptr");
buffer.dequeue(task->rendezvous_ptr);
task->rendezvous_ptr = 0;
task->wakeup(this);
if (task->blocked_on(this)) {
KLOG(kernel, comm, "dequeued in rendezvous_ptr");
buffer.dequeue(task->rendezvous_ptr);
task->rendezvous_ptr = 0;
task->wakeup(this);
did_rendezvous = true;
}
}
if (!did_rendezvous) {
// If the task wasn't waiting specifically on this port,
// it may be waiting on a group of ports
rust_port_selector *port_selector = task->get_port_selector();
// This check is not definitive. The port selector will take a lock
// and check again whether the task is still blocked.
if (task->blocked_on(port_selector)) {
port_selector->msg_sent_on(this);
}
}
}

View File

@ -1,6 +1,8 @@
#ifndef RUST_PORT_H
#define RUST_PORT_H
#include "rust_internal.h"
class rust_port : public kernel_owned<rust_port>, public rust_cond {
public:
RUST_REFCOUNTED(rust_port)

View File

@ -0,0 +1,83 @@
#include "rust_port.h"
#include "rust_port_selector.h"
rust_port_selector::rust_port_selector()
: ports(NULL), n_ports(0) {
}
void
rust_port_selector::select(rust_task *task, rust_port **dptr,
rust_port **ports,
size_t n_ports, uintptr_t *yield) {
I(task->thread, this->ports == NULL);
I(task->thread, this->n_ports == 0);
I(task->thread, dptr != NULL);
I(task->thread, ports != NULL);
I(task->thread, n_ports != 0);
I(task->thread, yield != NULL);
*yield = false;
size_t locks_taken = 0;
bool found_msg = false;
// Take each port's lock as we iterate through them because
// if none of them contain a usable message then we need to
// block the task before any of them can try to send another
// message.
for (size_t i = 0; i < n_ports; i++) {
rust_port *port = ports[i];
I(task->thread, port != NULL);
port->lock.lock();
locks_taken++;
if (port->buffer.size() > 0) {
*dptr = port;
found_msg = true;
break;
}
}
if (!found_msg) {
this->ports = ports;
this->n_ports = n_ports;
I(task->thread, task->rendezvous_ptr == NULL);
task->rendezvous_ptr = (uintptr_t*)dptr;
*yield = true;
task->block(this, "waiting for select rendezvous");
}
for (size_t i = 0; i < locks_taken; i++) {
rust_port *port = ports[i];
port->lock.unlock();
}
}
void
rust_port_selector::msg_sent_on(rust_port *port) {
rust_task *task = port->task;
I(task->thread, !task->lock.lock_held_by_current_thread());
I(task->thread, !port->lock.lock_held_by_current_thread());
I(task->thread, !rendezvous_lock.lock_held_by_current_thread());
// Prevent two ports from trying to wake up the task
// simultaneously
scoped_lock with(rendezvous_lock);
if (task->blocked_on(this)) {
for (size_t i = 0; i < n_ports; i++) {
if (port == ports[i]) {
// This was one of the ports we were waiting on
ports = NULL;
n_ports = 0;
*task->rendezvous_ptr = (uintptr_t) port;
task->rendezvous_ptr = NULL;
task->wakeup(this);
return;
}
}
}
}

View File

@ -0,0 +1,27 @@
#ifndef RUST_PORT_SELECTOR_H
#define RUST_PORT_SELECTOR_H
#include "rust_internal.h"
struct rust_task;
class rust_port;
class rust_port_selector : public rust_cond {
private:
rust_port **ports;
size_t n_ports;
lock_and_signal rendezvous_lock;
public:
rust_port_selector();
void select(rust_task *task,
rust_port **dptr,
rust_port **ports,
size_t n_ports,
uintptr_t *yield);
void msg_sent_on(rust_port *port);
};
#endif /* RUST_PORT_SELECTOR_H */

View File

@ -16,6 +16,7 @@
#include "rust_obstack.h"
#include "boxed_region.h"
#include "rust_stack.h"
#include "rust_port_selector.h"
// Corresponds to the rust chan (currently _chan) type.
struct chan_handle {
@ -116,6 +117,8 @@ private:
uintptr_t next_c_sp;
uintptr_t next_rust_sp;
rust_port_selector port_selector;
// Called when the atomic refcount reaches zero
void delete_this();
@ -206,6 +209,8 @@ public:
void call_on_c_stack(void *args, void *fn_ptr);
void call_on_rust_stack(void *args, void *fn_ptr);
bool have_c_stack() { return c_stack != NULL; }
rust_port_selector *get_port_selector() { return &port_selector; }
};
// This stuff is on the stack-switching fast path

View File

@ -17,6 +17,7 @@ nano_time
new_port
new_task
port_recv
rust_port_select
rand_free
rand_new
rand_next