qemu-e2k/net/af-xdp.c
Ilya Maximets cb039ef3d9 net: add initial support for AF_XDP network backend
AF_XDP is a network socket family that allows communication directly
with the network device driver in the kernel, bypassing most or all
of the kernel networking stack.  In the essence, the technology is
pretty similar to netmap.  But, unlike netmap, AF_XDP is Linux-native
and works with any network interfaces without driver modifications.
Unlike vhost-based backends (kernel, user, vdpa), AF_XDP doesn't
require access to character devices or unix sockets.  Only access to
the network interface itself is necessary.

This patch implements a network backend that communicates with the
kernel by creating an AF_XDP socket.  A chunk of userspace memory
is shared between QEMU and the host kernel.  4 ring buffers (Tx, Rx,
Fill and Completion) are placed in that memory along with a pool of
memory buffers for the packet data.  Data transmission is done by
allocating one of the buffers, copying packet data into it and
placing the pointer into Tx ring.  After transmission, device will
return the buffer via Completion ring.  On Rx, device will take
a buffer form a pre-populated Fill ring, write the packet data into
it and place the buffer into Rx ring.

AF_XDP network backend takes on the communication with the host
kernel and the network interface and forwards packets to/from the
peer device in QEMU.

Usage example:

  -device virtio-net-pci,netdev=guest1,mac=00:16:35:AF:AA:5C
  -netdev af-xdp,ifname=ens6f1np1,id=guest1,mode=native,queues=1

XDP program bridges the socket with a network interface.  It can be
attached to the interface in 2 different modes:

1. skb - this mode should work for any interface and doesn't require
         driver support.  With a caveat of lower performance.

2. native - this does require support from the driver and allows to
            bypass skb allocation in the kernel and potentially use
            zero-copy while getting packets in/out userspace.

By default, QEMU will try to use native mode and fall back to skb.
Mode can be forced via 'mode' option.  To force 'copy' even in native
mode, use 'force-copy=on' option.  This might be useful if there is
some issue with the driver.

Option 'queues=N' allows to specify how many device queues should
be open.  Note that all the queues that are not open are still
functional and can receive traffic, but it will not be delivered to
QEMU.  So, the number of device queues should generally match the
QEMU configuration, unless the device is shared with something
else and the traffic re-direction to appropriate queues is correctly
configured on a device level (e.g. with ethtool -N).
'start-queue=M' option can be used to specify from which queue id
QEMU should start configuring 'N' queues.  It might also be necessary
to use this option with certain NICs, e.g. MLX5 NICs.  See the docs
for examples.

In a general case QEMU will need CAP_NET_ADMIN and CAP_SYS_ADMIN
or CAP_BPF capabilities in order to load default XSK/XDP programs to
the network interface and configure BPF maps.  It is possible, however,
to run with no capabilities.  For that to work, an external process
with enough capabilities will need to pre-load default XSK program,
create AF_XDP sockets and pass their file descriptors to QEMU process
on startup via 'sock-fds' option.  Network backend will need to be
configured with 'inhibit=on' to avoid loading of the program.
QEMU will need 32 MB of locked memory (RLIMIT_MEMLOCK) per queue
or CAP_IPC_LOCK.

There are few performance challenges with the current network backends.

First is that they do not support IO threads.  This means that data
path is handled by the main thread in QEMU and may slow down other
work or may be slowed down by some other work.  This also means that
taking advantage of multi-queue is generally not possible today.

Another thing is that data path is going through the device emulation
code, which is not really optimized for performance.  The fastest
"frontend" device is virtio-net.  But it's not optimized for heavy
traffic either, because it expects such use-cases to be handled via
some implementation of vhost (user, kernel, vdpa).  In practice, we
have virtio notifications and rcu lock/unlock on a per-packet basis
and not very efficient accesses to the guest memory.  Communication
channels between backend and frontend devices do not allow passing
more than one packet at a time as well.

Some of these challenges can be avoided in the future by adding better
batching into device emulation or by implementing vhost-af-xdp variant.

There are also a few kernel limitations.  AF_XDP sockets do not
support any kinds of checksum or segmentation offloading.  Buffers
are limited to a page size (4K), i.e. MTU is limited.  Multi-buffer
support implementation for AF_XDP is in progress, but not ready yet.
Also, transmission in all non-zero-copy modes is synchronous, i.e.
done in a syscall.  That doesn't allow high packet rates on virtual
interfaces.

However, keeping in mind all of these challenges, current implementation
of the AF_XDP backend shows a decent performance while running on top
of a physical NIC with zero-copy support.

Test setup:

2 VMs running on 2 physical hosts connected via ConnectX6-Dx card.
Network backend is configured to open the NIC directly in native mode.
The driver supports zero-copy.  NIC is configured to use 1 queue.

Inside a VM - iperf3 for basic TCP performance testing and dpdk-testpmd
for PPS testing.

iperf3 result:
 TCP stream      : 19.1 Gbps

dpdk-testpmd (single queue, single CPU core, 64 B packets) results:
 Tx only         : 3.4 Mpps
 Rx only         : 2.0 Mpps
 L2 FWD Loopback : 1.5 Mpps

In skb mode the same setup shows much lower performance, similar to
the setup where pair of physical NICs is replaced with veth pair:

iperf3 result:
  TCP stream      : 9 Gbps

dpdk-testpmd (single queue, single CPU core, 64 B packets) results:
  Tx only         : 1.2 Mpps
  Rx only         : 1.0 Mpps
  L2 FWD Loopback : 0.7 Mpps

Results in skb mode or over the veth are close to results of a tap
backend with vhost=on and disabled segmentation offloading bridged
with a NIC.

Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
Reviewed-by: Daniel P. Berrangé <berrange@redhat.com> (docker/lcitool)
Signed-off-by: Jason Wang <jasowang@redhat.com>
2023-09-18 14:36:13 +08:00

527 lines
14 KiB
C

/*
* AF_XDP network backend.
*
* Copyright (c) 2023 Red Hat, Inc.
*
* Authors:
* Ilya Maximets <i.maximets@ovn.org>
*
* This work is licensed under the terms of the GNU GPL, version 2 or later.
* See the COPYING file in the top-level directory.
*/
#include "qemu/osdep.h"
#include <bpf/bpf.h>
#include <inttypes.h>
#include <linux/if_link.h>
#include <linux/if_xdp.h>
#include <net/if.h>
#include <xdp/xsk.h>
#include "clients.h"
#include "monitor/monitor.h"
#include "net/net.h"
#include "qapi/error.h"
#include "qemu/cutils.h"
#include "qemu/error-report.h"
#include "qemu/iov.h"
#include "qemu/main-loop.h"
#include "qemu/memalign.h"
typedef struct AFXDPState {
NetClientState nc;
struct xsk_socket *xsk;
struct xsk_ring_cons rx;
struct xsk_ring_prod tx;
struct xsk_ring_cons cq;
struct xsk_ring_prod fq;
char ifname[IFNAMSIZ];
int ifindex;
bool read_poll;
bool write_poll;
uint32_t outstanding_tx;
uint64_t *pool;
uint32_t n_pool;
char *buffer;
struct xsk_umem *umem;
uint32_t n_queues;
uint32_t xdp_flags;
bool inhibit;
} AFXDPState;
#define AF_XDP_BATCH_SIZE 64
static void af_xdp_send(void *opaque);
static void af_xdp_writable(void *opaque);
/* Set the event-loop handlers for the af-xdp backend. */
static void af_xdp_update_fd_handler(AFXDPState *s)
{
qemu_set_fd_handler(xsk_socket__fd(s->xsk),
s->read_poll ? af_xdp_send : NULL,
s->write_poll ? af_xdp_writable : NULL,
s);
}
/* Update the read handler. */
static void af_xdp_read_poll(AFXDPState *s, bool enable)
{
if (s->read_poll != enable) {
s->read_poll = enable;
af_xdp_update_fd_handler(s);
}
}
/* Update the write handler. */
static void af_xdp_write_poll(AFXDPState *s, bool enable)
{
if (s->write_poll != enable) {
s->write_poll = enable;
af_xdp_update_fd_handler(s);
}
}
static void af_xdp_poll(NetClientState *nc, bool enable)
{
AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
if (s->read_poll != enable || s->write_poll != enable) {
s->write_poll = enable;
s->read_poll = enable;
af_xdp_update_fd_handler(s);
}
}
static void af_xdp_complete_tx(AFXDPState *s)
{
uint32_t idx = 0;
uint32_t done, i;
uint64_t *addr;
done = xsk_ring_cons__peek(&s->cq, XSK_RING_CONS__DEFAULT_NUM_DESCS, &idx);
for (i = 0; i < done; i++) {
addr = (void *) xsk_ring_cons__comp_addr(&s->cq, idx++);
s->pool[s->n_pool++] = *addr;
s->outstanding_tx--;
}
if (done) {
xsk_ring_cons__release(&s->cq, done);
}
}
/*
* The fd_write() callback, invoked if the fd is marked as writable
* after a poll.
*/
static void af_xdp_writable(void *opaque)
{
AFXDPState *s = opaque;
/* Try to recover buffers that are already sent. */
af_xdp_complete_tx(s);
/*
* Unregister the handler, unless we still have packets to transmit
* and kernel needs a wake up.
*/
if (!s->outstanding_tx || !xsk_ring_prod__needs_wakeup(&s->tx)) {
af_xdp_write_poll(s, false);
}
/* Flush any buffered packets. */
qemu_flush_queued_packets(&s->nc);
}
static ssize_t af_xdp_receive(NetClientState *nc,
const uint8_t *buf, size_t size)
{
AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
struct xdp_desc *desc;
uint32_t idx;
void *data;
/* Try to recover buffers that are already sent. */
af_xdp_complete_tx(s);
if (size > XSK_UMEM__DEFAULT_FRAME_SIZE) {
/* We can't transmit packet this size... */
return size;
}
if (!s->n_pool || !xsk_ring_prod__reserve(&s->tx, 1, &idx)) {
/*
* Out of buffers or space in tx ring. Poll until we can write.
* This will also kick the Tx, if it was waiting on CQ.
*/
af_xdp_write_poll(s, true);
return 0;
}
desc = xsk_ring_prod__tx_desc(&s->tx, idx);
desc->addr = s->pool[--s->n_pool];
desc->len = size;
data = xsk_umem__get_data(s->buffer, desc->addr);
memcpy(data, buf, size);
xsk_ring_prod__submit(&s->tx, 1);
s->outstanding_tx++;
if (xsk_ring_prod__needs_wakeup(&s->tx)) {
af_xdp_write_poll(s, true);
}
return size;
}
/*
* Complete a previous send (backend --> guest) and enable the
* fd_read callback.
*/
static void af_xdp_send_completed(NetClientState *nc, ssize_t len)
{
AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
af_xdp_read_poll(s, true);
}
static void af_xdp_fq_refill(AFXDPState *s, uint32_t n)
{
uint32_t i, idx = 0;
/* Leave one packet for Tx, just in case. */
if (s->n_pool < n + 1) {
n = s->n_pool;
}
if (!n || !xsk_ring_prod__reserve(&s->fq, n, &idx)) {
return;
}
for (i = 0; i < n; i++) {
*xsk_ring_prod__fill_addr(&s->fq, idx++) = s->pool[--s->n_pool];
}
xsk_ring_prod__submit(&s->fq, n);
if (xsk_ring_prod__needs_wakeup(&s->fq)) {
/* Receive was blocked by not having enough buffers. Wake it up. */
af_xdp_read_poll(s, true);
}
}
static void af_xdp_send(void *opaque)
{
uint32_t i, n_rx, idx = 0;
AFXDPState *s = opaque;
n_rx = xsk_ring_cons__peek(&s->rx, AF_XDP_BATCH_SIZE, &idx);
if (!n_rx) {
return;
}
for (i = 0; i < n_rx; i++) {
const struct xdp_desc *desc;
struct iovec iov;
desc = xsk_ring_cons__rx_desc(&s->rx, idx++);
iov.iov_base = xsk_umem__get_data(s->buffer, desc->addr);
iov.iov_len = desc->len;
s->pool[s->n_pool++] = desc->addr;
if (!qemu_sendv_packet_async(&s->nc, &iov, 1,
af_xdp_send_completed)) {
/*
* The peer does not receive anymore. Packet is queued, stop
* reading from the backend until af_xdp_send_completed().
*/
af_xdp_read_poll(s, false);
/* Return unused descriptors to not break the ring cache. */
xsk_ring_cons__cancel(&s->rx, n_rx - i - 1);
n_rx = i + 1;
break;
}
}
/* Release actually sent descriptors and try to re-fill. */
xsk_ring_cons__release(&s->rx, n_rx);
af_xdp_fq_refill(s, AF_XDP_BATCH_SIZE);
}
/* Flush and close. */
static void af_xdp_cleanup(NetClientState *nc)
{
AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
qemu_purge_queued_packets(nc);
af_xdp_poll(nc, false);
xsk_socket__delete(s->xsk);
s->xsk = NULL;
g_free(s->pool);
s->pool = NULL;
xsk_umem__delete(s->umem);
s->umem = NULL;
qemu_vfree(s->buffer);
s->buffer = NULL;
/* Remove the program if it's the last open queue. */
if (!s->inhibit && nc->queue_index == s->n_queues - 1 && s->xdp_flags
&& bpf_xdp_detach(s->ifindex, s->xdp_flags, NULL) != 0) {
fprintf(stderr,
"af-xdp: unable to remove XDP program from '%s', ifindex: %d\n",
s->ifname, s->ifindex);
}
}
static int af_xdp_umem_create(AFXDPState *s, int sock_fd, Error **errp)
{
struct xsk_umem_config config = {
.fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
.comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
.frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
.frame_headroom = 0,
};
uint64_t n_descs;
uint64_t size;
int64_t i;
int ret;
/* Number of descriptors if all 4 queues (rx, tx, cq, fq) are full. */
n_descs = (XSK_RING_PROD__DEFAULT_NUM_DESCS
+ XSK_RING_CONS__DEFAULT_NUM_DESCS) * 2;
size = n_descs * XSK_UMEM__DEFAULT_FRAME_SIZE;
s->buffer = qemu_memalign(qemu_real_host_page_size(), size);
memset(s->buffer, 0, size);
if (sock_fd < 0) {
ret = xsk_umem__create(&s->umem, s->buffer, size,
&s->fq, &s->cq, &config);
} else {
ret = xsk_umem__create_with_fd(&s->umem, sock_fd, s->buffer, size,
&s->fq, &s->cq, &config);
}
if (ret) {
qemu_vfree(s->buffer);
error_setg_errno(errp, errno,
"failed to create umem for %s queue_index: %d",
s->ifname, s->nc.queue_index);
return -1;
}
s->pool = g_new(uint64_t, n_descs);
/* Fill the pool in the opposite order, because it's a LIFO queue. */
for (i = n_descs; i >= 0; i--) {
s->pool[i] = i * XSK_UMEM__DEFAULT_FRAME_SIZE;
}
s->n_pool = n_descs;
af_xdp_fq_refill(s, XSK_RING_PROD__DEFAULT_NUM_DESCS);
return 0;
}
static int af_xdp_socket_create(AFXDPState *s,
const NetdevAFXDPOptions *opts, Error **errp)
{
struct xsk_socket_config cfg = {
.rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
.tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
.libxdp_flags = 0,
.bind_flags = XDP_USE_NEED_WAKEUP,
.xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST,
};
int queue_id, error = 0;
s->inhibit = opts->has_inhibit && opts->inhibit;
if (s->inhibit) {
cfg.libxdp_flags |= XSK_LIBXDP_FLAGS__INHIBIT_PROG_LOAD;
}
if (opts->has_force_copy && opts->force_copy) {
cfg.bind_flags |= XDP_COPY;
}
queue_id = s->nc.queue_index;
if (opts->has_start_queue && opts->start_queue > 0) {
queue_id += opts->start_queue;
}
if (opts->has_mode) {
/* Specific mode requested. */
cfg.xdp_flags |= (opts->mode == AFXDP_MODE_NATIVE)
? XDP_FLAGS_DRV_MODE : XDP_FLAGS_SKB_MODE;
if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
s->umem, &s->rx, &s->tx, &cfg)) {
error = errno;
}
} else {
/* No mode requested, try native first. */
cfg.xdp_flags |= XDP_FLAGS_DRV_MODE;
if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
s->umem, &s->rx, &s->tx, &cfg)) {
/* Can't use native mode, try skb. */
cfg.xdp_flags &= ~XDP_FLAGS_DRV_MODE;
cfg.xdp_flags |= XDP_FLAGS_SKB_MODE;
if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
s->umem, &s->rx, &s->tx, &cfg)) {
error = errno;
}
}
}
if (error) {
error_setg_errno(errp, error,
"failed to create AF_XDP socket for %s queue_id: %d",
s->ifname, queue_id);
return -1;
}
s->xdp_flags = cfg.xdp_flags;
return 0;
}
/* NetClientInfo methods. */
static NetClientInfo net_af_xdp_info = {
.type = NET_CLIENT_DRIVER_AF_XDP,
.size = sizeof(AFXDPState),
.receive = af_xdp_receive,
.poll = af_xdp_poll,
.cleanup = af_xdp_cleanup,
};
static int *parse_socket_fds(const char *sock_fds_str,
int64_t n_expected, Error **errp)
{
gchar **substrings = g_strsplit(sock_fds_str, ":", -1);
int64_t i, n_sock_fds = g_strv_length(substrings);
int *sock_fds = NULL;
if (n_sock_fds != n_expected) {
error_setg(errp, "expected %"PRIi64" socket fds, got %"PRIi64,
n_expected, n_sock_fds);
goto exit;
}
sock_fds = g_new(int, n_sock_fds);
for (i = 0; i < n_sock_fds; i++) {
sock_fds[i] = monitor_fd_param(monitor_cur(), substrings[i], errp);
if (sock_fds[i] < 0) {
g_free(sock_fds);
sock_fds = NULL;
goto exit;
}
}
exit:
g_strfreev(substrings);
return sock_fds;
}
/*
* The exported init function.
*
* ... -netdev af-xdp,ifname="..."
*/
int net_init_af_xdp(const Netdev *netdev,
const char *name, NetClientState *peer, Error **errp)
{
const NetdevAFXDPOptions *opts = &netdev->u.af_xdp;
NetClientState *nc, *nc0 = NULL;
unsigned int ifindex;
uint32_t prog_id = 0;
int *sock_fds = NULL;
int64_t i, queues;
Error *err = NULL;
AFXDPState *s;
ifindex = if_nametoindex(opts->ifname);
if (!ifindex) {
error_setg_errno(errp, errno, "failed to get ifindex for '%s'",
opts->ifname);
return -1;
}
queues = opts->has_queues ? opts->queues : 1;
if (queues < 1) {
error_setg(errp, "invalid number of queues (%" PRIi64 ") for '%s'",
queues, opts->ifname);
return -1;
}
if ((opts->has_inhibit && opts->inhibit) != !!opts->sock_fds) {
error_setg(errp, "'inhibit=on' requires 'sock-fds' and vice versa");
return -1;
}
if (opts->sock_fds) {
sock_fds = parse_socket_fds(opts->sock_fds, queues, errp);
if (!sock_fds) {
return -1;
}
}
for (i = 0; i < queues; i++) {
nc = qemu_new_net_client(&net_af_xdp_info, peer, "af-xdp", name);
qemu_set_info_str(nc, "af-xdp%"PRIi64" to %s", i, opts->ifname);
nc->queue_index = i;
if (!nc0) {
nc0 = nc;
}
s = DO_UPCAST(AFXDPState, nc, nc);
pstrcpy(s->ifname, sizeof(s->ifname), opts->ifname);
s->ifindex = ifindex;
s->n_queues = queues;
if (af_xdp_umem_create(s, sock_fds ? sock_fds[i] : -1, errp)
|| af_xdp_socket_create(s, opts, errp)) {
/* Make sure the XDP program will be removed. */
s->n_queues = i;
error_propagate(errp, err);
goto err;
}
}
if (nc0) {
s = DO_UPCAST(AFXDPState, nc, nc0);
if (bpf_xdp_query_id(s->ifindex, s->xdp_flags, &prog_id) || !prog_id) {
error_setg_errno(errp, errno,
"no XDP program loaded on '%s', ifindex: %d",
s->ifname, s->ifindex);
goto err;
}
}
af_xdp_read_poll(s, true); /* Initially only poll for reads. */
return 0;
err:
g_free(sock_fds);
if (nc0) {
qemu_del_net_client(nc0);
}
return -1;
}