std: Deal with fallout of rtio changes

This commit is contained in:
Alex Crichton 2014-06-03 20:09:39 -07:00
parent 5ec36c358f
commit da2293c6f6
21 changed files with 452 additions and 209 deletions

View File

@ -27,10 +27,10 @@ use option::{Some, None, Option};
use owned::Box;
use result::{Ok, Err, Result};
use rt::local::Local;
use rt::mutex::NativeMutex;
use rt::task::{Task, BlockedTask};
use rt::thread::Thread;
use sync::atomics;
use unstable::mutex::NativeMutex;
use mpsc = sync::mpsc_queue;

View File

@ -43,10 +43,10 @@ use owned::Box;
use ptr::RawPtr;
use result::{Result, Ok, Err};
use rt::local::Local;
use rt::mutex::{NativeMutex, LockGuard};
use rt::task::{Task, BlockedTask};
use sync::atomics;
use ty::Unsafe;
use unstable::mutex::{NativeMutex, LockGuard};
use vec::Vec;
pub struct Packet<T> {

View File

@ -52,19 +52,22 @@ fs::unlink(&path);
use c_str::ToCStr;
use clone::Clone;
use container::Container;
use io;
use iter::Iterator;
use kinds::Send;
use super::{Reader, Writer, Seek};
use super::{SeekStyle, Read, Write, Open, IoError, Truncate};
use super::{FileMode, FileAccess, FileStat, IoResult, FilePermission};
use rt::rtio::{RtioFileStream, IoFactory, LocalIo};
use io;
use libc;
use option::{Some, None, Option};
use owned::Box;
use result::{Ok, Err};
use path;
use path::{Path, GenericPath};
use path;
use result::{Ok, Err};
use rt::rtio::{RtioFileStream, IoFactory, LocalIo};
use rt::rtio;
use slice::{OwnedVector, ImmutableVector};
use super::UnstableFileStat;
use super::{FileMode, FileAccess, FileStat, IoResult, FilePermission};
use super::{Reader, Writer, Seek, Append, SeekCur, SeekEnd, SeekSet};
use super::{SeekStyle, Read, Write, ReadWrite, Open, IoError, Truncate};
use vec::Vec;
/// Unconstrained file access type that exposes read and write operations
@ -126,6 +129,16 @@ impl File {
pub fn open_mode(path: &Path,
mode: FileMode,
access: FileAccess) -> IoResult<File> {
let mode = match mode {
Open => rtio::Open,
Append => rtio::Append,
Truncate => rtio::Truncate,
};
let access = match access {
Read => rtio::Read,
Write => rtio::Write,
ReadWrite => rtio::ReadWrite,
};
LocalIo::maybe_raise(|io| {
io.fs_open(&path.to_c_str(), mode, access).map(|fd| {
File {
@ -134,7 +147,7 @@ impl File {
last_nread: -1
}
})
})
}).map_err(IoError::from_rtio_error)
}
/// Attempts to open a file in read-only mode. This function is equivalent to
@ -184,7 +197,7 @@ impl File {
/// device. This will flush any internal buffers necessary to perform this
/// operation.
pub fn fsync(&mut self) -> IoResult<()> {
self.fd.fsync()
self.fd.fsync().map_err(IoError::from_rtio_error)
}
/// This function is similar to `fsync`, except that it may not synchronize
@ -192,7 +205,7 @@ impl File {
/// must synchronize content, but don't need the metadata on disk. The goal
/// of this method is to reduce disk operations.
pub fn datasync(&mut self) -> IoResult<()> {
self.fd.datasync()
self.fd.datasync().map_err(IoError::from_rtio_error)
}
/// Either truncates or extends the underlying file, updating the size of
@ -204,7 +217,7 @@ impl File {
/// will be extended to `size` and have all of the intermediate data filled
/// in with 0s.
pub fn truncate(&mut self, size: i64) -> IoResult<()> {
self.fd.truncate(size)
self.fd.truncate(size).map_err(IoError::from_rtio_error)
}
/// Tests whether this stream has reached EOF.
@ -217,7 +230,10 @@ impl File {
/// Queries information about the underlying file.
pub fn stat(&mut self) -> IoResult<FileStat> {
self.fd.fstat()
match self.fd.fstat() {
Ok(s) => Ok(from_rtio(s)),
Err(e) => Err(IoError::from_rtio_error(e)),
}
}
}
@ -243,7 +259,9 @@ impl File {
/// user lacks permissions to remove the file, or if some other filesystem-level
/// error occurs.
pub fn unlink(path: &Path) -> IoResult<()> {
LocalIo::maybe_raise(|io| io.fs_unlink(&path.to_c_str()))
LocalIo::maybe_raise(|io| {
io.fs_unlink(&path.to_c_str())
}).map_err(IoError::from_rtio_error)
}
/// Given a path, query the file system to get information about a file,
@ -268,9 +286,10 @@ pub fn unlink(path: &Path) -> IoResult<()> {
/// to perform a `stat` call on the given path or if there is no entry in the
/// filesystem at the provided path.
pub fn stat(path: &Path) -> IoResult<FileStat> {
LocalIo::maybe_raise(|io| {
io.fs_stat(&path.to_c_str())
})
match LocalIo::maybe_raise(|io| io.fs_stat(&path.to_c_str())) {
Ok(s) => Ok(from_rtio(s)),
Err(e) => Err(IoError::from_rtio_error(e)),
}
}
/// Perform the same operation as the `stat` function, except that this
@ -282,9 +301,46 @@ pub fn stat(path: &Path) -> IoResult<FileStat> {
///
/// See `stat`
pub fn lstat(path: &Path) -> IoResult<FileStat> {
LocalIo::maybe_raise(|io| {
io.fs_lstat(&path.to_c_str())
})
match LocalIo::maybe_raise(|io| io.fs_lstat(&path.to_c_str())) {
Ok(s) => Ok(from_rtio(s)),
Err(e) => Err(IoError::from_rtio_error(e)),
}
}
fn from_rtio(s: rtio::FileStat) -> FileStat {
let rtio::FileStat {
size, kind, perm, created, modified,
accessed, device, inode, rdev,
nlink, uid, gid, blksize, blocks, flags, gen
} = s;
FileStat {
size: size,
kind: match (kind as libc::c_int) & libc::S_IFMT {
libc::S_IFREG => io::TypeFile,
libc::S_IFDIR => io::TypeDirectory,
libc::S_IFIFO => io::TypeNamedPipe,
libc::S_IFBLK => io::TypeBlockSpecial,
libc::S_IFLNK => io::TypeSymlink,
_ => io::TypeUnknown,
},
perm: FilePermission::from_bits_truncate(perm as u32),
created: created,
modified: modified,
accessed: accessed,
unstable: UnstableFileStat {
device: device,
inode: inode,
rdev: rdev,
nlink: nlink,
uid: uid,
gid: gid,
blksize: blksize,
blocks: blocks,
flags: flags,
gen: gen,
},
}
}
/// Rename a file or directory to a new name.
@ -304,7 +360,9 @@ pub fn lstat(path: &Path) -> IoResult<FileStat> {
/// permissions to view the contents, or if some other intermittent I/O error
/// occurs.
pub fn rename(from: &Path, to: &Path) -> IoResult<()> {
LocalIo::maybe_raise(|io| io.fs_rename(&from.to_c_str(), &to.to_c_str()))
LocalIo::maybe_raise(|io| {
io.fs_rename(&from.to_c_str(), &to.to_c_str())
}).map_err(IoError::from_rtio_error)
}
/// Copies the contents of one file to another. This function will also
@ -382,25 +440,33 @@ pub fn copy(from: &Path, to: &Path) -> IoResult<()> {
/// Some possible error situations are not having the permission to
/// change the attributes of a file or the file not existing.
pub fn chmod(path: &Path, mode: io::FilePermission) -> IoResult<()> {
LocalIo::maybe_raise(|io| io.fs_chmod(&path.to_c_str(), mode))
LocalIo::maybe_raise(|io| {
io.fs_chmod(&path.to_c_str(), mode.bits() as uint)
}).map_err(IoError::from_rtio_error)
}
/// Change the user and group owners of a file at the specified path.
pub fn chown(path: &Path, uid: int, gid: int) -> IoResult<()> {
LocalIo::maybe_raise(|io| io.fs_chown(&path.to_c_str(), uid, gid))
LocalIo::maybe_raise(|io| {
io.fs_chown(&path.to_c_str(), uid, gid)
}).map_err(IoError::from_rtio_error)
}
/// Creates a new hard link on the filesystem. The `dst` path will be a
/// link pointing to the `src` path. Note that systems often require these
/// two paths to both be located on the same filesystem.
pub fn link(src: &Path, dst: &Path) -> IoResult<()> {
LocalIo::maybe_raise(|io| io.fs_link(&src.to_c_str(), &dst.to_c_str()))
LocalIo::maybe_raise(|io| {
io.fs_link(&src.to_c_str(), &dst.to_c_str())
}).map_err(IoError::from_rtio_error)
}
/// Creates a new symbolic link on the filesystem. The `dst` path will be a
/// symlink pointing to the `src` path.
pub fn symlink(src: &Path, dst: &Path) -> IoResult<()> {
LocalIo::maybe_raise(|io| io.fs_symlink(&src.to_c_str(), &dst.to_c_str()))
LocalIo::maybe_raise(|io| {
io.fs_symlink(&src.to_c_str(), &dst.to_c_str())
}).map_err(IoError::from_rtio_error)
}
/// Reads a symlink, returning the file that the symlink points to.
@ -412,7 +478,7 @@ pub fn symlink(src: &Path, dst: &Path) -> IoResult<()> {
pub fn readlink(path: &Path) -> IoResult<Path> {
LocalIo::maybe_raise(|io| {
Ok(Path::new(try!(io.fs_readlink(&path.to_c_str()))))
})
}).map_err(IoError::from_rtio_error)
}
/// Create a new, empty directory at the provided path
@ -433,7 +499,9 @@ pub fn readlink(path: &Path) -> IoResult<Path> {
/// This call will return an error if the user lacks permissions to make a new
/// directory at the provided path, or if the directory already exists.
pub fn mkdir(path: &Path, mode: FilePermission) -> IoResult<()> {
LocalIo::maybe_raise(|io| io.fs_mkdir(&path.to_c_str(), mode))
LocalIo::maybe_raise(|io| {
io.fs_mkdir(&path.to_c_str(), mode.bits() as uint)
}).map_err(IoError::from_rtio_error)
}
/// Remove an existing, empty directory
@ -453,7 +521,9 @@ pub fn mkdir(path: &Path, mode: FilePermission) -> IoResult<()> {
/// This call will return an error if the user lacks permissions to remove the
/// directory at the provided path, or if the directory isn't empty.
pub fn rmdir(path: &Path) -> IoResult<()> {
LocalIo::maybe_raise(|io| io.fs_rmdir(&path.to_c_str()))
LocalIo::maybe_raise(|io| {
io.fs_rmdir(&path.to_c_str())
}).map_err(IoError::from_rtio_error)
}
/// Retrieve a vector containing all entries within a provided directory
@ -492,7 +562,7 @@ pub fn readdir(path: &Path) -> IoResult<Vec<Path>> {
Ok(try!(io.fs_readdir(&path.to_c_str(), 0)).move_iter().map(|a| {
Path::new(a)
}).collect())
})
}).map_err(IoError::from_rtio_error)
}
/// Returns an iterator which will recursively walk the directory structure
@ -616,7 +686,9 @@ pub fn rmdir_recursive(path: &Path) -> IoResult<()> {
/// be in milliseconds.
// FIXME(#10301) these arguments should not be u64
pub fn change_file_times(path: &Path, atime: u64, mtime: u64) -> IoResult<()> {
LocalIo::maybe_raise(|io| io.fs_utime(&path.to_c_str(), atime, mtime))
LocalIo::maybe_raise(|io| {
io.fs_utime(&path.to_c_str(), atime, mtime)
}).map_err(IoError::from_rtio_error)
}
impl Reader for File {
@ -629,28 +701,35 @@ impl Reader for File {
_ => Ok(read as uint)
}
},
Err(e) => Err(e),
Err(e) => Err(IoError::from_rtio_error(e)),
}
}
}
impl Writer for File {
fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.fd.write(buf) }
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
self.fd.write(buf).map_err(IoError::from_rtio_error)
}
}
impl Seek for File {
fn tell(&self) -> IoResult<u64> {
self.fd.tell()
self.fd.tell().map_err(IoError::from_rtio_error)
}
fn seek(&mut self, pos: i64, style: SeekStyle) -> IoResult<()> {
let style = match style {
SeekSet => rtio::SeekSet,
SeekCur => rtio::SeekCur,
SeekEnd => rtio::SeekEnd,
};
match self.fd.seek(pos, style) {
Ok(_) => {
// successful seek resets EOF indicator
self.last_nread = -1;
Ok(())
}
Err(e) => Err(e),
Err(e) => Err(IoError::from_rtio_error(e)),
}
}
}

View File

@ -225,6 +225,7 @@ use option::{Option, Some, None};
use os;
use owned::Box;
use result::{Ok, Err, Result};
use rt::rtio;
use slice::{Vector, MutableVector, ImmutableVector};
use str::{StrSlice, StrAllocating};
use str;
@ -323,6 +324,14 @@ impl IoError {
libc::ERROR_BROKEN_PIPE => (EndOfFile, "the pipe has ended"),
libc::ERROR_OPERATION_ABORTED =>
(TimedOut, "operation timed out"),
libc::WSAEINVAL => (InvalidInput, "invalid argument"),
libc::ERROR_CALL_NOT_IMPLEMENTED =>
(IoUnavailable, "function not implemented"),
libc::ERROR_CALL_NOT_IMPLEMENTED =>
(MismatchedFileTypeForOperation,
"invalid handle provided to function"),
libc::ERROR_NOTHING_TO_TERMINATE =>
(InvalidInput, "no process to kill"),
// libuv maps this error code to EISDIR. we do too. if it is found
// to be incorrect, we can add in some more machinery to only
@ -351,9 +360,17 @@ impl IoError {
libc::EADDRINUSE => (ConnectionRefused, "address in use"),
libc::ENOENT => (FileNotFound, "no such file or directory"),
libc::EISDIR => (InvalidInput, "illegal operation on a directory"),
libc::ENOSYS => (IoUnavailable, "function not implemented"),
libc::EINVAL => (InvalidInput, "invalid argument"),
libc::ENOTTY =>
(MismatchedFileTypeForOperation,
"file descriptor is not a TTY"),
libc::ETIMEDOUT => (TimedOut, "operation timed out"),
libc::ECANCELED => (TimedOut, "operation aborted"),
// These two constants can have the same value on some systems, but
// different values on others, so we can't use a match clause
// These two constants can have the same value on some systems,
// but different values on others, so we can't use a match
// clause
x if x == libc::EAGAIN || x == libc::EWOULDBLOCK =>
(ResourceUnavailable, "resource temporarily unavailable"),
@ -382,6 +399,17 @@ impl IoError {
pub fn last_error() -> IoError {
IoError::from_errno(os::errno() as uint, true)
}
fn from_rtio_error(err: rtio::IoError) -> IoError {
let rtio::IoError { code, extra, detail } = err;
let mut ioerr = IoError::from_errno(code, false);
ioerr.detail = detail;
ioerr.kind = match ioerr.kind {
TimedOut if extra > 0 => ShortWrite(extra),
k => k,
};
return ioerr;
}
}
impl fmt::Show for IoError {

View File

@ -20,10 +20,12 @@ getaddrinfo()
#![allow(missing_doc)]
use iter::Iterator;
use io::IoResult;
use io::{IoResult, IoError};
use io::net::ip::{SocketAddr, IpAddr};
use option::{Option, Some, None};
use result::{Ok, Err};
use rt::rtio::{IoFactory, LocalIo};
use rt::rtio;
use vec::Vec;
/// Hints to the types of sockets that are desired when looking up hosts
@ -89,9 +91,34 @@ pub fn get_host_addresses(host: &str) -> IoResult<Vec<IpAddr>> {
///
/// FIXME: this is not public because the `Hint` structure is not ready for public
/// consumption just yet.
#[allow(unused_variable)]
fn lookup(hostname: Option<&str>, servname: Option<&str>, hint: Option<Hint>)
-> IoResult<Vec<Info>> {
LocalIo::maybe_raise(|io| io.get_host_addresses(hostname, servname, hint))
let hint = hint.map(|Hint { family, socktype, protocol, flags }| {
rtio::AddrinfoHint {
family: family,
socktype: 0, // FIXME: this should use the above variable
protocol: 0, // FIXME: this should use the above variable
flags: flags,
}
});
match LocalIo::maybe_raise(|io| {
io.get_host_addresses(hostname, servname, hint)
}) {
Ok(v) => Ok(v.move_iter().map(|info| {
Info {
address: SocketAddr {
ip: super::from_rtio(info.address.ip),
port: info.address.port,
},
family: info.family,
socktype: None, // FIXME: this should use the above variable
protocol: None, // FIXME: this should use the above variable
flags: info.flags,
}
}).collect()),
Err(e) => Err(IoError::from_rtio_error(e)),
}
}
// Ignored on android since we cannot give tcp/ip

View File

@ -10,6 +10,9 @@
//! Networking I/O
use rt::rtio;
use self::ip::{Ipv4Addr, Ipv6Addr, IpAddr};
pub use self::addrinfo::get_host_addresses;
pub mod addrinfo;
@ -18,3 +21,21 @@ pub mod udp;
pub mod ip;
// FIXME(#12093) - this should not be called unix
pub mod unix;
fn to_rtio(ip: IpAddr) -> rtio::IpAddr {
match ip {
Ipv4Addr(a, b, c, d) => rtio::Ipv4Addr(a, b, c, d),
Ipv6Addr(a, b, c, d, e, f, g, h) => {
rtio::Ipv6Addr(a, b, c, d, e, f, g, h)
}
}
}
fn from_rtio(ip: rtio::IpAddr) -> IpAddr {
match ip {
rtio::Ipv4Addr(a, b, c, d) => Ipv4Addr(a, b, c, d),
rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
Ipv6Addr(a, b, c, d, e, f, g, h)
}
}
}

View File

@ -32,6 +32,7 @@ use option::{None, Some, Option};
use owned::Box;
use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener};
use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
use rt::rtio;
/// A structure which represents a TCP stream between a local socket and a
/// remote socket.
@ -67,22 +68,22 @@ impl TcpStream {
Some(addr) => vec!(addr),
None => try!(get_host_addresses(host))
};
let mut err = IoError{
let mut err = IoError {
kind: ConnectionFailed,
desc: "no addresses found for hostname",
detail: None
};
for address in addresses.iter() {
let socket_addr = SocketAddr{ip: *address, port: port};
for addr in addresses.iter() {
let addr = rtio::SocketAddr{ ip: super::to_rtio(*addr), port: port };
let result = LocalIo::maybe_raise(|io| {
io.tcp_connect(socket_addr, None).map(TcpStream::new)
io.tcp_connect(addr, None).map(TcpStream::new)
});
match result {
Ok(stream) => {
return Ok(stream)
}
Err(connect_err) => {
err = connect_err
err = IoError::from_rtio_error(connect_err)
}
}
}
@ -101,19 +102,31 @@ impl TcpStream {
#[experimental = "the timeout argument may eventually change types"]
pub fn connect_timeout(addr: SocketAddr,
timeout_ms: u64) -> IoResult<TcpStream> {
let SocketAddr { ip, port } = addr;
let addr = rtio::SocketAddr { ip: super::to_rtio(ip), port: port };
LocalIo::maybe_raise(|io| {
io.tcp_connect(addr, Some(timeout_ms)).map(TcpStream::new)
})
}).map_err(IoError::from_rtio_error)
}
/// Returns the socket address of the remote peer of this TCP connection.
pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
self.obj.peer_name()
match self.obj.peer_name() {
Ok(rtio::SocketAddr { ip, port }) => {
Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
}
Err(e) => Err(IoError::from_rtio_error(e)),
}
}
/// Returns the socket address of the local half of this TCP connection.
pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
self.obj.socket_name()
match self.obj.socket_name() {
Ok(rtio::SocketAddr { ip, port }) => {
Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
}
Err(e) => Err(IoError::from_rtio_error(e)),
}
}
/// Sets the nodelay flag on this connection to the boolean specified
@ -123,7 +136,7 @@ impl TcpStream {
self.obj.nodelay()
} else {
self.obj.control_congestion()
}
}.map_err(IoError::from_rtio_error)
}
/// Sets the keepalive timeout to the timeout specified.
@ -136,7 +149,7 @@ impl TcpStream {
match delay_in_seconds {
Some(i) => self.obj.keepalive(i),
None => self.obj.letdie(),
}
}.map_err(IoError::from_rtio_error)
}
/// Closes the reading half of this connection.
@ -168,7 +181,9 @@ impl TcpStream {
///
/// Note that this method affects all cloned handles associated with this
/// stream, not just this one handle.
pub fn close_read(&mut self) -> IoResult<()> { self.obj.close_read() }
pub fn close_read(&mut self) -> IoResult<()> {
self.obj.close_read().map_err(IoError::from_rtio_error)
}
/// Closes the writing half of this connection.
///
@ -177,7 +192,9 @@ impl TcpStream {
///
/// Note that this method affects all cloned handles associated with this
/// stream, not just this one handle.
pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() }
pub fn close_write(&mut self) -> IoResult<()> {
self.obj.close_write().map_err(IoError::from_rtio_error)
}
/// Sets a timeout, in milliseconds, for blocking operations on this stream.
///
@ -261,11 +278,15 @@ impl Clone for TcpStream {
}
impl Reader for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) }
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
self.obj.read(buf).map_err(IoError::from_rtio_error)
}
}
impl Writer for TcpStream {
fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.obj.write(buf) }
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
self.obj.write(buf).map_err(IoError::from_rtio_error)
}
}
/// A structure representing a socket server. This listener is used to create a
@ -319,10 +340,13 @@ impl TcpListener {
pub fn bind(addr: &str, port: u16) -> IoResult<TcpListener> {
match FromStr::from_str(addr) {
Some(ip) => {
let socket_addr = SocketAddr{ip: ip, port: port};
let addr = rtio::SocketAddr{
ip: super::to_rtio(ip),
port: port,
};
LocalIo::maybe_raise(|io| {
io.tcp_bind(socket_addr).map(|l| TcpListener { obj: l })
})
io.tcp_bind(addr).map(|l| TcpListener { obj: l })
}).map_err(IoError::from_rtio_error)
}
None => {
Err(IoError{
@ -336,13 +360,21 @@ impl TcpListener {
/// Returns the local socket address of this listener.
pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
self.obj.socket_name()
match self.obj.socket_name() {
Ok(rtio::SocketAddr { ip, port }) => {
Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
}
Err(e) => Err(IoError::from_rtio_error(e)),
}
}
}
impl Listener<TcpStream, TcpAcceptor> for TcpListener {
fn listen(self) -> IoResult<TcpAcceptor> {
self.obj.listen().map(|acceptor| TcpAcceptor { obj: acceptor })
match self.obj.listen() {
Ok(acceptor) => Ok(TcpAcceptor { obj: acceptor }),
Err(e) => Err(IoError::from_rtio_error(e)),
}
}
}
@ -403,7 +435,10 @@ impl TcpAcceptor {
impl Acceptor<TcpStream> for TcpAcceptor {
fn accept(&mut self) -> IoResult<TcpStream> {
self.obj.accept().map(TcpStream::new)
match self.obj.accept(){
Ok(s) => Ok(TcpStream::new(s)),
Err(e) => Err(IoError::from_rtio_error(e)),
}
}
}

View File

@ -17,12 +17,13 @@
use clone::Clone;
use io::net::ip::{SocketAddr, IpAddr};
use io::{Reader, Writer, IoResult};
use io::{Reader, Writer, IoResult, IoError};
use kinds::Send;
use owned::Box;
use option::Option;
use result::{Ok, Err};
use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory, LocalIo};
use rt::rtio;
/// A User Datagram Protocol socket.
///
@ -62,22 +63,32 @@ pub struct UdpSocket {
impl UdpSocket {
/// Creates a UDP socket from the given socket address.
pub fn bind(addr: SocketAddr) -> IoResult<UdpSocket> {
let SocketAddr { ip, port } = addr;
LocalIo::maybe_raise(|io| {
let addr = rtio::SocketAddr { ip: super::to_rtio(ip), port: port };
io.udp_bind(addr).map(|s| UdpSocket { obj: s })
})
}).map_err(IoError::from_rtio_error)
}
/// Receives data from the socket. On success, returns the number of bytes
/// read and the address from whence the data came.
pub fn recvfrom(&mut self, buf: &mut [u8])
-> IoResult<(uint, SocketAddr)> {
self.obj.recvfrom(buf)
match self.obj.recvfrom(buf) {
Ok((amt, rtio::SocketAddr { ip, port })) => {
Ok((amt, SocketAddr { ip: super::from_rtio(ip), port: port }))
}
Err(e) => Err(IoError::from_rtio_error(e)),
}
}
/// Sends data on the socket to the given address. Returns nothing on
/// success.
pub fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> IoResult<()> {
self.obj.sendto(buf, dst)
self.obj.sendto(buf, rtio::SocketAddr {
ip: super::to_rtio(dst.ip),
port: dst.port,
}).map_err(IoError::from_rtio_error)
}
/// Creates a `UdpStream`, which allows use of the `Reader` and `Writer`
@ -95,19 +106,24 @@ impl UdpSocket {
/// Returns the socket address that this socket was created from.
pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
self.obj.socket_name()
match self.obj.socket_name() {
Ok(a) => Ok(SocketAddr { ip: super::from_rtio(a.ip), port: a.port }),
Err(e) => Err(IoError::from_rtio_error(e))
}
}
/// Joins a multicast IP address (becomes a member of it)
#[experimental]
pub fn join_multicast(&mut self, multi: IpAddr) -> IoResult<()> {
self.obj.join_multicast(multi)
let e = self.obj.join_multicast(super::to_rtio(multi));
e.map_err(IoError::from_rtio_error)
}
/// Leaves a multicast IP address (drops membership from it)
#[experimental]
pub fn leave_multicast(&mut self, multi: IpAddr) -> IoResult<()> {
self.obj.leave_multicast(multi)
let e = self.obj.leave_multicast(super::to_rtio(multi));
e.map_err(IoError::from_rtio_error)
}
/// Set the multicast loop flag to the specified value
@ -119,19 +135,19 @@ impl UdpSocket {
self.obj.loop_multicast_locally()
} else {
self.obj.dont_loop_multicast_locally()
}
}.map_err(IoError::from_rtio_error)
}
/// Sets the multicast TTL
#[experimental]
pub fn set_multicast_ttl(&mut self, ttl: int) -> IoResult<()> {
self.obj.multicast_time_to_live(ttl)
self.obj.multicast_time_to_live(ttl).map_err(IoError::from_rtio_error)
}
/// Sets this socket's TTL
#[experimental]
pub fn set_ttl(&mut self, ttl: int) -> IoResult<()> {
self.obj.time_to_live(ttl)
self.obj.time_to_live(ttl).map_err(IoError::from_rtio_error)
}
/// Sets the broadcast flag on or off
@ -141,7 +157,7 @@ impl UdpSocket {
self.obj.hear_broadcasts()
} else {
self.obj.ignore_broadcasts()
}
}.map_err(IoError::from_rtio_error)
}
/// Sets the read/write timeout for this socket.

View File

@ -28,7 +28,7 @@ use prelude::*;
use c_str::ToCStr;
use clone::Clone;
use io::{Listener, Acceptor, Reader, Writer, IoResult};
use io::{Listener, Acceptor, Reader, Writer, IoResult, IoError};
use kinds::Send;
use owned::Box;
use rt::rtio::{IoFactory, LocalIo, RtioUnixListener};
@ -58,7 +58,7 @@ impl UnixStream {
pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> {
LocalIo::maybe_raise(|io| {
io.unix_connect(&path.to_c_str(), None).map(|p| UnixStream { obj: p })
})
}).map_err(IoError::from_rtio_error)
}
/// Connect to a pipe named by `path`, timing out if the specified number of
@ -72,7 +72,7 @@ impl UnixStream {
LocalIo::maybe_raise(|io| {
let s = io.unix_connect(&path.to_c_str(), Some(timeout_ms));
s.map(|p| UnixStream { obj: p })
})
}).map_err(IoError::from_rtio_error)
}
@ -83,7 +83,9 @@ impl UnixStream {
///
/// Note that this method affects all cloned handles associated with this
/// stream, not just this one handle.
pub fn close_read(&mut self) -> IoResult<()> { self.obj.close_read() }
pub fn close_read(&mut self) -> IoResult<()> {
self.obj.close_read().map_err(IoError::from_rtio_error)
}
/// Closes the writing half of this connection.
///
@ -92,7 +94,9 @@ impl UnixStream {
///
/// Note that this method affects all cloned handles associated with this
/// stream, not just this one handle.
pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() }
pub fn close_write(&mut self) -> IoResult<()> {
self.obj.close_write().map_err(IoError::from_rtio_error)
}
/// Sets the read/write timeout for this socket.
///
@ -126,11 +130,15 @@ impl Clone for UnixStream {
}
impl Reader for UnixStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) }
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
self.obj.read(buf).map_err(IoError::from_rtio_error)
}
}
impl Writer for UnixStream {
fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.obj.write(buf) }
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
self.obj.write(buf).map_err(IoError::from_rtio_error)
}
}
/// A value that can listen for incoming named pipe connection requests.
@ -165,13 +173,15 @@ impl UnixListener {
pub fn bind<P: ToCStr>(path: &P) -> IoResult<UnixListener> {
LocalIo::maybe_raise(|io| {
io.unix_bind(&path.to_c_str()).map(|s| UnixListener { obj: s })
})
}).map_err(IoError::from_rtio_error)
}
}
impl Listener<UnixStream, UnixAcceptor> for UnixListener {
fn listen(self) -> IoResult<UnixAcceptor> {
self.obj.listen().map(|obj| UnixAcceptor { obj: obj })
self.obj.listen().map(|obj| {
UnixAcceptor { obj: obj }
}).map_err(IoError::from_rtio_error)
}
}
@ -202,7 +212,9 @@ impl UnixAcceptor {
impl Acceptor<UnixStream> for UnixAcceptor {
fn accept(&mut self) -> IoResult<UnixStream> {
self.obj.accept().map(|s| UnixStream { obj: s })
self.obj.accept().map(|s| {
UnixStream { obj: s }
}).map_err(IoError::from_rtio_error)
}
}

View File

@ -16,7 +16,7 @@
#![allow(missing_doc)]
use prelude::*;
use io::IoResult;
use io::{IoResult, IoError};
use libc;
use owned::Box;
use rt::rtio::{RtioPipe, LocalIo};
@ -51,7 +51,7 @@ impl PipeStream {
pub fn open(fd: libc::c_int) -> IoResult<PipeStream> {
LocalIo::maybe_raise(|io| {
io.pipe_open(fd).map(|obj| PipeStream { obj: obj })
})
}).map_err(IoError::from_rtio_error)
}
#[doc(hidden)]
@ -67,11 +67,15 @@ impl Clone for PipeStream {
}
impl Reader for PipeStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) }
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
self.obj.read(buf).map_err(IoError::from_rtio_error)
}
}
impl Writer for PipeStream {
fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.obj.write(buf) }
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
self.obj.write(buf).map_err(IoError::from_rtio_error)
}
}
#[cfg(test)]

View File

@ -16,12 +16,13 @@ use prelude::*;
use str;
use fmt;
use io::IoResult;
use io::{IoResult, IoError};
use io;
use libc;
use mem;
use owned::Box;
use rt::rtio::{RtioProcess, ProcessConfig, IoFactory, LocalIo};
use rt::rtio;
use c_str::CString;
/// Signal a process to exit, without forcibly killing it. Corresponds to
@ -232,16 +233,25 @@ impl Command {
/// Executes the command as a child process, which is returned.
pub fn spawn(&self) -> IoResult<Process> {
fn to_rtio(p: StdioContainer) -> rtio::StdioContainer {
match p {
Ignored => rtio::Ignored,
InheritFd(fd) => rtio::InheritFd(fd),
CreatePipe(a, b) => rtio::CreatePipe(a, b),
}
}
let extra_io: Vec<rtio::StdioContainer> =
self.extra_io.iter().map(|x| to_rtio(*x)).collect();
LocalIo::maybe_raise(|io| {
let cfg = ProcessConfig {
program: &self.program,
args: self.args.as_slice(),
env: self.env.as_ref().map(|env| env.as_slice()),
cwd: self.cwd.as_ref(),
stdin: self.stdin,
stdout: self.stdout,
stderr: self.stderr,
extra_io: self.extra_io.as_slice(),
stdin: to_rtio(self.stdin),
stdout: to_rtio(self.stdout),
stderr: to_rtio(self.stderr),
extra_io: extra_io.as_slice(),
uid: self.uid,
gid: self.gid,
detach: self.detach,
@ -258,7 +268,7 @@ impl Command {
extra_io: io.collect(),
}
})
})
}).map_err(IoError::from_rtio_error)
}
/// Executes the command as a child process, waiting for it to finish and
@ -393,7 +403,9 @@ impl Process {
/// be successfully delivered if the child has exited, but not yet been
/// reaped.
pub fn kill(id: libc::pid_t, signal: int) -> IoResult<()> {
LocalIo::maybe_raise(|io| io.kill(id, signal))
LocalIo::maybe_raise(|io| {
io.kill(id, signal)
}).map_err(IoError::from_rtio_error)
}
/// Returns the process id of this child process
@ -415,7 +427,7 @@ impl Process {
///
/// If the signal delivery fails, the corresponding error is returned.
pub fn signal(&mut self, signal: int) -> IoResult<()> {
self.handle.kill(signal)
self.handle.kill(signal).map_err(IoError::from_rtio_error)
}
/// Sends a signal to this child requesting that it exits. This is
@ -442,7 +454,11 @@ impl Process {
/// `set_timeout` and the timeout expires before the child exits.
pub fn wait(&mut self) -> IoResult<ProcessExit> {
drop(self.stdin.take());
self.handle.wait()
match self.handle.wait() {
Ok(rtio::ExitSignal(s)) => Ok(ExitSignal(s)),
Ok(rtio::ExitStatus(s)) => Ok(ExitStatus(s)),
Err(e) => Err(IoError::from_rtio_error(e)),
}
}
/// Sets a timeout, in milliseconds, for future calls to wait().

View File

@ -28,7 +28,7 @@ use mem::drop;
use option::{Some, None};
use owned::Box;
use result::{Ok, Err};
use rt::rtio::{IoFactory, LocalIo, RtioSignal};
use rt::rtio::{IoFactory, LocalIo, RtioSignal, Callback};
use slice::ImmutableVector;
use vec::Vec;
@ -122,17 +122,28 @@ impl Listener {
/// If this function fails to register a signal handler, then an error will
/// be returned.
pub fn register(&mut self, signum: Signum) -> io::IoResult<()> {
struct SignalCallback {
signum: Signum,
tx: Sender<Signum>,
}
impl Callback for SignalCallback {
fn call(&mut self) { self.tx.send(self.signum) }
}
if self.handles.iter().any(|&(sig, _)| sig == signum) {
return Ok(()); // self is already listening to signum, so succeed
}
match LocalIo::maybe_raise(|io| {
io.signal(signum, self.tx.clone())
io.signal(signum as int, box SignalCallback {
signum: signum,
tx: self.tx.clone(),
})
}) {
Ok(handle) => {
self.handles.push((signum, handle));
Ok(())
}
Err(e) => Err(e)
Err(e) => Err(io::IoError::from_rtio_error(e))
}
}

View File

@ -27,20 +27,19 @@ out.write(bytes!("Hello, world!"));
*/
use failure::local_stderr;
use fmt;
use io::{Reader, Writer, IoResult, IoError, OtherIoError,
standard_error, EndOfFile, LineBufferedWriter, BufferedReader};
use libc;
use kinds::Send;
use mem::replace;
use libc;
use option::{Option, Some, None};
use owned::Box;
use prelude::drop;
use result::{Ok, Err};
use rt;
use rt::local::Local;
use rt::rtio::{DontClose, IoFactory, LocalIo, RtioFileStream, RtioTTY};
use rt::task::Task;
use rt::rtio::{DontClose, IoFactory, LocalIo, RtioFileStream, RtioTTY};
use str::StrSlice;
// And so begins the tale of acquiring a uv handle to a stdio stream on all
@ -82,9 +81,11 @@ fn src<T>(fd: libc::c_int, readable: bool, f: |StdSource| -> T) -> T {
Ok(tty) => f(TTY(tty)),
Err(_) => f(File(io.fs_from_raw_fd(fd, DontClose))),
})
}).unwrap()
}).map_err(IoError::from_rtio_error).unwrap()
}
local_data_key!(local_stdout: Box<Writer:Send>)
/// Creates a new non-blocking handle to the stdin of the current process.
///
/// The returned handled is buffered by default with a `BufferedReader`. If
@ -154,22 +155,6 @@ pub fn stderr_raw() -> StdWriter {
src(libc::STDERR_FILENO, false, |src| StdWriter { inner: src })
}
fn reset_helper(w: Box<Writer:Send>,
f: |&mut Task, Box<Writer:Send>| -> Option<Box<Writer:Send>>)
-> Option<Box<Writer:Send>> {
let mut t = Local::borrow(None::<Task>);
// Be sure to flush any pending output from the writer
match f(&mut *t, w) {
Some(mut w) => {
drop(t);
// FIXME: is failing right here?
w.flush().unwrap();
Some(w)
}
None => None
}
}
/// Resets the task-local stdout handle to the specified writer
///
/// This will replace the current task's stdout handle, returning the old
@ -179,7 +164,10 @@ fn reset_helper(w: Box<Writer:Send>,
/// Note that this does not need to be called for all new tasks; the default
/// output handle is to the process's stdout stream.
pub fn set_stdout(stdout: Box<Writer:Send>) -> Option<Box<Writer:Send>> {
reset_helper(stdout, |t, w| replace(&mut t.stdout, Some(w)))
local_stdout.replace(Some(stdout)).and_then(|mut s| {
let _ = s.flush();
Some(s)
})
}
/// Resets the task-local stderr handle to the specified writer
@ -191,7 +179,10 @@ pub fn set_stdout(stdout: Box<Writer:Send>) -> Option<Box<Writer:Send>> {
/// Note that this does not need to be called for all new tasks; the default
/// output handle is to the process's stderr stream.
pub fn set_stderr(stderr: Box<Writer:Send>) -> Option<Box<Writer:Send>> {
reset_helper(stderr, |t, w| replace(&mut t.stderr, Some(w)))
local_stderr.replace(Some(stderr)).and_then(|mut s| {
let _ = s.flush();
Some(s)
})
}
// Helper to access the local task's stdout handle
@ -204,42 +195,18 @@ pub fn set_stderr(stderr: Box<Writer:Send>) -> Option<Box<Writer:Send>> {
// // io1 aliases io2
// })
// })
fn with_task_stdout(f: |&mut Writer| -> IoResult<()> ) {
let task: Option<Box<Task>> = Local::try_take();
let result = match task {
Some(mut task) => {
// Printing may run arbitrary code, so ensure that the task is in
// TLS to allow all std services. Note that this means a print while
// printing won't use the task's normal stdout handle, but this is
// necessary to ensure safety (no aliasing).
let mut my_stdout = task.stdout.take();
Local::put(task);
if my_stdout.is_none() {
my_stdout = Some(box stdout() as Box<Writer:Send>);
}
let ret = f(*my_stdout.get_mut_ref());
// Note that we need to be careful when putting the stdout handle
// back into the task. If the handle was set to `Some` while
// printing, then we can run aribitrary code when destroying the
// previous handle. This means that the local task needs to be in
// TLS while we do this.
//
// To protect against this, we do a little dance in which we
// temporarily take the task, swap the handles, put the task in TLS,
// and only then drop the previous handle.
let prev = replace(&mut Local::borrow(None::<Task>).stdout, my_stdout);
drop(prev);
ret
}
None => {
let mut io = rt::Stdout;
f(&mut io as &mut Writer)
}
fn with_task_stdout(f: |&mut Writer| -> IoResult<()>) {
let result = if Local::exists(None::<Task>) {
let mut my_stdout = local_stdout.replace(None).unwrap_or_else(|| {
box stdout() as Box<Writer:Send>
});
let result = f(my_stdout);
local_stdout.replace(Some(my_stdout));
result
} else {
let mut io = rt::Stdout;
f(&mut io as &mut Writer)
};
match result {
Ok(()) => {}
Err(e) => fail!("failed printing to stdout: {}", e),
@ -311,7 +278,7 @@ impl Reader for StdReader {
tty.read(buf)
},
File(ref mut file) => file.read(buf).map(|i| i as uint),
};
}.map_err(IoError::from_rtio_error);
match ret {
// When reading a piped stdin, libuv will return 0-length reads when
// stdin reaches EOF. For pretty much all other streams it will
@ -342,7 +309,9 @@ impl StdWriter {
/// connected to a TTY instance, or if querying the TTY instance fails.
pub fn winsize(&mut self) -> IoResult<(int, int)> {
match self.inner {
TTY(ref mut tty) => tty.get_winsize(),
TTY(ref mut tty) => {
tty.get_winsize().map_err(IoError::from_rtio_error)
}
File(..) => {
Err(IoError {
kind: OtherIoError,
@ -362,7 +331,9 @@ impl StdWriter {
/// connected to a TTY instance, or if querying the TTY instance fails.
pub fn set_raw(&mut self, raw: bool) -> IoResult<()> {
match self.inner {
TTY(ref mut tty) => tty.set_raw(raw),
TTY(ref mut tty) => {
tty.set_raw(raw).map_err(IoError::from_rtio_error)
}
File(..) => {
Err(IoError {
kind: OtherIoError,
@ -387,7 +358,7 @@ impl Writer for StdWriter {
match self.inner {
TTY(ref mut tty) => tty.write(buf),
File(ref mut file) => file.write(buf),
}
}.map_err(IoError::from_rtio_error)
}
}
@ -413,12 +384,13 @@ mod tests {
})
iotest!(fn capture_stderr() {
use io::{ChanReader, ChanWriter};
use realstd::comm::channel;
use realstd::io::{Writer, ChanReader, ChanWriter, Reader};
let (tx, rx) = channel();
let (mut r, w) = (ChanReader::new(rx), ChanWriter::new(tx));
spawn(proc() {
set_stderr(box w);
::realstd::io::stdio::set_stderr(box w);
fail!("my special message");
});
let s = r.read_to_str().unwrap();

View File

@ -17,11 +17,11 @@ and create receivers which will receive notifications after a period of time.
*/
use comm::Receiver;
use io::IoResult;
use comm::{Receiver, Sender, channel};
use io::{IoResult, IoError};
use kinds::Send;
use owned::Box;
use rt::rtio::{IoFactory, LocalIo, RtioTimer};
use rt::rtio::{IoFactory, LocalIo, RtioTimer, Callback};
/// A synchronous timer object
///
@ -67,6 +67,8 @@ pub struct Timer {
obj: Box<RtioTimer:Send>,
}
struct TimerCallback { tx: Sender<()> }
/// Sleep the current task for `msecs` milliseconds.
pub fn sleep(msecs: u64) {
let timer = Timer::new();
@ -80,7 +82,9 @@ impl Timer {
/// for a number of milliseconds, or to possibly create channels which will
/// get notified after an amount of time has passed.
pub fn new() -> IoResult<Timer> {
LocalIo::maybe_raise(|io| io.timer_init().map(|t| Timer { obj: t }))
LocalIo::maybe_raise(|io| {
io.timer_init().map(|t| Timer { obj: t })
}).map_err(IoError::from_rtio_error)
}
/// Blocks the current task for `msecs` milliseconds.
@ -99,7 +103,9 @@ impl Timer {
/// by this timer, and that the returned receiver will be invalidated once
/// the timer is destroyed (when it falls out of scope).
pub fn oneshot(&mut self, msecs: u64) -> Receiver<()> {
self.obj.oneshot(msecs)
let (tx, rx) = channel();
self.obj.oneshot(msecs, box TimerCallback { tx: tx });
return rx
}
/// Creates a receiver which will have a continuous stream of notifications
@ -112,7 +118,15 @@ impl Timer {
/// by this timer, and that the returned receiver will be invalidated once
/// the timer is destroyed (when it falls out of scope).
pub fn periodic(&mut self, msecs: u64) -> Receiver<()> {
self.obj.period(msecs)
let (tx, rx) = channel();
self.obj.period(msecs, box TimerCallback { tx: tx });
return rx
}
}
impl Callback for TimerCallback {
fn call(&mut self) {
let _ = self.tx.send_opt(());
}
}

View File

@ -103,8 +103,8 @@
html_favicon_url = "http://www.rust-lang.org/favicon.ico",
html_root_url = "http://doc.rust-lang.org/",
html_playground_url = "http://play.rust-lang.org/")]
#![feature(macro_rules, globs, asm, managed_boxes, thread_local, link_args,
linkage, default_type_params, phase, concat_idents, quad_precision_float)]
#![feature(macro_rules, globs, managed_boxes,
linkage, default_type_params, phase)]
// Don't link to std. We are std.
#![no_std]
@ -123,9 +123,10 @@
extern crate alloc;
extern crate core;
extern crate libc;
extern crate core_rand = "rand";
extern crate core_collections = "collections";
extern crate core_rand = "rand";
extern crate libc;
extern crate rustrt;
// Make std testable by not duplicating lang items. See #2912
#[cfg(test)] extern crate realstd = "std";
@ -168,6 +169,9 @@ pub use core_collections::str;
pub use core_collections::string;
pub use core_collections::vec;
pub use rustrt::c_str;
pub use rustrt::local_data;
// Run tests with libgreen instead of libnative.
//
// FIXME: This egregiously hacks around starting the test runner in a different
@ -231,19 +235,16 @@ pub mod collections;
pub mod task;
pub mod comm;
pub mod local_data;
pub mod sync;
/* Runtime and platform support */
pub mod c_str;
pub mod c_vec;
pub mod os;
pub mod io;
pub mod path;
pub mod fmt;
pub mod cleanup;
// Private APIs
#[unstable]
@ -253,6 +254,7 @@ pub mod unstable;
// but name resolution doesn't work without it being pub.
#[unstable]
pub mod rt;
mod failure;
#[doc(hidden)]
pub fn issue_14344_workaround() { // FIXME #14344 force linkage to happen correctly

View File

@ -189,7 +189,7 @@ Accessing environment variables is not generally threadsafe.
Serialize access through a global lock.
*/
fn with_env_lock<T>(f: || -> T) -> T {
use unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
use rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
static mut lock: StaticNativeMutex = NATIVE_MUTEX_INIT;

View File

@ -13,7 +13,7 @@
//! necessary for running libstd.
// All platforms need to link to rustrt
#[link(name = "rustrt", kind = "static")]
#[link(name = "rust_builtin", kind = "static")]
extern {}
// LLVM implements the `frem` instruction as a call to `fmod`, which lives in

View File

@ -63,7 +63,7 @@ use ptr;
use rt::heap::{allocate, deallocate};
use slice::ImmutableVector;
use sync::atomics::{AtomicInt, AtomicPtr, SeqCst};
use unstable::sync::Exclusive;
use rt::exclusive::Exclusive;
use vec::Vec;
// Once the queue is less than 1/K full, then it will be downsized. Note that
@ -121,7 +121,7 @@ pub enum Stolen<T> {
/// will only use this structure when allocating a new buffer or deallocating a
/// previous one.
pub struct BufferPool<T> {
pool: Exclusive<Vec<Box<Buffer<T>>>>,
pool: Arc<Exclusive<Vec<Box<Buffer<T>>>>>,
}
/// An internal buffer used by the chase-lev deque. This structure is actually
@ -148,7 +148,7 @@ impl<T: Send> BufferPool<T> {
/// Allocates a new buffer pool which in turn can be used to allocate new
/// deques.
pub fn new() -> BufferPool<T> {
BufferPool { pool: Exclusive::new(vec!()) }
BufferPool { pool: Arc::new(Exclusive::new(vec!())) }
}
/// Allocates a new work-stealing deque which will send/receiving memory to
@ -162,25 +162,21 @@ impl<T: Send> BufferPool<T> {
fn alloc(&self, bits: int) -> Box<Buffer<T>> {
unsafe {
self.pool.with(|pool| {
match pool.iter().position(|x| x.size() >= (1 << bits)) {
Some(i) => pool.remove(i).unwrap(),
None => box Buffer::new(bits)
}
})
let mut pool = self.pool.lock();
match pool.iter().position(|x| x.size() >= (1 << bits)) {
Some(i) => pool.remove(i).unwrap(),
None => box Buffer::new(bits)
}
}
}
fn free(&self, buf: Box<Buffer<T>>) {
unsafe {
let mut buf = Some(buf);
self.pool.with(|pool| {
let buf = buf.take_unwrap();
match pool.iter().position(|v| v.size() > buf.size()) {
Some(i) => pool.insert(i, buf),
None => pool.push(buf),
}
})
let mut pool = self.pool.lock();
match pool.iter().position(|v| v.size() > buf.size()) {
Some(i) => pool.insert(i, buf),
None => pool.push(buf),
}
}
}
}

View File

@ -38,12 +38,13 @@
use any::Any;
use comm::{Sender, Receiver, channel};
use io::Writer;
use io::{Writer, stdio};
use kinds::{Send, marker};
use option::{None, Some, Option};
use owned::Box;
use result::{Result, Ok, Err};
use rt::local::Local;
use rt::task;
use rt::task::Task;
use str::{Str, SendStr, IntoMaybeOwned};
@ -53,18 +54,10 @@ use str::{Str, SendStr, IntoMaybeOwned};
#[cfg(test)] use str::StrAllocating;
#[cfg(test)] use string::String;
/// Indicates the manner in which a task exited.
///
/// A task that completes without failing is considered to exit successfully.
///
/// If you wish for this result's delivery to block until all
/// children tasks complete, recommend using a result future.
pub type TaskResult = Result<(), Box<Any:Send>>;
/// Task configuration options
pub struct TaskOpts {
/// Enable lifecycle notifications on the given channel
pub notify_chan: Option<Sender<TaskResult>>,
pub notify_chan: Option<Sender<task::Result>>,
/// A name for the task-to-be, for identification in failure messages
pub name: Option<SendStr>,
/// The size of the stack for the spawned task
@ -114,7 +107,7 @@ impl TaskBuilder {
///
/// # Failure
/// Fails if a future_result was already set for this task.
pub fn future_result(&mut self) -> Receiver<TaskResult> {
pub fn future_result(&mut self) -> Receiver<task::Result> {
// FIXME (#3725): Once linked failure and notification are
// handled in the library, I can imagine implementing this by just
// registering an arbitrary number of task::on_exit handlers and
@ -180,7 +173,28 @@ impl TaskBuilder {
Some(t) => t,
None => fail!("need a local task to spawn a new task"),
};
t.spawn_sibling(self.opts, f);
let TaskOpts { notify_chan, name, stack_size, stdout, stderr } = self.opts;
let opts = task::TaskOpts {
on_exit: notify_chan.map(|c| proc(r) c.send(r)),
name: name,
stack_size: stack_size,
};
if stdout.is_some() || stderr.is_some() {
t.spawn_sibling(opts, proc() {
match stdout {
Some(handle) => { let _ = stdio::set_stdout(handle); }
None => {}
}
match stderr {
Some(handle) => { let _ = stdio::set_stderr(handle); }
None => {}
}
f();
});
} else {
t.spawn_sibling(opts, f);
}
}
/**

View File

@ -224,7 +224,7 @@ pub mod dl {
}
pub fn check_for_errors_in<T>(f: || -> T) -> Result<T, String> {
use unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
use rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
static mut lock: StaticNativeMutex = NATIVE_MUTEX_INIT;
unsafe {
// dlerror isn't thread safe, so we need to lock around this entire

View File

@ -11,7 +11,3 @@
#![doc(hidden)]
pub mod dynamic_lib;
pub mod sync;
pub mod mutex;