block: add blk_io_plug_call() API

Introduce a new API for thread-local blk_io_plug() that does not
traverse the block graph. The goal is to make blk_io_plug() multi-queue
friendly.

Instead of having block drivers track whether or not we're in a plugged
section, provide an API that allows them to defer a function call until
we're unplugged: blk_io_plug_call(fn, opaque). If blk_io_plug_call() is
called multiple times with the same fn/opaque pair, then fn() is only
called once at the end of the function - resulting in batching.

This patch introduces the API and changes blk_io_plug()/blk_io_unplug().
blk_io_plug()/blk_io_unplug() no longer require a BlockBackend argument
because the plug state is now thread-local.

Later patches convert block drivers to blk_io_plug_call() and then we
can finally remove .bdrv_co_io_plug() once all block drivers have been
converted.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Reviewed-by: Eric Blake <eblake@redhat.com>
Reviewed-by: Stefano Garzarella <sgarzare@redhat.com>
Acked-by: Kevin Wolf <kwolf@redhat.com>
Message-id: 20230530180959.1108766-2-stefanha@redhat.com
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
This commit is contained in:
Stefan Hajnoczi 2023-05-30 14:09:54 -04:00
parent c6a5fc2ac7
commit 41abca8c39
8 changed files with 173 additions and 41 deletions

View File

@ -2650,6 +2650,7 @@ F: util/aio-*.c
F: util/aio-*.h F: util/aio-*.h
F: util/fdmon-*.c F: util/fdmon-*.c
F: block/io.c F: block/io.c
F: block/plug.c
F: migration/block* F: migration/block*
F: include/block/aio.h F: include/block/aio.h
F: include/block/aio-wait.h F: include/block/aio-wait.h

View File

@ -2582,28 +2582,6 @@ void blk_add_insert_bs_notifier(BlockBackend *blk, Notifier *notify)
notifier_list_add(&blk->insert_bs_notifiers, notify); notifier_list_add(&blk->insert_bs_notifiers, notify);
} }
void coroutine_fn blk_co_io_plug(BlockBackend *blk)
{
BlockDriverState *bs = blk_bs(blk);
IO_CODE();
GRAPH_RDLOCK_GUARD();
if (bs) {
bdrv_co_io_plug(bs);
}
}
void coroutine_fn blk_co_io_unplug(BlockBackend *blk)
{
BlockDriverState *bs = blk_bs(blk);
IO_CODE();
GRAPH_RDLOCK_GUARD();
if (bs) {
bdrv_co_io_unplug(bs);
}
}
BlockAcctStats *blk_get_stats(BlockBackend *blk) BlockAcctStats *blk_get_stats(BlockBackend *blk)
{ {
IO_CODE(); IO_CODE();

View File

@ -23,6 +23,7 @@ block_ss.add(files(
'mirror.c', 'mirror.c',
'nbd.c', 'nbd.c',
'null.c', 'null.c',
'plug.c',
'qapi.c', 'qapi.c',
'qcow2-bitmap.c', 'qcow2-bitmap.c',
'qcow2-cache.c', 'qcow2-cache.c',

159
block/plug.c Normal file
View File

@ -0,0 +1,159 @@
/* SPDX-License-Identifier: GPL-2.0-or-later */
/*
* Block I/O plugging
*
* Copyright Red Hat.
*
* This API defers a function call within a blk_io_plug()/blk_io_unplug()
* section, allowing multiple calls to batch up. This is a performance
* optimization that is used in the block layer to submit several I/O requests
* at once instead of individually:
*
* blk_io_plug(); <-- start of plugged region
* ...
* blk_io_plug_call(my_func, my_obj); <-- deferred my_func(my_obj) call
* blk_io_plug_call(my_func, my_obj); <-- another
* blk_io_plug_call(my_func, my_obj); <-- another
* ...
* blk_io_unplug(); <-- end of plugged region, my_func(my_obj) is called once
*
* This code is actually generic and not tied to the block layer. If another
* subsystem needs this functionality, it could be renamed.
*/
#include "qemu/osdep.h"
#include "qemu/coroutine-tls.h"
#include "qemu/notify.h"
#include "qemu/thread.h"
#include "sysemu/block-backend.h"
/* A function call that has been deferred until unplug() */
typedef struct {
void (*fn)(void *);
void *opaque;
} UnplugFn;
/* Per-thread state */
typedef struct {
unsigned count; /* how many times has plug() been called? */
GArray *unplug_fns; /* functions to call at unplug time */
} Plug;
/* Use get_ptr_plug() to fetch this thread-local value */
QEMU_DEFINE_STATIC_CO_TLS(Plug, plug);
/* Called at thread cleanup time */
static void blk_io_plug_atexit(Notifier *n, void *value)
{
Plug *plug = get_ptr_plug();
g_array_free(plug->unplug_fns, TRUE);
}
/* This won't involve coroutines, so use __thread */
static __thread Notifier blk_io_plug_atexit_notifier;
/**
* blk_io_plug_call:
* @fn: a function pointer to be invoked
* @opaque: a user-defined argument to @fn()
*
* Call @fn(@opaque) immediately if not within a blk_io_plug()/blk_io_unplug()
* section.
*
* Otherwise defer the call until the end of the outermost
* blk_io_plug()/blk_io_unplug() section in this thread. If the same
* @fn/@opaque pair has already been deferred, it will only be called once upon
* blk_io_unplug() so that accumulated calls are batched into a single call.
*
* The caller must ensure that @opaque is not freed before @fn() is invoked.
*/
void blk_io_plug_call(void (*fn)(void *), void *opaque)
{
Plug *plug = get_ptr_plug();
/* Call immediately if we're not plugged */
if (plug->count == 0) {
fn(opaque);
return;
}
GArray *array = plug->unplug_fns;
if (!array) {
array = g_array_new(FALSE, FALSE, sizeof(UnplugFn));
plug->unplug_fns = array;
blk_io_plug_atexit_notifier.notify = blk_io_plug_atexit;
qemu_thread_atexit_add(&blk_io_plug_atexit_notifier);
}
UnplugFn *fns = (UnplugFn *)array->data;
UnplugFn new_fn = {
.fn = fn,
.opaque = opaque,
};
/*
* There won't be many, so do a linear search. If this becomes a bottleneck
* then a binary search (glib 2.62+) or different data structure could be
* used.
*/
for (guint i = 0; i < array->len; i++) {
if (memcmp(&fns[i], &new_fn, sizeof(new_fn)) == 0) {
return; /* already exists */
}
}
g_array_append_val(array, new_fn);
}
/**
* blk_io_plug: Defer blk_io_plug_call() functions until blk_io_unplug()
*
* blk_io_plug/unplug are thread-local operations. This means that multiple
* threads can simultaneously call plug/unplug, but the caller must ensure that
* each unplug() is called in the same thread of the matching plug().
*
* Nesting is supported. blk_io_plug_call() functions are only called at the
* outermost blk_io_unplug().
*/
void blk_io_plug(void)
{
Plug *plug = get_ptr_plug();
assert(plug->count < UINT32_MAX);
plug->count++;
}
/**
* blk_io_unplug: Run any pending blk_io_plug_call() functions
*
* There must have been a matching blk_io_plug() call in the same thread prior
* to this blk_io_unplug() call.
*/
void blk_io_unplug(void)
{
Plug *plug = get_ptr_plug();
assert(plug->count > 0);
if (--plug->count > 0) {
return;
}
GArray *array = plug->unplug_fns;
if (!array) {
return;
}
UnplugFn *fns = (UnplugFn *)array->data;
for (guint i = 0; i < array->len; i++) {
fns[i].fn(fns[i].opaque);
}
/*
* This resets the array without freeing memory so that appending is cheap
* in the future.
*/
g_array_set_size(array, 0);
}

View File

@ -537,7 +537,7 @@ static bool xen_block_handle_requests(XenBlockDataPlane *dataplane)
* is below us. * is below us.
*/ */
if (inflight_atstart > IO_PLUG_THRESHOLD) { if (inflight_atstart > IO_PLUG_THRESHOLD) {
blk_io_plug(dataplane->blk); blk_io_plug();
} }
while (rc != rp) { while (rc != rp) {
/* pull request from ring */ /* pull request from ring */
@ -577,12 +577,12 @@ static bool xen_block_handle_requests(XenBlockDataPlane *dataplane)
if (inflight_atstart > IO_PLUG_THRESHOLD && if (inflight_atstart > IO_PLUG_THRESHOLD &&
batched >= inflight_atstart) { batched >= inflight_atstart) {
blk_io_unplug(dataplane->blk); blk_io_unplug();
} }
xen_block_do_aio(request); xen_block_do_aio(request);
if (inflight_atstart > IO_PLUG_THRESHOLD) { if (inflight_atstart > IO_PLUG_THRESHOLD) {
if (batched >= inflight_atstart) { if (batched >= inflight_atstart) {
blk_io_plug(dataplane->blk); blk_io_plug();
batched = 0; batched = 0;
} else { } else {
batched++; batched++;
@ -590,7 +590,7 @@ static bool xen_block_handle_requests(XenBlockDataPlane *dataplane)
} }
} }
if (inflight_atstart > IO_PLUG_THRESHOLD) { if (inflight_atstart > IO_PLUG_THRESHOLD) {
blk_io_unplug(dataplane->blk); blk_io_unplug();
} }
return done_something; return done_something;

View File

@ -1134,7 +1134,7 @@ void virtio_blk_handle_vq(VirtIOBlock *s, VirtQueue *vq)
bool suppress_notifications = virtio_queue_get_notification(vq); bool suppress_notifications = virtio_queue_get_notification(vq);
aio_context_acquire(blk_get_aio_context(s->blk)); aio_context_acquire(blk_get_aio_context(s->blk));
blk_io_plug(s->blk); blk_io_plug();
do { do {
if (suppress_notifications) { if (suppress_notifications) {
@ -1158,7 +1158,7 @@ void virtio_blk_handle_vq(VirtIOBlock *s, VirtQueue *vq)
virtio_blk_submit_multireq(s, &mrb); virtio_blk_submit_multireq(s, &mrb);
} }
blk_io_unplug(s->blk); blk_io_unplug();
aio_context_release(blk_get_aio_context(s->blk)); aio_context_release(blk_get_aio_context(s->blk));
} }

View File

@ -799,7 +799,7 @@ static int virtio_scsi_handle_cmd_req_prepare(VirtIOSCSI *s, VirtIOSCSIReq *req)
return -ENOBUFS; return -ENOBUFS;
} }
scsi_req_ref(req->sreq); scsi_req_ref(req->sreq);
blk_io_plug(d->conf.blk); blk_io_plug();
object_unref(OBJECT(d)); object_unref(OBJECT(d));
return 0; return 0;
} }
@ -810,7 +810,7 @@ static void virtio_scsi_handle_cmd_req_submit(VirtIOSCSI *s, VirtIOSCSIReq *req)
if (scsi_req_enqueue(sreq)) { if (scsi_req_enqueue(sreq)) {
scsi_req_continue(sreq); scsi_req_continue(sreq);
} }
blk_io_unplug(sreq->dev->conf.blk); blk_io_unplug();
scsi_req_unref(sreq); scsi_req_unref(sreq);
} }
@ -836,7 +836,7 @@ static void virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq)
while (!QTAILQ_EMPTY(&reqs)) { while (!QTAILQ_EMPTY(&reqs)) {
req = QTAILQ_FIRST(&reqs); req = QTAILQ_FIRST(&reqs);
QTAILQ_REMOVE(&reqs, req, next); QTAILQ_REMOVE(&reqs, req, next);
blk_io_unplug(req->sreq->dev->conf.blk); blk_io_unplug();
scsi_req_unref(req->sreq); scsi_req_unref(req->sreq);
virtqueue_detach_element(req->vq, &req->elem, 0); virtqueue_detach_element(req->vq, &req->elem, 0);
virtio_scsi_free_req(req); virtio_scsi_free_req(req);

View File

@ -100,16 +100,9 @@ void blk_iostatus_set_err(BlockBackend *blk, int error);
int blk_get_max_iov(BlockBackend *blk); int blk_get_max_iov(BlockBackend *blk);
int blk_get_max_hw_iov(BlockBackend *blk); int blk_get_max_hw_iov(BlockBackend *blk);
/* void blk_io_plug(void);
* blk_io_plug/unplug are thread-local operations. This means that multiple void blk_io_unplug(void);
* IOThreads can simultaneously call plug/unplug, but the caller must ensure void blk_io_plug_call(void (*fn)(void *), void *opaque);
* that each unplug() is called in the same IOThread of the matching plug().
*/
void coroutine_fn blk_co_io_plug(BlockBackend *blk);
void co_wrapper blk_io_plug(BlockBackend *blk);
void coroutine_fn blk_co_io_unplug(BlockBackend *blk);
void co_wrapper blk_io_unplug(BlockBackend *blk);
AioContext *blk_get_aio_context(BlockBackend *blk); AioContext *blk_get_aio_context(BlockBackend *blk);
BlockAcctStats *blk_get_stats(BlockBackend *blk); BlockAcctStats *blk_get_stats(BlockBackend *blk);