Reduce the amount of unsafe code and mark handle_deadlock as unsafe

This commit is contained in:
John Kåre Alsaker 2018-06-05 23:12:19 +02:00
parent 3e83248441
commit 131ef97c4b
2 changed files with 53 additions and 52 deletions

View File

@ -151,6 +151,10 @@ impl<'tcx> QueryJob<'tcx> {
#[cfg(parallel_queries)] #[cfg(parallel_queries)]
self.latch.set(); self.latch.set();
} }
fn as_ptr(&self) -> *const QueryJob<'tcx> {
self as *const _
}
} }
#[cfg(parallel_queries)] #[cfg(parallel_queries)]
@ -233,13 +237,9 @@ impl<'tcx> QueryLatch<'tcx> {
} }
} }
/// A pointer to an active query job. This is used to give query jobs an identity.
#[cfg(parallel_queries)]
type Ref<'tcx> = *const QueryJob<'tcx>;
/// A resumable waiter of a query. The usize is the index into waiters in the query's latch /// A resumable waiter of a query. The usize is the index into waiters in the query's latch
#[cfg(parallel_queries)] #[cfg(parallel_queries)]
type Waiter<'tcx> = (Ref<'tcx>, usize); type Waiter<'tcx> = (Lrc<QueryJob<'tcx>>, usize);
/// Visits all the non-resumable and resumable waiters of a query. /// Visits all the non-resumable and resumable waiters of a query.
/// Only waiters in a query are visited. /// Only waiters in a query are visited.
@ -251,15 +251,13 @@ type Waiter<'tcx> = (Ref<'tcx>, usize);
/// required information to resume the waiter. /// required information to resume the waiter.
/// If all `visit` calls returns None, this function also returns None. /// If all `visit` calls returns None, this function also returns None.
#[cfg(parallel_queries)] #[cfg(parallel_queries)]
fn visit_waiters<'tcx, F>(query_ref: Ref<'tcx>, mut visit: F) -> Option<Option<Waiter<'tcx>>> fn visit_waiters<'tcx, F>(query: Lrc<QueryJob<'tcx>>, mut visit: F) -> Option<Option<Waiter<'tcx>>>
where where
F: FnMut(Span, Ref<'tcx>) -> Option<Option<Waiter<'tcx>>> F: FnMut(Span, Lrc<QueryJob<'tcx>>) -> Option<Option<Waiter<'tcx>>>
{ {
let query = unsafe { &*query_ref };
// Visit the parent query which is a non-resumable waiter since it's on the same stack // Visit the parent query which is a non-resumable waiter since it's on the same stack
if let Some(ref parent) = query.parent { if let Some(ref parent) = query.parent {
if let Some(cycle) = visit(query.info.span, &**parent as Ref) { if let Some(cycle) = visit(query.info.span, parent.clone()) {
return Some(cycle); return Some(cycle);
} }
} }
@ -267,9 +265,9 @@ where
// Visit the explict waiters which use condvars and are resumable // Visit the explict waiters which use condvars and are resumable
for (i, waiter) in query.latch.info.lock().waiters.iter().enumerate() { for (i, waiter) in query.latch.info.lock().waiters.iter().enumerate() {
if let Some(ref waiter_query) = waiter.query { if let Some(ref waiter_query) = waiter.query {
if visit(waiter.span, &**waiter_query).is_some() { if visit(waiter.span, waiter_query.clone()).is_some() {
// Return a value which indicates that this waiter can be resumed // Return a value which indicates that this waiter can be resumed
return Some(Some((query_ref, i))); return Some(Some((query.clone(), i)));
} }
} }
} }
@ -281,12 +279,13 @@ where
/// If a cycle is detected, this initial value is replaced with the span causing /// If a cycle is detected, this initial value is replaced with the span causing
/// the cycle. /// the cycle.
#[cfg(parallel_queries)] #[cfg(parallel_queries)]
fn cycle_check<'tcx>(query: Ref<'tcx>, fn cycle_check<'tcx>(query: Lrc<QueryJob<'tcx>>,
span: Span, span: Span,
stack: &mut Vec<(Span, Ref<'tcx>)>, stack: &mut Vec<(Span, Lrc<QueryJob<'tcx>>)>,
visited: &mut HashSet<Ref<'tcx>>) -> Option<Option<Waiter<'tcx>>> { visited: &mut HashSet<*const QueryJob<'tcx>>
if visited.contains(&query) { ) -> Option<Option<Waiter<'tcx>>> {
return if let Some(p) = stack.iter().position(|q| q.1 == query) { if visited.contains(&query.as_ptr()) {
return if let Some(p) = stack.iter().position(|q| q.1.as_ptr() == query.as_ptr()) {
// We detected a query cycle, fix up the initial span and return Some // We detected a query cycle, fix up the initial span and return Some
// Remove previous stack entries // Remove previous stack entries
@ -300,8 +299,8 @@ fn cycle_check<'tcx>(query: Ref<'tcx>,
} }
// Mark this query is visited and add it to the stack // Mark this query is visited and add it to the stack
visited.insert(query); visited.insert(query.as_ptr());
stack.push((span, query)); stack.push((span, query.clone()));
// Visit all the waiters // Visit all the waiters
let r = visit_waiters(query, |span, successor| { let r = visit_waiters(query, |span, successor| {
@ -320,18 +319,21 @@ fn cycle_check<'tcx>(query: Ref<'tcx>,
/// from `query` without going through any of the queries in `visited`. /// from `query` without going through any of the queries in `visited`.
/// This is achieved with a depth first search. /// This is achieved with a depth first search.
#[cfg(parallel_queries)] #[cfg(parallel_queries)]
fn connected_to_root<'tcx>(query: Ref<'tcx>, visited: &mut HashSet<Ref<'tcx>>) -> bool { fn connected_to_root<'tcx>(
query: Lrc<QueryJob<'tcx>>,
visited: &mut HashSet<*const QueryJob<'tcx>>
) -> bool {
// We already visited this or we're deliberately ignoring it // We already visited this or we're deliberately ignoring it
if visited.contains(&query) { if visited.contains(&query.as_ptr()) {
return false; return false;
} }
// This query is connected to the root (it has no query parent), return true // This query is connected to the root (it has no query parent), return true
if unsafe { (*query).parent.is_none() } { if query.parent.is_none() {
return true; return true;
} }
visited.insert(query); visited.insert(query.as_ptr());
let mut connected = false; let mut connected = false;
@ -351,7 +353,7 @@ fn connected_to_root<'tcx>(query: Ref<'tcx>, visited: &mut HashSet<Ref<'tcx>>) -
/// the function returns false. /// the function returns false.
#[cfg(parallel_queries)] #[cfg(parallel_queries)]
fn remove_cycle<'tcx>( fn remove_cycle<'tcx>(
jobs: &mut Vec<Ref<'tcx>>, jobs: &mut Vec<Lrc<QueryJob<'tcx>>>,
wakelist: &mut Vec<Lrc<QueryWaiter<'tcx>>>, wakelist: &mut Vec<Lrc<QueryWaiter<'tcx>>>,
tcx: TyCtxt<'_, 'tcx, '_> tcx: TyCtxt<'_, 'tcx, '_>
) -> bool { ) -> bool {
@ -367,7 +369,7 @@ fn remove_cycle<'tcx>(
// Extract the spans and queries into separate arrays // Extract the spans and queries into separate arrays
let mut spans: Vec<_> = stack.iter().map(|e| e.0).collect(); let mut spans: Vec<_> = stack.iter().map(|e| e.0).collect();
let queries = stack.iter().map(|e| e.1); let queries = stack.into_iter().map(|e| e.1);
// Shift the spans so that queries are matched with the span for their waitee // Shift the spans so that queries are matched with the span for their waitee
let last = spans.pop().unwrap(); let last = spans.pop().unwrap();
@ -378,23 +380,25 @@ fn remove_cycle<'tcx>(
// Remove the queries in our cycle from the list of jobs to look at // Remove the queries in our cycle from the list of jobs to look at
for r in &stack { for r in &stack {
jobs.remove_item(&r.1); if let Some(pos) = jobs.iter().position(|j| j.as_ptr() == r.1.as_ptr()) {
jobs.remove(pos);
}
} }
// Find the queries in the cycle which are // Find the queries in the cycle which are
// connected to queries outside the cycle // connected to queries outside the cycle
let entry_points: Vec<Ref<'_>> = stack.iter().filter_map(|query| { let entry_points: Vec<Lrc<QueryJob<'tcx>>> = stack.iter().filter_map(|query| {
// Mark all the other queries in the cycle as already visited // Mark all the other queries in the cycle as already visited
let mut visited = HashSet::from_iter(stack.iter().filter_map(|q| { let mut visited = HashSet::from_iter(stack.iter().filter_map(|q| {
if q.1 != query.1 { if q.1.as_ptr() != query.1.as_ptr() {
Some(q.1) Some(q.1.as_ptr())
} else { } else {
None None
} }
})); }));
if connected_to_root(query.1, &mut visited) { if connected_to_root(query.1.clone(), &mut visited) {
Some(query.1) Some(query.1.clone())
} else { } else {
None None
} }
@ -403,14 +407,14 @@ fn remove_cycle<'tcx>(
// Deterministically pick an entry point // Deterministically pick an entry point
// FIXME: Sort this instead // FIXME: Sort this instead
let mut hcx = tcx.create_stable_hashing_context(); let mut hcx = tcx.create_stable_hashing_context();
let entry_point = *entry_points.iter().min_by_key(|&&q| { let entry_point = entry_points.iter().min_by_key(|q| {
let mut stable_hasher = StableHasher::<u64>::new(); let mut stable_hasher = StableHasher::<u64>::new();
unsafe { (*q).info.query.hash_stable(&mut hcx, &mut stable_hasher); } q.info.query.hash_stable(&mut hcx, &mut stable_hasher);
stable_hasher.finish() stable_hasher.finish()
}).unwrap(); }).unwrap().as_ptr();
// Shift the stack until our entry point is first // Shift the stack until our entry point is first
while stack[0].1 != entry_point { while stack[0].1.as_ptr() != entry_point {
let last = stack.pop().unwrap(); let last = stack.pop().unwrap();
stack.insert(0, last); stack.insert(0, last);
} }
@ -418,24 +422,21 @@ fn remove_cycle<'tcx>(
// Create the cycle error // Create the cycle error
let mut error = CycleError { let mut error = CycleError {
usage: None, usage: None,
cycle: stack.iter().map(|&(s, q)| QueryInfo { cycle: stack.iter().map(|&(s, ref q)| QueryInfo {
span: s, span: s,
query: unsafe { (*q).info.query.clone() }, query: q.info.query.clone(),
} ).collect(), } ).collect(),
}; };
// We unwrap `waiter` here since there must always be one // We unwrap `waiter` here since there must always be one
// edge which is resumeable / waited using a query latch // edge which is resumeable / waited using a query latch
let (waitee_query, waiter_idx) = waiter.unwrap(); let (waitee_query, waiter_idx) = waiter.unwrap();
let waitee_query = unsafe { &*waitee_query };
// Extract the waiter we want to resume // Extract the waiter we want to resume
let waiter = waitee_query.latch.extract_waiter(waiter_idx); let waiter = waitee_query.latch.extract_waiter(waiter_idx);
// Set the cycle error so it will be picked up when resumed // Set the cycle error so it will be picked up when resumed
unsafe { *waiter.cycle.lock() = Some(error);
*waiter.cycle.lock() = Some(error);
}
// Put the waiter on the list of things to resume // Put the waiter on the list of things to resume
wakelist.push(waiter); wakelist.push(waiter);
@ -448,8 +449,9 @@ fn remove_cycle<'tcx>(
/// Creates a new thread and forwards information in thread locals to it. /// Creates a new thread and forwards information in thread locals to it.
/// The new thread runs the deadlock handler. /// The new thread runs the deadlock handler.
/// Must only be called when a deadlock is about to happen.
#[cfg(parallel_queries)] #[cfg(parallel_queries)]
pub fn handle_deadlock() { pub unsafe fn handle_deadlock() {
use syntax; use syntax;
use syntax_pos; use syntax_pos;
@ -458,25 +460,23 @@ pub fn handle_deadlock() {
let gcx_ptr = tls::GCX_PTR.with(|gcx_ptr| { let gcx_ptr = tls::GCX_PTR.with(|gcx_ptr| {
gcx_ptr as *const _ gcx_ptr as *const _
}); });
let gcx_ptr = unsafe { &*gcx_ptr }; let gcx_ptr = &*gcx_ptr;
let syntax_globals = syntax::GLOBALS.with(|syntax_globals| { let syntax_globals = syntax::GLOBALS.with(|syntax_globals| {
syntax_globals as *const _ syntax_globals as *const _
}); });
let syntax_globals = unsafe { &*syntax_globals }; let syntax_globals = &*syntax_globals;
let syntax_pos_globals = syntax_pos::GLOBALS.with(|syntax_pos_globals| { let syntax_pos_globals = syntax_pos::GLOBALS.with(|syntax_pos_globals| {
syntax_pos_globals as *const _ syntax_pos_globals as *const _
}); });
let syntax_pos_globals = unsafe { &*syntax_pos_globals }; let syntax_pos_globals = &*syntax_pos_globals;
thread::spawn(move || { thread::spawn(move || {
tls::GCX_PTR.set(gcx_ptr, || { tls::GCX_PTR.set(gcx_ptr, || {
syntax_pos::GLOBALS.set(syntax_pos_globals, || { syntax_pos::GLOBALS.set(syntax_pos_globals, || {
syntax_pos::GLOBALS.set(syntax_pos_globals, || { syntax_pos::GLOBALS.set(syntax_pos_globals, || {
tls::with_thread_locals(|| { tls::with_thread_locals(|| {
unsafe { tls::with_global(|tcx| deadlock(tcx, &registry))
tls::with_global(|tcx| deadlock(tcx, &registry))
}
}) })
}) })
}) })
@ -497,7 +497,7 @@ fn deadlock(tcx: TyCtxt<'_, '_, '_>, registry: &rayon_core::Registry) {
}); });
let mut wakelist = Vec::new(); let mut wakelist = Vec::new();
let mut jobs: Vec<_> = tcx.maps.collect_active_jobs().iter().map(|j| &**j as Ref).collect(); let mut jobs: Vec<_> = tcx.maps.collect_active_jobs();
let mut found_cycle = false; let mut found_cycle = false;

View File

@ -85,9 +85,10 @@ pub fn spawn_thread_pool<F: FnOnce(config::Options) -> R + sync::Send, R: sync::
let gcx_ptr = &Lock::new(0); let gcx_ptr = &Lock::new(0);
let config = ThreadPoolBuilder::new().num_threads(Session::query_threads_from_opts(&opts)) let config = ThreadPoolBuilder::new()
.deadlock_handler(ty::maps::handle_deadlock) .num_threads(Session::query_threads_from_opts(&opts))
.stack_size(16 * 1024 * 1024); .deadlock_handler(|| unsafe { ty::maps::handle_deadlock() })
.stack_size(16 * 1024 * 1024);
let with_pool = move |pool: &ThreadPool| { let with_pool = move |pool: &ThreadPool| {
pool.install(move || f(opts)) pool.install(move || f(opts))