Removing proxies and message queues.

This commit is contained in:
Eric Holk 2011-07-29 11:00:44 -07:00 committed by Graydon Hoare
parent bc4e9afe25
commit d1dbb99984
17 changed files with 64 additions and 690 deletions

View File

@ -18,7 +18,6 @@ RUNTIME_CS := rt/sync/timer.cpp \
rt/rust_port.cpp \
rt/rust_upcall.cpp \
rt/rust_log.cpp \
rt/rust_message.cpp \
rt/rust_timer.cpp \
rt/circular_buffer.cpp \
rt/isaac/randport.cpp \
@ -44,9 +43,7 @@ RUNTIME_HDR := rt/globals.h \
rt/rust_scheduler.h \
rt/rust_task.h \
rt/rust_task_list.h \
rt/rust_proxy.h \
rt/rust_log.h \
rt/rust_message.h \
rt/circular_buffer.h \
rt/util/array_list.h \
rt/util/indexed_list.h \

View File

@ -93,7 +93,6 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) {
rust_srv *srv = new rust_srv(env);
rust_kernel *kernel = new rust_kernel(srv, env->num_sched_threads);
kernel->start();
rust_task *root_task = kernel->create_task(NULL, "main");
rust_scheduler *sched = root_task->sched;
command_line_args *args

View File

@ -4,7 +4,7 @@
/**
* Create a new rust channel and associate it with the specified port.
*/
rust_chan::rust_chan(rust_kernel *kernel, maybe_proxy<rust_port> *port,
rust_chan::rust_chan(rust_kernel *kernel, rust_port *port,
size_t unit_sz)
: ref_count(1),
kernel(kernel),
@ -29,18 +29,16 @@ rust_chan::~rust_chan() {
/**
* Link this channel with the specified port.
*/
void rust_chan::associate(maybe_proxy<rust_port> *port) {
void rust_chan::associate(rust_port *port) {
this->port = port;
if (port->is_proxy() == false) {
scoped_lock with(port->referent()->lock);
KLOG(kernel, task,
"associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
this, port);
++this->ref_count;
this->task = port->referent()->task;
this->task->ref();
this->port->referent()->chans.push(this);
}
scoped_lock with(port->lock);
KLOG(kernel, task,
"associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
this, port);
++this->ref_count;
this->task = port->task;
this->task->ref();
this->port->chans.push(this);
}
bool rust_chan::is_associated() {
@ -52,19 +50,17 @@ bool rust_chan::is_associated() {
*/
void rust_chan::disassociate() {
A(kernel,
port->referent()->lock.lock_held_by_current_thread(),
port->lock.lock_held_by_current_thread(),
"Port referent lock must be held to call rust_chan::disassociate");
A(kernel, is_associated(),
"Channel must be associated with a port.");
if (port->is_proxy() == false) {
KLOG(kernel, task,
"disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
this, port->referent());
--this->ref_count;
task->deref();
this->task = NULL;
port->referent()->chans.swap_delete(this);
}
KLOG(kernel, task,
"disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
this, port);
--this->ref_count;
task->deref();
this->task = NULL;
port->chans.swap_delete(this);
// Delete reference to the port.
port = NULL;
@ -74,12 +70,7 @@ void rust_chan::disassociate() {
* Attempt to send data to the associated port.
*/
void rust_chan::send(void *sptr) {
I(kernel, !port->is_proxy());
rust_port *target_port = port->referent();
// TODO: We can probably avoid this lock by using atomic operations in
// circular_buffer.
scoped_lock with(target_port->lock);
scoped_lock with(port->lock);
buffer.enqueue(sptr);
@ -92,28 +83,17 @@ void rust_chan::send(void *sptr) {
A(kernel, !buffer.is_empty(),
"rust_chan::transmit with nothing to send.");
if (port->is_proxy()) {
data_message::send(buffer.peek(), buffer.unit_sz, "send data",
task->get_handle(), port->as_proxy()->handle());
buffer.dequeue(NULL);
} else {
if (target_port->task->blocked_on(target_port)) {
KLOG(kernel, comm, "dequeued in rendezvous_ptr");
buffer.dequeue(target_port->task->rendezvous_ptr);
target_port->task->rendezvous_ptr = 0;
target_port->task->wakeup(target_port);
return;
}
if (port->task->blocked_on(port)) {
KLOG(kernel, comm, "dequeued in rendezvous_ptr");
buffer.dequeue(port->task->rendezvous_ptr);
port->task->rendezvous_ptr = 0;
port->task->wakeup(port);
}
return;
}
rust_chan *rust_chan::clone(rust_task *target) {
size_t unit_sz = buffer.unit_sz;
maybe_proxy<rust_port> *port = this->port;
return new (target->kernel, "cloned chan")
rust_chan(kernel, port, unit_sz);
rust_chan(kernel, port, buffer.unit_sz);
}
/**
@ -125,30 +105,21 @@ void rust_chan::destroy() {
"Channel's ref count should be zero.");
if (is_associated()) {
if (port->is_proxy()) {
// Here is a good place to delete the port proxy we allocated
// in upcall_clone_chan.
rust_proxy<rust_port> *proxy = port->as_proxy();
scoped_lock with(port->referent()->lock);
disassociate();
delete proxy;
} else {
// We're trying to delete a channel that another task may be
// reading from. We have two options:
//
// 1. We can flush the channel by blocking in upcall_flush_chan()
// and resuming only when the channel is flushed. The problem
// here is that we can get ourselves in a deadlock if the
// parent task tries to join us.
//
// 2. We can leave the channel in a "dormnat" state by not freeing
// it and letting the receiver task delete it for us instead.
if (buffer.is_empty() == false) {
return;
}
scoped_lock with(port->referent()->lock);
disassociate();
// We're trying to delete a channel that another task may be
// reading from. We have two options:
//
// 1. We can flush the channel by blocking in upcall_flush_chan()
// and resuming only when the channel is flushed. The problem
// here is that we can get ourselves in a deadlock if the
// parent task tries to join us.
//
// 2. We can leave the channel in a "dormnat" state by not freeing
// it and letting the receiver task delete it for us instead.
if (buffer.is_empty() == false) {
return;
}
scoped_lock with(port->lock);
disassociate();
}
delete this;
}

View File

@ -5,18 +5,18 @@ class rust_chan : public kernel_owned<rust_chan>,
public rust_cond {
public:
RUST_REFCOUNTED_WITH_DTOR(rust_chan, destroy())
rust_chan(rust_kernel *kernel, maybe_proxy<rust_port> *port,
rust_chan(rust_kernel *kernel, rust_port *port,
size_t unit_sz);
~rust_chan();
rust_kernel *kernel;
rust_task *task;
maybe_proxy<rust_port> *port;
rust_port *port;
size_t idx;
circular_buffer buffer;
void associate(maybe_proxy<rust_port> *port);
void associate(rust_port *port);
void disassociate();
bool is_associated();

View File

@ -219,9 +219,7 @@ public:
#include "memory_region.h"
#include "rust_srv.h"
#include "rust_log.h"
#include "rust_proxy.h"
#include "rust_kernel.h"
#include "rust_message.h"
#include "rust_scheduler.h"
struct rust_timer {

View File

@ -1,15 +1,14 @@
#include "rust_internal.h"
#define KLOG_(...) \
#define KLOG_(...) \
KLOG(this, kern, __VA_ARGS__)
#define KLOG_ERR_(field, ...) \
#define KLOG_ERR_(field, ...) \
KLOG_LVL(this, field, log_err, __VA_ARGS__)
rust_kernel::rust_kernel(rust_srv *srv, size_t num_threads) :
_region(srv, true),
_log(srv, NULL),
srv(srv),
_interrupt_kernel_loop(FALSE),
num_threads(num_threads),
rval(0),
live_tasks(0),
@ -22,15 +21,9 @@ rust_kernel::rust_kernel(rust_srv *srv, size_t num_threads) :
rust_scheduler *
rust_kernel::create_scheduler(int id) {
_kernel_lock.lock();
rust_message_queue *message_queue =
new (this, "rust_message_queue") rust_message_queue(srv, this);
rust_srv *srv = this->srv->clone();
rust_scheduler *sched =
new (this, "rust_scheduler")
rust_scheduler(this, message_queue, srv, id);
rust_handle<rust_scheduler> *handle = internal_get_sched_handle(sched);
message_queue->associate(handle);
message_queues.append(message_queue);
new (this, "rust_scheduler") rust_scheduler(this, srv, id);
KLOG_("created scheduler: " PTR ", id: %d, index: %d",
sched, id, sched->list_index);
_kernel_lock.signal_all();
@ -43,7 +36,6 @@ rust_kernel::destroy_scheduler(rust_scheduler *sched) {
_kernel_lock.lock();
KLOG_("deleting scheduler: " PTR ", name: %s, index: %d",
sched, sched->name, sched->list_index);
sched->message_queue->disassociate();
rust_srv *srv = sched->srv;
delete sched;
delete srv;
@ -65,46 +57,6 @@ void rust_kernel::destroy_schedulers() {
}
}
rust_handle<rust_scheduler> *
rust_kernel::internal_get_sched_handle(rust_scheduler *sched) {
rust_handle<rust_scheduler> *handle = NULL;
if (_sched_handles.get(sched, &handle) == false) {
handle = new (this, "rust_handle<rust_scheduler")
rust_handle<rust_scheduler>(this, sched->message_queue, sched);
_sched_handles.put(sched, handle);
}
return handle;
}
rust_handle<rust_task> *
rust_kernel::get_task_handle(rust_task *task) {
_kernel_lock.lock();
rust_handle<rust_task> *handle = NULL;
if (_task_handles.get(task, &handle) == false) {
handle =
new (this, "rust_handle<rust_task>")
rust_handle<rust_task>(this, task->sched->message_queue, task);
_task_handles.put(task, handle);
}
_kernel_lock.unlock();
return handle;
}
rust_handle<rust_port> *
rust_kernel::get_port_handle(rust_port *port) {
_kernel_lock.lock();
rust_handle<rust_port> *handle = NULL;
if (_port_handles.get(port, &handle) == false) {
handle = new (this, "rust_handle<rust_port>")
rust_handle<rust_port>(this,
port->task->sched->message_queue,
port);
_port_handles.put(port, handle);
}
_kernel_lock.unlock();
return handle;
}
void
rust_kernel::log_all_scheduler_state() {
for(size_t i = 0; i < num_threads; ++i) {
@ -141,73 +93,8 @@ rust_kernel::fatal(char const *fmt, ...) {
va_end(args);
}
void
rust_kernel::pump_message_queues() {
for (size_t i = 0; i < message_queues.length(); i++) {
rust_message_queue *queue = message_queues[i];
if (queue->is_associated() == false) {
rust_message *message = NULL;
while (queue->dequeue(&message)) {
message->kernel_process();
delete message;
}
}
}
}
void
rust_kernel::start_kernel_loop() {
_kernel_lock.lock();
while (_interrupt_kernel_loop == false) {
_kernel_lock.wait();
pump_message_queues();
}
_kernel_lock.unlock();
}
void
rust_kernel::run() {
KLOG_("started kernel loop");
start_kernel_loop();
KLOG_("finished kernel loop");
}
void
rust_kernel::terminate_kernel_loop() {
KLOG_("terminating kernel loop");
_interrupt_kernel_loop = true;
signal_kernel_lock();
join();
}
rust_kernel::~rust_kernel() {
destroy_schedulers();
terminate_kernel_loop();
// It's possible that the message pump misses some messages because
// of races, so pump any remaining messages here. By now all domain
// threads should have been joined, so we shouldn't miss any more
// messages.
pump_message_queues();
KLOG_("freeing handles");
free_handles(_task_handles);
KLOG_("..task handles freed");
free_handles(_port_handles);
KLOG_("..port handles freed");
free_handles(_sched_handles);
KLOG_("..sched handles freed");
KLOG_("freeing queues");
rust_message_queue *queue = NULL;
while (message_queues.pop(&queue)) {
K(srv, queue->is_empty(), "Kernel message queue should be empty "
"before killing the kernel.");
delete queue;
}
}
void *
@ -224,26 +111,6 @@ void rust_kernel::free(void *mem) {
_region.free(mem);
}
template<class T> void
rust_kernel::free_handles(hash_map<T*, rust_handle<T>* > &map) {
T* key;
rust_handle<T> *value;
while (map.pop(&key, &value)) {
KLOG_("...freeing " PTR, value);
delete value;
}
}
void
rust_kernel::notify_message_enqueued(rust_message_queue *queue,
rust_message *message) {
// The message pump needs to handle this message if the queue is not
// associated with a domain, therefore signal the message pump.
if (queue->is_associated() == false) {
signal_kernel_lock();
}
}
void
rust_kernel::signal_kernel_lock() {
_kernel_lock.lock();

View File

@ -2,77 +2,22 @@
#ifndef RUST_KERNEL_H
#define RUST_KERNEL_H
/**
* A handle object for Rust tasks. We need a reference to the message queue
* of the referent's domain which we can safely hang on to since it's a
* kernel object. We use the referent reference as a label we stash in
* messages sent via this proxy.
*/
class rust_kernel;
class rust_message;
template <typename T> class
rust_handle :
public rust_cond,
public rc_base<rust_handle<T> >,
public kernel_owned<rust_handle<T> > {
public:
rust_kernel *kernel;
rust_message_queue *message_queue;
T *_referent;
T * referent() {
return _referent;
}
rust_handle(rust_kernel *kernel,
rust_message_queue *message_queue,
T *referent) :
kernel(kernel),
message_queue(message_queue),
_referent(referent) {
// Nop.
}
};
class rust_task_thread;
/**
* A global object shared by all thread domains. Most of the data structures
* in this class are synchronized since they are accessed from multiple
* threads.
*/
class rust_kernel : public rust_thread {
class rust_kernel {
memory_region _region;
rust_log _log;
public:
rust_srv *srv;
private:
/**
* Task proxy objects are kernel owned handles to Rust objects.
*/
hash_map<rust_task *, rust_handle<rust_task> *> _task_handles;
hash_map<rust_port *, rust_handle<rust_port> *> _port_handles;
hash_map<rust_scheduler *, rust_handle<rust_scheduler> *> _sched_handles;
template<class T> void free_handles(hash_map<T*, rust_handle<T>* > &map);
void run();
void start_kernel_loop();
bool _interrupt_kernel_loop;
lock_and_signal _kernel_lock;
const size_t num_threads;
void terminate_kernel_loop();
void pump_message_queues();
rust_handle<rust_scheduler> *
internal_get_sched_handle(rust_scheduler *sched);
array_list<rust_scheduler *> threads;
randctx rctx;
@ -89,20 +34,8 @@ public:
volatile int live_tasks;
/**
* Message queues are kernel objects and are associated with domains.
* Their lifetime is not bound to the lifetime of a domain and in fact
* live on after their associated domain has died. This way we can safely
* communicate with domains that may have died.
*
*/
indexed_list<rust_message_queue> message_queues;
struct rust_env *env;
rust_handle<rust_task> *get_task_handle(rust_task *task);
rust_handle<rust_port> *get_port_handle(rust_port *port);
rust_kernel(rust_srv *srv, size_t num_threads);
bool is_deadlocked();
@ -110,14 +43,6 @@ public:
void signal_kernel_lock();
void wakeup_schedulers();
/**
* Notifies the kernel whenever a message has been enqueued . This gives
* the kernel the opportunity to wake up the message pump thread if the
* message queue is not associated.
*/
void
notify_message_enqueued(rust_message_queue *queue, rust_message *message);
void log_all_scheduler_state();
void log(uint32_t level, char const *fmt, ...);
void fatal(char const *fmt, ...);

View File

@ -1,125 +0,0 @@
#include "rust_internal.h"
#include "rust_message.h"
rust_message::
rust_message(memory_region *region, const char* label,
rust_handle<rust_task> *source, rust_handle<rust_task> *target) :
label(label), region(region), _source(source), _target(target) {
}
rust_message::~rust_message() {
}
void rust_message::process() {
}
void rust_message::kernel_process() {
}
notify_message::
notify_message(memory_region *region, notification_type type,
const char* label, rust_handle<rust_task> *source,
rust_handle<rust_task> *target) :
rust_message(region, label, source, target), type(type) {
}
data_message::
data_message(memory_region *region, uint8_t *buffer, size_t buffer_sz,
const char* label, rust_handle<rust_task> *source,
rust_handle<rust_port> *port) :
rust_message(region, label, source, NULL),
_buffer_sz(buffer_sz), _port(port) {
_buffer = (uint8_t *)malloc(buffer_sz);
memcpy(_buffer, buffer, buffer_sz);
}
data_message::~data_message() {
free (_buffer);
}
/**
* Sends a message to the target task via a proxy. The message is allocated
* in the target task domain along with a proxy which points back to the
* source task.
*/
void notify_message::
send(notification_type type, const char* label,
rust_handle<rust_task> *source, rust_handle<rust_task> *target) {
memory_region *region = &target->message_queue->region;
notify_message *message =
new (region, "notify_message")
notify_message(region, type, label, source, target);
target->message_queue->enqueue(message);
}
void notify_message::process() {
rust_task *task = _target->referent();
switch (type) {
case KILL:
// task->ref_count--;
task->kill();
break;
case JOIN: {
if (task->dead() == false) {
// FIXME: this should be dead code.
assert(false);
} else {
send(WAKEUP, "wakeup", _target, _source);
}
break;
}
case WAKEUP:
task->wakeup(_source);
break;
}
}
void notify_message::kernel_process() {
switch(type) {
case WAKEUP:
case KILL:
// Ignore.
break;
case JOIN:
send(WAKEUP, "wakeup", _target, _source);
break;
}
}
void data_message::
send(uint8_t *buffer, size_t buffer_sz, const char* label,
rust_handle<rust_task> *source, rust_handle<rust_port> *port) {
memory_region *region = &port->message_queue->region;
data_message *message =
new (region, "data_message")
data_message(region, buffer, buffer_sz, label, source, port);
LOG(source->referent(), comm, "==> sending \"%s\"" PTR " in queue " PTR,
label, message, &port->message_queue);
port->message_queue->enqueue(message);
}
void data_message::process() {
_port->referent()->remote_channel->send(_buffer);
}
void data_message::kernel_process() {
}
rust_message_queue::rust_message_queue(rust_srv *srv, rust_kernel *kernel)
: region(srv, true),
kernel(kernel),
sched_handle(NULL) {
}
//
// Local Variables:
// mode: C++
// fill-column: 78;
// indent-tabs-mode: nil
// c-basic-offset: 4
// buffer-file-coding-system: utf-8-unix
// compile-command: "make -k -C $RBUILD 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
// End:
//

View File

@ -1,135 +0,0 @@
#ifndef RUST_MESSAGE_H
#define RUST_MESSAGE_H
/**
* Rust messages are used for inter-thread communication. They are enqueued
* and allocated in the target domain.
*/
/**
* Abstract base class for all message types.
*/
class rust_message : public region_owned<rust_message> {
public:
const char* label;
memory_region *region;
private:
protected:
rust_handle<rust_task> *_source;
rust_handle<rust_task> *_target;
public:
rust_message(memory_region *region,
const char* label,
rust_handle<rust_task> *source,
rust_handle<rust_task> *target);
virtual ~rust_message();
/**
* Processes the message in the target domain.
*/
virtual void process();
/**
* Processes the message in the kernel.
*/
virtual void kernel_process();
};
/**
* Notify messages are simple argument-less messages.
*/
class notify_message : public rust_message {
public:
enum notification_type {
KILL, JOIN, WAKEUP
};
const notification_type type;
notify_message(memory_region *region, notification_type type,
const char* label, rust_handle<rust_task> *source,
rust_handle<rust_task> *target);
void process();
void kernel_process();
/**
* This code executes in the sending domain's thread.
*/
static void
send(notification_type type, const char* label,
rust_handle<rust_task> *source, rust_handle<rust_task> *target);
};
/**
* Data messages carry a buffer.
*/
class data_message : public rust_message {
private:
uint8_t *_buffer;
size_t _buffer_sz;
rust_handle<rust_port> *_port;
public:
data_message(memory_region *region, uint8_t *buffer, size_t buffer_sz,
const char* label, rust_handle<rust_task> *source,
rust_handle<rust_port> *port);
virtual ~data_message();
void process();
void kernel_process();
/**
* This code executes in the sending domain's thread.
*/
static void
send(uint8_t *buffer, size_t buffer_sz, const char* label,
rust_handle<rust_task> *source, rust_handle<rust_port> *port);
};
class rust_message_queue : public lock_free_queue<rust_message*>,
public kernel_owned<rust_message_queue> {
public:
memory_region region;
rust_kernel *kernel;
rust_handle<rust_scheduler> *sched_handle;
int32_t list_index;
rust_message_queue(rust_srv *srv, rust_kernel *kernel);
void associate(rust_handle<rust_scheduler> *sched_handle) {
this->sched_handle = sched_handle;
}
/**
* The Rust domain relinquishes control to the Rust kernel.
*/
void disassociate() {
this->sched_handle = NULL;
}
/**
* Checks if a Rust domain is responsible for draining the message queue.
*/
bool is_associated() {
return this->sched_handle != NULL;
}
void enqueue(rust_message* message) {
lock_free_queue<rust_message*>::enqueue(message);
kernel->notify_message_enqueued(this, message);
}
};
//
// Local Variables:
// mode: C++
// fill-column: 78;
// indent-tabs-mode: nil
// c-basic-offset: 4
// buffer-file-coding-system: utf-8-unix
// compile-command: "make -k -C $RBUILD 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
// End:
//
#endif /* RUST_MESSAGE_H */

View File

@ -2,7 +2,7 @@
#include "rust_port.h"
rust_port::rust_port(rust_task *task, size_t unit_sz)
: maybe_proxy<rust_port>(this), kernel(task->kernel), task(task),
: ref_count(1), kernel(task->kernel), task(task),
unit_sz(unit_sz), writers(task), chans(task) {
LOG(task, comm,
@ -17,11 +17,9 @@ rust_port::rust_port(rust_task *task, size_t unit_sz)
rust_port::~rust_port() {
LOG(task, comm, "~rust_port 0x%" PRIxPTR, (uintptr_t) this);
// log_state();
// Disassociate channels from this port.
while (chans.is_empty() == false) {
scoped_lock with(referent()->lock);
scoped_lock with(lock);
rust_chan *chan = chans.peek();
chan->disassociate();

View File

@ -1,10 +1,11 @@
#ifndef RUST_PORT_H
#define RUST_PORT_H
class rust_port : public maybe_proxy<rust_port>,
public kernel_owned<rust_port> {
class rust_port : public kernel_owned<rust_port>, public rust_cond {
public:
RUST_REFCOUNTED(rust_port);
rust_kernel *kernel;
rust_task *task;
size_t unit_sz;

View File

@ -1,77 +0,0 @@
#ifndef RUST_PROXY_H
#define RUST_PROXY_H
/**
* A proxy object is a wrapper for remote objects. Proxy objects are domain
* owned and provide a way distinguish between local and remote objects.
*/
template <typename T> struct rust_proxy;
/**
* The base class of all objects that may delegate.
*/
template <typename T> struct
maybe_proxy : public rc_base<T>, public rust_cond {
protected:
T *_referent;
public:
maybe_proxy(T *referent) : _referent(referent) {
}
T *referent() {
return (T *)_referent;
}
bool is_proxy() {
return _referent != this;
}
rust_proxy<T> *as_proxy() {
return (rust_proxy<T> *) this;
}
T *as_referent() {
return (T *) this;
}
};
template <typename T> class rust_handle;
/**
* A proxy object that delegates to another.
*/
template <typename T> struct
rust_proxy : public maybe_proxy<T> {
private:
bool _strong;
rust_handle<T> *_handle;
public:
rust_proxy(rust_handle<T> *handle) :
maybe_proxy<T> (NULL), _strong(FALSE), _handle(handle) {
}
rust_proxy(T *referent) :
maybe_proxy<T> (referent), _strong(FALSE), _handle(NULL) {
}
rust_handle<T> *handle() {
return _handle;
}
};
class rust_message_queue;
struct rust_task;
//
// Local Variables:
// mode: C++
// fill-column: 78;
// indent-tabs-mode: nil
// c-basic-offset: 4
// buffer-file-coding-system: utf-8-unix
// compile-command: "make -k -C $RBUILD 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
// End:
//
#endif /* RUST_PROXY_H */

View File

@ -4,7 +4,6 @@
#include "globals.h"
rust_scheduler::rust_scheduler(rust_kernel *kernel,
rust_message_queue *message_queue,
rust_srv *srv,
int id) :
interrupt_flag(0),
@ -19,7 +18,6 @@ rust_scheduler::rust_scheduler(rust_kernel *kernel,
dead_tasks(this, "dead"),
cache(this),
kernel(kernel),
message_queue(message_queue),
id(id),
min_stack_size(kernel->env->min_stack_size),
env(kernel->env)
@ -108,21 +106,6 @@ rust_scheduler::reap_dead_tasks(int id) {
}
}
/**
* Drains and processes incoming pending messages.
*/
void rust_scheduler::drain_incoming_message_queue(bool process) {
rust_message *message;
while (message_queue->dequeue(&message)) {
DLOG(this, comm, "<== receiving \"%s\" " PTR,
message->label, message);
if (process) {
message->process();
}
delete message;
}
}
/**
* Schedules a running task for execution. Only running tasks can be
* activated. Blocked tasks have to be unblocked before they can be
@ -207,8 +190,6 @@ rust_scheduler::start_main_loop() {
DLOG(this, dom, "worker %d, number_of_live_tasks = %d, total = %d",
id, number_of_live_tasks(), kernel->live_tasks);
drain_incoming_message_queue(true);
rust_task *scheduled_task = schedule_task(id);
if (scheduled_task == NULL) {
@ -259,18 +240,14 @@ rust_scheduler::start_main_loop() {
"terminated scheduler loop, reaping dead tasks ...");
while (dead_tasks.length() > 0) {
if (message_queue->is_empty()) {
DLOG(this, dom,
"waiting for %d dead tasks to become dereferenced, "
"scheduler yielding ...",
dead_tasks.length());
log_state();
lock.unlock();
sync::yield();
lock.lock();
} else {
drain_incoming_message_queue(true);
}
DLOG(this, dom,
"waiting for %d dead tasks to become dereferenced, "
"scheduler yielding ...",
dead_tasks.length());
log_state();
lock.unlock();
sync::yield();
lock.lock();
reap_dead_tasks(id);
}

View File

@ -51,11 +51,8 @@ struct rust_scheduler : public kernel_owned<rust_scheduler>,
rust_kernel *kernel;
int32_t list_index;
hash_map<rust_task *, rust_proxy<rust_task> *> _task_proxies;
hash_map<rust_port *, rust_proxy<rust_port> *> _port_proxies;
// Incoming messages from other domains.
rust_message_queue *message_queue;
hash_map<rust_task *, rust_task *> _task_proxies;
hash_map<rust_port *, rust_port *> _port_proxies;
const int id;
@ -70,17 +67,13 @@ struct rust_scheduler : public kernel_owned<rust_scheduler>,
// Only a pointer to 'name' is kept, so it must live as long as this
// domain.
rust_scheduler(rust_kernel *kernel,
rust_message_queue *message_queue, rust_srv *srv,
int id);
rust_scheduler(rust_kernel *kernel, rust_srv *srv, int id);
~rust_scheduler();
void activate(rust_task *task);
void log(rust_task *task, uint32_t level, char const *fmt, ...);
rust_log & get_log();
void fail();
void drain_incoming_message_queue(bool process);
rust_crate_cache *get_cache();
size_t number_of_live_tasks();

View File

@ -76,7 +76,6 @@ rust_task::rust_task(rust_scheduler *sched, rust_task_list *state,
supervisor(spawner),
list_index(-1),
rendezvous_ptr(0),
handle(NULL),
running_on(-1),
pinned_on(-1),
local_region(&sched->srv->local_region),
@ -464,14 +463,6 @@ rust_task::backtrace() {
#endif
}
rust_handle<rust_task> *
rust_task::get_handle() {
if (handle == NULL) {
handle = sched->kernel->get_task_handle(this);
}
return handle;
}
bool rust_task::can_schedule(int id)
{
return yield_timer.has_timed_out() &&

View File

@ -80,8 +80,6 @@ public:
// List of tasks waiting for this task to finish.
array_list<rust_task *> tasks_waiting_to_join;
rust_handle<rust_task> *handle;
context ctx;
// This flag indicates that a worker is either currently running the task
@ -157,8 +155,6 @@ public:
// Notify tasks waiting for us that we are about to die.
void notify_tasks_waiting_to_join();
rust_handle<rust_task> * get_handle();
frame_glue_fns *get_frame_glue_fns(uintptr_t fp);
rust_crate_cache * get_crate_cache();

View File

@ -169,9 +169,7 @@ upcall_clone_chan(rust_task *task, rust_task *target,
extern "C" CDECL rust_task *
upcall_chan_target_task(rust_task *task, rust_chan *chan) {
LOG_UPCALL_ENTRY(task);
I(task->sched, !chan->port->is_proxy());
return chan->port->referent()->task;
return chan->port->task;
}
extern "C" CDECL void
@ -204,9 +202,9 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
extern "C" CDECL void
upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
LOG_UPCALL_ENTRY(task);
{
scoped_lock with(port->lock);
LOG_UPCALL_ENTRY(task);
LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR
", size: 0x%" PRIxPTR ", chan_no: %d",