Merge remote-tracking branch 'upstream/io' into io

Conflicts:
	src/libstd/rt/test.rs
	src/rt/rustrt.def.in
This commit is contained in:
Eric Reed 2013-07-02 16:55:56 -07:00
commit 6a1a7819c9
14 changed files with 405 additions and 565 deletions

View File

@ -20,7 +20,8 @@ use cast;
use util;
use ops::Drop;
use kinds::Owned;
use rt::sched::{Scheduler, Coroutine};
use rt::sched::{Scheduler};
use rt::task::Task;
use rt::local::Local;
use unstable::atomics::{AtomicUint, AtomicOption, SeqCst};
use unstable::sync::UnsafeAtomicRcBox;
@ -136,7 +137,7 @@ impl<T> ChanOne<T> {
}
task_as_state => {
// Port is blocked. Wake it up.
let recvr: ~Coroutine = cast::transmute(task_as_state);
let recvr: ~Task = cast::transmute(task_as_state);
let mut sched = Local::take::<Scheduler>();
rtdebug!("rendezvous send");
sched.metrics.rendezvous_sends += 1;
@ -192,7 +193,7 @@ impl<T> PortOne<T> {
// NB: We have to drop back into the scheduler event loop here
// instead of switching immediately back or we could end up
// triggering infinite recursion on the scheduler's stack.
let task: ~Coroutine = cast::transmute(task_as_state);
let task: ~Task = cast::transmute(task_as_state);
sched.enqueue_task(task);
}
_ => util::unreachable()
@ -257,7 +258,7 @@ impl<T> Drop for ChanOneHack<T> {
task_as_state => {
// The port is blocked waiting for a message we will never send. Wake it.
assert!((*this.packet()).payload.is_none());
let recvr: ~Coroutine = cast::transmute(task_as_state);
let recvr: ~Task = cast::transmute(task_as_state);
let sched = Local::take::<Scheduler>();
sched.schedule_task(recvr);
}
@ -554,6 +555,8 @@ mod test {
{ let _c = chan; }
port.recv();
};
// What is our res?
rtdebug!("res is: %?", res.is_err());
assert!(res.is_err());
}
}
@ -905,4 +908,5 @@ mod test {
}
}
}
}

View File

@ -643,3 +643,4 @@ mod test {
}
}
}

View File

@ -13,6 +13,7 @@ use rt::sched::Scheduler;
use rt::task::Task;
use rt::local_ptr;
use rt::rtio::{EventLoop, IoFactoryObject};
//use borrow::to_uint;
pub trait Local {
fn put(value: ~Self);
@ -32,6 +33,7 @@ impl Local for Scheduler {
let res_ptr: *mut Option<T> = &mut res;
unsafe {
do local_ptr::borrow |sched| {
// rtdebug!("successfully unsafe borrowed sched pointer");
let result = f(sched);
*res_ptr = Some(result);
}
@ -51,9 +53,12 @@ impl Local for Task {
fn exists() -> bool { rtabort!("unimpl") }
fn borrow<T>(f: &fn(&mut Task) -> T) -> T {
do Local::borrow::<Scheduler, T> |sched| {
// rtdebug!("sched about to grab current_task");
match sched.current_task {
Some(~ref mut task) => {
f(&mut *task.task)
// rtdebug!("current task pointer: %x", to_uint(task));
// rtdebug!("current task heap pointer: %x", to_uint(&task.heap));
f(task)
}
None => {
rtabort!("no scheduler")
@ -64,7 +69,7 @@ impl Local for Task {
unsafe fn unsafe_borrow() -> *mut Task {
match (*Local::unsafe_borrow::<Scheduler>()).current_task {
Some(~ref mut task) => {
let s: *mut Task = &mut *task.task;
let s: *mut Task = &mut *task;
return s;
}
None => {

View File

@ -67,7 +67,7 @@ use iter::Times;
use iterator::IteratorUtil;
use option::Some;
use ptr::RawPtr;
use rt::sched::{Scheduler, Coroutine, Shutdown};
use rt::sched::{Scheduler, Shutdown};
use rt::sleeper_list::SleeperList;
use rt::task::Task;
use rt::thread::Thread;
@ -268,10 +268,9 @@ pub fn run(main: ~fn()) -> int {
// Create and enqueue the main task.
let main_cell = Cell::new(main);
let mut new_task = ~Task::new_root();
new_task.on_exit = Some(on_exit);
let main_task = ~Coroutine::with_task(&mut scheds[0].stack_pool,
new_task, main_cell.take());
let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool,
main_cell.take());
main_task.on_exit = Some(on_exit);
scheds[0].enqueue_task(main_task);
// Run each scheduler in a thread.
@ -348,7 +347,7 @@ pub fn context() -> RuntimeContext {
#[test]
fn test_context() {
use unstable::run_in_bare_thread;
use self::sched::{Scheduler, Coroutine};
use self::sched::{Scheduler};
use rt::local::Local;
use rt::test::new_test_uv_sched;
@ -356,7 +355,7 @@ fn test_context() {
do run_in_bare_thread {
assert_eq!(context(), GlobalContext);
let mut sched = ~new_test_uv_sched();
let task = ~do Coroutine::new_root(&mut sched.stack_pool) {
let task = ~do Task::new_root(&mut sched.stack_pool) {
assert_eq!(context(), TaskContext);
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then() |sched, task| {

View File

@ -11,24 +11,19 @@
use option::*;
use sys;
use cast::transmute;
use cell::Cell;
use clone::Clone;
use super::sleeper_list::SleeperList;
use super::work_queue::WorkQueue;
use super::stack::{StackPool, StackSegment};
use super::stack::{StackPool};
use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject};
use super::context::Context;
use super::task::Task;
use super::task::{Task, AnySched, Sched};
use super::message_queue::MessageQueue;
use rt::local_ptr;
use rt::local::Local;
use rt::rtio::RemoteCallback;
use rt::metrics::SchedMetrics;
//use to_str::ToStr;
/// To allow for using pointers as scheduler ids
use borrow::{to_uint};
/// The Scheduler is responsible for coordinating execution of Coroutines
@ -41,7 +36,7 @@ use borrow::{to_uint};
pub struct Scheduler {
/// A queue of available work. Under a work-stealing policy there
/// is one per Scheduler.
priv work_queue: WorkQueue<~Coroutine>,
priv work_queue: WorkQueue<~Task>,
/// The queue of incoming messages from other schedulers.
/// These are enqueued by SchedHandles after which a remote callback
/// is triggered to handle the message.
@ -66,7 +61,7 @@ pub struct Scheduler {
/// Always valid when a task is executing, otherwise not
priv saved_context: Context,
/// The currently executing task
current_task: Option<~Coroutine>,
current_task: Option<~Task>,
/// An action performed after a context switch on behalf of the
/// code running before the context switch
priv cleanup_job: Option<CleanupJob>,
@ -81,33 +76,15 @@ pub struct SchedHandle {
sched_id: uint
}
pub struct Coroutine {
/// The segment of stack on which the task is currently running or,
/// if the task is blocked, on which the task will resume execution
priv current_stack_segment: StackSegment,
/// These are always valid when the task is not running, unless
/// the task is dead
priv saved_context: Context,
/// The heap, GC, unwinding, local storage, logging
task: ~Task,
}
// A scheduler home is either a handle to the home scheduler, or an
// explicit "AnySched".
pub enum SchedHome {
AnySched,
Sched(SchedHandle)
}
pub enum SchedMessage {
Wake,
Shutdown,
PinnedTask(~Coroutine)
PinnedTask(~Task)
}
enum CleanupJob {
DoNothing,
GiveTask(~Coroutine, UnsafeTaskReceiver)
GiveTask(~Task, UnsafeTaskReceiver)
}
impl Scheduler {
@ -116,7 +93,7 @@ impl Scheduler {
pub fn sched_id(&self) -> uint { to_uint(self) }
pub fn new(event_loop: ~EventLoopObject,
work_queue: WorkQueue<~Coroutine>,
work_queue: WorkQueue<~Task>,
sleeper_list: SleeperList)
-> Scheduler {
@ -125,7 +102,7 @@ impl Scheduler {
}
pub fn new_special(event_loop: ~EventLoopObject,
work_queue: WorkQueue<~Coroutine>,
work_queue: WorkQueue<~Task>,
sleeper_list: SleeperList,
run_anything: bool)
-> Scheduler {
@ -177,7 +154,7 @@ impl Scheduler {
rtdebug!("run taking sched");
let sched = Local::take::<Scheduler>();
// XXX: Reenable this once we're using a per-task queue. With a shared
// XXX: Reenable this once we're using a per-scheduler queue. With a shared
// queue this is not true
//assert!(sched.work_queue.is_empty());
rtdebug!("scheduler metrics: %s\n", {
@ -213,10 +190,10 @@ impl Scheduler {
if sched.resume_task_from_queue() {
// We performed a scheduling action. There may be other work
// to do yet, so let's try again later.
let mut sched = Local::take::<Scheduler>();
sched.metrics.tasks_resumed_from_queue += 1;
sched.event_loop.callback(Scheduler::run_sched_once);
Local::put(sched);
do Local::borrow::<Scheduler, ()> |sched| {
sched.metrics.tasks_resumed_from_queue += 1;
sched.event_loop.callback(Scheduler::run_sched_once);
}
return;
}
@ -224,18 +201,18 @@ impl Scheduler {
// Generate a SchedHandle and push it to the sleeper list so
// somebody can wake us up later.
rtdebug!("no work to do");
let mut sched = Local::take::<Scheduler>();
sched.metrics.wasted_turns += 1;
if !sched.sleepy && !sched.no_sleep {
rtdebug!("sleeping");
sched.metrics.sleepy_times += 1;
sched.sleepy = true;
let handle = sched.make_handle();
sched.sleeper_list.push(handle);
} else {
rtdebug!("not sleeping");
do Local::borrow::<Scheduler, ()> |sched| {
sched.metrics.wasted_turns += 1;
if !sched.sleepy && !sched.no_sleep {
rtdebug!("sleeping");
sched.metrics.sleepy_times += 1;
sched.sleepy = true;
let handle = sched.make_handle();
sched.sleeper_list.push(handle);
} else {
rtdebug!("not sleeping");
}
}
Local::put(sched);
}
pub fn make_handle(&mut self) -> SchedHandle {
@ -253,7 +230,7 @@ impl Scheduler {
/// Pushes the task onto the work stealing queue and tells the
/// event loop to run it later. Always use this instead of pushing
/// to the work queue directly.
pub fn enqueue_task(&mut self, task: ~Coroutine) {
pub fn enqueue_task(&mut self, task: ~Task) {
// We don't want to queue tasks that belong on other threads,
// so we send them home at enqueue time.
@ -307,7 +284,7 @@ impl Scheduler {
rtdebug!("recv BiasedTask message in sched: %u",
this.sched_id());
let mut task = task;
task.task.home = Some(Sched(this.make_handle()));
task.home = Some(Sched(this.make_handle()));
this.resume_task_immediately(task);
return true;
}
@ -349,9 +326,9 @@ impl Scheduler {
}
/// Given an input Coroutine sends it back to its home scheduler.
fn send_task_home(task: ~Coroutine) {
fn send_task_home(task: ~Task) {
let mut task = task;
let mut home = task.task.home.swap_unwrap();
let mut home = task.home.swap_unwrap();
match home {
Sched(ref mut home_handle) => {
home_handle.send(PinnedTask(task));
@ -377,7 +354,7 @@ impl Scheduler {
match this.work_queue.pop() {
Some(task) => {
let action_id = {
let home = &task.task.home;
let home = &task.home;
match home {
&Some(Sched(ref home_handle))
if home_handle.sched_id != this.sched_id() => {
@ -440,14 +417,15 @@ impl Scheduler {
rtdebug!("ending running task");
do self.deschedule_running_task_and_then |sched, dead_task| {
let dead_task = Cell::new(dead_task);
dead_task.take().recycle(&mut sched.stack_pool);
let mut dead_task = dead_task;
let coroutine = dead_task.coroutine.swap_unwrap();
coroutine.recycle(&mut sched.stack_pool);
}
rtabort!("control reached end of task");
}
pub fn schedule_task(~self, task: ~Coroutine) {
pub fn schedule_task(~self, task: ~Task) {
assert!(self.in_task_context());
// is the task home?
@ -462,8 +440,7 @@ impl Scheduler {
// here we know we are home, execute now OR we know we
// aren't homed, and that this sched doesn't care
do this.switch_running_tasks_and_then(task) |sched, last_task| {
let last_task = Cell::new(last_task);
sched.enqueue_task(last_task.take());
sched.enqueue_task(last_task);
}
} else if !homed && !this.run_anything {
// the task isn't homed, but it can't be run here
@ -478,7 +455,7 @@ impl Scheduler {
// Core scheduling ops
pub fn resume_task_immediately(~self, task: ~Coroutine) {
pub fn resume_task_immediately(~self, task: ~Task) {
let mut this = self;
assert!(!this.in_task_context());
@ -521,7 +498,7 @@ impl Scheduler {
/// This passes a Scheduler pointer to the fn after the context switch
/// in order to prevent that fn from performing further scheduling operations.
/// Doing further scheduling could easily result in infinite recursion.
pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Coroutine)) {
pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Task)) {
let mut this = self;
assert!(this.in_task_context());
@ -530,8 +507,8 @@ impl Scheduler {
unsafe {
let blocked_task = this.current_task.swap_unwrap();
let f_fake_region = transmute::<&fn(&mut Scheduler, ~Coroutine),
&fn(&mut Scheduler, ~Coroutine)>(f);
let f_fake_region = transmute::<&fn(&mut Scheduler, ~Task),
&fn(&mut Scheduler, ~Task)>(f);
let f_opaque = ClosureConverter::from_fn(f_fake_region);
this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
}
@ -553,8 +530,8 @@ impl Scheduler {
/// Switch directly to another task, without going through the scheduler.
/// You would want to think hard about doing this, e.g. if there are
/// pending I/O events it would be a bad idea.
pub fn switch_running_tasks_and_then(~self, next_task: ~Coroutine,
f: &fn(&mut Scheduler, ~Coroutine)) {
pub fn switch_running_tasks_and_then(~self, next_task: ~Task,
f: &fn(&mut Scheduler, ~Task)) {
let mut this = self;
assert!(this.in_task_context());
@ -563,8 +540,8 @@ impl Scheduler {
let old_running_task = this.current_task.swap_unwrap();
let f_fake_region = unsafe {
transmute::<&fn(&mut Scheduler, ~Coroutine),
&fn(&mut Scheduler, ~Coroutine)>(f)
transmute::<&fn(&mut Scheduler, ~Task),
&fn(&mut Scheduler, ~Task)>(f)
};
let f_opaque = ClosureConverter::from_fn(f_fake_region);
this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque));
@ -631,12 +608,22 @@ impl Scheduler {
// because borrowck thinks the three patterns are conflicting
// borrows
unsafe {
let last_task = transmute::<Option<&Coroutine>, Option<&mut Coroutine>>(last_task);
let last_task = transmute::<Option<&Task>, Option<&mut Task>>(last_task);
let last_task_context = match last_task {
Some(t) => Some(&mut t.saved_context), None => None
Some(t) => {
Some(&mut t.coroutine.get_mut_ref().saved_context)
}
None => {
None
}
};
let next_task_context = match self.current_task {
Some(ref mut t) => Some(&mut t.saved_context), None => None
Some(ref mut t) => {
Some(&mut t.coroutine.get_mut_ref().saved_context)
}
None => {
None
}
};
// XXX: These transmutes can be removed after snapshot
return (transmute(&mut self.saved_context),
@ -661,186 +648,34 @@ impl SchedHandle {
}
}
impl Coroutine {
/// This function checks that a coroutine is running "home".
pub fn is_home(&self) -> bool {
rtdebug!("checking if coroutine is home");
do Local::borrow::<Scheduler,bool> |sched| {
match self.task.home {
Some(AnySched) => { false }
Some(Sched(SchedHandle { sched_id: ref id, _ })) => {
*id == sched.sched_id()
}
None => { rtabort!("error: homeless task!"); }
}
}
}
/// Without access to self, but with access to the "expected home
/// id", see if we are home.
fn is_home_using_id(id: uint) -> bool {
rtdebug!("checking if coroutine is home using id");
do Local::borrow::<Scheduler,bool> |sched| {
if sched.sched_id() == id {
true
} else {
false
}
}
}
/// Check if this coroutine has a home
fn homed(&self) -> bool {
rtdebug!("checking if this coroutine has a home");
match self.task.home {
Some(AnySched) => { false }
Some(Sched(_)) => { true }
None => { rtabort!("error: homeless task!");
}
}
}
/// A version of is_home that does not need to use TLS, it instead
/// takes local scheduler as a parameter.
fn is_home_no_tls(&self, sched: &~Scheduler) -> bool {
rtdebug!("checking if coroutine is home without tls");
match self.task.home {
Some(AnySched) => { true }
Some(Sched(SchedHandle { sched_id: ref id, _})) => {
*id == sched.sched_id()
}
None => { rtabort!("error: homeless task!"); }
}
}
/// Check TLS for the scheduler to see if we are on a special
/// scheduler.
pub fn on_special() -> bool {
rtdebug!("checking if coroutine is executing on special sched");
do Local::borrow::<Scheduler,bool>() |sched| {
!sched.run_anything
}
}
// Created new variants of "new" that takes a home scheduler
// parameter. The original with_task now calls with_task_homed
// using the AnySched paramter.
pub fn new_homed(stack_pool: &mut StackPool, home: SchedHome, start: ~fn()) -> Coroutine {
Coroutine::with_task_homed(stack_pool, ~Task::new_root(), start, home)
}
pub fn new_root(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine {
Coroutine::with_task(stack_pool, ~Task::new_root(), start)
}
pub fn with_task_homed(stack_pool: &mut StackPool,
task: ~Task,
start: ~fn(),
home: SchedHome) -> Coroutine {
static MIN_STACK_SIZE: uint = 1000000; // XXX: Too much stack
let start = Coroutine::build_start_wrapper(start);
let mut stack = stack_pool.take_segment(MIN_STACK_SIZE);
// NB: Context holds a pointer to that ~fn
let initial_context = Context::new(start, &mut stack);
let mut crt = Coroutine {
current_stack_segment: stack,
saved_context: initial_context,
task: task,
};
crt.task.home = Some(home);
return crt;
}
pub fn with_task(stack_pool: &mut StackPool,
task: ~Task,
start: ~fn()) -> Coroutine {
Coroutine::with_task_homed(stack_pool,
task,
start,
AnySched)
}
fn build_start_wrapper(start: ~fn()) -> ~fn() {
// XXX: The old code didn't have this extra allocation
let start_cell = Cell::new(start);
let wrapper: ~fn() = || {
// This is the first code to execute after the initial
// context switch to the task. The previous context may
// have asked us to do some cleanup.
unsafe {
let sched = Local::unsafe_borrow::<Scheduler>();
(*sched).run_cleanup_job();
let sched = Local::unsafe_borrow::<Scheduler>();
let task = (*sched).current_task.get_mut_ref();
// FIXME #6141: shouldn't neet to put `start()` in
// another closure
let start_cell = Cell::new(start_cell.take());
do task.task.run {
// N.B. Removing `start` from the start wrapper
// closure by emptying a cell is critical for
// correctness. The ~Task pointer, and in turn the
// closure used to initialize the first call
// frame, is destroyed in scheduler context, not
// task context. So any captured closures must
// not contain user-definable dtors that expect to
// be in task context. By moving `start` out of
// the closure, all the user code goes out of
// scope while the task is still running.
let start = start_cell.take();
start();
};
}
let sched = Local::take::<Scheduler>();
sched.terminate_current_task();
};
return wrapper;
}
/// Destroy the task and try to reuse its components
pub fn recycle(~self, stack_pool: &mut StackPool) {
match self {
~Coroutine {current_stack_segment, _} => {
stack_pool.give_segment(current_stack_segment);
}
}
}
}
// XXX: Some hacks to put a &fn in Scheduler without borrowck
// complaining
type UnsafeTaskReceiver = sys::Closure;
trait ClosureConverter {
fn from_fn(&fn(&mut Scheduler, ~Coroutine)) -> Self;
fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine);
fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self;
fn to_fn(self) -> &fn(&mut Scheduler, ~Task);
}
impl ClosureConverter for UnsafeTaskReceiver {
fn from_fn(f: &fn(&mut Scheduler, ~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine) { unsafe { transmute(self) } }
fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } }
}
#[cfg(test)]
mod test {
use int;
use cell::Cell;
use iterator::IteratorUtil;
use unstable::run_in_bare_thread;
use task::spawn;
use rt::local::Local;
use rt::test::*;
use super::*;
use rt::thread::Thread;
use ptr::to_uint;
use vec::MutableVector;
use borrow::to_uint;
use rt::task::{Task,Sched};
// Confirm that a sched_id actually is the uint form of the
// pointer to the scheduler struct.
#[test]
fn simple_sched_id_test() {
do run_in_bare_thread {
@ -851,7 +686,6 @@ mod test {
// Compare two scheduler ids that are different, this should never
// fail but may catch a mistake someday.
#[test]
fn compare_sched_id_test() {
do run_in_bare_thread {
@ -863,7 +697,6 @@ mod test {
// A simple test to check if a homed task run on a single
// scheduler ends up executing while home.
#[test]
fn test_home_sched() {
do run_in_bare_thread {
@ -874,8 +707,8 @@ mod test {
let sched_handle = sched.make_handle();
let sched_id = sched.sched_id();
let task = ~do Coroutine::new_homed(&mut sched.stack_pool,
Sched(sched_handle)) {
let task = ~do Task::new_root_homed(&mut sched.stack_pool,
Sched(sched_handle)) {
unsafe { *task_ran_ptr = true };
let sched = Local::take::<Scheduler>();
assert!(sched.sched_id() == sched_id);
@ -888,7 +721,6 @@ mod test {
}
// A test for each state of schedule_task
#[test]
fn test_schedule_home_states() {
@ -898,7 +730,6 @@ mod test {
use rt::work_queue::WorkQueue;
do run_in_bare_thread {
// let nthreads = 2;
let sleepers = SleeperList::new();
let work_queue = WorkQueue::new();
@ -924,33 +755,33 @@ mod test {
let t1_handle = special_sched.make_handle();
let t4_handle = special_sched.make_handle();
let t1f = ~do Coroutine::new_homed(&mut special_sched.stack_pool,
Sched(t1_handle)) {
let is_home = Coroutine::is_home_using_id(special_id);
let t1f = ~do Task::new_root_homed(&mut special_sched.stack_pool,
Sched(t1_handle)) || {
let is_home = Task::is_home_using_id(special_id);
rtdebug!("t1 should be home: %b", is_home);
assert!(is_home);
};
let t1f = Cell::new(t1f);
let t2f = ~do Coroutine::new_root(&mut normal_sched.stack_pool) {
let on_special = Coroutine::on_special();
let t2f = ~do Task::new_root(&mut normal_sched.stack_pool) {
let on_special = Task::on_special();
rtdebug!("t2 should not be on special: %b", on_special);
assert!(!on_special);
};
let t2f = Cell::new(t2f);
let t3f = ~do Coroutine::new_root(&mut normal_sched.stack_pool) {
let t3f = ~do Task::new_root(&mut normal_sched.stack_pool) {
// not on special
let on_special = Coroutine::on_special();
let on_special = Task::on_special();
rtdebug!("t3 should not be on special: %b", on_special);
assert!(!on_special);
};
let t3f = Cell::new(t3f);
let t4f = ~do Coroutine::new_homed(&mut special_sched.stack_pool,
Sched(t4_handle)) {
let t4f = ~do Task::new_root_homed(&mut special_sched.stack_pool,
Sched(t4_handle)) {
// is home
let home = Coroutine::is_home_using_id(special_id);
let home = Task::is_home_using_id(special_id);
rtdebug!("t4 should be home: %b", home);
assert!(home);
};
@ -988,7 +819,7 @@ mod test {
let t4 = Cell::new(t4);
// build a main task that runs our four tests
let main_task = ~do Coroutine::new_root(&mut normal_sched.stack_pool) {
let main_task = ~do Task::new_root(&mut normal_sched.stack_pool) {
// the two tasks that require a normal start location
t2.take()();
t4.take()();
@ -997,7 +828,7 @@ mod test {
};
// task to run the two "special start" tests
let special_task = ~do Coroutine::new_homed(
let special_task = ~do Task::new_root_homed(
&mut special_sched.stack_pool,
Sched(special_handle2.take())) {
t1.take()();
@ -1027,91 +858,7 @@ mod test {
}
}
// The following test is a bit of a mess, but it trys to do
// something tricky so I'm not sure how to get around this in the
// short term.
// A number of schedulers are created, and then a task is created
// and assigned a home scheduler. It is then "started" on a
// different scheduler. The scheduler it is started on should
// observe that the task is not home, and send it home.
// This test is light in that it does very little.
#[test]
fn test_transfer_task_home() {
use rt::uv::uvio::UvEventLoop;
use rt::sched::Shutdown;
use rt::sleeper_list::SleeperList;
use rt::work_queue::WorkQueue;
use uint;
use container::Container;
use vec::OwnedVector;
do run_in_bare_thread {
static N: uint = 8;
let sleepers = SleeperList::new();
let work_queue = WorkQueue::new();
let mut handles = ~[];
let mut scheds = ~[];
for uint::range(0, N) |_| {
let loop_ = ~UvEventLoop::new();
let mut sched = ~Scheduler::new(loop_,
work_queue.clone(),
sleepers.clone());
let handle = sched.make_handle();
rtdebug!("sched id: %u", handle.sched_id);
handles.push(handle);
scheds.push(sched);
};
let handles = Cell::new(handles);
let home_handle = scheds[6].make_handle();
let home_id = home_handle.sched_id;
let home = Sched(home_handle);
let main_task = ~do Coroutine::new_homed(&mut scheds[1].stack_pool, home) {
// Here we check if the task is running on its home.
let sched = Local::take::<Scheduler>();
rtdebug!("run location scheduler id: %u, home: %u",
sched.sched_id(),
home_id);
assert!(sched.sched_id() == home_id);
Local::put::<Scheduler>(sched);
let mut handles = handles.take();
for handles.mut_iter().advance |handle| {
handle.send(Shutdown);
}
};
scheds[0].enqueue_task(main_task);
let mut threads = ~[];
while !scheds.is_empty() {
let sched = scheds.pop();
let sched_cell = Cell::new(sched);
let thread = do Thread::start {
let sched = sched_cell.take();
sched.run();
};
threads.push(thread);
}
let _threads = threads;
}
}
// Do it a lot
#[test]
fn test_stress_schedule_task_states() {
let n = stress_factor() * 120;
@ -1120,21 +867,6 @@ mod test {
}
}
// The goal is that this is the high-stress test for making sure
// homing is working. It allocates RUST_RT_STRESS tasks that
// do nothing but assert that they are home at execution
// time. These tasks are queued to random schedulers, so sometimes
// they are home and sometimes not. It also runs RUST_RT_STRESS
// times.
#[test]
fn test_stress_homed_tasks() {
let n = stress_factor();
for int::range(0,n as int) |_| {
run_in_mt_newsched_task_random_homed();
}
}
#[test]
fn test_simple_scheduling() {
do run_in_bare_thread {
@ -1142,7 +874,7 @@ mod test {
let task_ran_ptr: *mut bool = &mut task_ran;
let mut sched = ~new_test_uv_sched();
let task = ~do Coroutine::new_root(&mut sched.stack_pool) {
let task = ~do Task::new_root(&mut sched.stack_pool) {
unsafe { *task_ran_ptr = true; }
};
sched.enqueue_task(task);
@ -1160,7 +892,7 @@ mod test {
let mut sched = ~new_test_uv_sched();
for int::range(0, total) |_| {
let task = ~do Coroutine::new_root(&mut sched.stack_pool) {
let task = ~do Task::new_root(&mut sched.stack_pool) {
unsafe { *task_count_ptr = *task_count_ptr + 1; }
};
sched.enqueue_task(task);
@ -1177,10 +909,10 @@ mod test {
let count_ptr: *mut int = &mut count;
let mut sched = ~new_test_uv_sched();
let task1 = ~do Coroutine::new_root(&mut sched.stack_pool) {
let task1 = ~do Task::new_root(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; }
let mut sched = Local::take::<Scheduler>();
let task2 = ~do Coroutine::new_root(&mut sched.stack_pool) {
let task2 = ~do Task::new_root(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; }
};
// Context switch directly to the new task
@ -1205,7 +937,7 @@ mod test {
let mut sched = ~new_test_uv_sched();
let start_task = ~do Coroutine::new_root(&mut sched.stack_pool) {
let start_task = ~do Task::new_root(&mut sched.stack_pool) {
run_task(count_ptr);
};
sched.enqueue_task(start_task);
@ -1215,7 +947,7 @@ mod test {
fn run_task(count_ptr: *mut int) {
do Local::borrow::<Scheduler, ()> |sched| {
let task = ~do Coroutine::new_root(&mut sched.stack_pool) {
let task = ~do Task::new_root(&mut sched.stack_pool) {
unsafe {
*count_ptr = *count_ptr + 1;
if *count_ptr != MAX {
@ -1233,7 +965,7 @@ mod test {
fn test_block_task() {
do run_in_bare_thread {
let mut sched = ~new_test_uv_sched();
let task = ~do Coroutine::new_root(&mut sched.stack_pool) {
let task = ~do Task::new_root(&mut sched.stack_pool) {
let sched = Local::take::<Scheduler>();
assert!(sched.in_task_context());
do sched.deschedule_running_task_and_then() |sched, task| {
@ -1280,13 +1012,13 @@ mod test {
let mut sched1 = ~new_test_uv_sched();
let handle1 = sched1.make_handle();
let handle1_cell = Cell::new(handle1);
let task1 = ~do Coroutine::new_root(&mut sched1.stack_pool) {
let task1 = ~do Task::new_root(&mut sched1.stack_pool) {
chan_cell.take().send(());
};
sched1.enqueue_task(task1);
let mut sched2 = ~new_test_uv_sched();
let task2 = ~do Coroutine::new_root(&mut sched2.stack_pool) {
let task2 = ~do Task::new_root(&mut sched2.stack_pool) {
port_cell.take().recv();
// Release the other scheduler's handle so it can exit
handle1_cell.take();
@ -1383,7 +1115,6 @@ mod test {
}
}
}
}
#[test]
@ -1408,5 +1139,4 @@ mod test {
}
}
}
}

View File

@ -23,8 +23,11 @@ use option::{Option, Some, None};
use rt::local::Local;
use rt::logging::StdErrLogger;
use super::local_heap::LocalHeap;
use rt::sched::{SchedHome, AnySched};
use rt::sched::{Scheduler, SchedHandle};
use rt::join_latch::JoinLatch;
use rt::stack::{StackSegment, StackPool};
use rt::context::Context;
use cell::Cell;
pub struct Task {
heap: LocalHeap,
@ -35,7 +38,22 @@ pub struct Task {
home: Option<SchedHome>,
join_latch: Option<~JoinLatch>,
on_exit: Option<~fn(bool)>,
destroyed: bool
destroyed: bool,
coroutine: Option<~Coroutine>
}
pub struct Coroutine {
/// The segment of stack on which the task is currently running or
/// if the task is blocked, on which the task will resume
/// execution.
priv current_stack_segment: StackSegment,
/// Always valid if the task is alive and not running.
saved_context: Context
}
pub enum SchedHome {
AnySched,
Sched(SchedHandle)
}
pub struct GarbageCollector;
@ -46,31 +64,50 @@ pub struct Unwinder {
}
impl Task {
pub fn new_root() -> Task {
pub fn new_root(stack_pool: &mut StackPool,
start: ~fn()) -> Task {
Task::new_root_homed(stack_pool, AnySched, start)
}
pub fn new_child(&mut self,
stack_pool: &mut StackPool,
start: ~fn()) -> Task {
self.new_child_homed(stack_pool, AnySched, start)
}
pub fn new_root_homed(stack_pool: &mut StackPool,
home: SchedHome,
start: ~fn()) -> Task {
Task {
heap: LocalHeap::new(),
gc: GarbageCollector,
storage: LocalStorage(ptr::null(), None),
logger: StdErrLogger,
unwinder: Unwinder { unwinding: false },
home: Some(AnySched),
home: Some(home),
join_latch: Some(JoinLatch::new_root()),
on_exit: None,
destroyed: false
destroyed: false,
coroutine: Some(~Coroutine::new(stack_pool, start))
}
}
pub fn new_child(&mut self) -> Task {
pub fn new_child_homed(&mut self,
stack_pool: &mut StackPool,
home: SchedHome,
start: ~fn()) -> Task {
Task {
heap: LocalHeap::new(),
gc: GarbageCollector,
storage: LocalStorage(ptr::null(), None),
logger: StdErrLogger,
home: Some(AnySched),
home: Some(home),
unwinder: Unwinder { unwinding: false },
join_latch: Some(self.join_latch.get_mut_ref().new_child()),
on_exit: None,
destroyed: false
destroyed: false,
coroutine: Some(~Coroutine::new(stack_pool, start))
}
}
@ -108,11 +145,11 @@ impl Task {
/// called unsafely, without removing Task from
/// thread-local-storage.
fn destroy(&mut self) {
// This is just an assertion that `destroy` was called unsafely
// and this instance of Task is still accessible.
do Local::borrow::<Task, ()> |task| {
assert!(borrow::ref_eq(task, self));
}
match self.storage {
LocalStorage(ptr, Some(ref dtor)) => {
(*dtor)(ptr)
@ -125,12 +162,129 @@ impl Task {
self.destroyed = true;
}
/// Check if *task* is currently home.
pub fn is_home(&self) -> bool {
do Local::borrow::<Scheduler,bool> |sched| {
match self.home {
Some(AnySched) => { false }
Some(Sched(SchedHandle { sched_id: ref id, _ })) => {
*id == sched.sched_id()
}
None => { rtabort!("task home of None") }
}
}
}
pub fn is_home_no_tls(&self, sched: &~Scheduler) -> bool {
match self.home {
Some(AnySched) => { false }
Some(Sched(SchedHandle { sched_id: ref id, _ })) => {
*id == sched.sched_id()
}
None => {rtabort!("task home of None") }
}
}
pub fn is_home_using_id(sched_id: uint) -> bool {
do Local::borrow::<Task,bool> |task| {
match task.home {
Some(Sched(SchedHandle { sched_id: ref id, _ })) => {
*id == sched_id
}
Some(AnySched) => { false }
None => { rtabort!("task home of None") }
}
}
}
/// Check if this *task* has a home.
pub fn homed(&self) -> bool {
match self.home {
Some(AnySched) => { false }
Some(Sched(_)) => { true }
None => {
rtabort!("task home of None")
}
}
}
/// On a special scheduler?
pub fn on_special() -> bool {
do Local::borrow::<Scheduler,bool> |sched| {
!sched.run_anything
}
}
}
impl Drop for Task {
fn finalize(&self) { assert!(self.destroyed) }
}
// Coroutines represent nothing more than a context and a stack
// segment.
impl Coroutine {
pub fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine {
static MIN_STACK_SIZE: uint = 100000; // XXX: Too much stack
let start = Coroutine::build_start_wrapper(start);
let mut stack = stack_pool.take_segment(MIN_STACK_SIZE);
let initial_context = Context::new(start, &mut stack);
Coroutine {
current_stack_segment: stack,
saved_context: initial_context
}
}
fn build_start_wrapper(start: ~fn()) -> ~fn() {
let start_cell = Cell::new(start);
let wrapper: ~fn() = || {
// First code after swap to this new context. Run our
// cleanup job.
unsafe {
let sched = Local::unsafe_borrow::<Scheduler>();
(*sched).run_cleanup_job();
let sched = Local::unsafe_borrow::<Scheduler>();
let task = (*sched).current_task.get_mut_ref();
do task.run {
// N.B. Removing `start` from the start wrapper
// closure by emptying a cell is critical for
// correctness. The ~Task pointer, and in turn the
// closure used to initialize the first call
// frame, is destroyed in the scheduler context,
// not task context. So any captured closures must
// not contain user-definable dtors that expect to
// be in task context. By moving `start` out of
// the closure, all the user code goes our of
// scope while the task is still running.
let start = start_cell.take();
start();
};
}
let sched = Local::take::<Scheduler>();
sched.terminate_current_task();
};
return wrapper;
}
/// Destroy coroutine and try to reuse stack segment.
pub fn recycle(~self, stack_pool: &mut StackPool) {
match self {
~Coroutine { current_stack_segment, _ } => {
stack_pool.give_segment(current_stack_segment);
}
}
}
}
// Just a sanity check to make sure we are catching a Rust-thrown exception
static UNWIND_TOKEN: uintptr_t = 839147;
@ -209,8 +363,10 @@ mod test {
fn unwind() {
do run_in_newsched_task() {
let result = spawntask_try(||());
rtdebug!("trying first assert");
assert!(result.is_ok());
let result = spawntask_try(|| fail!());
rtdebug!("trying second assert");
assert!(result.is_err());
}
}

View File

@ -15,23 +15,24 @@ use clone::Clone;
use container::Container;
use iterator::IteratorUtil;
use vec::{OwnedVector, MutableVector};
use result::{Result, Ok, Err};
use unstable::run_in_bare_thread;
use super::io::net::ip::{IpAddr, Ipv4, Ipv6};
use rt::comm::oneshot;
use rt::task::Task;
use rt::thread::Thread;
use rt::sched::Scheduler;
use rt::local::Local;
use rt::sched::{Scheduler, Coroutine};
use rt::sleeper_list::SleeperList;
use unstable::run_in_bare_thread;
use rt::thread::Thread;
use rt::task::Task;
use rt::uv::uvio::UvEventLoop;
use rt::work_queue::WorkQueue;
use rt::sleeper_list::SleeperList;
use rt::task::{Sched};
use rt::comm::oneshot;
use result::{Result, Ok, Err};
pub fn new_test_uv_sched() -> Scheduler {
use rt::uv::uvio::UvEventLoop;
use rt::work_queue::WorkQueue;
use rt::sleeper_list::SleeperList;
let mut sched = Scheduler::new(~UvEventLoop::new(), WorkQueue::new(), SleeperList::new());
let mut sched = Scheduler::new(~UvEventLoop::new(),
WorkQueue::new(),
SleeperList::new());
// Don't wait for the Shutdown message
sched.no_sleep = true;
return sched;
@ -41,19 +42,15 @@ pub fn new_test_uv_sched() -> Scheduler {
/// then waits for the scheduler to exit. Failure of the task
/// will abort the process.
pub fn run_in_newsched_task(f: ~fn()) {
use super::sched::*;
use unstable::run_in_bare_thread;
let f = Cell::new(f);
do run_in_bare_thread {
let mut sched = ~new_test_uv_sched();
let mut new_task = ~Task::new_root();
let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status);
new_task.on_exit = Some(on_exit);
let task = ~Coroutine::with_task(&mut sched.stack_pool,
new_task,
f.take());
let mut task = ~Task::new_root(&mut sched.stack_pool,
f.take());
rtdebug!("newsched_task: %x", to_uint(task));
task.on_exit = Some(on_exit);
sched.enqueue_task(task);
sched.run();
}
@ -65,7 +62,6 @@ pub fn run_in_newsched_task(f: ~fn()) {
pub fn run_in_mt_newsched_task(f: ~fn()) {
use os;
use from_str::FromStr;
use rt::uv::uvio::UvEventLoop;
use rt::sched::Shutdown;
use rt::util;
@ -90,7 +86,9 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
for uint::range(0, nthreads) |_| {
let loop_ = ~UvEventLoop::new();
let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone());
let mut sched = ~Scheduler::new(loop_,
work_queue.clone(),
sleepers.clone());
let handle = sched.make_handle();
handles.push(handle);
@ -99,9 +97,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
let f_cell = Cell::new(f_cell.take());
let handles = Cell::new(handles);
let mut new_task = ~Task::new_root();
let on_exit: ~fn(bool) = |exit_status| {
let mut handles = handles.take();
// Tell schedulers to exit
for handles.mut_iter().advance |handle| {
@ -110,9 +106,9 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
rtassert!(exit_status);
};
new_task.on_exit = Some(on_exit);
let main_task = ~Coroutine::with_task(&mut scheds[0].stack_pool,
new_task, f_cell.take());
let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool,
f_cell.take());
main_task.on_exit = Some(on_exit);
scheds[0].enqueue_task(main_task);
let mut threads = ~[];
@ -134,144 +130,44 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
}
// THIS IS AWFUL. Copy-pasted the above initialization function but
// with a number of hacks to make it spawn tasks on a variety of
// schedulers with a variety of homes using the new spawn.
pub fn run_in_mt_newsched_task_random_homed() {
use libc;
use os;
use from_str::FromStr;
use rt::uv::uvio::UvEventLoop;
use rt::sched::Shutdown;
do run_in_bare_thread {
let nthreads = match os::getenv("RUST_TEST_THREADS") {
Some(nstr) => FromStr::from_str(nstr).get(),
None => unsafe {
// Using more threads than cores in test code to force
// the OS to preempt them frequently. Assuming that
// this help stress test concurrent types.
rust_get_num_cpus() * 2
}
};
let sleepers = SleeperList::new();
let work_queue = WorkQueue::new();
let mut handles = ~[];
let mut scheds = ~[];
// create a few special schedulers, those with even indicies
// will be pinned-only
for uint::range(0, nthreads) |i| {
let special = (i % 2) == 0;
let loop_ = ~UvEventLoop::new();
let mut sched = ~Scheduler::new_special(
loop_, work_queue.clone(), sleepers.clone(), special);
let handle = sched.make_handle();
handles.push(handle);
scheds.push(sched);
}
// Schedule a pile o tasks
let n = 5*stress_factor();
for uint::range(0,n) |_i| {
rtdebug!("creating task: %u", _i);
let hf: ~fn() = || { assert!(true) };
spawntask_homed(&mut scheds, hf);
}
// Now we want another pile o tasks that do not ever run on a
// special scheduler, because they are normal tasks. Because
// we can we put these in the "main" task.
let n = 5*stress_factor();
let f: ~fn() = || {
for uint::range(0,n) |_| {
let f: ~fn() = || {
// Borrow the scheduler we run on and check if it is
// privileged.
do Local::borrow::<Scheduler,()> |sched| {
assert!(sched.run_anything);
};
};
spawntask_random(f);
};
};
let f_cell = Cell::new(f);
let handles = Cell::new(handles);
rtdebug!("creating main task");
let main_task = ~do Coroutine::new_root(&mut scheds[0].stack_pool) {
f_cell.take()();
let mut handles = handles.take();
// Tell schedulers to exit
for handles.mut_iter().advance |handle| {
handle.send(Shutdown);
}
};
rtdebug!("queuing main task")
scheds[0].enqueue_task(main_task);
let mut threads = ~[];
while !scheds.is_empty() {
let sched = scheds.pop();
let sched_cell = Cell::new(sched);
let thread = do Thread::start {
let sched = sched_cell.take();
rtdebug!("running sched: %u", sched.sched_id());
sched.run();
};
threads.push(thread);
}
rtdebug!("waiting on scheduler threads");
// Wait for schedulers
let _threads = threads;
}
extern {
fn rust_get_num_cpus() -> libc::uintptr_t;
}
}
/// Test tasks will abort on failure instead of unwinding
pub fn spawntask(f: ~fn()) {
use super::sched::*;
let f = Cell::new(f);
rtdebug!("spawntask taking the scheduler from TLS")
let task = do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child()
let task = unsafe {
let sched = Local::unsafe_borrow::<Scheduler>();
rtdebug!("spawntask taking the scheduler from TLS");
do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child(&mut (*sched).stack_pool, f.take())
}
};
let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::with_task(&mut sched.stack_pool,
task, f);
rtdebug!("new task pointer: %x", to_uint(task));
let sched = Local::take::<Scheduler>();
rtdebug!("spawntask scheduling the new task");
sched.schedule_task(task);
}
/// Create a new task and run it right now. Aborts on failure
pub fn spawntask_immediately(f: ~fn()) {
use super::sched::*;
let task = do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child()
let f = Cell::new(f);
let task = unsafe {
let sched = Local::unsafe_borrow::<Scheduler>();
do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child(&mut (*sched).stack_pool,
f.take())
}
};
let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::with_task(&mut sched.stack_pool,
task, f);
let sched = Local::take::<Scheduler>();
do sched.switch_running_tasks_and_then(task) |sched, task| {
sched.enqueue_task(task);
}
@ -280,15 +176,16 @@ pub fn spawntask_immediately(f: ~fn()) {
/// Create a new task and run it right now. Aborts on failure
pub fn spawntask_later(f: ~fn()) {
use super::sched::*;
let f = Cell::new(f);
let task = do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child()
let task = unsafe {
let sched = Local::unsafe_borrow::<Scheduler>();
do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child(&mut (*sched).stack_pool, f.take())
}
};
let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::with_task(&mut sched.stack_pool,
task, f);
sched.enqueue_task(task);
Local::put(sched);
}
@ -298,13 +195,18 @@ pub fn spawntask_random(f: ~fn()) {
use super::sched::*;
use rand::{Rand, rng};
let task = do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child()
let f = Cell::new(f);
let task = unsafe {
let sched = Local::unsafe_borrow::<Scheduler>();
do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child(&mut (*sched).stack_pool,
f.take())
}
};
let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::with_task(&mut sched.stack_pool,
task, f);
let mut rng = rng();
let run_now: bool = Rand::rand(&mut rng);
@ -343,33 +245,49 @@ pub fn spawntask_homed(scheds: &mut ~[~Scheduler], f: ~fn()) {
f()
};
~Coroutine::with_task_homed(&mut sched.stack_pool,
~Task::new_root(),
af,
Sched(handle))
~Task::new_root_homed(&mut sched.stack_pool,
Sched(handle),
af)
};
let dest_sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)];
// enqueue it for future execution
dest_sched.enqueue_task(task);
}
/// Spawn a task and wait for it to finish, returning whether it completed successfully or failed
/// Spawn a task and wait for it to finish, returning whether it
/// completed successfully or failed
pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
use cell::Cell;
use super::sched::*;
let f = Cell::new(f);
let (port, chan) = oneshot();
let chan = Cell::new(chan);
let mut new_task = ~Task::new_root();
let on_exit: ~fn(bool) = |exit_status| chan.take().send(exit_status);
let mut new_task = unsafe {
let sched = Local::unsafe_borrow::<Scheduler>();
do Local::borrow::<Task, ~Task> |_running_task| {
// I don't understand why using a child task here fails. I
// think the fail status is propogating back up the task
// tree and triggering a fail for the parent, which we
// aren't correctly expecting.
// ~running_task.new_child(&mut (*sched).stack_pool,
~Task::new_root(&mut (*sched).stack_pool,
f.take())
}
};
new_task.on_exit = Some(on_exit);
let mut sched = Local::take::<Scheduler>();
let new_task = ~Coroutine::with_task(&mut sched.stack_pool,
new_task, f);
let sched = Local::take::<Scheduler>();
do sched.switch_running_tasks_and_then(new_task) |sched, old_task| {
sched.enqueue_task(old_task);
}
rtdebug!("enqueued the new task, now waiting on exit_status");
let exit_status = port.recv();
if exit_status { Ok(()) } else { Err(()) }
}
@ -378,23 +296,27 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
pub fn spawntask_thread(f: ~fn()) -> Thread {
use rt::sched::*;
let task = do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child()
let f = Cell::new(f);
let task = unsafe {
let sched = Local::unsafe_borrow::<Scheduler>();
do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child(&mut (*sched).stack_pool,
f.take())
}
};
let task = Cell::new(task);
let f = Cell::new(f);
let thread = do Thread::start {
let mut sched = ~new_test_uv_sched();
let task = ~Coroutine::with_task(&mut sched.stack_pool,
task.take(),
f.take());
sched.enqueue_task(task);
sched.enqueue_task(task.take());
sched.run();
};
return thread;
}
/// Get a port number, starting at 9600, for use in tests
pub fn next_test_port() -> u16 {
unsafe {
@ -415,7 +337,8 @@ pub fn next_test_ip6() -> IpAddr {
Ipv6(0, 0, 0, 0, 0, 0, 0, 1, next_test_port())
}
/// Get a constant that represents the number of times to repeat stress tests. Default 1.
/// Get a constant that represents the number of times to repeat
/// stress tests. Default 1.
pub fn stress_factor() -> uint {
use os::getenv;

View File

@ -16,14 +16,15 @@
use option::*;
use clone::Clone;
use super::rc::RC;
use rt::sched::{Scheduler, Coroutine};
use rt::sched::Scheduler;
use rt::{context, TaskContext, SchedulerContext};
use rt::local::Local;
use rt::task::Task;
use vec::OwnedVector;
use container::Container;
struct TubeState<T> {
blocked_task: Option<~Coroutine>,
blocked_task: Option<~Task>,
buf: ~[T]
}

View File

@ -29,7 +29,10 @@ use unstable::sync::{Exclusive, exclusive};
#[cfg(test)] use container::Container;
#[cfg(test)] use uint;
#[cfg(test)] use unstable::run_in_bare_thread;
#[cfg(test)] use rt::test::*;
#[cfg(test)] use rt::test::{spawntask_immediately,
next_test_ip4,
run_in_newsched_task};
pub struct UvEventLoop {
uvio: UvIoFactory

View File

@ -203,8 +203,8 @@ pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
let msg = str::raw::from_c_str(msg);
let file = str::raw::from_c_str(file);
let outmsg = fmt!("task failed: '%s' at line %i of file %s",
msg, line as int, file);
let outmsg = fmt!("task failed at '%s', %s:%i",
msg, file, line as int);
// XXX: Logging doesn't work correctly in non-task context because it
// invokes the local heap

View File

@ -1182,3 +1182,4 @@ fn test_simple_newsched_spawn() {
spawn(||())
}
}

View File

@ -581,13 +581,20 @@ pub fn spawn_raw(opts: TaskOpts, f: ~fn()) {
fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) {
use rt::sched::*;
let mut task = if opts.linked {
do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child()
let f = Cell::new(f);
let mut task = unsafe {
let sched = Local::unsafe_borrow::<Scheduler>();
rtdebug!("unsafe borrowed sched");
if opts.linked {
do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child(&mut (*sched).stack_pool, f.take())
}
} else {
// An unlinked task is a new root in the task tree
~Task::new_root(&mut (*sched).stack_pool, f.take())
}
} else {
// An unlinked task is a new root in the task tree
~Task::new_root()
};
if opts.notify_chan.is_some() {
@ -601,9 +608,13 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) {
task.on_exit = Some(on_exit);
}
rtdebug!("spawn about to take scheduler");
let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::with_task(&mut sched.stack_pool,
task, f);
rtdebug!("took sched in spawn");
// let task = ~Coroutine::with_task(&mut sched.stack_pool,
// task, f);
// let task = ~Task::new_root(&mut sched.stack_pool, f);
sched.schedule_task(task);
}

View File

@ -23,6 +23,7 @@ use option::{Option, Some, None};
use io;
use rt::global_heap;
use rt::borrowck;
use borrow::to_uint;
#[allow(non_camel_case_types)]
pub type rust_task = c_void;
@ -90,6 +91,9 @@ pub unsafe fn local_malloc(td: *c_char, size: uintptr_t) -> *c_char {
_ => {
let mut alloc = ::ptr::null();
do Local::borrow::<Task,()> |task| {
rtdebug!("task pointer: %x, heap pointer: %x",
to_uint(task),
to_uint(&task.heap));
alloc = task.heap.alloc(td as *c_void, size as uint) as *c_char;
}
return alloc;

View File

@ -269,3 +269,5 @@ rust_running_on_valgrind
rust_get_num_cpus
rust_get_global_args_ptr
rust_current_boxed_region
rust_take_global_args_lock
rust_drop_global_args_lock