Fall out of the std::sync rewrite

This commit is contained in:
Alex Crichton 2014-11-24 11:16:40 -08:00
parent 71d4e77db8
commit c3adbd34c4
19 changed files with 99 additions and 188 deletions

View File

@ -38,9 +38,8 @@ exceptions = [
"rt/isaac/randport.cpp", # public domain
"rt/isaac/rand.h", # public domain
"rt/isaac/standard.h", # public domain
"libstd/sync/mpsc_queue.rs", # BSD
"libstd/sync/spsc_queue.rs", # BSD
"libstd/sync/mpmc_bounded_queue.rs", # BSD
"libstd/comm/mpsc_queue.rs", # BSD
"libstd/comm/spsc_queue.rs", # BSD
"test/bench/shootout-binarytrees.rs", # BSD
"test/bench/shootout-chameneos-redux.rs", # BSD
"test/bench/shootout-fannkuch-redux.rs", # BSD

View File

@ -354,6 +354,8 @@ mod select;
mod shared;
mod stream;
mod sync;
mod mpsc_queue;
mod spsc_queue;
/// The receiving-half of Rust's channel type. This half can only be owned by
/// one task
@ -628,24 +630,26 @@ impl<T: Send> Sender<T> {
#[unstable]
impl<T: Send> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
let (packet, sleeper) = match *unsafe { self.inner() } {
let (packet, sleeper, guard) = match *unsafe { self.inner() } {
Oneshot(ref p) => {
let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
unsafe {
(*a.get()).postinit_lock();
let guard = (*a.get()).postinit_lock();
match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
oneshot::UpSuccess | oneshot::UpDisconnected => (a, None),
oneshot::UpWoke(task) => (a, Some(task))
oneshot::UpSuccess |
oneshot::UpDisconnected => (a, None, guard),
oneshot::UpWoke(task) => (a, Some(task), guard)
}
}
}
Stream(ref p) => {
let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
unsafe {
(*a.get()).postinit_lock();
let guard = (*a.get()).postinit_lock();
match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
stream::UpSuccess | stream::UpDisconnected => (a, None),
stream::UpWoke(task) => (a, Some(task)),
stream::UpSuccess |
stream::UpDisconnected => (a, None, guard),
stream::UpWoke(task) => (a, Some(task), guard),
}
}
}
@ -657,7 +661,7 @@ impl<T: Send> Clone for Sender<T> {
};
unsafe {
(*packet.get()).inherit_blocker(sleeper);
(*packet.get()).inherit_blocker(sleeper, guard);
let tmp = Sender::new(Shared(packet.clone()));
mem::swap(self.inner_mut(), tmp.inner_mut());

View File

@ -26,12 +26,11 @@ use alloc::boxed::Box;
use core::cmp;
use core::int;
use rustrt::local::Local;
use rustrt::mutex::NativeMutex;
use rustrt::task::{Task, BlockedTask};
use rustrt::thread::Thread;
use sync::atomic;
use sync::mpsc_queue as mpsc;
use sync::{atomic, Mutex, MutexGuard};
use comm::mpsc_queue as mpsc;
const DISCONNECTED: int = int::MIN;
const FUDGE: int = 1024;
@ -56,7 +55,7 @@ pub struct Packet<T> {
// this lock protects various portions of this implementation during
// select()
select_lock: NativeMutex,
select_lock: Mutex<()>,
}
pub enum Failure {
@ -76,7 +75,7 @@ impl<T: Send> Packet<T> {
channels: atomic::AtomicInt::new(2),
port_dropped: atomic::AtomicBool::new(false),
sender_drain: atomic::AtomicInt::new(0),
select_lock: unsafe { NativeMutex::new() },
select_lock: Mutex::new(()),
};
return p;
}
@ -86,8 +85,8 @@ impl<T: Send> Packet<T> {
// In other case mutex data will be duplicated while cloning
// and that could cause problems on platforms where it is
// represented by opaque data structure
pub fn postinit_lock(&mut self) {
unsafe { self.select_lock.lock_noguard() }
pub fn postinit_lock(&self) -> MutexGuard<()> {
self.select_lock.lock()
}
// This function is used at the creation of a shared packet to inherit a
@ -95,7 +94,9 @@ impl<T: Send> Packet<T> {
// tasks in select().
//
// This can only be called at channel-creation time
pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) {
pub fn inherit_blocker(&mut self,
task: Option<BlockedTask>,
guard: MutexGuard<()>) {
match task {
Some(task) => {
assert_eq!(self.cnt.load(atomic::SeqCst), 0);
@ -135,7 +136,7 @@ impl<T: Send> Packet<T> {
// interfere with this method. After we unlock this lock, we're
// signifying that we're done modifying self.cnt and self.to_wake and
// the port is ready for the world to continue using it.
unsafe { self.select_lock.unlock_noguard() }
drop(guard);
}
pub fn send(&mut self, t: T) -> Result<(), T> {
@ -441,7 +442,7 @@ impl<T: Send> Packet<T> {
// done with. Without this bounce, we can race with inherit_blocker
// about looking at and dealing with to_wake. Once we have acquired the
// lock, we are guaranteed that inherit_blocker is done.
unsafe {
{
let _guard = self.select_lock.lock();
}

View File

@ -32,7 +32,7 @@ use rustrt::task::{Task, BlockedTask};
use rustrt::thread::Thread;
use sync::atomic;
use sync::spsc_queue as spsc;
use comm::spsc_queue as spsc;
use comm::Receiver;
const DISCONNECTED: int = int::MIN;

View File

@ -225,8 +225,8 @@ pub mod dl {
}
pub fn check_for_errors_in<T>(f: || -> T) -> Result<T, String> {
use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
use sync::{StaticMutex, MUTEX_INIT};
static LOCK: StaticMutex = MUTEX_INIT;
unsafe {
// dlerror isn't thread safe, so we need to lock around this entire
// sequence

View File

@ -106,7 +106,7 @@
#![allow(unknown_features)]
#![feature(macro_rules, globs, linkage)]
#![feature(default_type_params, phase, lang_items, unsafe_destructor)]
#![feature(import_shadowing, slicing_syntax)]
#![feature(import_shadowing, slicing_syntax, tuple_indexing)]
// Don't link to std. We are std.
#![no_std]

View File

@ -209,14 +209,12 @@ Accessing environment variables is not generally threadsafe.
Serialize access through a global lock.
*/
fn with_env_lock<T>(f: || -> T) -> T {
use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
use sync::{StaticMutex, MUTEX_INIT};
static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
static LOCK: StaticMutex = MUTEX_INIT;
unsafe {
let _guard = LOCK.lock();
f()
}
let _guard = LOCK.lock();
f()
}
/// Returns a vector of (variable, value) pairs, for all the environment

View File

@ -238,7 +238,7 @@ mod imp {
use mem;
use option::{Some, None, Option};
use result::{Ok, Err};
use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
use sync::{StaticMutex, MUTEX_INIT};
/// As always - iOS on arm uses SjLj exceptions and
/// _Unwind_Backtrace is even not available there. Still,
@ -264,8 +264,8 @@ mod imp {
// while it doesn't requires lock for work as everything is
// local, it still displays much nicer backtraces when a
// couple of tasks panic simultaneously
static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
let _g = unsafe { LOCK.lock() };
static LOCK: StaticMutex = MUTEX_INIT;
let _g = LOCK.lock();
try!(writeln!(w, "stack backtrace:"));
// 100 lines should be enough
@ -297,8 +297,8 @@ mod imp {
// is semi-reasonable in terms of printing anyway, and we know that all
// I/O done here is blocking I/O, not green I/O, so we don't have to
// worry about this being a native vs green mutex.
static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
let _g = unsafe { LOCK.lock() };
static LOCK: StaticMutex = MUTEX_INIT;
let _g = LOCK.lock();
try!(writeln!(w, "stack backtrace:"));
@ -667,7 +667,7 @@ mod imp {
use option::{Some, None};
use path::Path;
use result::{Ok, Err};
use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
use sync::{StaticMutex, MUTEX_INIT};
use slice::SlicePrelude;
use str::StrPrelude;
use dynamic_lib::DynamicLibrary;
@ -928,8 +928,8 @@ mod imp {
pub fn write(w: &mut Writer) -> IoResult<()> {
// According to windows documentation, all dbghelp functions are
// single-threaded.
static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
let _g = unsafe { LOCK.lock() };
static LOCK: StaticMutex = MUTEX_INIT;
let _g = LOCK.lock();
// Open up dbghelp.dll, we don't link to it explicitly because it can't
// always be found. Additionally, it's nice having fewer dependencies.

View File

@ -143,8 +143,14 @@ impl Condvar {
///
/// Like `wait`, the lock specified will be re-acquired when this function
/// returns, regardless of whether the timeout elapsed or not.
pub fn wait_timeout<T: AsMutexGuard>(&self, mutex_guard: &T,
dur: Duration) -> bool {
// Note that this method is *not* public, and this is quite intentional
// because we're not quite sure about the semantics of relative vs absolute
// durations or how the timing guarantees play into what the system APIs
// provide. There are also additional concerns about the unix-specific
// implementation which may need to be addressed.
#[allow(dead_code)]
fn wait_timeout<T: AsMutexGuard>(&self, mutex_guard: &T,
dur: Duration) -> bool {
unsafe {
let me: &'static Condvar = &*(self as *const _);
me.inner.wait_timeout(mutex_guard, dur)
@ -195,8 +201,9 @@ impl StaticCondvar {
/// specified duration.
///
/// See `Condvar::wait_timeout`.
pub fn wait_timeout<T: AsMutexGuard>(&'static self, mutex_guard: &T,
dur: Duration) -> bool {
#[allow(dead_code)] // may want to stabilize this later, see wait_timeout above
fn wait_timeout<T: AsMutexGuard>(&'static self, mutex_guard: &T,
dur: Duration) -> bool {
unsafe {
let lock = mutex_guard.as_mutex_guard();
let sys = mutex::guard_lock(lock);

View File

@ -45,7 +45,7 @@ use sys_common::mutex as sys;
/// let data = Arc::new(Mutex::new(0));
///
/// let (tx, rx) = channel();
/// for _ in range(0, 10) {
/// for _ in range(0u, 10) {
/// let (data, tx) = (data.clone(), tx.clone());
/// spawn(proc() {
/// // The shared static can only be accessed once the lock is held.

View File

@ -20,13 +20,14 @@
//! can be created in the future and there must be no active timers at that
//! time.
use prelude::*;
use cell::UnsafeCell;
use mem;
use rustrt::bookkeeping;
use rustrt::mutex::StaticNativeMutex;
use rustrt;
use cell::UnsafeCell;
use sync::{StaticMutex, StaticCondvar};
use sys::helper_signal;
use prelude::*;
use task;
@ -39,7 +40,8 @@ use task;
/// is for static initialization.
pub struct Helper<M> {
/// Internal lock which protects the remaining fields
pub lock: StaticNativeMutex,
pub lock: StaticMutex,
pub cond: StaticCondvar,
// You'll notice that the remaining fields are UnsafeCell<T>, and this is
// because all helper thread operations are done through &self, but we need
@ -53,6 +55,9 @@ pub struct Helper<M> {
/// Flag if this helper thread has booted and been initialized yet.
pub initialized: UnsafeCell<bool>,
/// Flag if this helper thread has shut down
pub shutdown: UnsafeCell<bool>,
}
impl<M: Send> Helper<M> {
@ -80,7 +85,9 @@ impl<M: Send> Helper<M> {
task::spawn(proc() {
bookkeeping::decrement();
helper(receive, rx, t);
self.lock.lock().signal()
let _g = self.lock.lock();
*self.shutdown.get() = true;
self.cond.notify_one()
});
rustrt::at_exit(proc() { self.shutdown() });
@ -119,7 +126,9 @@ impl<M: Send> Helper<M> {
helper_signal::signal(*self.signal.get() as helper_signal::signal);
// Wait for the child to exit
guard.wait();
while !*self.shutdown.get() {
self.cond.wait(&guard);
}
drop(guard);
// Clean up after ourselves

View File

@ -16,13 +16,13 @@ use libc::{mod, c_char, c_int};
use mem;
use num::Int;
use ptr::{mod, null, null_mut};
use rustrt::mutex;
use io::net::ip::{SocketAddr, IpAddr, Ipv4Addr, Ipv6Addr};
use io::net::addrinfo;
use io::{IoResult, IoError};
use sys::{mod, retry, c, sock_t, last_error, last_net_error, last_gai_error, close_sock,
wrlen, msglen_t, os, wouldblock, set_nonblocking, timer, ms_to_timeval,
decode_error_detailed};
use sync::{Mutex, MutexGuard};
use sys_common::{mod, keep_going, short_write, timeout};
use prelude::*;
use cmp;
@ -557,12 +557,12 @@ struct Inner {
// Unused on Linux, where this lock is not necessary.
#[allow(dead_code)]
lock: mutex::NativeMutex
lock: Mutex<()>,
}
impl Inner {
fn new(fd: sock_t) -> Inner {
Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
Inner { fd: fd, lock: Mutex::new(()) }
}
}
@ -572,7 +572,7 @@ impl Drop for Inner {
pub struct Guard<'a> {
pub fd: sock_t,
pub guard: mutex::LockGuard<'a>,
pub guard: MutexGuard<'a, ()>,
}
#[unsafe_destructor]
@ -666,7 +666,7 @@ impl TcpStream {
fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
let ret = Guard {
fd: self.fd(),
guard: unsafe { self.inner.lock.lock() },
guard: self.inner.lock.lock(),
};
assert!(set_nonblocking(self.fd(), true).is_ok());
ret
@ -805,7 +805,7 @@ impl UdpSocket {
fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
let ret = Guard {
fd: self.fd(),
guard: unsafe { self.inner.lock.lock() },
guard: self.inner.lock.lock(),
};
assert!(set_nonblocking(self.fd(), true).is_ok());
ret

View File

@ -25,10 +25,12 @@ use sys_common::mkerr_libc;
macro_rules! helper_init( (static $name:ident: Helper<$m:ty>) => (
static $name: Helper<$m> = Helper {
lock: ::rustrt::mutex::NATIVE_MUTEX_INIT,
lock: ::sync::MUTEX_INIT,
cond: ::sync::CONDVAR_INIT,
chan: ::cell::UnsafeCell { value: 0 as *mut Sender<$m> },
signal: ::cell::UnsafeCell { value: 0 },
initialized: ::cell::UnsafeCell { value: false },
shutdown: ::cell::UnsafeCell { value: false },
};
) )

View File

@ -12,8 +12,7 @@ use alloc::arc::Arc;
use libc;
use c_str::CString;
use mem;
use rustrt::mutex;
use sync::atomic;
use sync::{atomic, Mutex};
use io::{mod, IoResult, IoError};
use prelude::*;
@ -60,12 +59,12 @@ struct Inner {
// Unused on Linux, where this lock is not necessary.
#[allow(dead_code)]
lock: mutex::NativeMutex
lock: Mutex<()>,
}
impl Inner {
fn new(fd: fd_t) -> Inner {
Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
Inner { fd: fd, lock: Mutex::new(()) }
}
}

View File

@ -26,10 +26,12 @@ use sync::{Once, ONCE_INIT};
macro_rules! helper_init( (static $name:ident: Helper<$m:ty>) => (
static $name: Helper<$m> = Helper {
lock: ::rustrt::mutex::NATIVE_MUTEX_INIT,
lock: ::sync::MUTEX_INIT,
cond: ::sync::CONDVAR_INIT,
chan: ::cell::UnsafeCell { value: 0 as *mut Sender<$m> },
signal: ::cell::UnsafeCell { value: 0 },
initialized: ::cell::UnsafeCell { value: false },
shutdown: ::cell::UnsafeCell { value: false },
};
) )

View File

@ -8,6 +8,8 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::*;
use sync::atomic;
use alloc::{mod, heap};
@ -21,8 +23,8 @@ pub struct Mutex { inner: atomic::AtomicUint }
pub const MUTEX_INIT: Mutex = Mutex { inner: atomic::INIT_ATOMIC_UINT };
#[inline]
pub unsafe fn raw(m: &super::Mutex) -> ffi::LPCRITICAL_SECTION {
m.0.get()
pub unsafe fn raw(m: &Mutex) -> ffi::LPCRITICAL_SECTION {
m.get()
}
impl Mutex {

View File

@ -89,8 +89,7 @@ use libc;
use c_str::CString;
use mem;
use ptr;
use sync::atomic;
use rustrt::mutex;
use sync::{atomic, Mutex};
use io::{mod, IoError, IoResult};
use prelude::*;
@ -126,7 +125,7 @@ impl Drop for Event {
struct Inner {
handle: libc::HANDLE,
lock: mutex::NativeMutex,
lock: Mutex<()>,
read_closed: atomic::AtomicBool,
write_closed: atomic::AtomicBool,
}
@ -135,7 +134,7 @@ impl Inner {
fn new(handle: libc::HANDLE) -> Inner {
Inner {
handle: handle,
lock: unsafe { mutex::NativeMutex::new() },
lock: Mutex::new(()),
read_closed: atomic::AtomicBool::new(false),
write_closed: atomic::AtomicBool::new(false),
}

View File

@ -19,28 +19,30 @@
// ignore-lexer-test FIXME #15679
use std::os;
use std::sync::{Arc, Future, Mutex};
use std::sync::{Arc, Future, Mutex, Condvar};
use std::time::Duration;
use std::uint;
// A poor man's pipe.
type pipe = Arc<Mutex<Vec<uint>>>;
type pipe = Arc<(Mutex<Vec<uint>>, Condvar)>;
fn send(p: &pipe, msg: uint) {
let mut arr = p.lock();
let &(ref lock, ref cond) = &**p;
let mut arr = lock.lock();
arr.push(msg);
arr.cond.signal();
cond.notify_one();
}
fn recv(p: &pipe) -> uint {
let mut arr = p.lock();
let &(ref lock, ref cond) = &**p;
let mut arr = lock.lock();
while arr.is_empty() {
arr.cond.wait();
cond.wait(&arr);
}
arr.pop().unwrap()
}
fn init() -> (pipe,pipe) {
let m = Arc::new(Mutex::new(Vec::new()));
let m = Arc::new((Mutex::new(Vec::new()), Condvar::new()));
((&m).clone(), m)
}

View File

@ -1,113 +0,0 @@
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// This test creates a bunch of tasks that simultaneously send to each
// other in a ring. The messages should all be basically
// independent.
// This is like msgsend-ring-pipes but adapted to use Arcs.
// This also serves as a pipes test, because Arcs are implemented with pipes.
// no-pretty-expanded FIXME #15189
// ignore-lexer-test FIXME #15679
use std::os;
use std::sync::{RWLock, Arc, Future};
use std::time::Duration;
use std::uint;
// A poor man's pipe.
type pipe = Arc<RWLock<Vec<uint>>>;
fn send(p: &pipe, msg: uint) {
let mut arr = p.write();
arr.push(msg);
arr.cond.signal();
}
fn recv(p: &pipe) -> uint {
let mut arr = p.write();
while arr.is_empty() {
arr.cond.wait();
}
arr.pop().unwrap()
}
fn init() -> (pipe,pipe) {
let x = Arc::new(RWLock::new(Vec::new()));
((&x).clone(), x)
}
fn thread_ring(i: uint, count: uint, num_chan: pipe, num_port: pipe) {
let mut num_chan = Some(num_chan);
let mut num_port = Some(num_port);
// Send/Receive lots of messages.
for j in range(0u, count) {
//println!("task %?, iter %?", i, j);
let num_chan2 = num_chan.take().unwrap();
let num_port2 = num_port.take().unwrap();
send(&num_chan2, i * j);
num_chan = Some(num_chan2);
let _n = recv(&num_port2);
//log(error, _n);
num_port = Some(num_port2);
};
}
fn main() {
let args = os::args();
let args = if os::getenv("RUST_BENCH").is_some() {
vec!("".to_string(), "100".to_string(), "10000".to_string())
} else if args.len() <= 1u {
vec!("".to_string(), "10".to_string(), "100".to_string())
} else {
args.clone().into_iter().collect()
};
let num_tasks = from_str::<uint>(args[1].as_slice()).unwrap();
let msg_per_task = from_str::<uint>(args[2].as_slice()).unwrap();
let (mut num_chan, num_port) = init();
let mut p = Some((num_chan, num_port));
let dur = Duration::span(|| {
let (mut num_chan, num_port) = p.take().unwrap();
// create the ring
let mut futures = Vec::new();
for i in range(1u, num_tasks) {
//println!("spawning %?", i);
let (new_chan, num_port) = init();
let num_chan_2 = num_chan.clone();
let new_future = Future::spawn(proc() {
thread_ring(i, msg_per_task, num_chan_2, num_port)
});
futures.push(new_future);
num_chan = new_chan;
};
// do our iteration
thread_ring(0, msg_per_task, num_chan, num_port);
// synchronize
for f in futures.iter_mut() {
let _ = f.get();
}
});
// all done, report stats.
let num_msgs = num_tasks * msg_per_task;
let rate = (num_msgs as f64) / (dur.num_milliseconds() as f64);
println!("Sent {} messages in {} ms", num_msgs, dur.num_milliseconds());
println!(" {} messages / second", rate / 1000.0);
println!(" {} μs / message", 1000000. / rate / 1000.0);
}