aio: add aio_context_acquire() and aio_context_release()
It can be useful to run an AioContext from a thread which normally does not "own" the AioContext. For example, request draining can be implemented by acquiring the AioContext and looping aio_poll() until all requests have been completed. The following pattern should work: /* Event loop thread */ while (running) { aio_context_acquire(ctx); aio_poll(ctx, true); aio_context_release(ctx); } /* Another thread */ aio_context_acquire(ctx); bdrv_read(bs, 0x1000, buf, 1); aio_context_release(ctx); This patch implements aio_context_acquire() and aio_context_release(). Note that existing aio_poll() callers do not need to worry about acquiring and releasing - it is only needed when multiple threads will call aio_poll() on the same AioContext. Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
This commit is contained in:
parent
2da61b671e
commit
98563fc3ec
18
async.c
18
async.c
@ -214,6 +214,7 @@ aio_ctx_finalize(GSource *source)
|
||||
thread_pool_free(ctx->thread_pool);
|
||||
aio_set_event_notifier(ctx, &ctx->notifier, NULL);
|
||||
event_notifier_cleanup(&ctx->notifier);
|
||||
rfifolock_destroy(&ctx->lock);
|
||||
qemu_mutex_destroy(&ctx->bh_lock);
|
||||
g_array_free(ctx->pollfds, TRUE);
|
||||
timerlistgroup_deinit(&ctx->tlg);
|
||||
@ -250,6 +251,12 @@ static void aio_timerlist_notify(void *opaque)
|
||||
aio_notify(opaque);
|
||||
}
|
||||
|
||||
static void aio_rfifolock_cb(void *opaque)
|
||||
{
|
||||
/* Kick owner thread in case they are blocked in aio_poll() */
|
||||
aio_notify(opaque);
|
||||
}
|
||||
|
||||
AioContext *aio_context_new(void)
|
||||
{
|
||||
AioContext *ctx;
|
||||
@ -257,6 +264,7 @@ AioContext *aio_context_new(void)
|
||||
ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
|
||||
ctx->thread_pool = NULL;
|
||||
qemu_mutex_init(&ctx->bh_lock);
|
||||
rfifolock_init(&ctx->lock, aio_rfifolock_cb, ctx);
|
||||
event_notifier_init(&ctx->notifier, false);
|
||||
aio_set_event_notifier(ctx, &ctx->notifier,
|
||||
(EventNotifierHandler *)
|
||||
@ -275,3 +283,13 @@ void aio_context_unref(AioContext *ctx)
|
||||
{
|
||||
g_source_unref(&ctx->source);
|
||||
}
|
||||
|
||||
void aio_context_acquire(AioContext *ctx)
|
||||
{
|
||||
rfifolock_lock(&ctx->lock);
|
||||
}
|
||||
|
||||
void aio_context_release(AioContext *ctx)
|
||||
{
|
||||
rfifolock_unlock(&ctx->lock);
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
#include "qemu/queue.h"
|
||||
#include "qemu/event_notifier.h"
|
||||
#include "qemu/thread.h"
|
||||
#include "qemu/rfifolock.h"
|
||||
#include "qemu/timer.h"
|
||||
|
||||
typedef struct BlockDriverAIOCB BlockDriverAIOCB;
|
||||
@ -47,6 +48,9 @@ typedef void IOHandler(void *opaque);
|
||||
struct AioContext {
|
||||
GSource source;
|
||||
|
||||
/* Protects all fields from multi-threaded access */
|
||||
RFifoLock lock;
|
||||
|
||||
/* The list of registered AIO handlers */
|
||||
QLIST_HEAD(, AioHandler) aio_handlers;
|
||||
|
||||
@ -104,6 +108,20 @@ void aio_context_ref(AioContext *ctx);
|
||||
*/
|
||||
void aio_context_unref(AioContext *ctx);
|
||||
|
||||
/* Take ownership of the AioContext. If the AioContext will be shared between
|
||||
* threads, a thread must have ownership when calling aio_poll().
|
||||
*
|
||||
* Note that multiple threads calling aio_poll() means timers, BHs, and
|
||||
* callbacks may be invoked from a different thread than they were registered
|
||||
* from. Therefore, code must use AioContext acquire/release or use
|
||||
* fine-grained synchronization to protect shared state if other threads will
|
||||
* be accessing it simultaneously.
|
||||
*/
|
||||
void aio_context_acquire(AioContext *ctx);
|
||||
|
||||
/* Relinquish ownership of the AioContext. */
|
||||
void aio_context_release(AioContext *ctx);
|
||||
|
||||
/**
|
||||
* aio_bh_new: Allocate a new bottom half structure.
|
||||
*
|
||||
|
@ -112,6 +112,64 @@ static void test_notify(void)
|
||||
g_assert(!aio_poll(ctx, false));
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
QemuMutex start_lock;
|
||||
bool thread_acquired;
|
||||
} AcquireTestData;
|
||||
|
||||
static void *test_acquire_thread(void *opaque)
|
||||
{
|
||||
AcquireTestData *data = opaque;
|
||||
|
||||
/* Wait for other thread to let us start */
|
||||
qemu_mutex_lock(&data->start_lock);
|
||||
qemu_mutex_unlock(&data->start_lock);
|
||||
|
||||
aio_context_acquire(ctx);
|
||||
aio_context_release(ctx);
|
||||
|
||||
data->thread_acquired = true; /* success, we got here */
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void dummy_notifier_read(EventNotifier *unused)
|
||||
{
|
||||
g_assert(false); /* should never be invoked */
|
||||
}
|
||||
|
||||
static void test_acquire(void)
|
||||
{
|
||||
QemuThread thread;
|
||||
EventNotifier notifier;
|
||||
AcquireTestData data;
|
||||
|
||||
/* Dummy event notifier ensures aio_poll() will block */
|
||||
event_notifier_init(¬ifier, false);
|
||||
aio_set_event_notifier(ctx, ¬ifier, dummy_notifier_read);
|
||||
g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */
|
||||
|
||||
qemu_mutex_init(&data.start_lock);
|
||||
qemu_mutex_lock(&data.start_lock);
|
||||
data.thread_acquired = false;
|
||||
|
||||
qemu_thread_create(&thread, "test_acquire_thread",
|
||||
test_acquire_thread,
|
||||
&data, QEMU_THREAD_JOINABLE);
|
||||
|
||||
/* Block in aio_poll(), let other thread kick us and acquire context */
|
||||
aio_context_acquire(ctx);
|
||||
qemu_mutex_unlock(&data.start_lock); /* let the thread run */
|
||||
g_assert(!aio_poll(ctx, true));
|
||||
aio_context_release(ctx);
|
||||
|
||||
qemu_thread_join(&thread);
|
||||
aio_set_event_notifier(ctx, ¬ifier, NULL);
|
||||
event_notifier_cleanup(¬ifier);
|
||||
|
||||
g_assert(data.thread_acquired);
|
||||
}
|
||||
|
||||
static void test_bh_schedule(void)
|
||||
{
|
||||
BHTestData data = { .n = 0 };
|
||||
@ -775,6 +833,7 @@ int main(int argc, char **argv)
|
||||
|
||||
g_test_init(&argc, &argv, NULL);
|
||||
g_test_add_func("/aio/notify", test_notify);
|
||||
g_test_add_func("/aio/acquire", test_acquire);
|
||||
g_test_add_func("/aio/bh/schedule", test_bh_schedule);
|
||||
g_test_add_func("/aio/bh/schedule10", test_bh_schedule10);
|
||||
g_test_add_func("/aio/bh/cancel", test_bh_cancel);
|
||||
|
Loading…
Reference in New Issue
Block a user