Add peek and peek_from to UnixStream and UnixDatagram

This commit is contained in:
rijenkii 2020-08-21 19:50:53 +07:00
parent a7425476e8
commit 64b8fd7920
2 changed files with 192 additions and 20 deletions

View File

@ -594,6 +594,32 @@ impl UnixStream {
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.0.shutdown(how)
}
/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.
///
/// Successive calls return the same data. This is accomplished by passing
/// `MSG_PEEK` as a flag to the underlying `recv` system call.
///
/// # Examples
///
/// ```no_run
/// #![feature(unix_socket_peek)]
///
/// use std::os::unix::net::UnixStream;
///
/// fn main() -> std::io::Result<()> {
/// let socket = UnixStream::connect("/tmp/sock")?;
/// let mut buf = [0; 10];
/// let len = socket.peek(&mut buf).expect("peek failed");
/// Ok(())
/// }
/// ```
#[unstable(feature = "unix_socket_peek", issue = "none")]
pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.0.peek(buf)
}
}
#[stable(feature = "unix_socket", since = "1.10.0")]
@ -1291,6 +1317,33 @@ impl UnixDatagram {
SocketAddr::new(|addr, len| unsafe { libc::getpeername(*self.0.as_inner(), addr, len) })
}
fn recv_from_flags(
&self,
buf: &mut [u8],
flags: libc::c_int,
) -> io::Result<(usize, SocketAddr)> {
let mut count = 0;
let addr = SocketAddr::new(|addr, len| unsafe {
count = libc::recvfrom(
*self.0.as_inner(),
buf.as_mut_ptr() as *mut _,
buf.len(),
flags,
addr,
len,
);
if count > 0 {
1
} else if count == 0 {
0
} else {
-1
}
})?;
Ok((count as usize, addr))
}
/// Receives data from the socket.
///
/// On success, returns the number of bytes read and the address from
@ -1311,26 +1364,7 @@ impl UnixDatagram {
/// ```
#[stable(feature = "unix_socket", since = "1.10.0")]
pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
let mut count = 0;
let addr = SocketAddr::new(|addr, len| unsafe {
count = libc::recvfrom(
*self.0.as_inner(),
buf.as_mut_ptr() as *mut _,
buf.len(),
0,
addr,
len,
);
if count > 0 {
1
} else if count == 0 {
0
} else {
-1
}
})?;
Ok((count as usize, addr))
self.recv_from_flags(buf, 0)
}
/// Receives data from the socket.
@ -1601,6 +1635,64 @@ impl UnixDatagram {
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.0.shutdown(how)
}
/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.
///
/// Successive calls return the same data. This is accomplished by passing
/// `MSG_PEEK` as a flag to the underlying `recv` system call.
///
/// # Examples
///
/// ```no_run
/// #![feature(unix_socket_peek)]
///
/// use std::os::unix::net::UnixDatagram;
///
/// fn main() -> std::io::Result<()> {
/// let socket = UnixDatagram::bind("/tmp/sock")?;
/// let mut buf = [0; 10];
/// let len = socket.peek(&mut buf).expect("peek failed");
/// Ok(())
/// }
/// ```
#[unstable(feature = "unix_socket_peek", issue = "none")]
pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.0.peek(buf)
}
/// Receives a single datagram message on the socket, without removing it from the
/// queue. On success, returns the number of bytes read and the origin.
///
/// The function must be called with valid byte array `buf` of sufficient size to
/// hold the message bytes. If a message is too long to fit in the supplied buffer,
/// excess bytes may be discarded.
///
/// Successive calls return the same data. This is accomplished by passing
/// `MSG_PEEK` as a flag to the underlying `recvfrom` system call.
///
/// Do not use this function to implement busy waiting, instead use `libc::poll` to
/// synchronize IO events on one or more sockets.
///
/// # Examples
///
/// ```no_run
/// #![feature(unix_socket_peek)]
///
/// use std::os::unix::net::UnixDatagram;
///
/// fn main() -> std::io::Result<()> {
/// let socket = UnixDatagram::bind("/tmp/sock")?;
/// let mut buf = [0; 10];
/// let (len, addr) = socket.peek_from(&mut buf).expect("peek failed");
/// Ok(())
/// }
/// ```
#[unstable(feature = "unix_socket_peek", issue = "none")]
pub fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.recv_from_flags(buf, libc::MSG_PEEK)
}
}
#[stable(feature = "unix_socket", since = "1.10.0")]

View File

@ -372,3 +372,83 @@ fn test_unix_datagram_timeout_zero_duration() {
fn abstract_namespace_not_allowed() {
assert!(UnixStream::connect("\0asdf").is_err());
}
#[test]
fn test_unix_stream_peek() {
let (txdone, rxdone) = crate::sync::mpsc::channel();
let dir = tmpdir();
let path = dir.path().join("sock");
let listener = or_panic!(UnixListener::bind(&path));
let thread = thread::spawn(move || {
let mut stream = or_panic!(listener.accept()).0;
or_panic!(stream.write_all(&[1, 3, 3, 7]));
or_panic!(rxdone.recv());
});
let mut stream = or_panic!(UnixStream::connect(&path));
let mut buf = [0; 10];
for _ in 0..2 {
assert_eq!(or_panic!(stream.peek(&mut buf)), 4);
}
assert_eq!(or_panic!(stream.read(&mut buf)), 4);
or_panic!(stream.set_nonblocking(true));
match stream.peek(&mut buf) {
Ok(_) => panic!("expected error"),
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
Err(e) => panic!("unexpected error: {}", e),
}
or_panic!(txdone.send(()));
thread.join().unwrap();
}
#[test]
fn test_unix_datagram_peek() {
let dir = tmpdir();
let path1 = dir.path().join("sock");
let sock1 = or_panic!(UnixDatagram::bind(&path1));
let sock2 = or_panic!(UnixDatagram::unbound());
or_panic!(sock2.connect(&path1));
let msg = b"hello world";
or_panic!(sock2.send(msg));
for _ in 0..2 {
let mut buf = [0; 11];
let size = or_panic!(sock1.peek(&mut buf));
assert_eq!(size, 11);
assert_eq!(msg, &buf[..]);
}
let mut buf = [0; 11];
let size = or_panic!(sock1.recv(&mut buf));
assert_eq!(size, 11);
assert_eq!(msg, &buf[..]);
}
#[test]
fn test_unix_datagram_peek_from() {
let dir = tmpdir();
let path1 = dir.path().join("sock");
let sock1 = or_panic!(UnixDatagram::bind(&path1));
let sock2 = or_panic!(UnixDatagram::unbound());
or_panic!(sock2.connect(&path1));
let msg = b"hello world";
or_panic!(sock2.send(msg));
for _ in 0..2 {
let mut buf = [0; 11];
let (size, _) = or_panic!(sock1.peek_from(&mut buf));
assert_eq!(size, 11);
assert_eq!(msg, &buf[..]);
}
let mut buf = [0; 11];
let size = or_panic!(sock1.recv(&mut buf));
assert_eq!(size, 11);
assert_eq!(msg, &buf[..]);
}