minor tweaks - unboxed the coroutine so that it is no longer a ~ pointer inside the task struct, and also added an assert to verify that send is never called inside scheduler context as it is undefined (BROKEN) if that happens

This commit is contained in:
toddaaro 2013-07-29 13:34:08 -07:00
parent 997719c13d
commit a5f55b3ead
8 changed files with 72 additions and 32 deletions

View File

@ -24,6 +24,7 @@ use util::Void;
use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
use cell::Cell;
use clone::Clone;
use rt::{context, SchedulerContext};
/// A combined refcount / BlockedTask-as-uint pointer.
///
@ -90,6 +91,9 @@ impl<T> ChanOne<T> {
}
pub fn try_send(self, val: T) -> bool {
rtassert!(context() != SchedulerContext);
let mut this = self;
let mut recvr_active = true;
let packet = this.packet();

View File

@ -571,7 +571,7 @@ mod test {
#[cfg(test)]
fn socket_name(addr: IpAddr) {
do run_in_newsched_task {
do spawntask_immediately {
do spawntask {
let listener = TcpListener::bind(addr);
assert!(listener.is_some());
@ -590,13 +590,13 @@ mod test {
#[cfg(test)]
fn peer_name(addr: IpAddr) {
do run_in_newsched_task {
do spawntask_immediately {
do spawntask {
let mut listener = TcpListener::bind(addr);
listener.accept();
}
do spawntask_immediately {
do spawntask {
let stream = TcpStream::connect(addr);
assert!(stream.is_some());

View File

@ -267,7 +267,7 @@ mod test {
#[cfg(test)]
fn socket_name(addr: IpAddr) {
do run_in_newsched_task {
do spawntask_immediately {
do spawntask {
let server = UdpSocket::bind(addr);
assert!(server.is_some());

View File

@ -120,9 +120,7 @@ impl Local for IoFactoryObject {
#[cfg(test)]
mod test {
// use unstable::run_in_bare_thread;
use rt::test::*;
// use rt::sched::Scheduler;
use super::*;
use rt::task::Task;
use rt::local_ptr;

View File

@ -70,7 +70,7 @@ use ptr::RawPtr;
use rt::local::Local;
use rt::sched::{Scheduler, Shutdown};
use rt::sleeper_list::SleeperList;
use rt::task::{Task, SchedTask, GreenTask};
use rt::task::{Task, SchedTask, GreenTask, Sched};
use rt::thread::Thread;
use rt::work_queue::WorkQueue;
use rt::uv::uvio::UvEventLoop;
@ -244,6 +244,8 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
let nscheds = util::default_sched_threads();
let main = Cell::new(main);
// The shared list of sleeping schedulers. Schedulers wake each other
// occassionally to do new work.
let sleepers = SleeperList::new();
@ -268,12 +270,19 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
// If we need a main-thread task then create a main thread scheduler
// that will reject any task that isn't pinned to it
let mut main_sched = if use_main_sched {
let main_sched = if use_main_sched {
// Create a friend handle.
let mut friend_sched = scheds.pop();
let friend_handle = friend_sched.make_handle();
scheds.push(friend_sched);
let main_loop = ~UvEventLoop::new();
let mut main_sched = ~Scheduler::new_special(main_loop,
work_queue.clone(),
sleepers.clone(),
false);
false,
Some(friend_handle));
let main_handle = main_sched.make_handle();
handles.push(main_handle);
Some(main_sched)
@ -312,15 +321,16 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
let mut threads = ~[];
let on_exit = Cell::new(on_exit);
if !use_main_sched {
// In the case where we do not use a main_thread scheduler we
// run the main task in one of our threads.
let main_cell = Cell::new(main);
let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool,
main_cell.take());
main_task.death.on_exit = Some(on_exit);
main.take());
main_task.death.on_exit = Some(on_exit.take());
let main_task_cell = Cell::new(main_task);
let sched = scheds.pop();
@ -347,16 +357,18 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
}
// If we do have a main thread scheduler, run it now.
if use_main_sched {
let mut main_sched = main_sched.get();
let home = Sched(main_sched.make_handle());
let mut main_task = ~Task::new_root_homed(&mut scheds[0].stack_pool,
home, main);
main_task.death.on_exit = Some(on_exit);
let main_task_cell = Cell::new(main_task);
sched.bootstrap(main_task);
let mut main_task = ~Task::new_root_homed(&mut scheds[0].stack_pool,
home, main.take());
main_task.death.on_exit = Some(on_exit.take());
main_sched.bootstrap(main_task);
}
// Wait for schedulers
foreach thread in threads.consume_iter() {
thread.join();

View File

@ -10,7 +10,6 @@
use either::{Left, Right};
use option::{Option, Some, None};
use sys;
use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
use clone::Clone;
use unstable::raw;
@ -334,7 +333,7 @@ impl Scheduler {
return None;
}
Some(TaskFromFriend(task)) => {
this.resume_task_immediately(task);
this.schedule_task_sched_context(task);
return None;
}
Some(Wake) => {
@ -432,7 +431,6 @@ impl Scheduler {
}
AnySched => {
task.give_home(AnySched);
// this.enqueue_task(task);
this.send_to_friend(task);
return Some(this);
}
@ -491,6 +489,36 @@ impl Scheduler {
}
}
// BAD BAD BAD BAD BAD
// Do something instead of just copy-pasting this.
pub fn schedule_task_sched_context(~self, task: ~Task) -> Option<~Scheduler> {
// is the task home?
let is_home = task.is_home_no_tls(&self);
// does the task have a home?
let homed = task.homed();
let mut this = self;
if is_home || (!homed && this.run_anything) {
// here we know we are home, execute now OR we know we
// aren't homed, and that this sched doesn't care
rtdebug!("task: %u is on ok sched, executing", to_uint(task));
this.resume_task_immediately(task);
return None;
} else if !homed && !this.run_anything {
// the task isn't homed, but it can't be run here
this.enqueue_task(task);
return Some(this);
} else {
// task isn't home, so don't run it here, send it home
Scheduler::send_task_home(task);
return Some(this);
}
}
// The primary function for changing contexts. In the current
// design the scheduler is just a slightly modified GreenTask, so
// all context swaps are from Task to Task. The only difference

View File

@ -45,9 +45,9 @@ pub struct Task {
taskgroup: Option<Taskgroup>,
death: Death,
destroyed: bool,
coroutine: Option<~Coroutine>,
// FIXME(#6874/#7599) use StringRef to save on allocations
name: Option<~str>,
coroutine: Option<Coroutine>,
sched: Option<~Scheduler>,
task_type: TaskType
}
@ -128,7 +128,7 @@ impl Task {
taskgroup: None,
death: Death::new(),
destroyed: false,
coroutine: Some(~Coroutine::empty()),
coroutine: Some(Coroutine::empty()),
sched: None,
task_type: SchedTask
}
@ -157,8 +157,8 @@ impl Task {
taskgroup: None,
death: Death::new(),
destroyed: false,
coroutine: Some(~Coroutine::new(stack_pool, start)),
name: None,
coroutine: Some(Coroutine::new(stack_pool, start)),
sched: None,
task_type: GreenTask(Some(~home))
}
@ -178,8 +178,8 @@ impl Task {
// FIXME(#7544) make watching optional
death: self.death.new_child(),
destroyed: false,
coroutine: Some(~Coroutine::new(stack_pool, start)),
name: None,
coroutine: Some(Coroutine::new(stack_pool, start)),
sched: None,
task_type: GreenTask(Some(~home))
}
@ -375,9 +375,9 @@ impl Coroutine {
}
/// Destroy coroutine and try to reuse stack segment.
pub fn recycle(~self, stack_pool: &mut StackPool) {
pub fn recycle(self, stack_pool: &mut StackPool) {
match self {
~Coroutine { current_stack_segment, _ } => {
Coroutine { current_stack_segment, _ } => {
stack_pool.give_segment(current_stack_segment);
}
}

View File

@ -790,10 +790,8 @@ impl Drop for UvTimer {
impl RtioTimer for UvTimer {
fn sleep(&self, msecs: u64) {
let scheduler = Local::take::<Scheduler>();
assert!(scheduler.in_task_context());
do scheduler.deschedule_running_task_and_then |sched, task| {
do scheduler.deschedule_running_task_and_then |_sched, task| {
rtdebug!("sleep: entered scheduler context");
assert!(!sched.in_task_context());
let task_cell = Cell::new(task);
let mut watcher = **self;
do watcher.start(msecs, 0) |_, status| {