diff --git a/include/io/channel.h b/include/io/channel.h index 446a566e5e..229bf36910 100644 --- a/include/io/channel.h +++ b/include/io/channel.h @@ -757,6 +757,16 @@ void qio_channel_detach_aio_context(QIOChannel *ioc); void coroutine_fn qio_channel_yield(QIOChannel *ioc, GIOCondition condition); +/** + * qio_channel_wake_read: + * @ioc: the channel object + * + * If qio_channel_yield() is currently waiting for the channel to become + * readable, interrupt it and reenter immediately. This function is safe to call + * from any thread. + */ +void qio_channel_wake_read(QIOChannel *ioc); + /** * qio_channel_wait: * @ioc: the channel object diff --git a/io/channel.c b/io/channel.c index 375a130a39..72f0066af5 100644 --- a/io/channel.c +++ b/io/channel.c @@ -19,6 +19,7 @@ */ #include "qemu/osdep.h" +#include "block/aio-wait.h" #include "io/channel.h" #include "qapi/error.h" #include "qemu/main-loop.h" @@ -514,7 +515,11 @@ int qio_channel_flush(QIOChannel *ioc, static void qio_channel_restart_read(void *opaque) { QIOChannel *ioc = opaque; - Coroutine *co = ioc->read_coroutine; + Coroutine *co = qatomic_xchg(&ioc->read_coroutine, NULL); + + if (!co) { + return; + } /* Assert that aio_co_wake() reenters the coroutine directly */ assert(qemu_get_current_aio_context() == @@ -525,7 +530,11 @@ static void qio_channel_restart_read(void *opaque) static void qio_channel_restart_write(void *opaque) { QIOChannel *ioc = opaque; - Coroutine *co = ioc->write_coroutine; + Coroutine *co = qatomic_xchg(&ioc->write_coroutine, NULL); + + if (!co) { + return; + } /* Assert that aio_co_wake() reenters the coroutine directly */ assert(qemu_get_current_aio_context() == @@ -568,7 +577,11 @@ void qio_channel_detach_aio_context(QIOChannel *ioc) void coroutine_fn qio_channel_yield(QIOChannel *ioc, GIOCondition condition) { + AioContext *ioc_ctx = ioc->ctx ?: qemu_get_aio_context(); + assert(qemu_in_coroutine()); + assert(in_aio_context_home_thread(ioc_ctx)); + if (condition == G_IO_IN) { assert(!ioc->read_coroutine); ioc->read_coroutine = qemu_coroutine_self(); @@ -580,18 +593,26 @@ void coroutine_fn qio_channel_yield(QIOChannel *ioc, } qio_channel_set_aio_fd_handlers(ioc); qemu_coroutine_yield(); + assert(in_aio_context_home_thread(ioc_ctx)); /* Allow interrupting the operation by reentering the coroutine other than * through the aio_fd_handlers. */ - if (condition == G_IO_IN && ioc->read_coroutine) { - ioc->read_coroutine = NULL; + if (condition == G_IO_IN) { + assert(ioc->read_coroutine == NULL); qio_channel_set_aio_fd_handlers(ioc); - } else if (condition == G_IO_OUT && ioc->write_coroutine) { - ioc->write_coroutine = NULL; + } else if (condition == G_IO_OUT) { + assert(ioc->write_coroutine == NULL); qio_channel_set_aio_fd_handlers(ioc); } } +void qio_channel_wake_read(QIOChannel *ioc) +{ + Coroutine *co = qatomic_xchg(&ioc->read_coroutine, NULL); + if (co) { + aio_co_wake(co); + } +} static gboolean qio_channel_wait_complete(QIOChannel *ioc, GIOCondition condition, diff --git a/nbd/server.c b/nbd/server.c index e239c2890f..2664d43bff 100644 --- a/nbd/server.c +++ b/nbd/server.c @@ -1599,8 +1599,7 @@ static bool nbd_drained_poll(void *opaque) * enter it here so we don't depend on the client to wake it up. */ if (client->recv_coroutine != NULL && client->read_yielding) { - qemu_aio_coroutine_enter(exp->common.ctx, - client->recv_coroutine); + qio_channel_wake_read(client->ioc); } return true;