diff --git a/src/libcore/comm.rs b/src/libcore/comm.rs index 69fd09e618c..1a86ccf3671 100644 --- a/src/libcore/comm.rs +++ b/src/libcore/comm.rs @@ -37,8 +37,7 @@ enum rust_port {} #[abi = "cdecl"] native mod rustrt { - fn rust_port_id_send(t: *sys::type_desc, - target_port: port_id, + fn rust_port_id_send(target_port: port_id, data: T) -> libc::uintptr_t; fn new_port(unit_sz: libc::size_t) -> *rust_port; @@ -114,7 +113,7 @@ whereupon the caller loses access to it. "] fn send(ch: chan, -data: T) { let chan_t(p) = ch; - let res = rustrt::rust_port_id_send(sys::get_type_desc::(), p, data); + let res = rustrt::rust_port_id_send(p, data); if res != 0u unsafe { // Data sent successfully unsafe::forget(data); diff --git a/src/libcore/priv.rs b/src/libcore/priv.rs index ec432a69ef2..b8e0848dc87 100644 --- a/src/libcore/priv.rs +++ b/src/libcore/priv.rs @@ -4,10 +4,14 @@ export chan_from_global_ptr; import compare_and_swap = rustrt::rust_compare_and_swap_ptr; +type rust_port_id = uint; + native mod rustrt { fn rust_compare_and_swap_ptr(address: *libc::uintptr_t, oldval: libc::uintptr_t, newval: libc::uintptr_t) -> bool; + fn rust_task_weaken(ch: rust_port_id); + fn rust_task_unweaken(ch: rust_port_id); } type global_ptr = *libc::uintptr_t; @@ -143,3 +147,83 @@ fn test_from_global_chan2() unsafe { assert winners == 1u; } } + +#[doc = " +Convert the current task to a 'weak' task temporarily + +As a weak task it will not be counted towards the runtime's set +of live tasks. When there are no more outstanding live (non-weak) tasks +the runtime will send an exit message on the provided channel. + +This function is super-unsafe. Do not use. + +# Safety notes + +* Weak tasks must either die on their own or exit upon receipt of + the exit message. Failure to do so will cause the runtime to never + exit +* Tasks must not call `weaken_task` multiple times. This will + break the kernel's accounting of live tasks. +* Weak tasks must not be supervised. A supervised task keeps + a reference to its parent, so the parent will not die. +"] +unsafe fn weaken_task(f: fn(comm::port<()>)) unsafe { + let po = comm::port(); + let ch = comm::chan(po); + rustrt::rust_task_weaken(unsafe::reinterpret_cast(ch)); + let _unweaken = unweaken(ch); + f(po); + + resource unweaken(ch: comm::chan<()>) unsafe { + rustrt::rust_task_unweaken(unsafe::reinterpret_cast(ch)); + } +} + +#[test] +fn test_weaken_task_then_unweaken() unsafe { + task::try {|| + weaken_task {|_po| + } + }; +} + +#[test] +fn test_weaken_task_wait() unsafe { + let builder = task::builder(); + task::unsupervise(builder); + task::run(builder) {|| + weaken_task {|po| + comm::recv(po); + } + } +} + +#[test] +fn test_weaken_task_stress() unsafe { + // Create a bunch of weak tasks + iter::repeat(100u) {|| + task::spawn {|| + weaken_task {|_po| + } + } + let builder = task::builder(); + task::unsupervise(builder); + task::run(builder) {|| + weaken_task {|po| + // Wait for it to tell us to die + comm::recv(po); + } + } + } +} + +#[test] +#[ignore(cfg(target_os = "win32"))] +fn test_weaken_task_fail() unsafe { + let res = task::try {|| + weaken_task {|_po| + fail; + } + }; + assert result::is_failure(res); +} \ No newline at end of file diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 910331fa7c0..f2714c4d7c3 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -669,22 +669,9 @@ get_port_id(rust_port *port) { } extern "C" CDECL uintptr_t -rust_port_id_send(type_desc *t, rust_port_id target_port_id, void *sptr) { - bool sent = false; +rust_port_id_send(rust_port_id target_port_id, void *sptr) { rust_task *task = rust_get_current_task(); - - LOG(task, comm, "rust_port_id*_send port: 0x%" PRIxPTR, - (uintptr_t) target_port_id); - - rust_port *port = task->kernel->get_port_by_id(target_port_id); - if(port) { - port->send(sptr); - port->deref(); - sent = true; - } else { - LOG(task, comm, "didn't get the port"); - } - return (uintptr_t)sent; + return (uintptr_t)task->kernel->send_to_port(target_port_id, sptr); } // This is called by an intrinsic on the Rust stack and must run @@ -782,6 +769,18 @@ rust_compare_and_swap_ptr(intptr_t *address, return sync::compare_and_swap(address, oldval, newval); } +extern "C" CDECL void +rust_task_weaken(rust_port_id chan) { + rust_task *task = rust_get_current_task(); + task->kernel->weaken_task(chan); +} + +extern "C" CDECL void +rust_task_unweaken(rust_port_id chan) { + rust_task *task = rust_get_current_task(); + task->kernel->unweaken_task(chan); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index ff6b0e1056c..21422646f0d 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -1,11 +1,11 @@ - #include "rust_kernel.h" #include "rust_port.h" #include "rust_util.h" #include "rust_scheduler.h" #include "rust_sched_launcher.h" +#include #define KLOG_(...) \ KLOG(this, kern, __VA_ARGS__) @@ -21,6 +21,7 @@ rust_kernel::rust_kernel(rust_env *env) : max_sched_id(0), sched_reaper(this), osmain_driver(NULL), + non_weak_tasks(0), env(env) { // Create the single threaded scheduler that will run on the platform's @@ -286,6 +287,84 @@ rust_kernel::set_exit_status(int code) { } } +void +rust_kernel::register_task() { + KLOG_("Registering task"); + uintptr_t new_non_weak_tasks = sync::increment(non_weak_tasks); + KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); +} + +void +rust_kernel::unregister_task() { + KLOG_("Unregistering task"); + uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks); + KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); + if (new_non_weak_tasks == 0) { + end_weak_tasks(); + } +} + +void +rust_kernel::weaken_task(rust_port_id chan) { + { + scoped_lock with(weak_task_lock); + KLOG_("Weakening task with channel %" PRIdPTR, chan); + weak_task_chans.push_back(chan); + } + uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks); + KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); + if (new_non_weak_tasks == 0) { + end_weak_tasks(); + } +} + +void +rust_kernel::unweaken_task(rust_port_id chan) { + uintptr_t new_non_weak_tasks = sync::increment(non_weak_tasks); + KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); + { + scoped_lock with(weak_task_lock); + KLOG_("Unweakening task with channel %" PRIdPTR, chan); + std::vector::iterator iter = + std::find(weak_task_chans.begin(), weak_task_chans.end(), chan); + if (iter != weak_task_chans.end()) { + weak_task_chans.erase(iter); + } + } +} + +void +rust_kernel::end_weak_tasks() { + std::vector chancopies; + { + //scoped_lock with(weak_task_lock); + chancopies = weak_task_chans; + weak_task_chans.clear(); + } + while (!chancopies.empty()) { + rust_port_id chan = chancopies.back(); + chancopies.pop_back(); + KLOG_("Notifying weak task " PRIdPTR, chan); + uintptr_t token = 0; + send_to_port(chan, &token); + } +} + +bool +rust_kernel::send_to_port(rust_port_id chan, void *sptr) { + KLOG_("rust_port_id*_send port: 0x%" PRIxPTR, (uintptr_t) chan); + + rust_port *port = get_port_by_id(chan); + if(port) { + port->send(sptr); + port->deref(); + return true; + } else { + KLOG_("didn't get the port"); + return false; + } +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 6a7a7070c20..1ccd423928d 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -63,7 +63,15 @@ class rust_kernel { // on the main thread rust_sched_driver *osmain_driver; + // An atomically updated count of the live, 'non-weak' tasks + uintptr_t non_weak_tasks; + // Protects weak_task_chans + lock_and_signal weak_task_lock; + // A list of weak tasks that need to be told when to exit + std::vector weak_task_chans; + rust_scheduler* get_scheduler_by_id_nolock(rust_sched_id id); + void end_weak_tasks(); public: struct rust_env *env; @@ -102,6 +110,13 @@ public: void set_exit_status(int code); rust_sched_id osmain_sched_id() { return osmain_scheduler; } + + void register_task(); + void unregister_task(); + void weaken_task(rust_port_id chan); + void unweaken_task(rust_port_id chan); + + bool send_to_port(rust_port_id chan, void *sptr); }; template struct kernel_owned { diff --git a/src/rt/rust_scheduler.cpp b/src/rt/rust_scheduler.cpp index 1b5978b5cfe..1f0b16a1d58 100644 --- a/src/rt/rust_scheduler.cpp +++ b/src/rt/rust_scheduler.cpp @@ -92,6 +92,7 @@ rust_scheduler::create_task(rust_task *spawner, const char *name) { if (cur_thread >= num_threads) cur_thread = 0; } + kernel->register_task(); rust_sched_launcher *thread = threads[thread_no]; return thread->get_loop()->create_task(spawner, name); } @@ -106,6 +107,7 @@ rust_scheduler::release_task() { need_exit = true; } } + kernel->unregister_task(); if (need_exit) { exit(); } diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index ec4f5bedb24..171718b64f2 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -54,6 +54,8 @@ rust_task_yield rust_task_is_unwinding rust_get_task rust_task_config_notify +rust_task_weaken +rust_task_unweaken sched_threads shape_log_str start_task