Register new snapshots

This commit is contained in:
Alex Crichton 2013-10-28 16:56:24 -07:00
parent cd6e9f4f83
commit 2290131543
8 changed files with 98 additions and 119 deletions

View File

@ -51,10 +51,9 @@ impl<A:Clone> Future<A> {
impl<A> Future<A> {
/// Gets the value from this future, forcing evaluation.
pub fn unwrap(self) -> A {
let mut this = self;
this.get_ref();
let state = replace(&mut this.state, Evaluating);
pub fn unwrap(mut self) -> A {
self.get_ref();
let state = replace(&mut self.state, Evaluating);
match state {
Forced(v) => v,
_ => fail!( "Logic error." ),

View File

@ -113,7 +113,7 @@ impl<T> ChanOne<T> {
// 'do_resched' configures whether the scheduler immediately switches to
// the receiving task, or leaves the sending task still running.
fn try_send_inner(self, val: T, do_resched: bool) -> bool {
fn try_send_inner(mut self, val: T, do_resched: bool) -> bool {
if do_resched {
rtassert!(!rt::in_sched_context());
}
@ -129,9 +129,8 @@ impl<T> ChanOne<T> {
sched.maybe_yield();
}
let mut this = self;
let mut recvr_active = true;
let packet = this.packet();
let packet = self.packet();
unsafe {
@ -150,7 +149,7 @@ impl<T> ChanOne<T> {
// done with the packet. NB: In case of do_resched, this *must*
// happen before waking up a blocked task (or be unkillable),
// because we might get a kill signal during the reschedule.
this.suppress_finalize = true;
self.suppress_finalize = true;
match oldstate {
STATE_BOTH => {
@ -158,7 +157,7 @@ impl<T> ChanOne<T> {
}
STATE_ONE => {
// Port has closed. Need to clean up.
let _packet: ~Packet<T> = cast::transmute(this.void_packet);
let _packet: ~Packet<T> = cast::transmute(self.void_packet);
recvr_active = false;
}
task_as_state => {
@ -202,22 +201,20 @@ impl<T> PortOne<T> {
}
/// As `recv`, but returns `None` if the send end is closed rather than failing.
pub fn try_recv(self) -> Option<T> {
let mut this = self;
pub fn try_recv(mut self) -> Option<T> {
// Optimistic check. If data was sent already, we don't even need to block.
// No release barrier needed here; we're not handing off our task pointer yet.
if !this.optimistic_check() {
if !self.optimistic_check() {
// No data available yet.
// Switch to the scheduler to put the ~Task into the Packet state.
let sched: ~Scheduler = Local::take();
do sched.deschedule_running_task_and_then |sched, task| {
this.block_on(sched, task);
self.block_on(sched, task);
}
}
// Task resumes.
this.recv_ready()
self.recv_ready()
}
}
@ -325,9 +322,8 @@ impl<T> SelectInner for PortOne<T> {
impl<T> Select for PortOne<T> { }
impl<T> SelectPortInner<T> for PortOne<T> {
fn recv_ready(self) -> Option<T> {
let mut this = self;
let packet = this.packet();
fn recv_ready(mut self) -> Option<T> {
let packet = self.packet();
// No further memory barrier is needed here to access the
// payload. Some scenarios:
@ -348,9 +344,9 @@ impl<T> SelectPortInner<T> for PortOne<T> {
let payload = (*packet).payload.take();
// The sender has closed up shop. Drop the packet.
let _packet: ~Packet<T> = cast::transmute(this.void_packet);
let _packet: ~Packet<T> = cast::transmute(self.void_packet);
// Suppress the synchronizing actions in the finalizer. We're done with the packet.
this.suppress_finalize = true;
self.suppress_finalize = true;
return payload;
}
}
@ -378,18 +374,17 @@ impl<T> Drop for ChanOne<T> {
if self.suppress_finalize { return }
unsafe {
let this = cast::transmute_mut(self);
let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
let oldstate = (*self.packet()).state.swap(STATE_ONE, SeqCst);
match oldstate {
STATE_BOTH => {
// Port still active. It will destroy the Packet.
},
STATE_ONE => {
let _packet: ~Packet<T> = cast::transmute(this.void_packet);
let _packet: ~Packet<T> = cast::transmute(self.void_packet);
},
task_as_state => {
// The port is blocked waiting for a message we will never send. Wake it.
rtassert!((*this.packet()).payload.is_none());
rtassert!((*self.packet()).payload.is_none());
let recvr = BlockedTask::cast_from_uint(task_as_state);
do recvr.wake().map |woken_task| {
Scheduler::run_task(woken_task);
@ -406,14 +401,13 @@ impl<T> Drop for PortOne<T> {
if self.suppress_finalize { return }
unsafe {
let this = cast::transmute_mut(self);
let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
let oldstate = (*self.packet()).state.swap(STATE_ONE, SeqCst);
match oldstate {
STATE_BOTH => {
// Chan still active. It will destroy the packet.
},
STATE_ONE => {
let _packet: ~Packet<T> = cast::transmute(this.void_packet);
let _packet: ~Packet<T> = cast::transmute(self.void_packet);
}
task_as_state => {
// This case occurs during unwinding, when the blocked

View File

@ -166,12 +166,10 @@ impl Scheduler {
// Take a main task to run, and a scheduler to run it in. Create a
// scheduler task and bootstrap into it.
pub fn bootstrap(~self, task: ~Task) {
let mut this = self;
pub fn bootstrap(mut ~self, task: ~Task) {
// Build an Idle callback.
this.idle_callback = Some(this.event_loop.pausible_idle_callback());
self.idle_callback = Some(self.event_loop.pausible_idle_callback());
// Initialize the TLS key.
local_ptr::init_tls_key();
@ -186,12 +184,12 @@ impl Scheduler {
// Before starting our first task, make sure the idle callback
// is active. As we do not start in the sleep state this is
// important.
this.idle_callback.get_mut_ref().start(Scheduler::run_sched_once);
self.idle_callback.get_mut_ref().start(Scheduler::run_sched_once);
// Now, as far as all the scheduler state is concerned, we are
// inside the "scheduler" context. So we can act like the
// scheduler and resume the provided task.
this.resume_task_immediately(task);
self.resume_task_immediately(task);
// Now we are back in the scheduler context, having
// successfully run the input task. Start by running the
@ -226,20 +224,18 @@ impl Scheduler {
// This does not return a scheduler, as the scheduler is placed
// inside the task.
pub fn run(~self) {
let mut self_sched = self;
pub fn run(mut ~self) {
// This is unsafe because we need to place the scheduler, with
// the event_loop inside, inside our task. But we still need a
// mutable reference to the event_loop to give it the "run"
// command.
unsafe {
let event_loop: *mut ~EventLoop = &mut self_sched.event_loop;
let event_loop: *mut ~EventLoop = &mut self.event_loop;
// Our scheduler must be in the task before the event loop
// is started.
let self_sched = Cell::new(self_sched);
let self_sched = Cell::new(self);
do Local::borrow |stask: &mut Task| {
stask.sched = Some(self_sched.take());
};
@ -315,48 +311,46 @@ impl Scheduler {
// returns the still-available scheduler. At this point all
// message-handling will count as a turn of work, and as a result
// return None.
fn interpret_message_queue(~self, effort: EffortLevel) -> Option<~Scheduler> {
let mut this = self;
fn interpret_message_queue(mut ~self, effort: EffortLevel) -> Option<~Scheduler> {
let msg = if effort == DontTryTooHard {
// Do a cheap check that may miss messages
this.message_queue.casual_pop()
self.message_queue.casual_pop()
} else {
this.message_queue.pop()
self.message_queue.pop()
};
match msg {
Some(PinnedTask(task)) => {
let mut task = task;
task.give_home(Sched(this.make_handle()));
this.resume_task_immediately(task);
task.give_home(Sched(self.make_handle()));
self.resume_task_immediately(task);
return None;
}
Some(TaskFromFriend(task)) => {
rtdebug!("got a task from a friend. lovely!");
this.process_task(task, Scheduler::resume_task_immediately_cl);
self.process_task(task, Scheduler::resume_task_immediately_cl);
return None;
}
Some(RunOnce(task)) => {
// bypass the process_task logic to force running this task once
// on this home scheduler. This is often used for I/O (homing).
Scheduler::resume_task_immediately_cl(this, task);
Scheduler::resume_task_immediately_cl(self, task);
return None;
}
Some(Wake) => {
this.sleepy = false;
Local::put(this);
self.sleepy = false;
Local::put(self);
return None;
}
Some(Shutdown) => {
rtdebug!("shutting down");
if this.sleepy {
if self.sleepy {
// There may be an outstanding handle on the
// sleeper list. Pop them all to make sure that's
// not the case.
loop {
match this.sleeper_list.pop() {
match self.sleeper_list.pop() {
Some(handle) => {
let mut handle = handle;
handle.send(Wake);
@ -367,30 +361,28 @@ impl Scheduler {
}
// No more sleeping. After there are no outstanding
// event loop references we will shut down.
this.no_sleep = true;
this.sleepy = false;
Local::put(this);
self.no_sleep = true;
self.sleepy = false;
Local::put(self);
return None;
}
None => {
return Some(this);
return Some(self);
}
}
}
fn do_work(~self) -> Option<~Scheduler> {
let mut this = self;
fn do_work(mut ~self) -> Option<~Scheduler> {
rtdebug!("scheduler calling do work");
match this.find_work() {
match self.find_work() {
Some(task) => {
rtdebug!("found some work! processing the task");
this.process_task(task, Scheduler::resume_task_immediately_cl);
self.process_task(task, Scheduler::resume_task_immediately_cl);
return None;
}
None => {
rtdebug!("no work was found, returning the scheduler struct");
return Some(this);
return Some(self);
}
}
}
@ -462,37 +454,34 @@ impl Scheduler {
// * Task Routing Functions - Make sure tasks send up in the right
// place.
fn process_task(~self, task: ~Task,
fn process_task(mut ~self, mut task: ~Task,
schedule_fn: SchedulingFn) {
let mut this = self;
let mut task = task;
rtdebug!("processing a task");
let home = task.take_unwrap_home();
match home {
Sched(home_handle) => {
if home_handle.sched_id != this.sched_id() {
if home_handle.sched_id != self.sched_id() {
rtdebug!("sending task home");
task.give_home(Sched(home_handle));
Scheduler::send_task_home(task);
Local::put(this);
Local::put(self);
} else {
rtdebug!("running task here");
task.give_home(Sched(home_handle));
schedule_fn(this, task);
schedule_fn(self, task);
}
}
AnySched if this.run_anything => {
AnySched if self.run_anything => {
rtdebug!("running anysched task here");
task.give_home(AnySched);
schedule_fn(this, task);
schedule_fn(self, task);
}
AnySched => {
rtdebug!("sending task to friend");
task.give_home(AnySched);
this.send_to_friend(task);
Local::put(this);
self.send_to_friend(task);
Local::put(self);
}
}
}
@ -531,16 +520,14 @@ impl Scheduler {
/// to the work queue directly.
pub fn enqueue_task(&mut self, task: ~Task) {
let this = self;
// We push the task onto our local queue clone.
this.work_queue.push(task);
this.idle_callback.get_mut_ref().resume();
self.work_queue.push(task);
self.idle_callback.get_mut_ref().resume();
// We've made work available. Notify a
// sleeping scheduler.
match this.sleeper_list.casual_pop() {
match self.sleeper_list.casual_pop() {
Some(handle) => {
let mut handle = handle;
handle.send(Wake)
@ -567,11 +554,9 @@ impl Scheduler {
// cleanup function f, which takes the scheduler and the
// old task as inputs.
pub fn change_task_context(~self,
pub fn change_task_context(mut ~self,
next_task: ~Task,
f: &fn(&mut Scheduler, ~Task)) {
let mut this = self;
// The current task is grabbed from TLS, not taken as an input.
// Doing an unsafe_take to avoid writing back a null pointer -
// We're going to call `put` later to do that.
@ -590,11 +575,11 @@ impl Scheduler {
// The current task is placed inside an enum with the cleanup
// function. This enum is then placed inside the scheduler.
this.cleanup_job = Some(CleanupJob::new(current_task, f_opaque));
self.cleanup_job = Some(CleanupJob::new(current_task, f_opaque));
// The scheduler is then placed inside the next task.
let mut next_task = next_task;
next_task.sched = Some(this);
next_task.sched = Some(self);
// However we still need an internal mutable pointer to the
// original task. The strategy here was "arrange memory, then
@ -692,13 +677,13 @@ impl Scheduler {
/// This passes a Scheduler pointer to the fn after the context switch
/// in order to prevent that fn from performing further scheduling operations.
/// Doing further scheduling could easily result in infinite recursion.
pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, BlockedTask)) {
pub fn deschedule_running_task_and_then(mut ~self,
f: &fn(&mut Scheduler, BlockedTask)) {
// Trickier - we need to get the scheduler task out of self
// and use it as the destination.
let mut this = self;
let stask = this.sched_task.take_unwrap();
let stask = self.sched_task.take_unwrap();
// Otherwise this is the same as below.
this.switch_running_tasks_and_then(stask, f);
self.switch_running_tasks_and_then(stask, f);
}
pub fn switch_running_tasks_and_then(~self, next_task: ~Task,
@ -725,12 +710,11 @@ impl Scheduler {
/// Called by a running task to end execution, after which it will
/// be recycled by the scheduler for reuse in a new task.
pub fn terminate_current_task(~self) {
pub fn terminate_current_task(mut ~self) {
// Similar to deschedule running task and then, but cannot go through
// the task-blocking path. The task is already dying.
let mut this = self;
let stask = this.sched_task.take_unwrap();
do this.change_task_context(stask) |sched, mut dead_task| {
let stask = self.sched_task.take_unwrap();
do self.change_task_context(stask) |sched, mut dead_task| {
let coroutine = dead_task.coroutine.take_unwrap();
coroutine.recycle(&mut sched.stack_pool);
}
@ -752,25 +736,23 @@ impl Scheduler {
/// to introduce some amount of randomness to the scheduler. Currently the
/// randomness is a result of performing a round of work stealing (which
/// may end up stealing from the current scheduler).
pub fn yield_now(~self) {
let mut this = self;
this.yield_check_count = reset_yield_check(&mut this.rng);
pub fn yield_now(mut ~self) {
self.yield_check_count = reset_yield_check(&mut self.rng);
// Tell the scheduler to start stealing on the next iteration
this.steal_for_yield = true;
do this.deschedule_running_task_and_then |sched, task| {
self.steal_for_yield = true;
do self.deschedule_running_task_and_then |sched, task| {
sched.enqueue_blocked_task(task);
}
}
pub fn maybe_yield(~self) {
pub fn maybe_yield(mut ~self) {
// The number of times to do the yield check before yielding, chosen arbitrarily.
let mut this = self;
rtassert!(this.yield_check_count > 0);
this.yield_check_count -= 1;
if this.yield_check_count == 0 {
this.yield_now();
rtassert!(self.yield_check_count > 0);
self.yield_check_count -= 1;
if self.yield_check_count == 0 {
self.yield_now();
} else {
Local::put(this);
Local::put(self);
}
}

View File

@ -55,13 +55,12 @@ impl Thread {
}
}
pub fn join(self) {
pub fn join(mut self) {
#[fixed_stack_segment]; #[inline(never)];
assert!(!self.joined);
let mut this = self;
unsafe { rust_raw_thread_join(this.raw_thread); }
this.joined = true;
unsafe { rust_raw_thread_join(self.raw_thread); }
self.joined = true;
}
}

View File

@ -225,15 +225,14 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
}
}
fn close(self, cb: NullCallback) {
let mut this = self;
fn close(mut self, cb: NullCallback) {
{
let data = this.get_watcher_data();
let data = self.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe { uvll::close(this.native_handle(), close_cb); }
unsafe { uvll::close(self.native_handle(), close_cb); }
extern fn close_cb(handle: *uvll::uv_handle_t) {
let mut h: Handle = NativeHandle::from_native_handle(handle);

View File

@ -120,10 +120,9 @@ trait HomingIO {
a // return the result of the IO
}
fn home_for_io_consume<A>(self, io: &fn(Self) -> A) -> A {
let mut this = self;
let home = this.go_to_IO_home();
let a = io(this); // do IO
fn home_for_io_consume<A>(mut self, io: &fn(Self) -> A) -> A {
let home = self.go_to_IO_home();
let a = io(self); // do IO
HomingIO::restore_original_home(None::<Self>, home);
a // return the result of the IO
}

View File

@ -200,11 +200,10 @@ impl<T: Send> UnsafeArc<T> {
/// As unwrap above, but without blocking. Returns 'UnsafeArcSelf(self)' if this is
/// not the last reference; 'UnsafeArcT(unwrapped_data)' if so.
pub fn try_unwrap(self) -> UnsafeArcUnwrap<T> {
pub fn try_unwrap(mut self) -> UnsafeArcUnwrap<T> {
unsafe {
let mut this = self; // FIXME(#4330) mutable self
// The ~ dtor needs to run if this code succeeds.
let mut data: ~ArcData<T> = cast::transmute(this.data);
let mut data: ~ArcData<T> = cast::transmute(self.data);
// This can of course race with anybody else who has a handle, but in
// such a case, the returned count will always be at least 2. If we
// see 1, no race was possible. All that matters is 1 or not-1.
@ -216,12 +215,12 @@ impl<T: Send> UnsafeArc<T> {
// (Note: using is_empty(), not take(), to not free the unwrapper.)
if count == 1 && data.unwrapper.is_empty(Acquire) {
// Tell this handle's destructor not to run (we are now it).
this.data = ptr::mut_null();
self.data = ptr::mut_null();
// FIXME(#3224) as above
UnsafeArcT(data.data.take_unwrap())
} else {
cast::forget(data);
UnsafeArcSelf(this)
UnsafeArcSelf(self)
}
}
}

View File

@ -1,3 +1,11 @@
S 2013-10-28 2ab4a6f
freebsd-x86_64 08af04bcf739930bdb7d0ad244b2c8094cd5096a
linux-i386 c233de1ed09872d5c7a3e1ce9ab9eb6e16631201
linux-x86_64 3c16b8e0cf3a1af654cc084a6ccb80f2de8fe84f
macos-i386 0fadb8c5afa4e4656ff37d0949aaa5a382f197a6
macos-x86_64 c8b7a8f30f95bf71542451b8e1a8b9108de1f39a
winnt-i386 c1c9dd8b2ef0c004e3327740732c6e06c3826fde
S 2013-10-22 ae0905a
freebsd-x86_64 6e9d81c160963308c2bf7886fae6726274b365ed
linux-i386 1c449504aa04c0345ad09f5bcb5a57b9656d11c3