9ec7a59b5a
Commit4c41c69e
changed the way the coroutine pool is sized because for virtio-blk devices with a large queue size and heavy I/O, it was just too small and caused coroutines to be deleted and reallocated soon afterwards. The change made the size dynamic based on the number of queues and the queue size of virtio-blk devices. There are two important numbers here: Slightly simplified, when a coroutine terminates, it is generally stored in the global release pool up to a certain pool size, and if the pool is full, it is freed. Conversely, when allocating a new coroutine, the coroutines in the release pool are reused if the pool already has reached a certain minimum size (the batch size), otherwise we allocate new coroutines. The problem after commit4c41c69e
is that it not only increases the maximum pool size (which is the intended effect), but also the batch size for reusing coroutines (which is a bug). It means that in cases with many devices and/or a large queue size (which defaults to the number of vcpus for virtio-blk-pci), many thousand coroutines could be sitting in the release pool without being reused. This is not only a waste of memory and allocations, but it actually makes the QEMU process likely to hit the vm.max_map_count limit on Linux because each coroutine requires two mappings (its stack and the guard page for the stack), causing it to abort() in qemu_alloc_stack() because when the limit is hit, mprotect() starts to fail with ENOMEM. In order to fix the problem, change the batch size back to 64 to avoid uselessly accumulating coroutines in the release pool, but keep the dynamic maximum pool size so that coroutines aren't freed too early in heavy I/O scenarios. Note that this fix doesn't strictly make it impossible to hit the limit, but this would only happen if most of the coroutines are actually in use at the same time, not just sitting in a pool. This is the same behaviour as we already had before commit4c41c69e
. Fully preventing this would require allowing qemu_coroutine_create() to return an error, but it doesn't seem to be a scenario that people hit in practice. Cc: qemu-stable@nongnu.org Resolves: https://bugzilla.redhat.com/show_bug.cgi?id=2079938 Fixes:4c41c69e05
Signed-off-by: Kevin Wolf <kwolf@redhat.com> Message-Id: <20220510151020.105528-3-kwolf@redhat.com> Tested-by: Hiroki Narukawa <hnarukaw@yahoo-corp.jp> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
230 lines
6.5 KiB
C
230 lines
6.5 KiB
C
/*
|
|
* QEMU coroutines
|
|
*
|
|
* Copyright IBM, Corp. 2011
|
|
*
|
|
* Authors:
|
|
* Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
|
|
* Kevin Wolf <kwolf@redhat.com>
|
|
*
|
|
* This work is licensed under the terms of the GNU LGPL, version 2 or later.
|
|
* See the COPYING.LIB file in the top-level directory.
|
|
*
|
|
*/
|
|
|
|
#include "qemu/osdep.h"
|
|
#include "trace.h"
|
|
#include "qemu/thread.h"
|
|
#include "qemu/atomic.h"
|
|
#include "qemu/coroutine.h"
|
|
#include "qemu/coroutine_int.h"
|
|
#include "qemu/coroutine-tls.h"
|
|
#include "block/aio.h"
|
|
|
|
/**
|
|
* The minimal batch size is always 64, coroutines from the release_pool are
|
|
* reused as soon as there are 64 coroutines in it. The maximum pool size starts
|
|
* with 64 and is increased on demand so that coroutines are not deleted even if
|
|
* they are not immediately reused.
|
|
*/
|
|
enum {
|
|
POOL_MIN_BATCH_SIZE = 64,
|
|
POOL_INITIAL_MAX_SIZE = 64,
|
|
};
|
|
|
|
/** Free list to speed up creation */
|
|
static QSLIST_HEAD(, Coroutine) release_pool = QSLIST_HEAD_INITIALIZER(pool);
|
|
static unsigned int pool_max_size = POOL_INITIAL_MAX_SIZE;
|
|
static unsigned int release_pool_size;
|
|
|
|
typedef QSLIST_HEAD(, Coroutine) CoroutineQSList;
|
|
QEMU_DEFINE_STATIC_CO_TLS(CoroutineQSList, alloc_pool);
|
|
QEMU_DEFINE_STATIC_CO_TLS(unsigned int, alloc_pool_size);
|
|
QEMU_DEFINE_STATIC_CO_TLS(Notifier, coroutine_pool_cleanup_notifier);
|
|
|
|
static void coroutine_pool_cleanup(Notifier *n, void *value)
|
|
{
|
|
Coroutine *co;
|
|
Coroutine *tmp;
|
|
CoroutineQSList *alloc_pool = get_ptr_alloc_pool();
|
|
|
|
QSLIST_FOREACH_SAFE(co, alloc_pool, pool_next, tmp) {
|
|
QSLIST_REMOVE_HEAD(alloc_pool, pool_next);
|
|
qemu_coroutine_delete(co);
|
|
}
|
|
}
|
|
|
|
Coroutine *qemu_coroutine_create(CoroutineEntry *entry, void *opaque)
|
|
{
|
|
Coroutine *co = NULL;
|
|
|
|
if (CONFIG_COROUTINE_POOL) {
|
|
CoroutineQSList *alloc_pool = get_ptr_alloc_pool();
|
|
|
|
co = QSLIST_FIRST(alloc_pool);
|
|
if (!co) {
|
|
if (release_pool_size > POOL_MIN_BATCH_SIZE) {
|
|
/* Slow path; a good place to register the destructor, too. */
|
|
Notifier *notifier = get_ptr_coroutine_pool_cleanup_notifier();
|
|
if (!notifier->notify) {
|
|
notifier->notify = coroutine_pool_cleanup;
|
|
qemu_thread_atexit_add(notifier);
|
|
}
|
|
|
|
/* This is not exact; there could be a little skew between
|
|
* release_pool_size and the actual size of release_pool. But
|
|
* it is just a heuristic, it does not need to be perfect.
|
|
*/
|
|
set_alloc_pool_size(qatomic_xchg(&release_pool_size, 0));
|
|
QSLIST_MOVE_ATOMIC(alloc_pool, &release_pool);
|
|
co = QSLIST_FIRST(alloc_pool);
|
|
}
|
|
}
|
|
if (co) {
|
|
QSLIST_REMOVE_HEAD(alloc_pool, pool_next);
|
|
set_alloc_pool_size(get_alloc_pool_size() - 1);
|
|
}
|
|
}
|
|
|
|
if (!co) {
|
|
co = qemu_coroutine_new();
|
|
}
|
|
|
|
co->entry = entry;
|
|
co->entry_arg = opaque;
|
|
QSIMPLEQ_INIT(&co->co_queue_wakeup);
|
|
return co;
|
|
}
|
|
|
|
static void coroutine_delete(Coroutine *co)
|
|
{
|
|
co->caller = NULL;
|
|
|
|
if (CONFIG_COROUTINE_POOL) {
|
|
if (release_pool_size < qatomic_read(&pool_max_size) * 2) {
|
|
QSLIST_INSERT_HEAD_ATOMIC(&release_pool, co, pool_next);
|
|
qatomic_inc(&release_pool_size);
|
|
return;
|
|
}
|
|
if (get_alloc_pool_size() < qatomic_read(&pool_max_size)) {
|
|
QSLIST_INSERT_HEAD(get_ptr_alloc_pool(), co, pool_next);
|
|
set_alloc_pool_size(get_alloc_pool_size() + 1);
|
|
return;
|
|
}
|
|
}
|
|
|
|
qemu_coroutine_delete(co);
|
|
}
|
|
|
|
void qemu_aio_coroutine_enter(AioContext *ctx, Coroutine *co)
|
|
{
|
|
QSIMPLEQ_HEAD(, Coroutine) pending = QSIMPLEQ_HEAD_INITIALIZER(pending);
|
|
Coroutine *from = qemu_coroutine_self();
|
|
|
|
QSIMPLEQ_INSERT_TAIL(&pending, co, co_queue_next);
|
|
|
|
/* Run co and any queued coroutines */
|
|
while (!QSIMPLEQ_EMPTY(&pending)) {
|
|
Coroutine *to = QSIMPLEQ_FIRST(&pending);
|
|
CoroutineAction ret;
|
|
|
|
/* Cannot rely on the read barrier for to in aio_co_wake(), as there are
|
|
* callers outside of aio_co_wake() */
|
|
const char *scheduled = qatomic_mb_read(&to->scheduled);
|
|
|
|
QSIMPLEQ_REMOVE_HEAD(&pending, co_queue_next);
|
|
|
|
trace_qemu_aio_coroutine_enter(ctx, from, to, to->entry_arg);
|
|
|
|
/* if the Coroutine has already been scheduled, entering it again will
|
|
* cause us to enter it twice, potentially even after the coroutine has
|
|
* been deleted */
|
|
if (scheduled) {
|
|
fprintf(stderr,
|
|
"%s: Co-routine was already scheduled in '%s'\n",
|
|
__func__, scheduled);
|
|
abort();
|
|
}
|
|
|
|
if (to->caller) {
|
|
fprintf(stderr, "Co-routine re-entered recursively\n");
|
|
abort();
|
|
}
|
|
|
|
to->caller = from;
|
|
to->ctx = ctx;
|
|
|
|
/* Store to->ctx before anything that stores to. Matches
|
|
* barrier in aio_co_wake and qemu_co_mutex_wake.
|
|
*/
|
|
smp_wmb();
|
|
|
|
ret = qemu_coroutine_switch(from, to, COROUTINE_ENTER);
|
|
|
|
/* Queued coroutines are run depth-first; previously pending coroutines
|
|
* run after those queued more recently.
|
|
*/
|
|
QSIMPLEQ_PREPEND(&pending, &to->co_queue_wakeup);
|
|
|
|
switch (ret) {
|
|
case COROUTINE_YIELD:
|
|
break;
|
|
case COROUTINE_TERMINATE:
|
|
assert(!to->locks_held);
|
|
trace_qemu_coroutine_terminate(to);
|
|
coroutine_delete(to);
|
|
break;
|
|
default:
|
|
abort();
|
|
}
|
|
}
|
|
}
|
|
|
|
void qemu_coroutine_enter(Coroutine *co)
|
|
{
|
|
qemu_aio_coroutine_enter(qemu_get_current_aio_context(), co);
|
|
}
|
|
|
|
void qemu_coroutine_enter_if_inactive(Coroutine *co)
|
|
{
|
|
if (!qemu_coroutine_entered(co)) {
|
|
qemu_coroutine_enter(co);
|
|
}
|
|
}
|
|
|
|
void coroutine_fn qemu_coroutine_yield(void)
|
|
{
|
|
Coroutine *self = qemu_coroutine_self();
|
|
Coroutine *to = self->caller;
|
|
|
|
trace_qemu_coroutine_yield(self, to);
|
|
|
|
if (!to) {
|
|
fprintf(stderr, "Co-routine is yielding to no one\n");
|
|
abort();
|
|
}
|
|
|
|
self->caller = NULL;
|
|
qemu_coroutine_switch(self, to, COROUTINE_YIELD);
|
|
}
|
|
|
|
bool qemu_coroutine_entered(Coroutine *co)
|
|
{
|
|
return co->caller;
|
|
}
|
|
|
|
AioContext *coroutine_fn qemu_coroutine_get_aio_context(Coroutine *co)
|
|
{
|
|
return co->ctx;
|
|
}
|
|
|
|
void qemu_coroutine_inc_pool_size(unsigned int additional_pool_size)
|
|
{
|
|
qatomic_add(&pool_max_size, additional_pool_size);
|
|
}
|
|
|
|
void qemu_coroutine_dec_pool_size(unsigned int removing_pool_size)
|
|
{
|
|
qatomic_sub(&pool_max_size, removing_pool_size);
|
|
}
|