diff --git a/async.c b/async.c index 46d9e639d7..77d080d6f5 100644 --- a/async.c +++ b/async.c @@ -280,6 +280,12 @@ static void aio_timerlist_notify(void *opaque) aio_notify(opaque); } +static void aio_rfifolock_cb(void *opaque) +{ + /* Kick owner thread in case they are blocked in aio_poll() */ + aio_notify(opaque); +} + AioContext *aio_context_new(Error **errp) { int ret; @@ -297,7 +303,7 @@ AioContext *aio_context_new(Error **errp) event_notifier_test_and_clear); ctx->thread_pool = NULL; qemu_mutex_init(&ctx->bh_lock); - rfifolock_init(&ctx->lock, NULL, NULL); + rfifolock_init(&ctx->lock, aio_rfifolock_cb, ctx); timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx); return ctx; diff --git a/iothread.c b/iothread.c index 0416fc4268..878a594ef4 100644 --- a/iothread.c +++ b/iothread.c @@ -31,14 +31,21 @@ typedef ObjectClass IOThreadClass; static void *iothread_run(void *opaque) { IOThread *iothread = opaque; + bool blocking; qemu_mutex_lock(&iothread->init_done_lock); iothread->thread_id = qemu_get_thread_id(); qemu_cond_signal(&iothread->init_done_cond); qemu_mutex_unlock(&iothread->init_done_lock); - while (!atomic_read(&iothread->stopping)) { - aio_poll(iothread->ctx, true); + while (!iothread->stopping) { + aio_context_acquire(iothread->ctx); + blocking = true; + while (!iothread->stopping && aio_poll(iothread->ctx, blocking)) { + /* Progress was made, keep going */ + blocking = false; + } + aio_context_release(iothread->ctx); } return NULL; } diff --git a/tests/test-aio.c b/tests/test-aio.c index 4b0cb45d31..a7cb5c9915 100644 --- a/tests/test-aio.c +++ b/tests/test-aio.c @@ -107,7 +107,6 @@ static void test_notify(void) typedef struct { QemuMutex start_lock; - EventNotifier notifier; bool thread_acquired; } AcquireTestData; @@ -119,8 +118,6 @@ static void *test_acquire_thread(void *opaque) qemu_mutex_lock(&data->start_lock); qemu_mutex_unlock(&data->start_lock); - g_usleep(500000); - event_notifier_set(&data->notifier); aio_context_acquire(ctx); aio_context_release(ctx); @@ -129,19 +126,20 @@ static void *test_acquire_thread(void *opaque) return NULL; } -static void dummy_notifier_read(EventNotifier *n) +static void dummy_notifier_read(EventNotifier *unused) { - event_notifier_test_and_clear(n); + g_assert(false); /* should never be invoked */ } static void test_acquire(void) { QemuThread thread; + EventNotifier notifier; AcquireTestData data; /* Dummy event notifier ensures aio_poll() will block */ - event_notifier_init(&data.notifier, false); - aio_set_event_notifier(ctx, &data.notifier, dummy_notifier_read); + event_notifier_init(¬ifier, false); + aio_set_event_notifier(ctx, ¬ifier, dummy_notifier_read); g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */ qemu_mutex_init(&data.start_lock); @@ -155,13 +153,12 @@ static void test_acquire(void) /* Block in aio_poll(), let other thread kick us and acquire context */ aio_context_acquire(ctx); qemu_mutex_unlock(&data.start_lock); /* let the thread run */ - g_assert(aio_poll(ctx, true)); - g_assert(!data.thread_acquired); + g_assert(!aio_poll(ctx, true)); aio_context_release(ctx); qemu_thread_join(&thread); - aio_set_event_notifier(ctx, &data.notifier, NULL); - event_notifier_cleanup(&data.notifier); + aio_set_event_notifier(ctx, ¬ifier, NULL); + event_notifier_cleanup(¬ifier); g_assert(data.thread_acquired); }