std: UvFileStream implements HomingIO + .home_for_io() wrapper usage

This commit is contained in:
Jeff Olson 2013-08-21 10:31:51 -07:00
parent c3a819b01c
commit 10ff5355b3

View File

@ -460,11 +460,10 @@ impl IoFactory for UvIoFactory {
}
fn fs_from_raw_fd(&mut self, fd: c_int, close_on_drop: bool) -> ~RtioFileStream {
~UvFileStream {
loop_: Loop{handle:self.uv_loop().native_handle()},
fd: file::FileDescriptor(fd),
close_on_drop: close_on_drop,
} as ~RtioFileStream
let loop_ = Loop {handle: self.uv_loop().native_handle()};
let fd = file::FileDescriptor(fd);
let home = get_handle_to_current_scheduler!();
~UvFileStream::new(loop_, fd, close_on_drop, home) as ~RtioFileStream
}
fn fs_open<P: PathLike>(&mut self, path: &P, flags: int, mode: int)
@ -480,10 +479,11 @@ impl IoFactory for UvIoFactory {
let path = path_cell.take();
do file::FileDescriptor::open(loop_, path, flags, mode) |req,err| {
if err.is_none() {
let res = Ok(~UvFileStream {
loop_: loop_,
fd: file::FileDescriptor(req.get_result()),
close_on_drop: true} as ~RtioFileStream);
let home = get_handle_to_current_scheduler!();
let fd = file::FileDescriptor(req.get_result());
let fs = ~UvFileStream::new(
loop_, fd, true, home) as ~RtioFileStream;
let res = Ok(fs);
unsafe { (*result_cell_ptr).put_back(res); }
let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
@ -1061,46 +1061,64 @@ impl RtioTimer for UvTimer {
pub struct UvFileStream {
loop_: Loop,
fd: file::FileDescriptor,
close_on_drop: bool
close_on_drop: bool,
home: SchedHandle
}
impl HomingIO for UvFileStream {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
}
impl UvFileStream {
fn new(loop_: Loop, fd: file::FileDescriptor, close_on_drop: bool,
home: SchedHandle) -> UvFileStream {
UvFileStream {
loop_: loop_,
fd: fd,
close_on_drop: close_on_drop,
home: home
}
}
fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result<int, IoError> {
let scheduler = Local::take::<Scheduler>();
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<int, IoError>> = &result_cell;
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
let task_cell = Cell::new(task);
do self.fd.read(self.loop_, buf, offset) |req, uverr| {
let res = match uverr {
None => Ok(req.get_result() as int),
Some(err) => Err(uv_error_to_io_error(err))
do self.home_for_io |self_| {
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
let task_cell = Cell::new(task);
do self_.fd.read(self.loop_, buf, offset) |req, uverr| {
let res = match uverr {
None => Ok(req.get_result() as int),
Some(err) => Err(uv_error_to_io_error(err))
};
unsafe { (*result_cell_ptr).put_back(res); }
let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
};
unsafe { (*result_cell_ptr).put_back(res); }
let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
};
};
result_cell.take()
}
fn base_write(&mut self, buf: &[u8], offset: i64) -> Result<(), IoError> {
let scheduler = Local::take::<Scheduler>();
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
let task_cell = Cell::new(task);
do self.fd.write(self.loop_, buf, offset) |_, uverr| {
let res = match uverr {
None => Ok(()),
Some(err) => Err(uv_error_to_io_error(err))
do self.home_for_io |self_| {
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
let task_cell = Cell::new(task);
do self_.fd.write(self.loop_, buf, offset) |_, uverr| {
let res = match uverr {
None => Ok(()),
Some(err) => Err(uv_error_to_io_error(err))
};
unsafe { (*result_cell_ptr).put_back(res); }
let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
};
unsafe { (*result_cell_ptr).put_back(res); }
let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
};
};
result_cell.take()
@ -1109,15 +1127,18 @@ impl UvFileStream {
impl Drop for UvFileStream {
fn drop(&self) {
let self_ = unsafe { transmute::<&UvFileStream, &mut UvFileStream>(self) };
if self.close_on_drop {
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self.fd.close(self.loop_) |_,_| {
let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
do self_.home_for_io |self_| {
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.fd.close(self.loop_) |_,_| {
let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
};
};
};
}
}
}
}