Lots of design changes around proxies and message passing. Made it so that domains can only talk to other domains via handles, and with the help of the rust_kernel.

This commit is contained in:
Michael Bebenita 2010-09-07 18:39:07 -07:00
parent a6aebdaedd
commit de611a3090
23 changed files with 650 additions and 444 deletions

View File

@ -380,33 +380,10 @@ self: $(CFG_COMPILER)
# Temporarily xfail the entire multi-tasking system, pending resolution
# of inter-task shutdown races introduced with notification proxies.
TASK_XFAILS := test/run-pass/acyclic-unwind.rs \
test/run-pass/alt-type-simple.rs \
test/run-pass/basic.rs \
test/run-pass/clone-with-exterior.rs \
test/run-pass/comm.rs \
test/run-pass/lazychan.rs \
test/run-pass/many.rs \
test/run-pass/obj-dtor.rs \
test/run-pass/preempt.rs \
test/run-pass/spawn-fn.rs \
test/run-pass/spawn-module-qualified.rs \
test/run-pass/spawn.rs \
test/run-pass/task-comm-0.rs \
test/run-pass/task-comm-1.rs \
test/run-pass/task-comm-2.rs \
test/run-pass/task-comm-3.rs \
test/run-pass/task-comm-7.rs \
test/run-pass/task-comm-8.rs \
test/run-pass/task-comm-9.rs \
test/run-pass/task-comm-10.rs \
test/run-pass/task-comm-11.rs \
test/run-pass/task-life-0.rs \
test/run-pass/task-comm.rs \
test/run-pass/threads.rs \
test/run-pass/yield.rs \
TASK_XFAILS := test/run-pass/task-comm-10.rs \
test/run-pass/task-comm-15.rs \
test/run-pass/task-life-0.rs
test/run-pass/task-life-0.rs \
test/run-pass/alt-type-simple.rs
TEST_XFAILS_X86 := $(TASK_XFAILS) \
test/run-pass/child-outlives-parent.rs \
@ -425,6 +402,7 @@ TEST_XFAILS_X86 := $(TASK_XFAILS) \
test/run-pass/task-comm.rs \
test/run-pass/vec-slice.rs \
test/run-pass/task-comm-3.rs \
test/run-fail/task-comm-14.rs \
test/compile-fail/bad-recv.rs \
test/compile-fail/bad-send.rs \
test/compile-fail/infinite-tag-type-recursion.rs \

View File

@ -20,7 +20,7 @@ let task_field_gc_alloc_chain = task_field_rust_sp + 1;;
let task_field_dom = task_field_gc_alloc_chain + 1;;
let n_visible_task_fields = task_field_dom + 1;;
let dom_field_interrupt_flag = 0;;
let dom_field_interrupt_flag = 1;;
let frame_glue_fns_field_mark = 0;;
let frame_glue_fns_field_drop = 1;;

View File

@ -1,11 +1,6 @@
/*
*
*/
#ifndef MEMORY_H
#define MEMORY_H
inline void *operator new(size_t size, void *mem) {
return mem;
}

View File

@ -1,7 +1,3 @@
/*
*
*/
#include "rust_internal.h"
#include "memory_region.h"
@ -20,6 +16,7 @@ memory_region::memory_region(memory_region *parent) :
}
void memory_region::free(void *mem) {
// printf("free: ptr 0x%" PRIxPTR"\n", (uintptr_t) mem);
if (_synchronized) { _lock.lock(); }
#ifdef TRACK_ALLOCATIONS
if (_allocation_list.replace(mem, NULL) == false) {
@ -34,7 +31,6 @@ void memory_region::free(void *mem) {
_live_allocations--;
_srv->free(mem);
if (_synchronized) { _lock.unlock(); }
}
void *
@ -63,6 +59,7 @@ memory_region::malloc(size_t size) {
#ifdef TRACK_ALLOCATIONS
_allocation_list.append(mem);
#endif
// printf("malloc: ptr 0x%" PRIxPTR "\n", (uintptr_t) mem);
if (_synchronized) { _lock.unlock(); }
return mem;
}

View File

@ -1,16 +1,16 @@
#include "rust_internal.h"
struct
command_line_args
command_line_args : public dom_owned<command_line_args>
{
rust_dom &dom;
rust_dom *dom;
int argc;
char **argv;
// vec[str] passed to rust_task::start.
rust_vec *args;
command_line_args(rust_dom &dom,
command_line_args(rust_dom *dom,
int sys_argc,
char **sys_argv)
: dom(dom),
@ -21,29 +21,29 @@ command_line_args
#if defined(__WIN32__)
LPCWSTR cmdline = GetCommandLineW();
LPWSTR *wargv = CommandLineToArgvW(cmdline, &argc);
dom.win32_require("CommandLineToArgvW", wargv != NULL);
argv = (char **) dom.malloc(sizeof(char*) * argc);
dom->win32_require("CommandLineToArgvW", wargv != NULL);
argv = (char **) dom->malloc(sizeof(char*) * argc);
for (int i = 0; i < argc; ++i) {
int n_chars = WideCharToMultiByte(CP_UTF8, 0, wargv[i], -1,
NULL, 0, NULL, NULL);
dom.win32_require("WideCharToMultiByte(0)", n_chars != 0);
argv[i] = (char *) dom.malloc(n_chars);
dom->win32_require("WideCharToMultiByte(0)", n_chars != 0);
argv[i] = (char *) dom->malloc(n_chars);
n_chars = WideCharToMultiByte(CP_UTF8, 0, wargv[i], -1,
argv[i], n_chars, NULL, NULL);
dom.win32_require("WideCharToMultiByte(1)", n_chars != 0);
dom->win32_require("WideCharToMultiByte(1)", n_chars != 0);
}
LocalFree(wargv);
#endif
size_t vec_fill = sizeof(rust_str *) * argc;
size_t vec_alloc = next_power_of_two(sizeof(rust_vec) + vec_fill);
void *mem = dom.malloc(vec_alloc);
args = new (mem) rust_vec(&dom, vec_alloc, 0, NULL);
void *mem = dom->malloc(vec_alloc);
args = new (mem) rust_vec(dom, vec_alloc, 0, NULL);
rust_str **strs = (rust_str**) &args->data[0];
for (int i = 0; i < argc; ++i) {
size_t str_fill = strlen(argv[i]) + 1;
size_t str_alloc = next_power_of_two(sizeof(rust_str) + str_fill);
mem = dom.malloc(str_alloc);
strs[i] = new (mem) rust_str(&dom, str_alloc, str_fill,
mem = dom->malloc(str_alloc);
strs[i] = new (mem) rust_str(dom, str_alloc, str_fill,
(uint8_t const *)argv[i]);
}
args->fill = vec_fill;
@ -58,50 +58,55 @@ command_line_args
// Drop the args we've had pinned here.
rust_str **strs = (rust_str**) &args->data[0];
for (int i = 0; i < argc; ++i)
dom.free(strs[i]);
dom.free(args);
dom->free(strs[i]);
dom->free(args);
}
#ifdef __WIN32__
for (int i = 0; i < argc; ++i) {
dom.free(argv[i]);
dom->free(argv[i]);
}
dom.free(argv);
dom->free(argv);
#endif
}
};
/**
* Main entry point into the Rust runtime. Here we create a Rust service,
* initialize the kernel, create the root domain and run it.
*/
extern "C" CDECL int
rust_start(uintptr_t main_fn, rust_crate const *crate, int argc, char **argv)
{
int ret;
{
rust_srv srv;
rust_dom dom(&srv, crate, "main");
srv.kernel->register_domain(&dom);
command_line_args args(dom, argc, argv);
rust_start(uintptr_t main_fn, rust_crate const *crate, int argc,
char **argv) {
dom.log(rust_log::DOM, "startup: %d args", args.argc);
for (int i = 0; i < args.argc; ++i)
dom.log(rust_log::DOM,
"startup: arg[%d] = '%s'", i, args.argv[i]);
rust_srv *srv = new rust_srv();
rust_kernel *kernel = new rust_kernel(srv);
kernel->start();
rust_handle<rust_dom> *handle = kernel->create_domain(crate, "main");
rust_dom *dom = handle->referent();
command_line_args *args = new (dom) command_line_args(dom, argc, argv);
if (dom._log.is_tracing(rust_log::DWARF)) {
rust_crate_reader rdr(&dom, crate);
}
uintptr_t main_args[4] = { 0, 0, 0, (uintptr_t)args.args };
dom.root_task->start(crate->get_exit_task_glue(),
main_fn,
(uintptr_t)&main_args,
sizeof(main_args));
ret = dom.start_main_loop();
srv.kernel->deregister_domain(&dom);
dom->log(rust_log::DOM, "startup: %d args", args->argc);
for (int i = 0; i < args->argc; i++) {
dom->log(rust_log::DOM,
"startup: arg[%d] = '%s'", i, args->argv[i]);
}
if (dom->_log.is_tracing(rust_log::DWARF)) {
rust_crate_reader create_reader(dom, crate);
}
uintptr_t main_args[4] = {0, 0, 0, (uintptr_t)args->args};
dom->root_task->start(crate->get_exit_task_glue(),
main_fn, (uintptr_t)&main_args, sizeof(main_args));
int ret = dom->start_main_loop();
delete args;
kernel->destroy_domain(dom);
kernel->join_all_domains();
delete kernel;
delete srv;
#if !defined(__WIN32__)
// Don't take down the process if the main thread exits without an
// error.

View File

@ -4,13 +4,15 @@
/**
* Create a new rust channel and associate it with the specified port.
*/
rust_chan::rust_chan(rust_task *task, maybe_proxy<rust_port> *port) :
task(task), port(port), buffer(task->dom, port->delegate()->unit_sz) {
rust_chan::rust_chan(rust_task *task,
maybe_proxy<rust_port> *port,
size_t unit_sz) :
task(task),
port(port),
buffer(task->dom, unit_sz) {
if (port) {
associate(port);
}
task->log(rust_log::MEM | rust_log::COMM,
"new rust_chan(task=0x%" PRIxPTR
", port=0x%" PRIxPTR ") -> chan=0x%" PRIxPTR,
@ -34,7 +36,7 @@ void rust_chan::associate(maybe_proxy<rust_port> *port) {
task->log(rust_log::TASK,
"associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
this, port);
this->port->delegate()->chans.push(this);
this->port->referent()->chans.push(this);
}
}
@ -51,8 +53,8 @@ void rust_chan::disassociate() {
if (port->is_proxy() == false) {
task->log(rust_log::TASK,
"disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
this, port->delegate());
port->delegate()->chans.swap_delete(this);
this, port->referent());
port->referent()->chans.swap_delete(this);
}
// Delete reference to the port.
@ -76,14 +78,11 @@ void rust_chan::send(void *sptr) {
"rust_chan::transmit with nothing to send.");
if (port->is_proxy()) {
// TODO: Cache port task locally.
rust_proxy<rust_task> *port_task =
dom->get_task_proxy(port->delegate()->task);
data_message::send(buffer.peek(), buffer.unit_sz,
"send data", task, port_task, port->as_proxy());
data_message::send(buffer.peek(), buffer.unit_sz, "send data",
task->get_handle(), port->as_proxy()->handle());
buffer.dequeue(NULL);
} else {
rust_port *target_port = port->delegate();
rust_port *target_port = port->referent();
if (target_port->task->blocked_on(target_port)) {
dom->log(rust_log::COMM, "dequeued in rendezvous_ptr");
buffer.dequeue(target_port->task->rendezvous_ptr);

View File

@ -5,7 +5,8 @@ class rust_chan : public rc_base<rust_chan>,
public task_owned<rust_chan>,
public rust_cond {
public:
rust_chan(rust_task *task, maybe_proxy<rust_port> *port);
rust_chan(rust_task *task, maybe_proxy<rust_port> *port, size_t unit_sz);
~rust_chan();
rust_task *task;

View File

@ -4,8 +4,9 @@
template class ptr_vec<rust_task>;
rust_dom::rust_dom(rust_srv *srv, rust_crate const *root_crate,
const char *name) :
rust_dom::rust_dom(rust_kernel *kernel,
rust_message_queue *message_queue, rust_srv *srv,
rust_crate const *root_crate, const char *name) :
interrupt_flag(0),
root_crate(root_crate),
_log(srv, this),
@ -20,7 +21,8 @@ rust_dom::rust_dom(rust_srv *srv, rust_crate const *root_crate,
root_task(NULL),
curr_task(NULL),
rval(0),
_kernel(srv->kernel)
kernel(kernel),
message_queue(message_queue)
{
logptr("new dom", (uintptr_t)this);
isaac_init(this, &rctx);
@ -42,33 +44,9 @@ del_all_tasks(rust_dom *dom, ptr_vec<rust_task> *v) {
}
}
void
rust_dom::delete_proxies() {
rust_task *task;
rust_proxy<rust_task> *task_proxy;
while (_task_proxies.pop(&task, &task_proxy)) {
log(rust_log::TASK,
"deleting proxy 0x%" PRIxPTR " in dom %s 0x%" PRIxPTR,
task_proxy, task_proxy->dom->name, task_proxy->dom);
delete task_proxy;
}
rust_port *port;
rust_proxy<rust_port> *port_proxy;
while (_port_proxies.pop(&port, &port_proxy)) {
log(rust_log::TASK,
"deleting proxy 0x%" PRIxPTR " in dom %s 0x%" PRIxPTR,
port_proxy, port_proxy->dom->name, port_proxy->dom);
delete port_proxy;
}
}
rust_dom::~rust_dom() {
log(rust_log::MEM | rust_log::DOM,
"~rust_dom %s @0x%" PRIxPTR, name, (uintptr_t)this);
log(rust_log::TASK, "deleting all proxies");
delete_proxies();
log(rust_log::TASK, "deleting all running tasks");
del_all_tasks(this, &running_tasks);
log(rust_log::TASK, "deleting all blocked tasks");
@ -78,8 +56,9 @@ rust_dom::~rust_dom() {
#ifndef __WIN32__
pthread_attr_destroy(&attr);
#endif
while (caches.length())
while (caches.length()) {
delete caches.pop();
}
}
void
@ -275,70 +254,21 @@ rust_dom::reap_dead_tasks() {
}
}
/**
* Enqueues a message in this domain's incoming message queue. It's the
* responsibility of the receiver to free the message once it's processed.
*/
void rust_dom::send_message(rust_message *message) {
log(rust_log::COMM, "==> enqueueing \"%s\" 0x%" PRIxPTR
" in queue 0x%" PRIxPTR
" in domain 0x%" PRIxPTR,
message->label,
message,
&_incoming_message_queue,
this);
_incoming_message_queue.enqueue(message);
}
/**
* Drains and processes incoming pending messages.
*/
void rust_dom::drain_incoming_message_queue(bool process) {
rust_message *message;
while (_incoming_message_queue.dequeue(&message)) {
log(rust_log::COMM, "<== processing incoming message \"%s\" 0x%"
PRIxPTR, message->label, message);
while (message_queue->dequeue(&message)) {
log(rust_log::COMM, "<== receiving \"%s\" " PTR,
message->label, message);
if (process) {
message->process();
}
message->~rust_message();
this->synchronized_region.free(message);
delete message;
}
}
rust_proxy<rust_task> *
rust_dom::get_task_proxy(rust_task *task) {
rust_proxy<rust_task> *proxy = NULL;
if (_task_proxies.get(task, &proxy)) {
return proxy;
}
log(rust_log::COMM, "no proxy for %s @0x%" PRIxPTR, task->name, task);
proxy = new (this) rust_proxy<rust_task> (this, task, false);
_task_proxies.put(task, proxy);
return proxy;
}
/**
* Gets a proxy for this port.
*
* TODO: This method needs to be synchronized since it's usually called
* during upcall_clone_chan in a different thread. However, for now
* since this usually happens before the thread actually starts,
* we may get lucky without synchronizing.
*
*/
rust_proxy<rust_port> *
rust_dom::get_port_proxy_synchronized(rust_port *port) {
rust_proxy<rust_port> *proxy = NULL;
if (_port_proxies.get(port, &proxy)) {
return proxy;
}
log(rust_log::COMM, "no proxy for 0x%" PRIxPTR, port);
proxy = new (this) rust_proxy<rust_port> (this, port, false);
_port_proxies.put(port, proxy);
return proxy;
}
/**
* Schedules a running task for execution. Only running tasks can be
* activated. Blocked tasks have to be unblocked before they can be
@ -362,31 +292,6 @@ rust_dom::schedule_task() {
return NULL;
}
/**
* Checks for simple deadlocks.
*/
bool
rust_dom::is_deadlocked() {
if (_kernel->domains.length() != 1) {
// We cannot tell if we are deadlocked if other domains exists.
return false;
}
if (running_tasks.length() != 0) {
// We are making progress and therefore we are not deadlocked.
return false;
}
if (_incoming_message_queue.is_empty() && blocked_tasks.length() > 0) {
// We have no messages to process, no running tasks to schedule
// and some blocked tasks therefore we are likely in a deadlock.
_kernel->log_all_domain_state();
return true;
}
return false;
}
void
rust_dom::log_state() {
if (!running_tasks.is_empty()) {
@ -439,7 +344,7 @@ rust_dom::start_main_loop()
logptr("exit-task glue", root_crate->get_exit_task_glue());
while (n_live_tasks() > 0) {
A(this, is_deadlocked() == false, "deadlock");
A(this, kernel->is_deadlocked() == false, "deadlock");
drain_incoming_message_queue(true);
@ -496,7 +401,7 @@ rust_dom::start_main_loop()
log(rust_log::DOM, "terminated scheduler loop, reaping dead tasks ...");
while (dead_tasks.length() > 0) {
if (_incoming_message_queue.is_empty()) {
if (message_queue->is_empty()) {
log(rust_log::DOM,
"waiting for %d dead tasks to become dereferenced, "
"scheduler yielding ...",

View File

@ -1,7 +1,7 @@
#ifndef RUST_DOM_H
#define RUST_DOM_H
struct rust_dom
struct rust_dom : public kernel_owned<rust_dom>, rc_base<rust_dom>
{
// Fields known to the compiler:
uintptr_t interrupt_flag;
@ -27,14 +27,14 @@ struct rust_dom
rust_task *curr_task;
int rval;
rust_kernel *_kernel;
rust_kernel *kernel;
int32_t list_index;
hash_map<rust_task *, rust_proxy<rust_task> *> _task_proxies;
hash_map<rust_port *, rust_proxy<rust_port> *> _port_proxies;
// Incoming messages from other domains.
lock_free_queue<rust_message*> _incoming_message_queue;
rust_message_queue *message_queue;
#ifndef __WIN32__
pthread_attr_t attr;
@ -42,9 +42,10 @@ struct rust_dom
// Only a pointer to 'name' is kept, so it must live as long as this
// domain.
rust_dom(rust_srv *srv, rust_crate const *root_crate, const char *name);
rust_dom(rust_kernel *kernel,
rust_message_queue *message_queue, rust_srv *srv,
rust_crate const *root_crate, const char *name);
~rust_dom();
void activate(rust_task *task);
void log(rust_task *task, uint32_t logbit, char const *fmt, ...);
void log(uint32_t logbit, char const *fmt, ...);
@ -63,11 +64,7 @@ struct rust_dom
void free(void *mem);
void free(void *mem, memory_region::memory_region_type type);
void send_message(rust_message *message);
void drain_incoming_message_queue(bool process);
rust_proxy<rust_task> *get_task_proxy(rust_task *task);
void delete_proxies();
rust_proxy<rust_port> *get_port_proxy_synchronized(rust_port *port);
#ifdef __WIN32__
void win32_require(LPCTSTR fn, BOOL ok);
@ -81,7 +78,7 @@ struct rust_dom
void reap_dead_tasks();
rust_task *schedule_task();
bool is_deadlocked();
int start_main_loop();
void log_state();

View File

@ -72,6 +72,11 @@ struct frame_glue_fns;
#define A(dom, e, s, ...) ((e) ? (void)0 : \
(dom)->srv->fatal(#e, __FILE__, __LINE__, s, ## __VA_ARGS__))
#define K(srv, e, s, ...) ((e) ? (void)0 : \
srv->fatal(#e, __FILE__, __LINE__, s, ## __VA_ARGS__))
#define PTR "0x%" PRIxPTR
// This drives our preemption scheme.
static size_t const TIME_SLICE_IN_MS = 10;
@ -96,25 +101,29 @@ template <typename T> struct rc_base {
};
template <typename T> struct dom_owned {
rust_dom *get_dom() const {
return ((T*)this)->dom;
}
void operator delete(void *ptr) {
((T *)ptr)->dom->free(ptr);
}
};
template <typename T> struct task_owned {
rust_dom *get_dom() const {
return ((T *)this)->task->dom;
}
void operator delete(void *ptr) {
((T *)ptr)->task->dom->free(ptr);
}
};
template <typename T> struct kernel_owned {
void operator delete(void *ptr) {
((T *)ptr)->kernel->free(ptr);
}
};
template <typename T> struct region_owned {
void operator delete(void *ptr) {
((T *)ptr)->region->free(ptr);
}
};
// A cond(ition) is something we can block on. This can be a channel
// (writing), a port (reading) or a task (waiting).
@ -152,8 +161,8 @@ public:
#include "rust_srv.h"
#include "rust_log.h"
#include "rust_proxy.h"
#include "rust_message.h"
#include "rust_kernel.h"
#include "rust_message.h"
#include "rust_dom.h"
#include "memory.h"
@ -552,6 +561,9 @@ struct gc_alloc {
#include "rust_chan.h"
#include "rust_port.h"
#include "test/rust_test_harness.h"
#include "test/rust_test_util.h"
//
// Local Variables:
// mode: C++

View File

@ -1,25 +1,81 @@
#include "rust_internal.h"
rust_kernel::rust_kernel(rust_srv *srv) :
_region(srv->local_region),
_region(&srv->local_region),
_log(srv, NULL),
domains(srv->local_region),
message_queues(srv->local_region) {
_srv(srv),
_interrupt_kernel_loop(FALSE),
domains(&srv->local_region),
message_queues(&srv->local_region) {
// Nop.
}
rust_kernel::~rust_kernel() {
// Nop.
}
void
rust_kernel::register_domain(rust_dom *dom) {
rust_handle<rust_dom> *
rust_kernel::create_domain(const rust_crate *crate, const char *name) {
rust_message_queue *message_queue =
new (this) rust_message_queue(_srv, this);
rust_srv *srv = _srv->clone();
rust_dom *dom =
new (this) rust_dom(this, message_queue, srv, crate, name);
rust_handle<rust_dom> *handle = get_dom_handle(dom);
message_queue->associate(handle);
domains.append(dom);
message_queues.append(message_queue);
return handle;
}
void
rust_kernel::deregister_domain(rust_dom *dom) {
rust_kernel::destroy_domain(rust_dom *dom) {
log(rust_log::KERN, "deleting domain: " PTR ", index: %d, domains %d",
dom, dom->list_index, domains.length());
domains.remove(dom);
dom->message_queue->disassociate();
rust_srv *srv = dom->srv;
delete dom;
delete srv;
}
rust_handle<rust_dom> *
rust_kernel::get_dom_handle(rust_dom *dom) {
rust_handle<rust_dom> *handle = NULL;
if (_dom_handles.get(dom, &handle)) {
return handle;
}
handle = new (this) rust_handle<rust_dom>(this, dom->message_queue, dom);
_dom_handles.put(dom, handle);
return handle;
}
rust_handle<rust_task> *
rust_kernel::get_task_handle(rust_task *task) {
rust_handle<rust_task> *handle = NULL;
if (_task_handles.get(task, &handle)) {
return handle;
}
handle = new (this) rust_handle<rust_task>(this, task->dom->message_queue,
task);
_task_handles.put(task, handle);
return handle;
}
rust_handle<rust_port> *
rust_kernel::get_port_handle(rust_port *port) {
rust_handle<rust_port> *handle = NULL;
if (_port_handles.get(port, &handle)) {
return handle;
}
handle = new (this) rust_handle<rust_port>(this,
port->task->dom->message_queue, port);
_port_handles.put(port, handle);
return handle;
}
void
rust_kernel::join_all_domains() {
// TODO: Perhaps we can do this a little smarter. Just spin wait for now.
while (domains.length() > 0) {
sync::yield();
}
}
void
@ -30,6 +86,36 @@ rust_kernel::log_all_domain_state() {
}
}
/**
* Checks for simple deadlocks.
*/
bool
rust_kernel::is_deadlocked() {
return false;
// _lock.lock();
// if (domains.length() != 1) {
// // We can only check for deadlocks when only one domain exists.
// return false;
// }
//
// if (domains[0]->running_tasks.length() != 0) {
// // We are making progress and therefore we are not deadlocked.
// return false;
// }
//
// if (domains[0]->message_queue->is_empty() &&
// domains[0]->blocked_tasks.length() > 0) {
// // We have no messages to process, no running tasks to schedule
// // and some blocked tasks therefore we are likely in a deadlock.
// log_all_domain_state();
// return true;
// }
//
// _lock.unlock();
// return false;
}
void
rust_kernel::log(uint32_t type_bits, char const *fmt, ...) {
char buf[256];
@ -41,3 +127,69 @@ rust_kernel::log(uint32_t type_bits, char const *fmt, ...) {
va_end(args);
}
}
void
rust_kernel::start_kernel_loop() {
while (_interrupt_kernel_loop == false) {
message_queues.global.lock();
for (size_t i = 0; i < message_queues.length(); i++) {
rust_message_queue *queue = message_queues[i];
if (queue->is_associated() == false) {
rust_message *message = NULL;
while (queue->dequeue(&message)) {
message->kernel_process();
delete message;
}
}
}
message_queues.global.unlock();
}
}
void
rust_kernel::run() {
log(rust_log::KERN, "started kernel loop");
start_kernel_loop();
log(rust_log::KERN, "finished kernel loop");
}
rust_kernel::~rust_kernel() {
K(_srv, domains.length() == 0,
"Kernel has %d live domain(s), join all domains before killing "
"the kernel.", domains.length());
// If the kernel loop is running, interrupt it, join and exit.
if (is_running()) {
_interrupt_kernel_loop = true;
join();
}
free_handles(_task_handles);
free_handles(_port_handles);
free_handles(_dom_handles);
rust_message_queue *queue = NULL;
while (message_queues.pop(&queue)) {
K(_srv, queue->is_empty(), "Kernel message queue should be empty "
"before killing the kernel.");
delete queue;
}
}
void *
rust_kernel::malloc(size_t size) {
return _region->malloc(size);
}
void rust_kernel::free(void *mem) {
_region->free(mem);
}
template<class T> void
rust_kernel::free_handles(hash_map<T*, rust_handle<T>* > &map) {
T* key;
rust_handle<T> *value;
while (map.pop(&key, &value)) {
delete value;
}
}

View File

@ -2,20 +2,113 @@
#define RUST_KERNEL_H
/**
* A global object shared by all domains.
* A handle object for Rust tasks. We need a reference to the message queue
* of the referent's domain which we can safely hang on to since it's a
* kernel object. We use the referent reference as a label we stash in
* messages sent via this proxy.
*/
class rust_kernel {
memory_region &_region;
rust_log _log;
class rust_kernel;
template <typename T> class
rust_handle :
public rust_cond,
public rc_base<rust_handle<T> >,
public kernel_owned<rust_handle<T> > {
public:
rust_kernel *kernel;
rust_message_queue *message_queue;
T *_referent;
T * referent() {
return _referent;
}
rust_handle(rust_kernel *kernel,
rust_message_queue *message_queue,
T *referent) :
kernel(kernel),
message_queue(message_queue),
_referent(referent) {
// Nop.
}
};
/**
* A global object shared by all thread domains. Most of the data structures
* in this class are synchronized since they are accessed from multiple
* threads.
*/
class rust_kernel : public rust_thread {
memory_region *_region;
rust_log _log;
rust_srv *_srv;
/**
* Task proxy objects are kernel owned handles to Rust objects.
*/
hash_map<rust_task *, rust_handle<rust_task> *> _task_handles;
hash_map<rust_port *, rust_handle<rust_port> *> _port_handles;
hash_map<rust_dom *, rust_handle<rust_dom> *> _dom_handles;
template<class T> void free_handles(hash_map<T*, rust_handle<T>* > &map);
void run();
void start_kernel_loop();
bool volatile _interrupt_kernel_loop;
/**
* Lock for the message queue list, so we can safely
*/
spin_lock _message_queues_lock;
public:
/**
* List of domains that are currently executing.
*/
synchronized_indexed_list<rust_dom> domains;
synchronized_indexed_list<lock_free_queue<rust_message*> > message_queues;
/**
* Message queues are kernel objects and are associated with domains.
* Their lifetime is not bound to the lifetime of a domain and in fact
* live on after their associated domain has died. This way we can safely
* communicate with domains that may have died.
*
* Although the message_queues list is synchronized, each individual
* message queue is lock free.
*/
synchronized_indexed_list<rust_message_queue> message_queues;
rust_handle<rust_dom> *get_dom_handle(rust_dom *dom);
rust_handle<rust_task> *get_task_handle(rust_task *task);
rust_handle<rust_port> *get_port_handle(rust_port *port);
rust_kernel(rust_srv *srv);
void register_domain(rust_dom *dom);
void deregister_domain(rust_dom *dom);
rust_handle<rust_dom> *create_domain(rust_crate const *root_crate,
const char *name);
void destroy_domain(rust_dom *dom);
bool is_deadlocked();
/**
* Blocks until all domains have terminated.
*/
void join_all_domains();
void log_all_domain_state();
void log(uint32_t type_bits, char const *fmt, ...);
virtual ~rust_kernel();
void *malloc(size_t size);
void free(void *mem);
};
inline void *operator new(size_t size, rust_kernel *kernel) {
return kernel->malloc(size);
}
inline void *operator new(size_t size, rust_kernel &kernel) {
return kernel.malloc(size);
}
#endif /* RUST_KERNEL_H */

View File

@ -2,36 +2,35 @@
#include "rust_message.h"
rust_message::
rust_message(const char* label, rust_task *source, rust_task *target) :
label(label),
_dom(target->dom),
_source(source),
_target(target) {
rust_message(memory_region *region, const char* label,
rust_handle<rust_task> *source, rust_handle<rust_task> *target) :
label(label), region(region), _source(source), _target(target) {
}
rust_message::~rust_message() {
// Nop.
}
void rust_message::process() {
I(_dom, false);
// Nop.
}
rust_proxy<rust_task> *
rust_message::get_source_proxy() {
return _dom->get_task_proxy(_source);
void rust_message::kernel_process() {
// Nop.
}
notify_message::
notify_message(notification_type type, const char* label,
rust_task *source,
rust_task *target) :
rust_message(label, source, target), type(type) {
notify_message(memory_region *region, notification_type type,
const char* label, rust_handle<rust_task> *source,
rust_handle<rust_task> *target) :
rust_message(region, label, source, target), type(type) {
}
data_message::
data_message(uint8_t *buffer, size_t buffer_sz, const char* label,
rust_task *source, rust_task *target, rust_port *port) :
rust_message(label, source, target),
data_message(memory_region *region, uint8_t *buffer, size_t buffer_sz,
const char* label, rust_handle<rust_task> *source,
rust_handle<rust_port> *port) :
rust_message(region, label, source, NULL),
_buffer_sz(buffer_sz), _port(port) {
_buffer = (uint8_t *)malloc(buffer_sz);
memcpy(_buffer, buffer, buffer_sz);
@ -47,54 +46,79 @@ data_message::~data_message() {
* source task.
*/
void notify_message::
send(notification_type type, const char* label, rust_task *source,
rust_proxy<rust_task> *target) {
rust_task *target_task = target->delegate();
rust_dom *target_domain = target_task->dom;
send(notification_type type, const char* label,
rust_handle<rust_task> *source, rust_handle<rust_task> *target) {
memory_region *region = &target->message_queue->region;
notify_message *message =
new (target_domain, memory_region::SYNCHRONIZED) notify_message(type,
label, source, target_task);
target_domain->send_message(message);
new (region) notify_message(region, type, label, source, target);
// target->referent()->log(rust_log::COMM,
// "==> sending \"%s\" " PTR " in queue " PTR,
// label, message, &target->message_queue);
target->message_queue->enqueue(message);
}
void notify_message::process() {
rust_task *task = _target;
rust_task *task = _target->referent();
switch (type) {
case KILL:
task->ref_count--;
// task->ref_count--;
task->kill();
break;
case JOIN: {
if (task->dead() == false) {
task->tasks_waiting_to_join.append(get_source_proxy());
rust_proxy<rust_task> *proxy = new rust_proxy<rust_task>(_source);
task->tasks_waiting_to_join.append(proxy);
} else {
send(WAKEUP, "wakeup", task, get_source_proxy());
send(WAKEUP, "wakeup", _target, _source);
}
break;
}
case WAKEUP:
task->wakeup(get_source_proxy()->delegate());
task->wakeup(_source);
break;
}
}
void notify_message::kernel_process() {
switch(type) {
case WAKEUP:
case KILL:
// Ignore.
break;
case JOIN:
send(WAKEUP, "wakeup", _target, _source);
break;
}
}
void data_message::
send(uint8_t *buffer, size_t buffer_sz, const char* label, rust_task *source,
rust_proxy<rust_task> *target, rust_proxy<rust_port> *port) {
send(uint8_t *buffer, size_t buffer_sz, const char* label,
rust_handle<rust_task> *source, rust_handle<rust_port> *port) {
rust_task *target_task = target->delegate();
rust_port *target_port = port->delegate();
rust_dom *target_domain = target_task->dom;
memory_region *region = &port->message_queue->region;
data_message *message =
new (target_domain, memory_region::SYNCHRONIZED)
data_message(buffer, buffer_sz, label, source,
target_task, target_port);
target_domain->send_message(message);
new (region) data_message(region, buffer, buffer_sz, label, source,
port);
source->referent()->log(rust_log::COMM,
"==> sending \"%s\"" PTR " in queue " PTR,
label, message, &port->message_queue);
port->message_queue->enqueue(message);
}
void data_message::process() {
_port->remote_channel->send(_buffer);
_target->log(rust_log::COMM, "<=== received data via message ===");
_port->referent()->remote_channel->send(_buffer);
// _target->referent()->log(rust_log::COMM,
// "<=== received data via message ===");
}
void data_message::kernel_process() {
}
rust_message_queue::rust_message_queue(rust_srv *srv, rust_kernel *kernel) :
region (srv, true), kernel(kernel),
dom_handle(NULL) {
// Nop.
}
//

View File

@ -9,28 +9,31 @@
/**
* Abstract base class for all message types.
*/
class rust_message {
class rust_message : public region_owned<rust_message> {
public:
const char* label;
memory_region *region;
private:
rust_dom *_dom;
rust_task *_source;
protected:
rust_task *_target;
rust_handle<rust_task> *_source;
rust_handle<rust_task> *_target;
public:
rust_message(const char* label, rust_task *source, rust_task *target);
rust_message(memory_region *region,
const char* label,
rust_handle<rust_task> *source,
rust_handle<rust_task> *target);
virtual ~rust_message();
/**
* We can only access the source task through a proxy, so create one
* on demand if we need it.
*/
rust_proxy<rust_task> *get_source_proxy();
/**
* Processes the message in the target domain thread.
* Processes the message in the target domain.
*/
virtual void process();
/**
* Processes the message in the kernel.
*/
virtual void kernel_process();
};
/**
@ -44,17 +47,19 @@ public:
const notification_type type;
notify_message(notification_type type, const char* label,
rust_task *source, rust_task *target);
notify_message(memory_region *region, notification_type type,
const char* label, rust_handle<rust_task> *source,
rust_handle<rust_task> *target);
void process();
void kernel_process();
/**
* This code executes in the sending domain's thread.
*/
static void
send(notification_type type, const char* label, rust_task *source,
rust_proxy<rust_task> *target);
send(notification_type type, const char* label,
rust_handle<rust_task> *source, rust_handle<rust_task> *target);
};
/**
@ -64,21 +69,51 @@ class data_message : public rust_message {
private:
uint8_t *_buffer;
size_t _buffer_sz;
rust_port *_port;
public:
rust_handle<rust_port> *_port;
public:
data_message(memory_region *region, uint8_t *buffer, size_t buffer_sz,
const char* label, rust_handle<rust_task> *source,
rust_handle<rust_port> *port);
data_message(uint8_t *buffer, size_t buffer_sz, const char* label,
rust_task *source, rust_task *target, rust_port *port);
virtual ~data_message();
void process();
void kernel_process();
/**
* This code executes in the sending domain's thread.
*/
static void
send(uint8_t *buffer, size_t buffer_sz, const char* label,
rust_task *source, rust_proxy<rust_task> *target,
rust_proxy<rust_port> *port);
rust_handle<rust_task> *source, rust_handle<rust_port> *port);
};
class rust_message_queue : public lock_free_queue<rust_message*>,
public kernel_owned<rust_message_queue> {
public:
memory_region region;
rust_kernel *kernel;
rust_handle<rust_dom> *dom_handle;
int32_t list_index;
rust_message_queue(rust_srv *srv, rust_kernel *kernel);
void associate(rust_handle<rust_dom> *dom_handle) {
this->dom_handle = dom_handle;
}
/**
* The Rust domain relinquishes control to the Rust kernel.
*/
void disassociate() {
this->dom_handle = NULL;
}
/**
* Checks if a Rust domain is responsible for draining the message queue.
*/
bool is_associated() {
return this->dom_handle != NULL;
}
};
//

View File

@ -2,15 +2,15 @@
#include "rust_port.h"
rust_port::rust_port(rust_task *task, size_t unit_sz) :
maybe_proxy<rust_port>(this), task(task), unit_sz(unit_sz),
writers(task->dom), chans(task->dom) {
maybe_proxy<rust_port>(this), task(task),
unit_sz(unit_sz), writers(task->dom), chans(task->dom) {
task->log(rust_log::MEM | rust_log::COMM,
"new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%"
PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this);
// Allocate a remote channel, for remote channel data.
remote_channel = new (task->dom) rust_chan(task, this);
remote_channel = new (task->dom) rust_chan(task, this, unit_sz);
}
rust_port::~rust_port() {

View File

@ -2,57 +2,70 @@
#define RUST_PROXY_H
/**
* A proxy object is a wrapper around other Rust objects. One use of the proxy
* object is to mitigate access between tasks in different thread domains.
* A proxy object is a wrapper for remote objects. Proxy objects are domain
* owned and provide a way distinguish between local and remote objects.
*/
template <typename T> struct rust_proxy;
/**
* The base class of all objects that may delegate.
*/
template <typename T> struct
maybe_proxy : public rc_base<T>, public rust_cond {
protected:
T *_delegate;
T *_referent;
public:
maybe_proxy(T * delegate) : _delegate(delegate) {
maybe_proxy(T *referent) : _referent(referent) {
// Nop.
}
T *referent() {
return (T *)_referent;
}
T *delegate() {
return _delegate;
}
bool is_proxy() {
return _delegate != this;
return _referent != this;
}
rust_proxy<T> *as_proxy() {
return (rust_proxy<T> *) this;
}
T *as_delegate() {
I(_delegate->get_dom(), !is_proxy());
T *as_referent() {
return (T *) this;
}
};
template <typename T> class rust_handle;
/**
* A proxy object that delegates to another.
*/
template <typename T> struct
rust_proxy : public maybe_proxy<T>,
public dom_owned<rust_proxy<T> > {
rust_proxy : public maybe_proxy<T> {
private:
bool _strong;
rust_handle<T> *_handle;
public:
rust_dom *dom;
rust_proxy(rust_dom *dom, T *delegate, bool strong) :
maybe_proxy<T> (delegate), _strong(strong), dom(dom) {
this->dom->log(rust_log::COMM,
"new proxy: 0x%" PRIxPTR " => 0x%" PRIxPTR, this, delegate);
if (strong) {
delegate->ref();
}
rust_proxy(rust_handle<T> *handle) :
maybe_proxy<T> (NULL), _strong(FALSE), _handle(handle) {
// Nop.
}
rust_proxy(T *referent) :
maybe_proxy<T> (referent), _strong(FALSE), _handle(NULL) {
// Nop.
}
rust_handle<T> *handle() {
return _handle;
}
};
class rust_message_queue;
class rust_task;
//
// Local Variables:
// mode: C++

View File

@ -7,13 +7,14 @@
rust_srv::rust_srv() :
local_region(this, false),
synchronized_region(this, true),
kernel(new rust_kernel(this)) {
synchronized_region(this, true) {
// Nop.
}
rust_srv::~rust_srv() {
// Nop.
// char msg[1024];
// snprintf(msg, sizeof(msg), "~rust_srv %" PRIxPTR, (uintptr_t) this);
// log(msg);
}
void
@ -74,3 +75,8 @@ rust_srv::warning(char const *expression,
expression, file, (int)line, buf);
log(msg);
}
rust_srv *
rust_srv::clone() {
return new rust_srv();
}

View File

@ -7,7 +7,6 @@ class rust_srv {
public:
memory_region local_region;
memory_region synchronized_region;
rust_kernel *kernel;
virtual void log(char const *msg);
virtual void fatal(char const *expression,
char const *file,
@ -24,6 +23,7 @@ public:
virtual void *realloc(void *, size_t);
rust_srv();
virtual ~rust_srv();
virtual rust_srv *clone();
};
#endif /* RUST_SRV_H */

View File

@ -404,9 +404,10 @@ rust_task::notify_tasks_waiting_to_join() {
tasks_waiting_to_join.pop(&waiting_task);
if (waiting_task->is_proxy()) {
notify_message::send(notify_message::WAKEUP, "wakeup",
this, waiting_task->as_proxy());
get_handle(), waiting_task->as_proxy()->handle());
delete waiting_task;
} else {
rust_task *task = waiting_task->delegate();
rust_task *task = waiting_task->referent();
if (task->dead() == false) {
task->wakeup(this);
}
@ -563,8 +564,7 @@ rust_task::transition(ptr_vec<rust_task> *src, ptr_vec<rust_task> *dst)
}
void
rust_task::block(rust_cond *on, const char* name)
{
rust_task::block(rust_cond *on, const char* name) {
log(rust_log::TASK, "Blocking on 0x%" PRIxPTR ", cond: 0x%" PRIxPTR,
(uintptr_t) on, (uintptr_t) cond);
A(dom, cond == NULL, "Cannot block an already blocked task.");
@ -631,6 +631,11 @@ rust_task::log(uint32_t type_bits, char const *fmt, ...) {
}
}
rust_handle<rust_task> *
rust_task::get_handle() {
return dom->kernel->get_task_handle(this);
}
//
// Local Variables:
// mode: C++

View File

@ -50,6 +50,7 @@ rust_task : public maybe_proxy<rust_task>,
rust_task(rust_dom *dom,
rust_task *spawner,
const char *name);
~rust_task();
void start(uintptr_t exit_task_glue,
@ -110,6 +111,8 @@ rust_task : public maybe_proxy<rust_task>,
// Notify tasks waiting for us that we are about to die.
void notify_tasks_waiting_to_join();
rust_handle<rust_task> * get_handle();
uintptr_t get_fp();
uintptr_t get_previous_fp(uintptr_t fp);
frame_glue_fns *get_frame_glue_fns(uintptr_t fp);

View File

@ -23,20 +23,6 @@
(task)->dom->get_log().indent();
#endif
void
log_task_state(rust_task *task, maybe_proxy<rust_task> *target) {
rust_task *delegate = target->delegate();
if (target->is_proxy()) {
task->log(rust_log::TASK,
"remote task: 0x%" PRIxPTR ", ref_count: %d state: %s",
delegate, delegate->ref_count, delegate->state_str());
} else {
task->log(rust_log::TASK,
"local task: 0x%" PRIxPTR ", ref_count: %d state: %s",
delegate, delegate->ref_count, delegate->state_str());
}
}
extern "C" CDECL char const *
str_buf(rust_task *task, rust_str *s);
@ -104,7 +90,7 @@ upcall_new_chan(rust_task *task, rust_port *port) {
"task=0x%" PRIxPTR " (%s), port=0x%" PRIxPTR ")",
(uintptr_t) task, task->name, port);
I(dom, port);
return new (dom) rust_chan(task, port);
return new (dom) rust_chan(task, port, port->unit_sz);
}
/**
@ -135,43 +121,55 @@ void upcall_del_chan(rust_task *task, rust_chan *chan) {
"Channel's ref count should be zero.");
if (chan->is_associated()) {
// We're trying to delete a channel that another task may be reading
// from. We have two options:
//
// 1. We can flush the channel by blocking in upcall_flush_chan()
// and resuming only when the channel is flushed. The problem
// here is that we can get ourselves in a deadlock if the parent
// task tries to join us.
//
// 2. We can leave the channel in a "dormnat" state by not freeing
// it and letting the receiver task delete it for us instead.
if (chan->buffer.is_empty() == false) {
return;
if (chan->port->is_proxy()) {
// Here is a good place to delete the port proxy we allocated
// in upcall_clone_chan.
rust_proxy<rust_port> *proxy = chan->port->as_proxy();
chan->disassociate();
delete proxy;
} else {
// We're trying to delete a channel that another task may be
// reading from. We have two options:
//
// 1. We can flush the channel by blocking in upcall_flush_chan()
// and resuming only when the channel is flushed. The problem
// here is that we can get ourselves in a deadlock if the
// parent task tries to join us.
//
// 2. We can leave the channel in a "dormnat" state by not freeing
// it and letting the receiver task delete it for us instead.
if (chan->buffer.is_empty() == false) {
return;
}
chan->disassociate();
}
chan->disassociate();
}
delete chan;
}
/**
* Clones a channel and stores it in the spawnee's domain. Each spawned task
* has it's own copy of the channel.
* has its own copy of the channel.
*/
extern "C" CDECL rust_chan *
upcall_clone_chan(rust_task *task,
maybe_proxy<rust_task> *target,
upcall_clone_chan(rust_task *task, maybe_proxy<rust_task> *target,
rust_chan *chan) {
LOG_UPCALL_ENTRY(task);
task->log(rust_log::UPCALL | rust_log::COMM,
"target: 0x%" PRIxPTR ", chan: 0x%" PRIxPTR,
target, chan);
rust_task *target_task = target->delegate();
size_t unit_sz = chan->buffer.unit_sz;
maybe_proxy<rust_port> *port = chan->port;
if (target->is_proxy()) {
port = target_task->dom->get_port_proxy_synchronized(
chan->port->as_delegate());
rust_task *target_task = NULL;
if (target->is_proxy() == false) {
port = chan->port;
target_task = target->referent();
} else {
rust_handle<rust_port> *handle =
task->dom->kernel->get_port_handle(port->as_referent());
maybe_proxy<rust_port> *proxy = new rust_proxy<rust_port> (handle);
task->log(rust_log::MEM, "new proxy: " PTR, proxy);
port = proxy;
target_task = target->as_proxy()->handle()->referent();
}
return new (target_task->dom) rust_chan(target_task, port);
return new (target_task->dom) rust_chan(target_task, port, unit_sz);
}
extern "C" CDECL void
@ -193,17 +191,15 @@ upcall_sleep(rust_task *task, size_t time_in_us) {
extern "C" CDECL void
upcall_join(rust_task *task, maybe_proxy<rust_task> *target) {
LOG_UPCALL_ENTRY(task);
rust_task *target_task = target->delegate();
task->log(rust_log::UPCALL | rust_log::COMM,
"target: 0x%" PRIxPTR ", task: %s @0x%" PRIxPTR,
target, target_task->name, target_task);
if (target->is_proxy()) {
notify_message::
send(notify_message::JOIN, "join", task, target->as_proxy());
task->block(target_task, "joining remote task");
rust_handle<rust_task> *task_handle = target->as_proxy()->handle();
notify_message::send(notify_message::JOIN, "join",
task->get_handle(), task_handle);
task->block(task_handle, "joining remote task");
task->yield(2);
} else {
rust_task *target_task = target->referent();
// If the other task is already dying, we don't have to wait for it.
if (target_task->dead() == false) {
target_task->tasks_waiting_to_join.push(task);
@ -221,10 +217,6 @@ upcall_join(rust_task *task, maybe_proxy<rust_task> *target) {
extern "C" CDECL void
upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
LOG_UPCALL_ENTRY(task);
task->log(rust_log::UPCALL | rust_log::COMM,
"chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d",
(uintptr_t) chan, (uintptr_t) sptr,
chan->port->delegate()->unit_sz);
chan->send(sptr);
task->log(rust_log::COMM, "=== sent data ===>");
}
@ -269,21 +261,14 @@ upcall_fail(rust_task *task,
extern "C" CDECL void
upcall_kill(rust_task *task, maybe_proxy<rust_task> *target) {
LOG_UPCALL_ENTRY(task);
log_task_state(task, target);
rust_task *target_task = target->delegate();
task->log(rust_log::UPCALL | rust_log::TASK,
"kill task %s @0x%" PRIxPTR ", ref count %d",
target_task->name, target_task,
target_task->ref_count);
if (target->is_proxy()) {
notify_message::
send(notify_message::KILL, "kill", task, target->as_proxy());
send(notify_message::KILL, "kill", task->get_handle(),
target->as_proxy()->handle());
// The proxy ref_count dropped to zero, delete it here.
delete target->as_proxy();
} else {
target_task->kill();
target->referent()->kill();
}
}
@ -554,25 +539,6 @@ upcall_get_type_desc(rust_task *task,
return td;
}
#if defined(__WIN32__)
static DWORD WINAPI rust_thread_start(void *ptr)
#elif defined(__GNUC__)
static void *rust_thread_start(void *ptr)
#else
#error "Platform not supported"
#endif
{
// We were handed the domain we are supposed to run.
rust_dom *dom = (rust_dom *) ptr;
// Start a new rust main loop for this thread.
dom->start_main_loop();
rust_srv *srv = dom->srv;
srv->kernel->deregister_domain(dom);
delete dom;
return 0;
}
extern "C" CDECL rust_task *
upcall_new_task(rust_task *spawner, const char *name) {
LOG_UPCALL_ENTRY(spawner);
@ -604,54 +570,76 @@ upcall_start_task(rust_task *spawner,
return task;
}
/**
* Called whenever a new domain is created.
*/
extern "C" CDECL maybe_proxy<rust_task> *
upcall_new_thread(rust_task *task, const char *name) {
LOG_UPCALL_ENTRY(task);
rust_dom *old_dom = task->dom;
rust_dom *new_dom = new rust_dom(old_dom->srv,
old_dom->root_crate,
name);
old_dom->srv->kernel->register_domain(new_dom);
rust_dom *parent_dom = task->dom;
rust_kernel *kernel = parent_dom->kernel;
rust_handle<rust_dom> *child_dom_handle =
kernel->create_domain(parent_dom->root_crate, name);
rust_handle<rust_task> *child_task_handle =
kernel->get_task_handle(child_dom_handle->referent()->root_task);
task->log(rust_log::UPCALL | rust_log::MEM,
"upcall new_thread(%s) = dom 0x%" PRIxPTR " task 0x%" PRIxPTR,
name, new_dom, new_dom->root_task);
rust_proxy<rust_task> *proxy =
new (old_dom) rust_proxy<rust_task>(old_dom,
new_dom->root_task, true);
task->log(rust_log::UPCALL | rust_log::MEM,
"new proxy = 0x%" PRIxPTR " -> task = 0x%" PRIxPTR,
proxy, proxy->delegate());
return proxy;
"child name: %s, child_dom_handle: " PTR
", child_task_handle: " PTR,
name, child_dom_handle, child_task_handle);
rust_proxy<rust_task> *child_task_proxy =
new rust_proxy<rust_task> (child_task_handle);
return child_task_proxy;
}
#if defined(__WIN32__)
static DWORD WINAPI rust_thread_start(void *ptr)
#elif defined(__GNUC__)
static void *rust_thread_start(void *ptr)
#else
#error "Platform not supported"
#endif
{
// We were handed the domain we are supposed to run.
rust_dom *dom = (rust_dom *) ptr;
// Start a new rust main loop for this thread.
dom->start_main_loop();
// Destroy the domain.
dom->kernel->destroy_domain(dom);
return 0;
}
/**
* Called after a new domain is created. Here we create a new thread and
* and start the domain main loop.
*/
extern "C" CDECL maybe_proxy<rust_task> *
upcall_start_thread(rust_task *spawner,
maybe_proxy<rust_task> *root_task_proxy,
upcall_start_thread(rust_task *task,
rust_proxy<rust_task> *child_task_proxy,
uintptr_t exit_task_glue,
uintptr_t spawnee_fn,
size_t callsz) {
LOG_UPCALL_ENTRY(spawner);
rust_dom *dom = spawner->dom;
rust_task *root_task = root_task_proxy->delegate();
dom->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK,
"upcall start_thread(exit_task_glue 0x%" PRIxPTR
", spawnee 0x%" PRIxPTR
", callsz %" PRIdPTR ")", exit_task_glue, spawnee_fn, callsz);
root_task->start(exit_task_glue, spawnee_fn, spawner->rust_sp, callsz);
LOG_UPCALL_ENTRY(task);
rust_dom *parenet_dom = task->dom;
rust_handle<rust_task> *child_task_handle = child_task_proxy->handle();
task->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK,
"exit_task_glue: " PTR ", spawnee_fn " PTR
", callsz %" PRIdPTR ")", exit_task_glue, spawnee_fn, callsz);
rust_task *child_task = child_task_handle->referent();
child_task->start(exit_task_glue, spawnee_fn, task->rust_sp, callsz);
#if defined(__WIN32__)
HANDLE thread;
thread = CreateThread(NULL, 0, rust_thread_start, root_task->dom,
0, NULL);
dom->win32_require("CreateThread", thread != NULL);
thread = CreateThread(NULL, 0, rust_thread_start, child_task->dom, 0,
NULL);
parenet_dom->win32_require("CreateThread", thread != NULL);
#else
pthread_t thread;
pthread_create(&thread, &dom->attr, rust_thread_start,
(void *) root_task->dom);
pthread_create(&thread, &parenet_dom->attr, rust_thread_start,
(void *) child_task->dom);
#endif
return root_task_proxy;
return child_task_proxy;
}
//

View File

@ -98,8 +98,6 @@ class lock_free_queue {
}
public:
int32_t list_index;
lock_free_queue() {
// We can only handle 64bit CAS for counted pointers, so this will
// not work with 64bit pointers.

View File

@ -28,10 +28,10 @@ public:
* object inserted in this list must define a "int32_t list_index" member.
*/
template<typename T> class indexed_list {
memory_region &region;
memory_region *region;
array_list<T*> list;
public:
indexed_list(memory_region &region) : region(region) {}
indexed_list(memory_region *region) : region(region) {}
virtual int32_t append(T *value);
virtual bool pop(T **value);
virtual size_t length() {