diff --git a/mk/rt.mk b/mk/rt.mk index 6d5fb862cf5..3c0d9f86ce7 100644 --- a/mk/rt.mk +++ b/mk/rt.mk @@ -18,7 +18,6 @@ RUNTIME_CS := rt/sync/timer.cpp \ rt/rust_port.cpp \ rt/rust_upcall.cpp \ rt/rust_log.cpp \ - rt/rust_message.cpp \ rt/rust_timer.cpp \ rt/circular_buffer.cpp \ rt/isaac/randport.cpp \ @@ -44,9 +43,7 @@ RUNTIME_HDR := rt/globals.h \ rt/rust_scheduler.h \ rt/rust_task.h \ rt/rust_task_list.h \ - rt/rust_proxy.h \ rt/rust_log.h \ - rt/rust_message.h \ rt/circular_buffer.h \ rt/util/array_list.h \ rt/util/indexed_list.h \ diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index 0d25d2c3248..42deca7b2cd 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -93,7 +93,6 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { rust_srv *srv = new rust_srv(env); rust_kernel *kernel = new rust_kernel(srv, env->num_sched_threads); - kernel->start(); rust_task *root_task = kernel->create_task(NULL, "main"); rust_scheduler *sched = root_task->sched; command_line_args *args diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index 9017e9a2bc1..3ade237c25a 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -4,7 +4,7 @@ /** * Create a new rust channel and associate it with the specified port. */ -rust_chan::rust_chan(rust_kernel *kernel, maybe_proxy *port, +rust_chan::rust_chan(rust_kernel *kernel, rust_port *port, size_t unit_sz) : ref_count(1), kernel(kernel), @@ -29,18 +29,16 @@ rust_chan::~rust_chan() { /** * Link this channel with the specified port. */ -void rust_chan::associate(maybe_proxy *port) { +void rust_chan::associate(rust_port *port) { this->port = port; - if (port->is_proxy() == false) { - scoped_lock with(port->referent()->lock); - KLOG(kernel, task, - "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, - this, port); - ++this->ref_count; - this->task = port->referent()->task; - this->task->ref(); - this->port->referent()->chans.push(this); - } + scoped_lock with(port->lock); + KLOG(kernel, task, + "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, + this, port); + ++this->ref_count; + this->task = port->task; + this->task->ref(); + this->port->chans.push(this); } bool rust_chan::is_associated() { @@ -52,19 +50,17 @@ bool rust_chan::is_associated() { */ void rust_chan::disassociate() { A(kernel, - port->referent()->lock.lock_held_by_current_thread(), + port->lock.lock_held_by_current_thread(), "Port referent lock must be held to call rust_chan::disassociate"); A(kernel, is_associated(), "Channel must be associated with a port."); - if (port->is_proxy() == false) { - KLOG(kernel, task, - "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, - this, port->referent()); - --this->ref_count; - task->deref(); - this->task = NULL; - port->referent()->chans.swap_delete(this); - } + KLOG(kernel, task, + "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, + this, port); + --this->ref_count; + task->deref(); + this->task = NULL; + port->chans.swap_delete(this); // Delete reference to the port. port = NULL; @@ -74,12 +70,7 @@ void rust_chan::disassociate() { * Attempt to send data to the associated port. */ void rust_chan::send(void *sptr) { - I(kernel, !port->is_proxy()); - - rust_port *target_port = port->referent(); - // TODO: We can probably avoid this lock by using atomic operations in - // circular_buffer. - scoped_lock with(target_port->lock); + scoped_lock with(port->lock); buffer.enqueue(sptr); @@ -92,28 +83,17 @@ void rust_chan::send(void *sptr) { A(kernel, !buffer.is_empty(), "rust_chan::transmit with nothing to send."); - if (port->is_proxy()) { - data_message::send(buffer.peek(), buffer.unit_sz, "send data", - task->get_handle(), port->as_proxy()->handle()); - buffer.dequeue(NULL); - } else { - if (target_port->task->blocked_on(target_port)) { - KLOG(kernel, comm, "dequeued in rendezvous_ptr"); - buffer.dequeue(target_port->task->rendezvous_ptr); - target_port->task->rendezvous_ptr = 0; - target_port->task->wakeup(target_port); - return; - } + if (port->task->blocked_on(port)) { + KLOG(kernel, comm, "dequeued in rendezvous_ptr"); + buffer.dequeue(port->task->rendezvous_ptr); + port->task->rendezvous_ptr = 0; + port->task->wakeup(port); } - - return; } rust_chan *rust_chan::clone(rust_task *target) { - size_t unit_sz = buffer.unit_sz; - maybe_proxy *port = this->port; return new (target->kernel, "cloned chan") - rust_chan(kernel, port, unit_sz); + rust_chan(kernel, port, buffer.unit_sz); } /** @@ -125,30 +105,21 @@ void rust_chan::destroy() { "Channel's ref count should be zero."); if (is_associated()) { - if (port->is_proxy()) { - // Here is a good place to delete the port proxy we allocated - // in upcall_clone_chan. - rust_proxy *proxy = port->as_proxy(); - scoped_lock with(port->referent()->lock); - disassociate(); - delete proxy; - } else { - // We're trying to delete a channel that another task may be - // reading from. We have two options: - // - // 1. We can flush the channel by blocking in upcall_flush_chan() - // and resuming only when the channel is flushed. The problem - // here is that we can get ourselves in a deadlock if the - // parent task tries to join us. - // - // 2. We can leave the channel in a "dormnat" state by not freeing - // it and letting the receiver task delete it for us instead. - if (buffer.is_empty() == false) { - return; - } - scoped_lock with(port->referent()->lock); - disassociate(); + // We're trying to delete a channel that another task may be + // reading from. We have two options: + // + // 1. We can flush the channel by blocking in upcall_flush_chan() + // and resuming only when the channel is flushed. The problem + // here is that we can get ourselves in a deadlock if the + // parent task tries to join us. + // + // 2. We can leave the channel in a "dormnat" state by not freeing + // it and letting the receiver task delete it for us instead. + if (buffer.is_empty() == false) { + return; } + scoped_lock with(port->lock); + disassociate(); } delete this; } diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h index 68cdd31b3cc..9b30b8acbb1 100644 --- a/src/rt/rust_chan.h +++ b/src/rt/rust_chan.h @@ -5,18 +5,18 @@ class rust_chan : public kernel_owned, public rust_cond { public: RUST_REFCOUNTED_WITH_DTOR(rust_chan, destroy()) - rust_chan(rust_kernel *kernel, maybe_proxy *port, + rust_chan(rust_kernel *kernel, rust_port *port, size_t unit_sz); ~rust_chan(); rust_kernel *kernel; rust_task *task; - maybe_proxy *port; + rust_port *port; size_t idx; circular_buffer buffer; - void associate(maybe_proxy *port); + void associate(rust_port *port); void disassociate(); bool is_associated(); diff --git a/src/rt/rust_internal.h b/src/rt/rust_internal.h index 10822f02395..0b983bb8806 100644 --- a/src/rt/rust_internal.h +++ b/src/rt/rust_internal.h @@ -219,9 +219,7 @@ public: #include "memory_region.h" #include "rust_srv.h" #include "rust_log.h" -#include "rust_proxy.h" #include "rust_kernel.h" -#include "rust_message.h" #include "rust_scheduler.h" struct rust_timer { diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index a4f56c10be2..1eeb9b2c018 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -1,15 +1,14 @@ #include "rust_internal.h" -#define KLOG_(...) \ +#define KLOG_(...) \ KLOG(this, kern, __VA_ARGS__) -#define KLOG_ERR_(field, ...) \ +#define KLOG_ERR_(field, ...) \ KLOG_LVL(this, field, log_err, __VA_ARGS__) rust_kernel::rust_kernel(rust_srv *srv, size_t num_threads) : _region(srv, true), _log(srv, NULL), srv(srv), - _interrupt_kernel_loop(FALSE), num_threads(num_threads), rval(0), live_tasks(0), @@ -22,15 +21,9 @@ rust_kernel::rust_kernel(rust_srv *srv, size_t num_threads) : rust_scheduler * rust_kernel::create_scheduler(int id) { _kernel_lock.lock(); - rust_message_queue *message_queue = - new (this, "rust_message_queue") rust_message_queue(srv, this); rust_srv *srv = this->srv->clone(); rust_scheduler *sched = - new (this, "rust_scheduler") - rust_scheduler(this, message_queue, srv, id); - rust_handle *handle = internal_get_sched_handle(sched); - message_queue->associate(handle); - message_queues.append(message_queue); + new (this, "rust_scheduler") rust_scheduler(this, srv, id); KLOG_("created scheduler: " PTR ", id: %d, index: %d", sched, id, sched->list_index); _kernel_lock.signal_all(); @@ -43,7 +36,6 @@ rust_kernel::destroy_scheduler(rust_scheduler *sched) { _kernel_lock.lock(); KLOG_("deleting scheduler: " PTR ", name: %s, index: %d", sched, sched->name, sched->list_index); - sched->message_queue->disassociate(); rust_srv *srv = sched->srv; delete sched; delete srv; @@ -65,46 +57,6 @@ void rust_kernel::destroy_schedulers() { } } -rust_handle * -rust_kernel::internal_get_sched_handle(rust_scheduler *sched) { - rust_handle *handle = NULL; - if (_sched_handles.get(sched, &handle) == false) { - handle = new (this, "rust_handle(this, sched->message_queue, sched); - _sched_handles.put(sched, handle); - } - return handle; -} - -rust_handle * -rust_kernel::get_task_handle(rust_task *task) { - _kernel_lock.lock(); - rust_handle *handle = NULL; - if (_task_handles.get(task, &handle) == false) { - handle = - new (this, "rust_handle") - rust_handle(this, task->sched->message_queue, task); - _task_handles.put(task, handle); - } - _kernel_lock.unlock(); - return handle; -} - -rust_handle * -rust_kernel::get_port_handle(rust_port *port) { - _kernel_lock.lock(); - rust_handle *handle = NULL; - if (_port_handles.get(port, &handle) == false) { - handle = new (this, "rust_handle") - rust_handle(this, - port->task->sched->message_queue, - port); - _port_handles.put(port, handle); - } - _kernel_lock.unlock(); - return handle; -} - void rust_kernel::log_all_scheduler_state() { for(size_t i = 0; i < num_threads; ++i) { @@ -141,73 +93,8 @@ rust_kernel::fatal(char const *fmt, ...) { va_end(args); } -void -rust_kernel::pump_message_queues() { - for (size_t i = 0; i < message_queues.length(); i++) { - rust_message_queue *queue = message_queues[i]; - if (queue->is_associated() == false) { - rust_message *message = NULL; - while (queue->dequeue(&message)) { - message->kernel_process(); - delete message; - } - } - } -} - -void -rust_kernel::start_kernel_loop() { - _kernel_lock.lock(); - while (_interrupt_kernel_loop == false) { - _kernel_lock.wait(); - pump_message_queues(); - } - _kernel_lock.unlock(); -} - -void -rust_kernel::run() { - KLOG_("started kernel loop"); - start_kernel_loop(); - KLOG_("finished kernel loop"); -} - -void -rust_kernel::terminate_kernel_loop() { - KLOG_("terminating kernel loop"); - _interrupt_kernel_loop = true; - signal_kernel_lock(); - join(); -} - rust_kernel::~rust_kernel() { destroy_schedulers(); - - terminate_kernel_loop(); - - // It's possible that the message pump misses some messages because - // of races, so pump any remaining messages here. By now all domain - // threads should have been joined, so we shouldn't miss any more - // messages. - pump_message_queues(); - - KLOG_("freeing handles"); - - free_handles(_task_handles); - KLOG_("..task handles freed"); - free_handles(_port_handles); - KLOG_("..port handles freed"); - free_handles(_sched_handles); - KLOG_("..sched handles freed"); - - KLOG_("freeing queues"); - - rust_message_queue *queue = NULL; - while (message_queues.pop(&queue)) { - K(srv, queue->is_empty(), "Kernel message queue should be empty " - "before killing the kernel."); - delete queue; - } } void * @@ -224,26 +111,6 @@ void rust_kernel::free(void *mem) { _region.free(mem); } -template void -rust_kernel::free_handles(hash_map* > &map) { - T* key; - rust_handle *value; - while (map.pop(&key, &value)) { - KLOG_("...freeing " PTR, value); - delete value; - } -} - -void -rust_kernel::notify_message_enqueued(rust_message_queue *queue, - rust_message *message) { - // The message pump needs to handle this message if the queue is not - // associated with a domain, therefore signal the message pump. - if (queue->is_associated() == false) { - signal_kernel_lock(); - } -} - void rust_kernel::signal_kernel_lock() { _kernel_lock.lock(); diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index d54c6a65430..66196e19684 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -2,77 +2,22 @@ #ifndef RUST_KERNEL_H #define RUST_KERNEL_H -/** - * A handle object for Rust tasks. We need a reference to the message queue - * of the referent's domain which we can safely hang on to since it's a - * kernel object. We use the referent reference as a label we stash in - * messages sent via this proxy. - */ - -class rust_kernel; -class rust_message; - -template class -rust_handle : - public rust_cond, - public rc_base >, - public kernel_owned > { -public: - rust_kernel *kernel; - rust_message_queue *message_queue; - T *_referent; - T * referent() { - return _referent; - } - rust_handle(rust_kernel *kernel, - rust_message_queue *message_queue, - T *referent) : - kernel(kernel), - message_queue(message_queue), - _referent(referent) { - // Nop. - } -}; - -class rust_task_thread; - - /** * A global object shared by all thread domains. Most of the data structures * in this class are synchronized since they are accessed from multiple * threads. */ -class rust_kernel : public rust_thread { +class rust_kernel { memory_region _region; rust_log _log; public: rust_srv *srv; private: - - /** - * Task proxy objects are kernel owned handles to Rust objects. - */ - hash_map *> _task_handles; - hash_map *> _port_handles; - hash_map *> _sched_handles; - - template void free_handles(hash_map* > &map); - - void run(); - void start_kernel_loop(); - bool _interrupt_kernel_loop; - lock_and_signal _kernel_lock; const size_t num_threads; - void terminate_kernel_loop(); - void pump_message_queues(); - - rust_handle * - internal_get_sched_handle(rust_scheduler *sched); - array_list threads; randctx rctx; @@ -89,20 +34,8 @@ public: volatile int live_tasks; - /** - * Message queues are kernel objects and are associated with domains. - * Their lifetime is not bound to the lifetime of a domain and in fact - * live on after their associated domain has died. This way we can safely - * communicate with domains that may have died. - * - */ - indexed_list message_queues; - struct rust_env *env; - rust_handle *get_task_handle(rust_task *task); - rust_handle *get_port_handle(rust_port *port); - rust_kernel(rust_srv *srv, size_t num_threads); bool is_deadlocked(); @@ -110,14 +43,6 @@ public: void signal_kernel_lock(); void wakeup_schedulers(); - /** - * Notifies the kernel whenever a message has been enqueued . This gives - * the kernel the opportunity to wake up the message pump thread if the - * message queue is not associated. - */ - void - notify_message_enqueued(rust_message_queue *queue, rust_message *message); - void log_all_scheduler_state(); void log(uint32_t level, char const *fmt, ...); void fatal(char const *fmt, ...); diff --git a/src/rt/rust_message.cpp b/src/rt/rust_message.cpp deleted file mode 100644 index f8001a17193..00000000000 --- a/src/rt/rust_message.cpp +++ /dev/null @@ -1,125 +0,0 @@ -#include "rust_internal.h" -#include "rust_message.h" - -rust_message:: -rust_message(memory_region *region, const char* label, - rust_handle *source, rust_handle *target) : - label(label), region(region), _source(source), _target(target) { -} - -rust_message::~rust_message() { -} - -void rust_message::process() { -} - -void rust_message::kernel_process() { -} - -notify_message:: -notify_message(memory_region *region, notification_type type, - const char* label, rust_handle *source, - rust_handle *target) : - rust_message(region, label, source, target), type(type) { -} - -data_message:: -data_message(memory_region *region, uint8_t *buffer, size_t buffer_sz, - const char* label, rust_handle *source, - rust_handle *port) : - rust_message(region, label, source, NULL), - _buffer_sz(buffer_sz), _port(port) { - _buffer = (uint8_t *)malloc(buffer_sz); - memcpy(_buffer, buffer, buffer_sz); -} - -data_message::~data_message() { - free (_buffer); -} - -/** - * Sends a message to the target task via a proxy. The message is allocated - * in the target task domain along with a proxy which points back to the - * source task. - */ -void notify_message:: -send(notification_type type, const char* label, - rust_handle *source, rust_handle *target) { - memory_region *region = &target->message_queue->region; - notify_message *message = - new (region, "notify_message") - notify_message(region, type, label, source, target); - target->message_queue->enqueue(message); -} - -void notify_message::process() { - rust_task *task = _target->referent(); - switch (type) { - case KILL: - // task->ref_count--; - task->kill(); - break; - case JOIN: { - if (task->dead() == false) { - // FIXME: this should be dead code. - assert(false); - } else { - send(WAKEUP, "wakeup", _target, _source); - } - break; - } - case WAKEUP: - task->wakeup(_source); - break; - } -} - -void notify_message::kernel_process() { - switch(type) { - case WAKEUP: - case KILL: - // Ignore. - break; - case JOIN: - send(WAKEUP, "wakeup", _target, _source); - break; - } -} - -void data_message:: -send(uint8_t *buffer, size_t buffer_sz, const char* label, - rust_handle *source, rust_handle *port) { - - memory_region *region = &port->message_queue->region; - data_message *message = - new (region, "data_message") - data_message(region, buffer, buffer_sz, label, source, port); - LOG(source->referent(), comm, "==> sending \"%s\"" PTR " in queue " PTR, - label, message, &port->message_queue); - port->message_queue->enqueue(message); -} - -void data_message::process() { - _port->referent()->remote_channel->send(_buffer); -} - -void data_message::kernel_process() { - -} - -rust_message_queue::rust_message_queue(rust_srv *srv, rust_kernel *kernel) - : region(srv, true), - kernel(kernel), - sched_handle(NULL) { -} - -// -// Local Variables: -// mode: C++ -// fill-column: 78; -// indent-tabs-mode: nil -// c-basic-offset: 4 -// buffer-file-coding-system: utf-8-unix -// compile-command: "make -k -C $RBUILD 2>&1 | sed -e 's/\\/x\\//x:\\//g'"; -// End: -// diff --git a/src/rt/rust_message.h b/src/rt/rust_message.h deleted file mode 100644 index 88c34119e19..00000000000 --- a/src/rt/rust_message.h +++ /dev/null @@ -1,135 +0,0 @@ -#ifndef RUST_MESSAGE_H -#define RUST_MESSAGE_H - -/** - * Rust messages are used for inter-thread communication. They are enqueued - * and allocated in the target domain. - */ - -/** - * Abstract base class for all message types. - */ -class rust_message : public region_owned { -public: - const char* label; - memory_region *region; -private: -protected: - rust_handle *_source; - rust_handle *_target; -public: - rust_message(memory_region *region, - const char* label, - rust_handle *source, - rust_handle *target); - - virtual ~rust_message(); - - /** - * Processes the message in the target domain. - */ - virtual void process(); - - /** - * Processes the message in the kernel. - */ - virtual void kernel_process(); -}; - -/** - * Notify messages are simple argument-less messages. - */ -class notify_message : public rust_message { -public: - enum notification_type { - KILL, JOIN, WAKEUP - }; - - const notification_type type; - - notify_message(memory_region *region, notification_type type, - const char* label, rust_handle *source, - rust_handle *target); - - void process(); - void kernel_process(); - - /** - * This code executes in the sending domain's thread. - */ - static void - send(notification_type type, const char* label, - rust_handle *source, rust_handle *target); -}; - -/** - * Data messages carry a buffer. - */ -class data_message : public rust_message { -private: - uint8_t *_buffer; - size_t _buffer_sz; - rust_handle *_port; - -public: - data_message(memory_region *region, uint8_t *buffer, size_t buffer_sz, - const char* label, rust_handle *source, - rust_handle *port); - - virtual ~data_message(); - void process(); - void kernel_process(); - - /** - * This code executes in the sending domain's thread. - */ - static void - send(uint8_t *buffer, size_t buffer_sz, const char* label, - rust_handle *source, rust_handle *port); -}; - -class rust_message_queue : public lock_free_queue, - public kernel_owned { -public: - memory_region region; - rust_kernel *kernel; - rust_handle *sched_handle; - int32_t list_index; - rust_message_queue(rust_srv *srv, rust_kernel *kernel); - - void associate(rust_handle *sched_handle) { - this->sched_handle = sched_handle; - } - - /** - * The Rust domain relinquishes control to the Rust kernel. - */ - void disassociate() { - this->sched_handle = NULL; - } - - /** - * Checks if a Rust domain is responsible for draining the message queue. - */ - bool is_associated() { - return this->sched_handle != NULL; - } - - void enqueue(rust_message* message) { - lock_free_queue::enqueue(message); - kernel->notify_message_enqueued(this, message); - } -}; - -// -// Local Variables: -// mode: C++ -// fill-column: 78; -// indent-tabs-mode: nil -// c-basic-offset: 4 -// buffer-file-coding-system: utf-8-unix -// compile-command: "make -k -C $RBUILD 2>&1 | sed -e 's/\\/x\\//x:\\//g'"; -// End: -// - -#endif /* RUST_MESSAGE_H */ diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp index 18acb38cc1d..bd4cff4f235 100644 --- a/src/rt/rust_port.cpp +++ b/src/rt/rust_port.cpp @@ -2,7 +2,7 @@ #include "rust_port.h" rust_port::rust_port(rust_task *task, size_t unit_sz) - : maybe_proxy(this), kernel(task->kernel), task(task), + : ref_count(1), kernel(task->kernel), task(task), unit_sz(unit_sz), writers(task), chans(task) { LOG(task, comm, @@ -17,11 +17,9 @@ rust_port::rust_port(rust_task *task, size_t unit_sz) rust_port::~rust_port() { LOG(task, comm, "~rust_port 0x%" PRIxPTR, (uintptr_t) this); - // log_state(); - // Disassociate channels from this port. while (chans.is_empty() == false) { - scoped_lock with(referent()->lock); + scoped_lock with(lock); rust_chan *chan = chans.peek(); chan->disassociate(); diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h index fd76b20861d..5a3da51ac94 100644 --- a/src/rt/rust_port.h +++ b/src/rt/rust_port.h @@ -1,10 +1,11 @@ #ifndef RUST_PORT_H #define RUST_PORT_H -class rust_port : public maybe_proxy, - public kernel_owned { +class rust_port : public kernel_owned, public rust_cond { public: + RUST_REFCOUNTED(rust_port); + rust_kernel *kernel; rust_task *task; size_t unit_sz; diff --git a/src/rt/rust_proxy.h b/src/rt/rust_proxy.h deleted file mode 100644 index 2ab080be45e..00000000000 --- a/src/rt/rust_proxy.h +++ /dev/null @@ -1,77 +0,0 @@ -#ifndef RUST_PROXY_H -#define RUST_PROXY_H - -/** - * A proxy object is a wrapper for remote objects. Proxy objects are domain - * owned and provide a way distinguish between local and remote objects. - */ - -template struct rust_proxy; - -/** - * The base class of all objects that may delegate. - */ -template struct -maybe_proxy : public rc_base, public rust_cond { -protected: - T *_referent; -public: - maybe_proxy(T *referent) : _referent(referent) { - } - - T *referent() { - return (T *)_referent; - } - - bool is_proxy() { - return _referent != this; - } - - rust_proxy *as_proxy() { - return (rust_proxy *) this; - } - - T *as_referent() { - return (T *) this; - } -}; - -template class rust_handle; - -/** - * A proxy object that delegates to another. - */ -template struct -rust_proxy : public maybe_proxy { -private: - bool _strong; - rust_handle *_handle; -public: - rust_proxy(rust_handle *handle) : - maybe_proxy (NULL), _strong(FALSE), _handle(handle) { - } - - rust_proxy(T *referent) : - maybe_proxy (referent), _strong(FALSE), _handle(NULL) { - } - - rust_handle *handle() { - return _handle; - } -}; - -class rust_message_queue; -struct rust_task; - -// -// Local Variables: -// mode: C++ -// fill-column: 78; -// indent-tabs-mode: nil -// c-basic-offset: 4 -// buffer-file-coding-system: utf-8-unix -// compile-command: "make -k -C $RBUILD 2>&1 | sed -e 's/\\/x\\//x:\\//g'"; -// End: -// - -#endif /* RUST_PROXY_H */ diff --git a/src/rt/rust_scheduler.cpp b/src/rt/rust_scheduler.cpp index c4987bbd3f1..6f91d17f9b8 100644 --- a/src/rt/rust_scheduler.cpp +++ b/src/rt/rust_scheduler.cpp @@ -4,7 +4,6 @@ #include "globals.h" rust_scheduler::rust_scheduler(rust_kernel *kernel, - rust_message_queue *message_queue, rust_srv *srv, int id) : interrupt_flag(0), @@ -19,7 +18,6 @@ rust_scheduler::rust_scheduler(rust_kernel *kernel, dead_tasks(this, "dead"), cache(this), kernel(kernel), - message_queue(message_queue), id(id), min_stack_size(kernel->env->min_stack_size), env(kernel->env) @@ -108,21 +106,6 @@ rust_scheduler::reap_dead_tasks(int id) { } } -/** - * Drains and processes incoming pending messages. - */ -void rust_scheduler::drain_incoming_message_queue(bool process) { - rust_message *message; - while (message_queue->dequeue(&message)) { - DLOG(this, comm, "<== receiving \"%s\" " PTR, - message->label, message); - if (process) { - message->process(); - } - delete message; - } -} - /** * Schedules a running task for execution. Only running tasks can be * activated. Blocked tasks have to be unblocked before they can be @@ -207,8 +190,6 @@ rust_scheduler::start_main_loop() { DLOG(this, dom, "worker %d, number_of_live_tasks = %d, total = %d", id, number_of_live_tasks(), kernel->live_tasks); - drain_incoming_message_queue(true); - rust_task *scheduled_task = schedule_task(id); if (scheduled_task == NULL) { @@ -259,18 +240,14 @@ rust_scheduler::start_main_loop() { "terminated scheduler loop, reaping dead tasks ..."); while (dead_tasks.length() > 0) { - if (message_queue->is_empty()) { - DLOG(this, dom, - "waiting for %d dead tasks to become dereferenced, " - "scheduler yielding ...", - dead_tasks.length()); - log_state(); - lock.unlock(); - sync::yield(); - lock.lock(); - } else { - drain_incoming_message_queue(true); - } + DLOG(this, dom, + "waiting for %d dead tasks to become dereferenced, " + "scheduler yielding ...", + dead_tasks.length()); + log_state(); + lock.unlock(); + sync::yield(); + lock.lock(); reap_dead_tasks(id); } diff --git a/src/rt/rust_scheduler.h b/src/rt/rust_scheduler.h index 561807d44e0..87ee1f1b206 100644 --- a/src/rt/rust_scheduler.h +++ b/src/rt/rust_scheduler.h @@ -51,11 +51,8 @@ struct rust_scheduler : public kernel_owned, rust_kernel *kernel; int32_t list_index; - hash_map *> _task_proxies; - hash_map *> _port_proxies; - - // Incoming messages from other domains. - rust_message_queue *message_queue; + hash_map _task_proxies; + hash_map _port_proxies; const int id; @@ -70,17 +67,13 @@ struct rust_scheduler : public kernel_owned, // Only a pointer to 'name' is kept, so it must live as long as this // domain. - rust_scheduler(rust_kernel *kernel, - rust_message_queue *message_queue, rust_srv *srv, - int id); + rust_scheduler(rust_kernel *kernel, rust_srv *srv, int id); ~rust_scheduler(); void activate(rust_task *task); void log(rust_task *task, uint32_t level, char const *fmt, ...); rust_log & get_log(); void fail(); - void drain_incoming_message_queue(bool process); - rust_crate_cache *get_cache(); size_t number_of_live_tasks(); diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 6f3e0202472..37a5e64172e 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -76,7 +76,6 @@ rust_task::rust_task(rust_scheduler *sched, rust_task_list *state, supervisor(spawner), list_index(-1), rendezvous_ptr(0), - handle(NULL), running_on(-1), pinned_on(-1), local_region(&sched->srv->local_region), @@ -464,14 +463,6 @@ rust_task::backtrace() { #endif } -rust_handle * -rust_task::get_handle() { - if (handle == NULL) { - handle = sched->kernel->get_task_handle(this); - } - return handle; -} - bool rust_task::can_schedule(int id) { return yield_timer.has_timed_out() && diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index 8b55c0028a9..133c10a95e5 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -80,8 +80,6 @@ public: // List of tasks waiting for this task to finish. array_list tasks_waiting_to_join; - rust_handle *handle; - context ctx; // This flag indicates that a worker is either currently running the task @@ -157,8 +155,6 @@ public: // Notify tasks waiting for us that we are about to die. void notify_tasks_waiting_to_join(); - rust_handle * get_handle(); - frame_glue_fns *get_frame_glue_fns(uintptr_t fp); rust_crate_cache * get_crate_cache(); diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 794bbc9c244..048be072e12 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -169,9 +169,7 @@ upcall_clone_chan(rust_task *task, rust_task *target, extern "C" CDECL rust_task * upcall_chan_target_task(rust_task *task, rust_chan *chan) { LOG_UPCALL_ENTRY(task); - I(task->sched, !chan->port->is_proxy()); - - return chan->port->referent()->task; + return chan->port->task; } extern "C" CDECL void @@ -204,9 +202,9 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) { extern "C" CDECL void upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { + LOG_UPCALL_ENTRY(task); { scoped_lock with(port->lock); - LOG_UPCALL_ENTRY(task); LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR ", size: 0x%" PRIxPTR ", chan_no: %d",