Minor cleanups to pipes and serialization

This commit is contained in:
Brian Anderson 2012-11-25 14:09:53 -08:00
parent 86f7eb3446
commit 1b481017ac
4 changed files with 43 additions and 27 deletions

View File

@ -527,7 +527,7 @@ pub pure fn peek<T: Send, Tb: Send>(p: &RecvPacketBuffered<T, Tb>) -> bool {
}
}
impl<T: Send, Tb: Send> RecvPacketBuffered<T, Tb> {
impl<T: Send, Tb: Send> RecvPacketBuffered<T, Tb>: Peekable<T> {
pure fn peek() -> bool {
peek(&self)
}
@ -928,32 +928,32 @@ proto! streamp (
)
/// A trait for things that can send multiple messages.
pub trait Channel<T: Send> {
// It'd be nice to call this send, but it'd conflict with the
// built in send kind.
pub trait GenericChan<T> {
/// Sends a message.
fn send(x: T);
}
/// Things that can send multiple messages and can detect when the receiver
/// is closed
pub trait GenericSmartChan<T> {
/// Sends a message, or report if the receiver has closed the connection.
fn try_send(x: T) -> bool;
}
/// A trait for things that can receive multiple messages.
pub trait Recv<T: Send> {
pub trait GenericPort<T> {
/// Receives a message, or fails if the connection closes.
fn recv() -> T;
/** Receives a message if one is available, or returns `none` if
the connection is closed.
/** Receives a message, or returns `none` if
the connection is closed or closes.
*/
fn try_recv() -> Option<T>;
}
/** Returns true if a message is available or the connection is
closed.
*/
/// Ports that can `peek`
pub trait Peekable<T> {
/// Returns true if a message is available
pure fn peek() -> bool;
}
@ -984,13 +984,16 @@ pub fn stream<T:Send>() -> (Chan<T>, Port<T>) {
(Chan_({ mut endp: Some(move c) }), Port_({ mut endp: Some(move s) }))
}
impl<T: Send> Chan<T>: Channel<T> {
impl<T: Send> Chan<T>: GenericChan<T> {
fn send(x: T) {
let mut endp = None;
endp <-> self.endp;
self.endp = Some(
streamp::client::data(unwrap(move endp), move x))
}
}
impl<T: Send> Chan<T>: GenericSmartChan<T> {
fn try_send(x: T) -> bool {
let mut endp = None;
@ -1005,7 +1008,7 @@ impl<T: Send> Chan<T>: Channel<T> {
}
}
impl<T: Send> Port<T>: Recv<T> {
impl<T: Send> Port<T>: GenericPort<T> {
fn recv() -> T {
let mut endp = None;
endp <-> self.endp;
@ -1025,7 +1028,9 @@ impl<T: Send> Port<T>: Recv<T> {
None => None
}
}
}
impl<T: Send> Port<T>: Peekable<T> {
pure fn peek() -> bool unsafe {
let mut endp = None;
endp <-> self.endp;
@ -1071,7 +1076,7 @@ impl<T: Send> PortSet<T> {
}
}
impl<T: Send> PortSet<T> : Recv<T> {
impl<T: Send> PortSet<T> : GenericPort<T> {
fn try_recv() -> Option<T> {
let mut result = None;
@ -1099,6 +1104,9 @@ impl<T: Send> PortSet<T> : Recv<T> {
option::unwrap_expect(self.try_recv(), "port_set: endpoints closed")
}
}
impl<T: Send> PortSet<T> : Peekable<T> {
pure fn peek() -> bool {
// It'd be nice to use self.port.each, but that version isn't
// pure.
@ -1112,7 +1120,7 @@ impl<T: Send> PortSet<T> : Recv<T> {
/// A channel that can be shared between many senders.
pub type SharedChan<T: Send> = private::Exclusive<Chan<T>>;
impl<T: Send> SharedChan<T>: Channel<T> {
impl<T: Send> SharedChan<T>: GenericChan<T> {
fn send(x: T) {
let mut xx = Some(move x);
do self.with_imm |chan| {
@ -1121,7 +1129,9 @@ impl<T: Send> SharedChan<T>: Channel<T> {
chan.send(option::unwrap(move x))
}
}
}
impl<T: Send> SharedChan<T>: GenericSmartChan<T> {
fn try_send(x: T) -> bool {
let mut xx = Some(move x);
do self.with_imm |chan| {
@ -1145,7 +1155,9 @@ pub trait Select2<T: Send, U: Send> {
fn select() -> Either<T, U>;
}
impl<T: Send, U: Send, Left: Selectable Recv<T>, Right: Selectable Recv<U>>
impl<T: Send, U: Send,
Left: Selectable GenericPort<T>,
Right: Selectable GenericPort<U>>
(Left, Right): Select2<T, U> {
fn select() -> Either<T, U> {

View File

@ -17,25 +17,28 @@ Higher level communication abstractions.
// NB: transitionary, de-mode-ing.
#[forbid(deprecated_mode)];
use pipes::{Channel, Recv, Chan, Port, Selectable};
use pipes::{GenericChan, GenericSmartChan, GenericPort,
Chan, Port, Selectable, Peekable};
/// An extension of `pipes::stream` that allows both sending and receiving.
pub struct DuplexStream<T: Send, U: Send> {
priv chan: Chan<T>,
priv port: Port <U>,
priv port: Port<U>,
}
impl<T: Send, U: Send> DuplexStream<T, U> : Channel<T> {
impl<T: Send, U: Send> DuplexStream<T, U> : GenericChan<T> {
fn send(x: T) {
self.chan.send(move x)
}
}
impl<T: Send, U: Send> DuplexStream<T, U> : GenericSmartChan<T> {
fn try_send(x: T) -> bool {
self.chan.try_send(move x)
}
}
impl<T: Send, U: Send> DuplexStream<T, U> : Recv<U> {
impl<T: Send, U: Send> DuplexStream<T, U> : GenericPort<U> {
fn recv() -> U {
self.port.recv()
}
@ -43,7 +46,9 @@ impl<T: Send, U: Send> DuplexStream<T, U> : Recv<U> {
fn try_recv() -> Option<U> {
self.port.try_recv()
}
}
impl<T: Send, U: Send> DuplexStream<T, U> : Peekable<U> {
pure fn peek() -> bool {
self.port.peek()
}

View File

@ -157,7 +157,6 @@ pub mod reader {
pub fn doc_data(d: Doc) -> ~[u8] { vec::slice::<u8>(*d.data, d.start,
d.end) }
pub fn with_doc_data<T>(d: Doc, f: fn(x: &[u8]) -> T) -> T {
f(vec::view(*d.data, d.start, d.end))
}
@ -190,7 +189,7 @@ pub mod reader {
pub fn doc_as_i64(d: Doc) -> i64 { doc_as_u64(d) as i64 }
struct Deserializer {
pub struct Deserializer {
priv mut parent: Doc,
priv mut pos: uint,
}
@ -388,7 +387,7 @@ pub mod reader {
pub mod writer {
// ebml writing
struct Serializer {
pub struct Serializer {
writer: io::Writer,
priv mut size_positions: ~[uint],
}

View File

@ -35,7 +35,7 @@ extern mod rustrt {
* underlying libuv data structures when it goes out of scope. This is the
* data structure that is used for read/write operations over a TCP stream.
*/
struct TcpSocket {
pub struct TcpSocket {
socket_data: @TcpSocketData,
}
@ -59,7 +59,7 @@ pub fn TcpSocket(socket_data: @TcpSocketData) -> TcpSocket {
* It is created with a call to `net::tcp::socket_buf()` and has impls that
* satisfy both the `io::reader` and `io::writer` traits.
*/
struct TcpSocketBuf {
pub struct TcpSocketBuf {
data: @TcpBufferedSocketData,
mut end_of_stream: bool,
}