impl Buffer for ChanReader
This commit is contained in:
parent
e4761c85b5
commit
7081007678
@ -13,10 +13,10 @@ use cmp;
|
||||
use collections::Collection;
|
||||
use comm::{Sender, Receiver};
|
||||
use io;
|
||||
use option::{None, Option, Some};
|
||||
use option::{None, Some};
|
||||
use result::{Ok, Err};
|
||||
use slice::{bytes, CloneableVector};
|
||||
use super::{Reader, Writer, IoResult};
|
||||
use super::{Buffer, Reader, Writer, IoResult};
|
||||
use vec::Vec;
|
||||
|
||||
/// Allows reading from a rx.
|
||||
@ -37,7 +37,7 @@ use vec::Vec;
|
||||
/// }
|
||||
/// ```
|
||||
pub struct ChanReader {
|
||||
buf: Option<Vec<u8>>, // A buffer of bytes received but not consumed.
|
||||
buf: Vec<u8>, // A buffer of bytes received but not consumed.
|
||||
pos: uint, // How many of the buffered bytes have already be consumed.
|
||||
rx: Receiver<Vec<u8>>, // The Receiver to pull data from.
|
||||
closed: bool, // Whether the channel this Receiver connects to has been closed.
|
||||
@ -47,7 +47,7 @@ impl ChanReader {
|
||||
/// Wraps a `Port` in a `ChanReader` structure
|
||||
pub fn new(rx: Receiver<Vec<u8>>) -> ChanReader {
|
||||
ChanReader {
|
||||
buf: None,
|
||||
buf: Vec::new(),
|
||||
pos: 0,
|
||||
rx: rx,
|
||||
closed: false,
|
||||
@ -55,27 +55,51 @@ impl ChanReader {
|
||||
}
|
||||
}
|
||||
|
||||
impl Buffer for ChanReader {
|
||||
fn fill_buf<'a>(&'a mut self) -> IoResult<&'a [u8]> {
|
||||
if self.pos >= self.buf.len() {
|
||||
self.pos = 0;
|
||||
match self.rx.recv_opt() {
|
||||
Ok(bytes) => {
|
||||
self.buf = bytes;
|
||||
},
|
||||
Err(()) => {
|
||||
self.closed = true;
|
||||
self.buf = Vec::new();
|
||||
}
|
||||
}
|
||||
}
|
||||
if self.closed {
|
||||
Err(io::standard_error(io::EndOfFile))
|
||||
} else {
|
||||
Ok(self.buf.slice_from(self.pos))
|
||||
}
|
||||
}
|
||||
|
||||
fn consume(&mut self, amt: uint) {
|
||||
self.pos += amt;
|
||||
assert!(self.pos <= self.buf.len());
|
||||
}
|
||||
}
|
||||
|
||||
impl Reader for ChanReader {
|
||||
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
|
||||
let mut num_read = 0;
|
||||
loop {
|
||||
match self.buf {
|
||||
Some(ref prev) => {
|
||||
let count = match self.fill_buf().ok() {
|
||||
Some(src) => {
|
||||
let dst = buf[mut num_read..];
|
||||
let src = prev[self.pos..];
|
||||
let count = cmp::min(dst.len(), src.len());
|
||||
let count = cmp::min(src.len(), dst.len());
|
||||
bytes::copy_memory(dst, src[..count]);
|
||||
num_read += count;
|
||||
self.pos += count;
|
||||
count
|
||||
},
|
||||
None => (),
|
||||
None => 0,
|
||||
};
|
||||
self.consume(count);
|
||||
num_read += count;
|
||||
if num_read == buf.len() || self.closed {
|
||||
break;
|
||||
}
|
||||
self.pos = 0;
|
||||
self.buf = self.rx.recv_opt().ok();
|
||||
self.closed = self.buf.is_none();
|
||||
}
|
||||
if self.closed && num_read == 0 {
|
||||
Err(io::standard_error(io::EndOfFile))
|
||||
@ -149,7 +173,6 @@ mod test {
|
||||
let mut reader = ChanReader::new(rx);
|
||||
let mut buf = [0u8, ..3];
|
||||
|
||||
|
||||
assert_eq!(Ok(0), reader.read([]));
|
||||
|
||||
assert_eq!(Ok(3), reader.read(buf));
|
||||
@ -178,6 +201,28 @@ mod test {
|
||||
assert_eq!(a, buf.as_slice());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rx_buffer() {
|
||||
let (tx, rx) = channel();
|
||||
task::spawn(proc() {
|
||||
tx.send(b"he".to_vec());
|
||||
tx.send(b"llo wo".to_vec());
|
||||
tx.send(b"".to_vec());
|
||||
tx.send(b"rld\nhow ".to_vec());
|
||||
tx.send(b"are you?".to_vec());
|
||||
tx.send(b"".to_vec());
|
||||
});
|
||||
|
||||
let mut reader = ChanReader::new(rx);
|
||||
|
||||
assert_eq!(Ok("hello world\n".to_string()), reader.read_line());
|
||||
assert_eq!(Ok("how are you?".to_string()), reader.read_line());
|
||||
match reader.read_line() {
|
||||
Ok(..) => fail!(),
|
||||
Err(e) => assert_eq!(e.kind, io::EndOfFile),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_chan_writer() {
|
||||
let (tx, rx) = channel();
|
||||
|
Loading…
Reference in New Issue
Block a user