auto merge of #10083 : alexcrichton/rust/timer-port, r=pcwalton

In addition to being able to sleep the current task, timers should be able to
create ports which get notified after a period of time.

Closes #10014
This commit is contained in:
bors 2013-10-28 02:41:18 -07:00
commit de3d36a763
3 changed files with 170 additions and 6 deletions

View File

@ -8,6 +8,37 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
/*!
Synchronous Timers
This module exposes the functionality to create timers, block the current task,
and create ports which will receive notifications after a period of time.
# Example
```rust
use std::rt::io::Timer;
let mut timer = Timer::new().unwrap();
timer.sleep(10); // block the task for awhile
let timeout = timer.oneshot(10);
// do some work
timeout.recv(); // wait for the timeout to expire
let periodic = timer.periodic(10);
loop {
periodic.recv();
// this loop is only executed once every 10ms
}
```
*/
use comm::{Port, PortOne};
use option::{Option, Some, None};
use result::{Ok, Err};
use rt::io::io_error;
@ -25,9 +56,9 @@ pub fn sleep(msecs: u64) {
}
impl Timer {
/// Creates a new timer which can be used to put the current task to sleep
/// for a number of milliseconds.
/// for a number of milliseconds, or to possibly create channels which will
/// get notified after an amount of time has passed.
pub fn new() -> Option<Timer> {
do with_local_io |io| {
match io.timer_init() {
@ -42,20 +73,116 @@ impl Timer {
}
}
/// Blocks the current task for `msecs` milliseconds.
///
/// Note that this function will cause any other ports for this timer to be
/// invalidated (the other end will be closed).
pub fn sleep(&mut self, msecs: u64) {
self.obj.sleep(msecs);
}
/// Creates a oneshot port which will have a notification sent when `msecs`
/// milliseconds has elapsed. This does *not* block the current task, but
/// instead returns immediately.
///
/// Note that this invalidates any previous port which has been created by
/// this timer, and that the returned port will be invalidated once the
/// timer is destroyed (when it falls out of scope).
pub fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
self.obj.oneshot(msecs)
}
/// Creates a port which will have a continuous stream of notifications
/// being sent every `msecs` milliseconds. This does *not* block the
/// current task, but instead returns immediately. The first notification
/// will not be received immediately, but rather after `msec` milliseconds
/// have passed.
///
/// Note that this invalidates any previous port which has been created by
/// this timer, and that the returned port will be invalidated once the
/// timer is destroyed (when it falls out of scope).
pub fn periodic(&mut self, msecs: u64) -> Port<()> {
self.obj.period(msecs)
}
}
#[cfg(test)]
mod test {
use super::*;
use rt::test::*;
use cell::Cell;
use task;
#[test]
fn test_io_timer_sleep_simple() {
do run_in_mt_newsched_task {
let timer = Timer::new();
do timer.map |mut t| { t.sleep(1) };
let mut timer = Timer::new().unwrap();
timer.sleep(1);
}
}
#[test]
fn test_io_timer_sleep_oneshot() {
do run_in_mt_newsched_task {
let mut timer = Timer::new().unwrap();
timer.oneshot(1).recv();
}
}
#[test]
fn test_io_timer_sleep_oneshot_forget() {
do run_in_mt_newsched_task {
let mut timer = Timer::new().unwrap();
timer.oneshot(100000000000);
}
}
#[test]
fn oneshot_twice() {
do run_in_mt_newsched_task {
let mut timer = Timer::new().unwrap();
let port1 = timer.oneshot(100000000000);
let port = timer.oneshot(1);
port.recv();
let port1 = Cell::new(port1);
let ret = do task::try {
port1.take().recv();
};
assert!(ret.is_err());
}
}
#[test]
fn test_io_timer_oneshot_then_sleep() {
do run_in_mt_newsched_task {
let mut timer = Timer::new().unwrap();
let port = timer.oneshot(100000000000);
timer.sleep(1); // this should invalidate the port
let port = Cell::new(port);
let ret = do task::try {
port.take().recv();
};
assert!(ret.is_err());
}
}
#[test]
fn test_io_timer_sleep_periodic() {
do run_in_mt_newsched_task {
let mut timer = Timer::new().unwrap();
let port = timer.periodic(1);
port.recv();
port.recv();
port.recv();
}
}
#[test]
fn test_io_timer_sleep_periodic_forget() {
do run_in_mt_newsched_task {
let mut timer = Timer::new().unwrap();
timer.periodic(100000000000);
}
}

View File

@ -11,7 +11,7 @@
use libc;
use option::*;
use result::*;
use comm::SharedChan;
use comm::{SharedChan, PortOne, Port};
use libc::c_int;
use c_str::CString;
@ -162,6 +162,8 @@ pub trait RtioUdpSocket : RtioSocket {
pub trait RtioTimer {
fn sleep(&mut self, msecs: u64);
fn oneshot(&mut self, msecs: u64) -> PortOne<()>;
fn period(&mut self, msecs: u64) -> Port<()>;
}
pub trait RtioFileStream {

View File

@ -13,7 +13,7 @@ use cast::transmute;
use cast;
use cell::Cell;
use clone::Clone;
use comm::{SendDeferred, SharedChan};
use comm::{SendDeferred, SharedChan, Port, PortOne, GenericChan};
use libc::{c_int, c_uint, c_void, pid_t};
use ops::Drop;
use option::*;
@ -1474,6 +1474,41 @@ impl RtioTimer for UvTimer {
self_.watcher.stop();
}
}
fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
use comm::oneshot;
let (port, chan) = oneshot();
let chan = Cell::new(chan);
do self.home_for_io |self_| {
let chan = Cell::new(chan.take());
do self_.watcher.start(msecs, 0) |_, status| {
assert!(status.is_none());
assert!(!chan.is_empty());
chan.take().send_deferred(());
}
}
return port;
}
fn period(&mut self, msecs: u64) -> Port<()> {
use comm::stream;
let (port, chan) = stream();
let chan = Cell::new(chan);
do self.home_for_io |self_| {
let chan = Cell::new(chan.take());
do self_.watcher.start(msecs, msecs) |_, status| {
assert!(status.is_none());
do chan.with_ref |chan| {
chan.send_deferred(());
}
}
}
return port;
}
}
pub struct UvFileStream {