diff --git a/HACKING b/HACKING index 20a910168d..4125c97d8d 100644 --- a/HACKING +++ b/HACKING @@ -1,10 +1,28 @@ 1. Preprocessor +1.1. Variadic macros + For variadic macros, stick with this C99-like syntax: #define DPRINTF(fmt, ...) \ do { printf("IRQ: " fmt, ## __VA_ARGS__); } while (0) +1.2. Include directives + +Order include directives as follows: + +#include "qemu/osdep.h" /* Always first... */ +#include <...> /* then system headers... */ +#include "..." /* and finally QEMU headers. */ + +The "qemu/osdep.h" header contains preprocessor macros that affect the behavior +of core system headers like . It must be the first include so that +core system headers included by external libraries get the preprocessor macros +that QEMU depends on. + +Do not include "qemu/osdep.h" from header files since the .c file will have +already included it. + 2. C types It should be common sense to use the right type, but we have collected diff --git a/aio-posix.c b/aio-posix.c index e13b9ab2b0..15855715d4 100644 --- a/aio-posix.c +++ b/aio-posix.c @@ -18,6 +18,8 @@ #include "block/block.h" #include "qemu/queue.h" #include "qemu/sockets.h" +#include "qemu/cutils.h" +#include "trace.h" #ifdef CONFIG_EPOLL_CREATE1 #include #endif @@ -27,6 +29,9 @@ struct AioHandler GPollFD pfd; IOHandler *io_read; IOHandler *io_write; + AioPollFn *io_poll; + IOHandler *io_poll_begin; + IOHandler *io_poll_end; int deleted; void *opaque; bool is_external; @@ -200,6 +205,7 @@ void aio_set_fd_handler(AioContext *ctx, bool is_external, IOHandler *io_read, IOHandler *io_write, + AioPollFn *io_poll, void *opaque) { AioHandler *node; @@ -209,7 +215,7 @@ void aio_set_fd_handler(AioContext *ctx, node = find_aio_handler(ctx, fd); /* Are we deleting the fd handler? */ - if (!io_read && !io_write) { + if (!io_read && !io_write && !io_poll) { if (node == NULL) { return; } @@ -228,6 +234,10 @@ void aio_set_fd_handler(AioContext *ctx, QLIST_REMOVE(node, node); deleted = true; } + + if (!node->io_poll) { + ctx->poll_disable_cnt--; + } } else { if (node == NULL) { /* Alloc and insert if it's not already there */ @@ -237,10 +247,16 @@ void aio_set_fd_handler(AioContext *ctx, g_source_add_poll(&ctx->source, &node->pfd); is_new = true; + + ctx->poll_disable_cnt += !io_poll; + } else { + ctx->poll_disable_cnt += !io_poll - !node->io_poll; } + /* Update handler with latest information */ node->io_read = io_read; node->io_write = io_write; + node->io_poll = io_poll; node->opaque = opaque; node->is_external = is_external; @@ -250,22 +266,83 @@ void aio_set_fd_handler(AioContext *ctx, aio_epoll_update(ctx, node, is_new); aio_notify(ctx); + if (deleted) { g_free(node); } } +void aio_set_fd_poll(AioContext *ctx, int fd, + IOHandler *io_poll_begin, + IOHandler *io_poll_end) +{ + AioHandler *node = find_aio_handler(ctx, fd); + + if (!node) { + return; + } + + node->io_poll_begin = io_poll_begin; + node->io_poll_end = io_poll_end; +} + void aio_set_event_notifier(AioContext *ctx, EventNotifier *notifier, bool is_external, - EventNotifierHandler *io_read) + EventNotifierHandler *io_read, + AioPollFn *io_poll) { - aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), - is_external, (IOHandler *)io_read, NULL, notifier); + aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external, + (IOHandler *)io_read, NULL, io_poll, notifier); } +void aio_set_event_notifier_poll(AioContext *ctx, + EventNotifier *notifier, + EventNotifierHandler *io_poll_begin, + EventNotifierHandler *io_poll_end) +{ + aio_set_fd_poll(ctx, event_notifier_get_fd(notifier), + (IOHandler *)io_poll_begin, + (IOHandler *)io_poll_end); +} + +static void poll_set_started(AioContext *ctx, bool started) +{ + AioHandler *node; + + if (started == ctx->poll_started) { + return; + } + + ctx->poll_started = started; + + ctx->walking_handlers++; + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + IOHandler *fn; + + if (node->deleted) { + continue; + } + + if (started) { + fn = node->io_poll_begin; + } else { + fn = node->io_poll_end; + } + + if (fn) { + fn(node->opaque); + } + } + ctx->walking_handlers--; +} + + bool aio_prepare(AioContext *ctx) { + /* Poll mode cannot be used with glib's event loop, disable it. */ + poll_set_started(ctx, false); + return false; } @@ -290,9 +367,13 @@ bool aio_pending(AioContext *ctx) return false; } -bool aio_dispatch(AioContext *ctx) +/* + * Note that dispatch_fds == false has the side-effect of post-poning the + * freeing of deleted handlers. + */ +bool aio_dispatch(AioContext *ctx, bool dispatch_fds) { - AioHandler *node; + AioHandler *node = NULL; bool progress = false; /* @@ -308,7 +389,9 @@ bool aio_dispatch(AioContext *ctx) * We have to walk very carefully in case aio_set_fd_handler is * called while we're walking. */ - node = QLIST_FIRST(&ctx->aio_handlers); + if (dispatch_fds) { + node = QLIST_FIRST(&ctx->aio_handlers); + } while (node) { AioHandler *tmp; int revents; @@ -400,12 +483,100 @@ static void add_pollfd(AioHandler *node) npfd++; } +static bool run_poll_handlers_once(AioContext *ctx) +{ + bool progress = false; + AioHandler *node; + + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + if (!node->deleted && node->io_poll && + node->io_poll(node->opaque)) { + progress = true; + } + + /* Caller handles freeing deleted nodes. Don't do it here. */ + } + + return progress; +} + +/* run_poll_handlers: + * @ctx: the AioContext + * @max_ns: maximum time to poll for, in nanoseconds + * + * Polls for a given time. + * + * Note that ctx->notify_me must be non-zero so this function can detect + * aio_notify(). + * + * Note that the caller must have incremented ctx->walking_handlers. + * + * Returns: true if progress was made, false otherwise + */ +static bool run_poll_handlers(AioContext *ctx, int64_t max_ns) +{ + bool progress; + int64_t end_time; + + assert(ctx->notify_me); + assert(ctx->walking_handlers > 0); + assert(ctx->poll_disable_cnt == 0); + + trace_run_poll_handlers_begin(ctx, max_ns); + + end_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + max_ns; + + do { + progress = run_poll_handlers_once(ctx); + } while (!progress && qemu_clock_get_ns(QEMU_CLOCK_REALTIME) < end_time); + + trace_run_poll_handlers_end(ctx, progress); + + return progress; +} + +/* try_poll_mode: + * @ctx: the AioContext + * @blocking: busy polling is only attempted when blocking is true + * + * ctx->notify_me must be non-zero so this function can detect aio_notify(). + * + * Note that the caller must have incremented ctx->walking_handlers. + * + * Returns: true if progress was made, false otherwise + */ +static bool try_poll_mode(AioContext *ctx, bool blocking) +{ + if (blocking && ctx->poll_max_ns && ctx->poll_disable_cnt == 0) { + /* See qemu_soonest_timeout() uint64_t hack */ + int64_t max_ns = MIN((uint64_t)aio_compute_timeout(ctx), + (uint64_t)ctx->poll_ns); + + if (max_ns) { + poll_set_started(ctx, true); + + if (run_poll_handlers(ctx, max_ns)) { + return true; + } + } + } + + poll_set_started(ctx, false); + + /* Even if we don't run busy polling, try polling once in case it can make + * progress and the caller will be able to avoid ppoll(2)/epoll_wait(2). + */ + return run_poll_handlers_once(ctx); +} + bool aio_poll(AioContext *ctx, bool blocking) { AioHandler *node; - int i, ret; + int i; + int ret = 0; bool progress; int64_t timeout; + int64_t start = 0; aio_context_acquire(ctx); progress = false; @@ -423,41 +594,91 @@ bool aio_poll(AioContext *ctx, bool blocking) ctx->walking_handlers++; - assert(npfd == 0); + if (ctx->poll_max_ns) { + start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); + } - /* fill pollfds */ + if (try_poll_mode(ctx, blocking)) { + progress = true; + } else { + assert(npfd == 0); - if (!aio_epoll_enabled(ctx)) { - QLIST_FOREACH(node, &ctx->aio_handlers, node) { - if (!node->deleted && node->pfd.events - && aio_node_check(ctx, node->is_external)) { - add_pollfd(node); + /* fill pollfds */ + + if (!aio_epoll_enabled(ctx)) { + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + if (!node->deleted && node->pfd.events + && aio_node_check(ctx, node->is_external)) { + add_pollfd(node); + } } } + + timeout = blocking ? aio_compute_timeout(ctx) : 0; + + /* wait until next event */ + if (timeout) { + aio_context_release(ctx); + } + if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) { + AioHandler epoll_handler; + + epoll_handler.pfd.fd = ctx->epollfd; + epoll_handler.pfd.events = G_IO_IN | G_IO_OUT | G_IO_HUP | G_IO_ERR; + npfd = 0; + add_pollfd(&epoll_handler); + ret = aio_epoll(ctx, pollfds, npfd, timeout); + } else { + ret = qemu_poll_ns(pollfds, npfd, timeout); + } + if (timeout) { + aio_context_acquire(ctx); + } } - timeout = blocking ? aio_compute_timeout(ctx) : 0; - - /* wait until next event */ - if (timeout) { - aio_context_release(ctx); - } - if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) { - AioHandler epoll_handler; - - epoll_handler.pfd.fd = ctx->epollfd; - epoll_handler.pfd.events = G_IO_IN | G_IO_OUT | G_IO_HUP | G_IO_ERR; - npfd = 0; - add_pollfd(&epoll_handler); - ret = aio_epoll(ctx, pollfds, npfd, timeout); - } else { - ret = qemu_poll_ns(pollfds, npfd, timeout); - } if (blocking) { atomic_sub(&ctx->notify_me, 2); } - if (timeout) { - aio_context_acquire(ctx); + + /* Adjust polling time */ + if (ctx->poll_max_ns) { + int64_t block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start; + + if (block_ns <= ctx->poll_ns) { + /* This is the sweet spot, no adjustment needed */ + } else if (block_ns > ctx->poll_max_ns) { + /* We'd have to poll for too long, poll less */ + int64_t old = ctx->poll_ns; + + if (ctx->poll_shrink) { + ctx->poll_ns /= ctx->poll_shrink; + } else { + ctx->poll_ns = 0; + } + + trace_poll_shrink(ctx, old, ctx->poll_ns); + } else if (ctx->poll_ns < ctx->poll_max_ns && + block_ns < ctx->poll_max_ns) { + /* There is room to grow, poll longer */ + int64_t old = ctx->poll_ns; + int64_t grow = ctx->poll_grow; + + if (grow == 0) { + grow = 2; + } + + if (ctx->poll_ns) { + ctx->poll_ns *= grow; + } else { + ctx->poll_ns = 4000; /* start polling at 4 microseconds */ + } + + if (ctx->poll_ns > ctx->poll_max_ns) { + ctx->poll_ns = ctx->poll_max_ns; + } + + trace_poll_grow(ctx, old, ctx->poll_ns); + } } aio_notify_accept(ctx); @@ -473,7 +694,7 @@ bool aio_poll(AioContext *ctx, bool blocking) ctx->walking_handlers--; /* Run dispatch even if there were no readable fds to run timers */ - if (aio_dispatch(ctx)) { + if (aio_dispatch(ctx, ret > 0)) { progress = true; } @@ -484,6 +705,13 @@ bool aio_poll(AioContext *ctx, bool blocking) void aio_context_setup(AioContext *ctx) { + /* TODO remove this in final patch submission */ + if (getenv("QEMU_AIO_POLL_MAX_NS")) { + fprintf(stderr, "The QEMU_AIO_POLL_MAX_NS environment variable has " + "been replaced with -object iothread,poll-max-ns=NUM\n"); + exit(1); + } + #ifdef CONFIG_EPOLL_CREATE1 assert(!ctx->epollfd); ctx->epollfd = epoll_create1(EPOLL_CLOEXEC); @@ -495,3 +723,17 @@ void aio_context_setup(AioContext *ctx) } #endif } + +void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, + int64_t grow, int64_t shrink, Error **errp) +{ + /* No thread synchronization here, it doesn't matter if an incorrect value + * is used once. + */ + ctx->poll_max_ns = max_ns; + ctx->poll_ns = 0; + ctx->poll_grow = grow; + ctx->poll_shrink = shrink; + + aio_notify(ctx); +} diff --git a/aio-win32.c b/aio-win32.c index c8c249e260..d19dc429d8 100644 --- a/aio-win32.c +++ b/aio-win32.c @@ -20,6 +20,7 @@ #include "block/block.h" #include "qemu/queue.h" #include "qemu/sockets.h" +#include "qapi/error.h" struct AioHandler { EventNotifier *e; @@ -38,6 +39,7 @@ void aio_set_fd_handler(AioContext *ctx, bool is_external, IOHandler *io_read, IOHandler *io_write, + AioPollFn *io_poll, void *opaque) { /* fd is a SOCKET in our case */ @@ -100,10 +102,18 @@ void aio_set_fd_handler(AioContext *ctx, aio_notify(ctx); } +void aio_set_fd_poll(AioContext *ctx, int fd, + IOHandler *io_poll_begin, + IOHandler *io_poll_end) +{ + /* Not implemented */ +} + void aio_set_event_notifier(AioContext *ctx, EventNotifier *e, bool is_external, - EventNotifierHandler *io_notify) + EventNotifierHandler *io_notify, + AioPollFn *io_poll) { AioHandler *node; @@ -150,6 +160,14 @@ void aio_set_event_notifier(AioContext *ctx, aio_notify(ctx); } +void aio_set_event_notifier_poll(AioContext *ctx, + EventNotifier *notifier, + EventNotifierHandler *io_poll_begin, + EventNotifierHandler *io_poll_end) +{ + /* Not implemented */ +} + bool aio_prepare(AioContext *ctx) { static struct timeval tv0; @@ -271,12 +289,14 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) return progress; } -bool aio_dispatch(AioContext *ctx) +bool aio_dispatch(AioContext *ctx, bool dispatch_fds) { bool progress; progress = aio_bh_poll(ctx); - progress |= aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE); + if (dispatch_fds) { + progress |= aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE); + } progress |= timerlistgroup_run_timers(&ctx->tlg); return progress; } @@ -374,3 +394,9 @@ bool aio_poll(AioContext *ctx, bool blocking) void aio_context_setup(AioContext *ctx) { } + +void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, + int64_t grow, int64_t shrink, Error **errp) +{ + error_setg(errp, "AioContext polling is not implemented on Windows"); +} diff --git a/async.c b/async.c index b2de360c23..2960171834 100644 --- a/async.c +++ b/async.c @@ -251,7 +251,7 @@ aio_ctx_dispatch(GSource *source, AioContext *ctx = (AioContext *) source; assert(callback == NULL); - aio_dispatch(ctx); + aio_dispatch(ctx, true); return true; } @@ -282,7 +282,7 @@ aio_ctx_finalize(GSource *source) } qemu_mutex_unlock(&ctx->bh_lock); - aio_set_event_notifier(ctx, &ctx->notifier, false, NULL); + aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL); event_notifier_cleanup(&ctx->notifier); qemu_rec_mutex_destroy(&ctx->lock); qemu_mutex_destroy(&ctx->bh_lock); @@ -349,6 +349,15 @@ static void event_notifier_dummy_cb(EventNotifier *e) { } +/* Returns true if aio_notify() was called (e.g. a BH was scheduled) */ +static bool event_notifier_poll(void *opaque) +{ + EventNotifier *e = opaque; + AioContext *ctx = container_of(e, AioContext, notifier); + + return atomic_read(&ctx->notified); +} + AioContext *aio_context_new(Error **errp) { int ret; @@ -366,7 +375,8 @@ AioContext *aio_context_new(Error **errp) aio_set_event_notifier(ctx, &ctx->notifier, false, (EventNotifierHandler *) - event_notifier_dummy_cb); + event_notifier_dummy_cb, + event_notifier_poll); #ifdef CONFIG_LINUX_AIO ctx->linux_aio = NULL; #endif @@ -375,6 +385,11 @@ AioContext *aio_context_new(Error **errp) qemu_rec_mutex_init(&ctx->lock); timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx); + ctx->poll_ns = 0; + ctx->poll_max_ns = 0; + ctx->poll_grow = 0; + ctx->poll_shrink = 0; + return ctx; fail: g_source_destroy(&ctx->source); diff --git a/block/curl.c b/block/curl.c index 0404c1b5fa..792fef8269 100644 --- a/block/curl.c +++ b/block/curl.c @@ -192,19 +192,19 @@ static int curl_sock_cb(CURL *curl, curl_socket_t fd, int action, switch (action) { case CURL_POLL_IN: aio_set_fd_handler(s->aio_context, fd, false, - curl_multi_read, NULL, state); + curl_multi_read, NULL, NULL, state); break; case CURL_POLL_OUT: aio_set_fd_handler(s->aio_context, fd, false, - NULL, curl_multi_do, state); + NULL, curl_multi_do, NULL, state); break; case CURL_POLL_INOUT: aio_set_fd_handler(s->aio_context, fd, false, - curl_multi_read, curl_multi_do, state); + curl_multi_read, curl_multi_do, NULL, state); break; case CURL_POLL_REMOVE: aio_set_fd_handler(s->aio_context, fd, false, - NULL, NULL, NULL); + NULL, NULL, NULL, NULL); break; } diff --git a/block/iscsi.c b/block/iscsi.c index 0960929d57..6aeeb9ec4f 100644 --- a/block/iscsi.c +++ b/block/iscsi.c @@ -362,6 +362,7 @@ iscsi_set_events(IscsiLun *iscsilun) false, (ev & POLLIN) ? iscsi_process_read : NULL, (ev & POLLOUT) ? iscsi_process_write : NULL, + NULL, iscsilun); iscsilun->events = ev; } @@ -1526,7 +1527,7 @@ static void iscsi_detach_aio_context(BlockDriverState *bs) IscsiLun *iscsilun = bs->opaque; aio_set_fd_handler(iscsilun->aio_context, iscsi_get_fd(iscsilun->iscsi), - false, NULL, NULL, NULL); + false, NULL, NULL, NULL, NULL); iscsilun->events = 0; if (iscsilun->nop_timer) { diff --git a/block/linux-aio.c b/block/linux-aio.c index 1685ec29a3..03ab741d37 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -255,6 +255,20 @@ static void qemu_laio_completion_cb(EventNotifier *e) } } +static bool qemu_laio_poll_cb(void *opaque) +{ + EventNotifier *e = opaque; + LinuxAioState *s = container_of(e, LinuxAioState, e); + struct io_event *events; + + if (!io_getevents_peek(s->ctx, &events)) { + return false; + } + + qemu_laio_process_completions_and_submit(s); + return true; +} + static void laio_cancel(BlockAIOCB *blockacb) { struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb; @@ -439,7 +453,7 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, LinuxAioState *s, int fd, void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context) { - aio_set_event_notifier(old_context, &s->e, false, NULL); + aio_set_event_notifier(old_context, &s->e, false, NULL, NULL); qemu_bh_delete(s->completion_bh); } @@ -448,7 +462,8 @@ void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context) s->aio_context = new_context; s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s); aio_set_event_notifier(new_context, &s->e, false, - qemu_laio_completion_cb); + qemu_laio_completion_cb, + qemu_laio_poll_cb); } LinuxAioState *laio_init(void) diff --git a/block/nbd-client.c b/block/nbd-client.c index 3779c6c999..06f1532805 100644 --- a/block/nbd-client.c +++ b/block/nbd-client.c @@ -145,7 +145,7 @@ static int nbd_co_send_request(BlockDriverState *bs, aio_context = bdrv_get_aio_context(bs); aio_set_fd_handler(aio_context, s->sioc->fd, false, - nbd_reply_ready, nbd_restart_write, bs); + nbd_reply_ready, nbd_restart_write, NULL, bs); if (qiov) { qio_channel_set_cork(s->ioc, true); rc = nbd_send_request(s->ioc, request); @@ -161,7 +161,7 @@ static int nbd_co_send_request(BlockDriverState *bs, rc = nbd_send_request(s->ioc, request); } aio_set_fd_handler(aio_context, s->sioc->fd, false, - nbd_reply_ready, NULL, bs); + nbd_reply_ready, NULL, NULL, bs); s->send_coroutine = NULL; qemu_co_mutex_unlock(&s->send_mutex); return rc; @@ -366,14 +366,14 @@ void nbd_client_detach_aio_context(BlockDriverState *bs) { aio_set_fd_handler(bdrv_get_aio_context(bs), nbd_get_client_session(bs)->sioc->fd, - false, NULL, NULL, NULL); + false, NULL, NULL, NULL, NULL); } void nbd_client_attach_aio_context(BlockDriverState *bs, AioContext *new_context) { aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd, - false, nbd_reply_ready, NULL, bs); + false, nbd_reply_ready, NULL, NULL, bs); } void nbd_client_close(BlockDriverState *bs) diff --git a/block/nfs.c b/block/nfs.c index a490660027..a564340d15 100644 --- a/block/nfs.c +++ b/block/nfs.c @@ -197,7 +197,8 @@ static void nfs_set_events(NFSClient *client) aio_set_fd_handler(client->aio_context, nfs_get_fd(client->context), false, (ev & POLLIN) ? nfs_process_read : NULL, - (ev & POLLOUT) ? nfs_process_write : NULL, client); + (ev & POLLOUT) ? nfs_process_write : NULL, + NULL, client); } client->events = ev; @@ -395,7 +396,7 @@ static void nfs_detach_aio_context(BlockDriverState *bs) NFSClient *client = bs->opaque; aio_set_fd_handler(client->aio_context, nfs_get_fd(client->context), - false, NULL, NULL, NULL); + false, NULL, NULL, NULL, NULL); client->events = 0; } @@ -415,7 +416,7 @@ static void nfs_client_close(NFSClient *client) nfs_close(client->context, client->fh); } aio_set_fd_handler(client->aio_context, nfs_get_fd(client->context), - false, NULL, NULL, NULL); + false, NULL, NULL, NULL, NULL); nfs_destroy_context(client->context); } memset(client, 0, sizeof(NFSClient)); diff --git a/block/sheepdog.c b/block/sheepdog.c index 4c9af89180..5637e0cd37 100644 --- a/block/sheepdog.c +++ b/block/sheepdog.c @@ -664,7 +664,7 @@ static coroutine_fn void do_co_req(void *opaque) co = qemu_coroutine_self(); aio_set_fd_handler(srco->aio_context, sockfd, false, - NULL, restart_co_req, co); + NULL, restart_co_req, NULL, co); ret = send_co_req(sockfd, hdr, data, wlen); if (ret < 0) { @@ -672,7 +672,7 @@ static coroutine_fn void do_co_req(void *opaque) } aio_set_fd_handler(srco->aio_context, sockfd, false, - restart_co_req, NULL, co); + restart_co_req, NULL, NULL, co); ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr)); if (ret != sizeof(*hdr)) { @@ -698,7 +698,7 @@ out: /* there is at most one request for this sockfd, so it is safe to * set each handler to NULL. */ aio_set_fd_handler(srco->aio_context, sockfd, false, - NULL, NULL, NULL); + NULL, NULL, NULL, NULL); srco->ret = ret; srco->finished = true; @@ -760,7 +760,7 @@ static coroutine_fn void reconnect_to_sdog(void *opaque) AIOReq *aio_req, *next; aio_set_fd_handler(s->aio_context, s->fd, false, NULL, - NULL, NULL); + NULL, NULL, NULL); close(s->fd); s->fd = -1; @@ -964,7 +964,7 @@ static int get_sheep_fd(BDRVSheepdogState *s, Error **errp) } aio_set_fd_handler(s->aio_context, fd, false, - co_read_response, NULL, s); + co_read_response, NULL, NULL, s); return fd; } @@ -1226,7 +1226,7 @@ static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, qemu_co_mutex_lock(&s->lock); s->co_send = qemu_coroutine_self(); aio_set_fd_handler(s->aio_context, s->fd, false, - co_read_response, co_write_request, s); + co_read_response, co_write_request, NULL, s); socket_set_cork(s->fd, 1); /* send a header */ @@ -1245,7 +1245,7 @@ static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, out: socket_set_cork(s->fd, 0); aio_set_fd_handler(s->aio_context, s->fd, false, - co_read_response, NULL, s); + co_read_response, NULL, NULL, s); s->co_send = NULL; qemu_co_mutex_unlock(&s->lock); } @@ -1396,7 +1396,7 @@ static void sd_detach_aio_context(BlockDriverState *bs) BDRVSheepdogState *s = bs->opaque; aio_set_fd_handler(s->aio_context, s->fd, false, NULL, - NULL, NULL); + NULL, NULL, NULL); } static void sd_attach_aio_context(BlockDriverState *bs, @@ -1406,7 +1406,7 @@ static void sd_attach_aio_context(BlockDriverState *bs, s->aio_context = new_context; aio_set_fd_handler(new_context, s->fd, false, - co_read_response, NULL, s); + co_read_response, NULL, NULL, s); } /* TODO Convert to fine grained options */ @@ -1520,7 +1520,7 @@ static int sd_open(BlockDriverState *bs, QDict *options, int flags, return 0; out: aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd, - false, NULL, NULL, NULL); + false, NULL, NULL, NULL, NULL); if (s->fd >= 0) { closesocket(s->fd); } @@ -1559,7 +1559,7 @@ static void sd_reopen_commit(BDRVReopenState *state) if (s->fd) { aio_set_fd_handler(s->aio_context, s->fd, false, - NULL, NULL, NULL); + NULL, NULL, NULL, NULL); closesocket(s->fd); } @@ -1583,7 +1583,7 @@ static void sd_reopen_abort(BDRVReopenState *state) if (re_s->fd) { aio_set_fd_handler(s->aio_context, re_s->fd, false, - NULL, NULL, NULL); + NULL, NULL, NULL, NULL); closesocket(re_s->fd); } @@ -1972,7 +1972,7 @@ static void sd_close(BlockDriverState *bs) } aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd, - false, NULL, NULL, NULL); + false, NULL, NULL, NULL, NULL); closesocket(s->fd); g_free(s->host_spec); } diff --git a/block/ssh.c b/block/ssh.c index 15ed2818c5..e0edf20f78 100644 --- a/block/ssh.c +++ b/block/ssh.c @@ -911,7 +911,7 @@ static coroutine_fn void set_fd_handler(BDRVSSHState *s, BlockDriverState *bs) rd_handler, wr_handler); aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, - false, rd_handler, wr_handler, co); + false, rd_handler, wr_handler, NULL, co); } static coroutine_fn void clear_fd_handler(BDRVSSHState *s, @@ -919,7 +919,7 @@ static coroutine_fn void clear_fd_handler(BDRVSSHState *s, { DPRINTF("s->sock=%d", s->sock); aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, - false, NULL, NULL, NULL); + false, NULL, NULL, NULL, NULL); } /* A non-blocking call returned EAGAIN, so yield, ensuring the diff --git a/block/win32-aio.c b/block/win32-aio.c index 95e3ab1541..8cdf73b00d 100644 --- a/block/win32-aio.c +++ b/block/win32-aio.c @@ -175,7 +175,7 @@ int win32_aio_attach(QEMUWin32AIOState *aio, HANDLE hfile) void win32_aio_detach_aio_context(QEMUWin32AIOState *aio, AioContext *old_context) { - aio_set_event_notifier(old_context, &aio->e, false, NULL); + aio_set_event_notifier(old_context, &aio->e, false, NULL, NULL); aio->is_aio_context_attached = false; } @@ -184,7 +184,7 @@ void win32_aio_attach_aio_context(QEMUWin32AIOState *aio, { aio->is_aio_context_attached = true; aio_set_event_notifier(new_context, &aio->e, false, - win32_aio_completion_cb); + win32_aio_completion_cb, NULL); } QEMUWin32AIOState *win32_aio_init(void) diff --git a/hw/block/virtio-blk.c b/hw/block/virtio-blk.c index 0c5fd27593..50bb0cbb93 100644 --- a/hw/block/virtio-blk.c +++ b/hw/block/virtio-blk.c @@ -588,13 +588,19 @@ void virtio_blk_handle_vq(VirtIOBlock *s, VirtQueue *vq) blk_io_plug(s->blk); - while ((req = virtio_blk_get_request(s, vq))) { - if (virtio_blk_handle_request(req, &mrb)) { - virtqueue_detach_element(req->vq, &req->elem, 0); - virtio_blk_free_request(req); - break; + do { + virtio_queue_set_notification(vq, 0); + + while ((req = virtio_blk_get_request(s, vq))) { + if (virtio_blk_handle_request(req, &mrb)) { + virtqueue_detach_element(req->vq, &req->elem, 0); + virtio_blk_free_request(req); + break; + } } - } + + virtio_queue_set_notification(vq, 1); + } while (!virtio_queue_empty(vq)); if (mrb.num_reqs) { virtio_blk_submit_multireq(s->blk, &mrb); diff --git a/hw/scsi/virtio-scsi.c b/hw/scsi/virtio-scsi.c index 34bba35d83..204e14f237 100644 --- a/hw/scsi/virtio-scsi.c +++ b/hw/scsi/virtio-scsi.c @@ -592,26 +592,32 @@ static void virtio_scsi_handle_cmd_req_submit(VirtIOSCSI *s, VirtIOSCSIReq *req) void virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq) { VirtIOSCSIReq *req, *next; - int ret; + int ret = 0; QTAILQ_HEAD(, VirtIOSCSIReq) reqs = QTAILQ_HEAD_INITIALIZER(reqs); - while ((req = virtio_scsi_pop_req(s, vq))) { - ret = virtio_scsi_handle_cmd_req_prepare(s, req); - if (!ret) { - QTAILQ_INSERT_TAIL(&reqs, req, next); - } else if (ret == -EINVAL) { - /* The device is broken and shouldn't process any request */ - while (!QTAILQ_EMPTY(&reqs)) { - req = QTAILQ_FIRST(&reqs); - QTAILQ_REMOVE(&reqs, req, next); - blk_io_unplug(req->sreq->dev->conf.blk); - scsi_req_unref(req->sreq); - virtqueue_detach_element(req->vq, &req->elem, 0); - virtio_scsi_free_req(req); + do { + virtio_queue_set_notification(vq, 0); + + while ((req = virtio_scsi_pop_req(s, vq))) { + ret = virtio_scsi_handle_cmd_req_prepare(s, req); + if (!ret) { + QTAILQ_INSERT_TAIL(&reqs, req, next); + } else if (ret == -EINVAL) { + /* The device is broken and shouldn't process any request */ + while (!QTAILQ_EMPTY(&reqs)) { + req = QTAILQ_FIRST(&reqs); + QTAILQ_REMOVE(&reqs, req, next); + blk_io_unplug(req->sreq->dev->conf.blk); + scsi_req_unref(req->sreq); + virtqueue_detach_element(req->vq, &req->elem, 0); + virtio_scsi_free_req(req); + } } } - } + + virtio_queue_set_notification(vq, 1); + } while (ret != -EINVAL && !virtio_queue_empty(vq)); QTAILQ_FOREACH_SAFE(req, &reqs, next, next) { virtio_scsi_handle_cmd_req_submit(s, req); diff --git a/hw/virtio/virtio.c b/hw/virtio/virtio.c index 1af2de2714..d40711a31d 100644 --- a/hw/virtio/virtio.c +++ b/hw/virtio/virtio.c @@ -87,8 +87,8 @@ struct VirtQueue /* Last used index value we have signalled on */ bool signalled_used_valid; - /* Notification enabled? */ - bool notification; + /* Nested host->guest notification disabled counter */ + unsigned int notification_disabled; uint16_t queue_index; @@ -201,7 +201,7 @@ static inline void vring_used_flags_unset_bit(VirtQueue *vq, int mask) static inline void vring_set_avail_event(VirtQueue *vq, uint16_t val) { hwaddr pa; - if (!vq->notification) { + if (vq->notification_disabled) { return; } pa = vq->vring.used + offsetof(VRingUsed, ring[vq->vring.num]); @@ -210,7 +210,13 @@ static inline void vring_set_avail_event(VirtQueue *vq, uint16_t val) void virtio_queue_set_notification(VirtQueue *vq, int enable) { - vq->notification = enable; + if (enable) { + assert(vq->notification_disabled > 0); + vq->notification_disabled--; + } else { + vq->notification_disabled++; + } + if (virtio_vdev_has_feature(vq->vdev, VIRTIO_RING_F_EVENT_IDX)) { vring_set_avail_event(vq, vring_avail_idx(vq)); } else if (enable) { @@ -959,7 +965,7 @@ void virtio_reset(void *opaque) virtio_queue_set_vector(vdev, i, VIRTIO_NO_VECTOR); vdev->vq[i].signalled_used = 0; vdev->vq[i].signalled_used_valid = false; - vdev->vq[i].notification = true; + vdev->vq[i].notification_disabled = 0; vdev->vq[i].vring.num = vdev->vq[i].vring.num_default; vdev->vq[i].inuse = 0; } @@ -1770,7 +1776,7 @@ int virtio_load(VirtIODevice *vdev, QEMUFile *f, int version_id) vdev->vq[i].vring.desc = qemu_get_be64(f); qemu_get_be16s(f, &vdev->vq[i].last_avail_idx); vdev->vq[i].signalled_used_valid = false; - vdev->vq[i].notification = true; + vdev->vq[i].notification_disabled = 0; if (vdev->vq[i].vring.desc) { /* XXX virtio-1 devices */ @@ -2047,15 +2053,47 @@ static void virtio_queue_host_notifier_aio_read(EventNotifier *n) } } +static void virtio_queue_host_notifier_aio_poll_begin(EventNotifier *n) +{ + VirtQueue *vq = container_of(n, VirtQueue, host_notifier); + + virtio_queue_set_notification(vq, 0); +} + +static bool virtio_queue_host_notifier_aio_poll(void *opaque) +{ + EventNotifier *n = opaque; + VirtQueue *vq = container_of(n, VirtQueue, host_notifier); + + if (virtio_queue_empty(vq)) { + return false; + } + + virtio_queue_notify_aio_vq(vq); + return true; +} + +static void virtio_queue_host_notifier_aio_poll_end(EventNotifier *n) +{ + VirtQueue *vq = container_of(n, VirtQueue, host_notifier); + + /* Caller polls once more after this to catch requests that race with us */ + virtio_queue_set_notification(vq, 1); +} + void virtio_queue_aio_set_host_notifier_handler(VirtQueue *vq, AioContext *ctx, VirtIOHandleOutput handle_output) { if (handle_output) { vq->handle_aio_output = handle_output; aio_set_event_notifier(ctx, &vq->host_notifier, true, - virtio_queue_host_notifier_aio_read); + virtio_queue_host_notifier_aio_read, + virtio_queue_host_notifier_aio_poll); + aio_set_event_notifier_poll(ctx, &vq->host_notifier, + virtio_queue_host_notifier_aio_poll_begin, + virtio_queue_host_notifier_aio_poll_end); } else { - aio_set_event_notifier(ctx, &vq->host_notifier, true, NULL); + aio_set_event_notifier(ctx, &vq->host_notifier, true, NULL, NULL); /* Test and clear notifier before after disabling event, * in case poll callback didn't have time to run. */ virtio_queue_host_notifier_aio_read(&vq->host_notifier); diff --git a/include/block/aio.h b/include/block/aio.h index ca551e346f..4dca54d9c7 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -44,6 +44,7 @@ void qemu_aio_ref(void *p); typedef struct AioHandler AioHandler; typedef void QEMUBHFunc(void *opaque); +typedef bool AioPollFn(void *opaque); typedef void IOHandler(void *opaque); struct ThreadPool; @@ -130,6 +131,18 @@ struct AioContext { int external_disable_cnt; + /* Number of AioHandlers without .io_poll() */ + int poll_disable_cnt; + + /* Polling mode parameters */ + int64_t poll_ns; /* current polling time in nanoseconds */ + int64_t poll_max_ns; /* maximum polling time in nanoseconds */ + int64_t poll_grow; /* polling time growth factor */ + int64_t poll_shrink; /* polling time shrink factor */ + + /* Are we in polling mode or monitoring file descriptors? */ + bool poll_started; + /* epoll(7) state used when built with CONFIG_EPOLL */ int epollfd; bool epoll_enabled; @@ -295,8 +308,12 @@ bool aio_pending(AioContext *ctx); /* Dispatch any pending callbacks from the GSource attached to the AioContext. * * This is used internally in the implementation of the GSource. + * + * @dispatch_fds: true to process fds, false to skip them + * (can be used as an optimization by callers that know there + * are no fds ready) */ -bool aio_dispatch(AioContext *ctx); +bool aio_dispatch(AioContext *ctx, bool dispatch_fds); /* Progress in completing AIO work to occur. This can issue new pending * aio as a result of executing I/O completion or bh callbacks. @@ -325,8 +342,17 @@ void aio_set_fd_handler(AioContext *ctx, bool is_external, IOHandler *io_read, IOHandler *io_write, + AioPollFn *io_poll, void *opaque); +/* Set polling begin/end callbacks for a file descriptor that has already been + * registered with aio_set_fd_handler. Do nothing if the file descriptor is + * not registered. + */ +void aio_set_fd_poll(AioContext *ctx, int fd, + IOHandler *io_poll_begin, + IOHandler *io_poll_end); + /* Register an event notifier and associated callbacks. Behaves very similarly * to event_notifier_set_handler. Unlike event_notifier_set_handler, these callbacks * will be invoked when using aio_poll(). @@ -337,7 +363,17 @@ void aio_set_fd_handler(AioContext *ctx, void aio_set_event_notifier(AioContext *ctx, EventNotifier *notifier, bool is_external, - EventNotifierHandler *io_read); + EventNotifierHandler *io_read, + AioPollFn *io_poll); + +/* Set polling begin/end callbacks for an event notifier that has already been + * registered with aio_set_event_notifier. Do nothing if the event notifier is + * not registered. + */ +void aio_set_event_notifier_poll(AioContext *ctx, + EventNotifier *notifier, + EventNotifierHandler *io_poll_begin, + EventNotifierHandler *io_poll_end); /* Return a GSource that lets the main loop poll the file descriptors attached * to this AioContext. @@ -474,4 +510,17 @@ static inline bool aio_context_in_iothread(AioContext *ctx) */ void aio_context_setup(AioContext *ctx); +/** + * aio_context_set_poll_params: + * @ctx: the aio context + * @max_ns: how long to busy poll for, in nanoseconds + * @grow: polling time growth factor + * @shrink: polling time shrink factor + * + * Poll mode can be disabled by setting poll_max_ns to 0. + */ +void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, + int64_t grow, int64_t shrink, + Error **errp); + #endif diff --git a/include/sysemu/iothread.h b/include/sysemu/iothread.h index 68ac2de83a..e6da1a4087 100644 --- a/include/sysemu/iothread.h +++ b/include/sysemu/iothread.h @@ -28,6 +28,11 @@ typedef struct { QemuCond init_done_cond; /* is thread initialization done? */ bool stopping; int thread_id; + + /* AioContext poll parameters */ + int64_t poll_max_ns; + int64_t poll_grow; + int64_t poll_shrink; } IOThread; #define IOTHREAD(obj) \ diff --git a/iohandler.c b/iohandler.c index f2fc8a9bd6..eb625d93dd 100644 --- a/iohandler.c +++ b/iohandler.c @@ -63,7 +63,7 @@ void qemu_set_fd_handler(int fd, { iohandler_init(); aio_set_fd_handler(iohandler_ctx, fd, false, - fd_read, fd_write, opaque); + fd_read, fd_write, NULL, opaque); } /* reaping of zombies. right now we're not passing the status to diff --git a/iothread.c b/iothread.c index bd70344811..7bedde87e9 100644 --- a/iothread.c +++ b/iothread.c @@ -98,6 +98,18 @@ static void iothread_complete(UserCreatable *obj, Error **errp) return; } + aio_context_set_poll_params(iothread->ctx, + iothread->poll_max_ns, + iothread->poll_grow, + iothread->poll_shrink, + &local_error); + if (local_error) { + error_propagate(errp, local_error); + aio_context_unref(iothread->ctx); + iothread->ctx = NULL; + return; + } + qemu_mutex_init(&iothread->init_done_lock); qemu_cond_init(&iothread->init_done_cond); @@ -120,10 +132,82 @@ static void iothread_complete(UserCreatable *obj, Error **errp) qemu_mutex_unlock(&iothread->init_done_lock); } +typedef struct { + const char *name; + ptrdiff_t offset; /* field's byte offset in IOThread struct */ +} PollParamInfo; + +static PollParamInfo poll_max_ns_info = { + "poll-max-ns", offsetof(IOThread, poll_max_ns), +}; +static PollParamInfo poll_grow_info = { + "poll-grow", offsetof(IOThread, poll_grow), +}; +static PollParamInfo poll_shrink_info = { + "poll-shrink", offsetof(IOThread, poll_shrink), +}; + +static void iothread_get_poll_param(Object *obj, Visitor *v, + const char *name, void *opaque, Error **errp) +{ + IOThread *iothread = IOTHREAD(obj); + PollParamInfo *info = opaque; + int64_t *field = (void *)iothread + info->offset; + + visit_type_int64(v, name, field, errp); +} + +static void iothread_set_poll_param(Object *obj, Visitor *v, + const char *name, void *opaque, Error **errp) +{ + IOThread *iothread = IOTHREAD(obj); + PollParamInfo *info = opaque; + int64_t *field = (void *)iothread + info->offset; + Error *local_err = NULL; + int64_t value; + + visit_type_int64(v, name, &value, &local_err); + if (local_err) { + goto out; + } + + if (value < 0) { + error_setg(&local_err, "%s value must be in range [0, %"PRId64"]", + info->name, INT64_MAX); + goto out; + } + + *field = value; + + if (iothread->ctx) { + aio_context_set_poll_params(iothread->ctx, + iothread->poll_max_ns, + iothread->poll_grow, + iothread->poll_shrink, + &local_err); + } + +out: + error_propagate(errp, local_err); +} + static void iothread_class_init(ObjectClass *klass, void *class_data) { UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass); ucc->complete = iothread_complete; + + object_class_property_add(klass, "poll-max-ns", "int", + iothread_get_poll_param, + iothread_set_poll_param, + NULL, &poll_max_ns_info, &error_abort); + object_class_property_add(klass, "poll-grow", "int", + iothread_get_poll_param, + iothread_set_poll_param, + NULL, &poll_grow_info, &error_abort); + object_class_property_add(klass, "poll-shrink", "int", + iothread_get_poll_param, + iothread_set_poll_param, + NULL, &poll_shrink_info, &error_abort); } static const TypeInfo iothread_info = { diff --git a/nbd/server.c b/nbd/server.c index 5b76261666..efe5cb82c9 100644 --- a/nbd/server.c +++ b/nbd/server.c @@ -1366,19 +1366,18 @@ static void nbd_restart_write(void *opaque) static void nbd_set_handlers(NBDClient *client) { if (client->exp && client->exp->ctx) { - aio_set_fd_handler(client->exp->ctx, client->sioc->fd, - true, + aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true, client->can_read ? nbd_read : NULL, client->send_coroutine ? nbd_restart_write : NULL, - client); + NULL, client); } } static void nbd_unset_handlers(NBDClient *client) { if (client->exp && client->exp->ctx) { - aio_set_fd_handler(client->exp->ctx, client->sioc->fd, - true, NULL, NULL, NULL); + aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true, NULL, + NULL, NULL, NULL); } } diff --git a/stubs/set-fd-handler.c b/stubs/set-fd-handler.c index 06a5da48f1..acbe65c1da 100644 --- a/stubs/set-fd-handler.c +++ b/stubs/set-fd-handler.c @@ -15,6 +15,7 @@ void aio_set_fd_handler(AioContext *ctx, bool is_external, IOHandler *io_read, IOHandler *io_write, + AioPollFn *io_poll, void *opaque) { abort(); diff --git a/tests/test-aio.c b/tests/test-aio.c index 5be99f8287..2754f154ce 100644 --- a/tests/test-aio.c +++ b/tests/test-aio.c @@ -128,7 +128,7 @@ static void *test_acquire_thread(void *opaque) static void set_event_notifier(AioContext *ctx, EventNotifier *notifier, EventNotifierHandler *handler) { - aio_set_event_notifier(ctx, notifier, false, handler); + aio_set_event_notifier(ctx, notifier, false, handler, NULL); } static void dummy_notifier_read(EventNotifier *n) @@ -388,7 +388,7 @@ static void test_aio_external_client(void) for (i = 1; i < 3; i++) { EventNotifierTestData data = { .n = 0, .active = 10, .auto_set = true }; event_notifier_init(&data.e, false); - aio_set_event_notifier(ctx, &data.e, true, event_ready_cb); + aio_set_event_notifier(ctx, &data.e, true, event_ready_cb, NULL); event_notifier_set(&data.e); for (j = 0; j < i; j++) { aio_disable_external(ctx); diff --git a/trace-events b/trace-events index f74e1d3d22..1181486454 100644 --- a/trace-events +++ b/trace-events @@ -25,6 +25,12 @@ # # The should be a sprintf()-compatible format string. +# aio-posix.c +run_poll_handlers_begin(void *ctx, int64_t max_ns) "ctx %p max_ns %"PRId64 +run_poll_handlers_end(void *ctx, bool progress) "ctx %p progress %d" +poll_shrink(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64 +poll_grow(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64 + # thread-pool.c thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p" thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d" diff --git a/util/event_notifier-posix.c b/util/event_notifier-posix.c index c1f0d79b34..f2aacfc8b3 100644 --- a/util/event_notifier-posix.c +++ b/util/event_notifier-posix.c @@ -95,7 +95,7 @@ int event_notifier_set_handler(EventNotifier *e, EventNotifierHandler *handler) { aio_set_fd_handler(iohandler_get_aio_context(), e->rfd, is_external, - (IOHandler *)handler, NULL, e); + (IOHandler *)handler, NULL, NULL, e); return 0; }