diff --git a/event-loop-base.c b/event-loop-base.c new file mode 100644 index 0000000000..d5be4dc6fc --- /dev/null +++ b/event-loop-base.c @@ -0,0 +1,140 @@ +/* + * QEMU event-loop base + * + * Copyright (C) 2022 Red Hat Inc + * + * Authors: + * Stefan Hajnoczi + * Nicolas Saenz Julienne + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + */ + +#include "qemu/osdep.h" +#include "qom/object_interfaces.h" +#include "qapi/error.h" +#include "block/thread-pool.h" +#include "sysemu/event-loop-base.h" + +typedef struct { + const char *name; + ptrdiff_t offset; /* field's byte offset in EventLoopBase struct */ +} EventLoopBaseParamInfo; + +static void event_loop_base_instance_init(Object *obj) +{ + EventLoopBase *base = EVENT_LOOP_BASE(obj); + + base->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT; +} + +static EventLoopBaseParamInfo aio_max_batch_info = { + "aio-max-batch", offsetof(EventLoopBase, aio_max_batch), +}; +static EventLoopBaseParamInfo thread_pool_min_info = { + "thread-pool-min", offsetof(EventLoopBase, thread_pool_min), +}; +static EventLoopBaseParamInfo thread_pool_max_info = { + "thread-pool-max", offsetof(EventLoopBase, thread_pool_max), +}; + +static void event_loop_base_get_param(Object *obj, Visitor *v, + const char *name, void *opaque, Error **errp) +{ + EventLoopBase *event_loop_base = EVENT_LOOP_BASE(obj); + EventLoopBaseParamInfo *info = opaque; + int64_t *field = (void *)event_loop_base + info->offset; + + visit_type_int64(v, name, field, errp); +} + +static void event_loop_base_set_param(Object *obj, Visitor *v, + const char *name, void *opaque, Error **errp) +{ + EventLoopBaseClass *bc = EVENT_LOOP_BASE_GET_CLASS(obj); + EventLoopBase *base = EVENT_LOOP_BASE(obj); + EventLoopBaseParamInfo *info = opaque; + int64_t *field = (void *)base + info->offset; + int64_t value; + + if (!visit_type_int64(v, name, &value, errp)) { + return; + } + + if (value < 0) { + error_setg(errp, "%s value must be in range [0, %" PRId64 "]", + info->name, INT64_MAX); + return; + } + + *field = value; + + if (bc->update_params) { + bc->update_params(base, errp); + } + + return; +} + +static void event_loop_base_complete(UserCreatable *uc, Error **errp) +{ + EventLoopBaseClass *bc = EVENT_LOOP_BASE_GET_CLASS(uc); + EventLoopBase *base = EVENT_LOOP_BASE(uc); + + if (bc->init) { + bc->init(base, errp); + } +} + +static bool event_loop_base_can_be_deleted(UserCreatable *uc) +{ + EventLoopBaseClass *bc = EVENT_LOOP_BASE_GET_CLASS(uc); + EventLoopBase *backend = EVENT_LOOP_BASE(uc); + + if (bc->can_be_deleted) { + return bc->can_be_deleted(backend); + } + + return true; +} + +static void event_loop_base_class_init(ObjectClass *klass, void *class_data) +{ + UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass); + ucc->complete = event_loop_base_complete; + ucc->can_be_deleted = event_loop_base_can_be_deleted; + + object_class_property_add(klass, "aio-max-batch", "int", + event_loop_base_get_param, + event_loop_base_set_param, + NULL, &aio_max_batch_info); + object_class_property_add(klass, "thread-pool-min", "int", + event_loop_base_get_param, + event_loop_base_set_param, + NULL, &thread_pool_min_info); + object_class_property_add(klass, "thread-pool-max", "int", + event_loop_base_get_param, + event_loop_base_set_param, + NULL, &thread_pool_max_info); +} + +static const TypeInfo event_loop_base_info = { + .name = TYPE_EVENT_LOOP_BASE, + .parent = TYPE_OBJECT, + .instance_size = sizeof(EventLoopBase), + .instance_init = event_loop_base_instance_init, + .class_size = sizeof(EventLoopBaseClass), + .class_init = event_loop_base_class_init, + .abstract = true, + .interfaces = (InterfaceInfo[]) { + { TYPE_USER_CREATABLE }, + { } + } +}; + +static void register_types(void) +{ + type_register_static(&event_loop_base_info); +} +type_init(register_types); diff --git a/hw/scsi/virtio-scsi-dataplane.c b/hw/scsi/virtio-scsi-dataplane.c index 29575cbaf6..8bb6e6acfc 100644 --- a/hw/scsi/virtio-scsi-dataplane.c +++ b/hw/scsi/virtio-scsi-dataplane.c @@ -138,7 +138,7 @@ int virtio_scsi_dataplane_start(VirtIODevice *vdev) aio_context_acquire(s->ctx); virtio_queue_aio_attach_host_notifier(vs->ctrl_vq, s->ctx); - virtio_queue_aio_attach_host_notifier(vs->event_vq, s->ctx); + virtio_queue_aio_attach_host_notifier_no_poll(vs->event_vq, s->ctx); for (i = 0; i < vs->conf.num_queues; i++) { virtio_queue_aio_attach_host_notifier(vs->cmd_vqs[i], s->ctx); diff --git a/hw/scsi/virtio-scsi.c b/hw/scsi/virtio-scsi.c index 34a968ecfb..db54d104be 100644 --- a/hw/scsi/virtio-scsi.c +++ b/hw/scsi/virtio-scsi.c @@ -29,6 +29,43 @@ #include "hw/virtio/virtio-access.h" #include "trace.h" +typedef struct VirtIOSCSIReq { + /* + * Note: + * - fields up to resp_iov are initialized by virtio_scsi_init_req; + * - fields starting at vring are zeroed by virtio_scsi_init_req. + */ + VirtQueueElement elem; + + VirtIOSCSI *dev; + VirtQueue *vq; + QEMUSGList qsgl; + QEMUIOVector resp_iov; + + union { + /* Used for two-stage request submission */ + QTAILQ_ENTRY(VirtIOSCSIReq) next; + + /* Used for cancellation of request during TMFs */ + int remaining; + }; + + SCSIRequest *sreq; + size_t resp_size; + enum SCSIXferMode mode; + union { + VirtIOSCSICmdResp cmd; + VirtIOSCSICtrlTMFResp tmf; + VirtIOSCSICtrlANResp an; + VirtIOSCSIEvent event; + } resp; + union { + VirtIOSCSICmdReq cmd; + VirtIOSCSICtrlTMFReq tmf; + VirtIOSCSICtrlANReq an; + } req; +} VirtIOSCSIReq; + static inline int virtio_scsi_get_lun(uint8_t *lun) { return ((lun[2] << 8) | lun[3]) & 0x3FFF; @@ -45,7 +82,7 @@ static inline SCSIDevice *virtio_scsi_device_get(VirtIOSCSI *s, uint8_t *lun) return scsi_device_get(&s->bus, 0, lun[1], virtio_scsi_get_lun(lun)); } -void virtio_scsi_init_req(VirtIOSCSI *s, VirtQueue *vq, VirtIOSCSIReq *req) +static void virtio_scsi_init_req(VirtIOSCSI *s, VirtQueue *vq, VirtIOSCSIReq *req) { VirtIODevice *vdev = VIRTIO_DEVICE(s); const size_t zero_skip = @@ -58,7 +95,7 @@ void virtio_scsi_init_req(VirtIOSCSI *s, VirtQueue *vq, VirtIOSCSIReq *req) memset((uint8_t *)req + zero_skip, 0, sizeof(*req) - zero_skip); } -void virtio_scsi_free_req(VirtIOSCSIReq *req) +static void virtio_scsi_free_req(VirtIOSCSIReq *req) { qemu_iovec_destroy(&req->resp_iov); qemu_sglist_destroy(&req->qsgl); @@ -460,28 +497,41 @@ static void virtio_scsi_handle_ctrl_req(VirtIOSCSI *s, VirtIOSCSIReq *req) } } -bool virtio_scsi_handle_ctrl_vq(VirtIOSCSI *s, VirtQueue *vq) +static void virtio_scsi_handle_ctrl_vq(VirtIOSCSI *s, VirtQueue *vq) { VirtIOSCSIReq *req; - bool progress = false; while ((req = virtio_scsi_pop_req(s, vq))) { - progress = true; virtio_scsi_handle_ctrl_req(s, req); } - return progress; +} + +/* + * If dataplane is configured but not yet started, do so now and return true on + * success. + * + * Dataplane is started by the core virtio code but virtqueue handler functions + * can also be invoked when a guest kicks before DRIVER_OK, so this helper + * function helps us deal with manually starting ioeventfd in that case. + */ +static bool virtio_scsi_defer_to_dataplane(VirtIOSCSI *s) +{ + if (!s->ctx || s->dataplane_started) { + return false; + } + + virtio_device_start_ioeventfd(&s->parent_obj.parent_obj); + return !s->dataplane_fenced; } static void virtio_scsi_handle_ctrl(VirtIODevice *vdev, VirtQueue *vq) { VirtIOSCSI *s = (VirtIOSCSI *)vdev; - if (s->ctx) { - virtio_device_start_ioeventfd(vdev); - if (!s->dataplane_fenced) { - return; - } + if (virtio_scsi_defer_to_dataplane(s)) { + return; } + virtio_scsi_acquire(s); virtio_scsi_handle_ctrl_vq(s, vq); virtio_scsi_release(s); @@ -672,12 +722,11 @@ static void virtio_scsi_handle_cmd_req_submit(VirtIOSCSI *s, VirtIOSCSIReq *req) scsi_req_unref(sreq); } -bool virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq) +static void virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq) { VirtIOSCSIReq *req, *next; int ret = 0; bool suppress_notifications = virtio_queue_get_notification(vq); - bool progress = false; QTAILQ_HEAD(, VirtIOSCSIReq) reqs = QTAILQ_HEAD_INITIALIZER(reqs); @@ -687,7 +736,6 @@ bool virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq) } while ((req = virtio_scsi_pop_req(s, vq))) { - progress = true; ret = virtio_scsi_handle_cmd_req_prepare(s, req); if (!ret) { QTAILQ_INSERT_TAIL(&reqs, req, next); @@ -712,7 +760,6 @@ bool virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq) QTAILQ_FOREACH_SAFE(req, &reqs, next, next) { virtio_scsi_handle_cmd_req_submit(s, req); } - return progress; } static void virtio_scsi_handle_cmd(VirtIODevice *vdev, VirtQueue *vq) @@ -720,12 +767,10 @@ static void virtio_scsi_handle_cmd(VirtIODevice *vdev, VirtQueue *vq) /* use non-QOM casts in the data path */ VirtIOSCSI *s = (VirtIOSCSI *)vdev; - if (s->ctx && !s->dataplane_started) { - virtio_device_start_ioeventfd(vdev); - if (!s->dataplane_fenced) { - return; - } + if (virtio_scsi_defer_to_dataplane(s)) { + return; } + virtio_scsi_acquire(s); virtio_scsi_handle_cmd_vq(s, vq); virtio_scsi_release(s); @@ -793,8 +838,8 @@ static void virtio_scsi_reset(VirtIODevice *vdev) s->events_dropped = false; } -void virtio_scsi_push_event(VirtIOSCSI *s, SCSIDevice *dev, - uint32_t event, uint32_t reason) +static void virtio_scsi_push_event(VirtIOSCSI *s, SCSIDevice *dev, + uint32_t event, uint32_t reason) { VirtIOSCSICommon *vs = VIRTIO_SCSI_COMMON(s); VirtIOSCSIReq *req; @@ -842,25 +887,21 @@ void virtio_scsi_push_event(VirtIOSCSI *s, SCSIDevice *dev, virtio_scsi_complete_req(req); } -bool virtio_scsi_handle_event_vq(VirtIOSCSI *s, VirtQueue *vq) +static void virtio_scsi_handle_event_vq(VirtIOSCSI *s, VirtQueue *vq) { if (s->events_dropped) { virtio_scsi_push_event(s, NULL, VIRTIO_SCSI_T_NO_EVENT, 0); - return true; } - return false; } static void virtio_scsi_handle_event(VirtIODevice *vdev, VirtQueue *vq) { VirtIOSCSI *s = VIRTIO_SCSI(vdev); - if (s->ctx) { - virtio_device_start_ioeventfd(vdev); - if (!s->dataplane_fenced) { - return; - } + if (virtio_scsi_defer_to_dataplane(s)) { + return; } + virtio_scsi_acquire(s); virtio_scsi_handle_event_vq(s, vq); virtio_scsi_release(s); diff --git a/hw/virtio/virtio.c b/hw/virtio/virtio.c index 9d637e043e..67a873f54a 100644 --- a/hw/virtio/virtio.c +++ b/hw/virtio/virtio.c @@ -3534,6 +3534,19 @@ void virtio_queue_aio_attach_host_notifier(VirtQueue *vq, AioContext *ctx) virtio_queue_host_notifier_aio_poll_end); } +/* + * Same as virtio_queue_aio_attach_host_notifier() but without polling. Use + * this for rx virtqueues and similar cases where the virtqueue handler + * function does not pop all elements. When the virtqueue is left non-empty + * polling consumes CPU cycles and should not be used. + */ +void virtio_queue_aio_attach_host_notifier_no_poll(VirtQueue *vq, AioContext *ctx) +{ + aio_set_event_notifier(ctx, &vq->host_notifier, true, + virtio_queue_host_notifier_read, + NULL, NULL); +} + void virtio_queue_aio_detach_host_notifier(VirtQueue *vq, AioContext *ctx) { aio_set_event_notifier(ctx, &vq->host_notifier, true, NULL, NULL, NULL); diff --git a/include/block/aio.h b/include/block/aio.h index 5634173b12..d128558f1d 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -192,6 +192,8 @@ struct AioContext { QSLIST_HEAD(, Coroutine) scheduled_coroutines; QEMUBH *co_schedule_bh; + int thread_pool_min; + int thread_pool_max; /* Thread pool for performing work and receiving completion callbacks. * Has its own locking. */ @@ -769,4 +771,12 @@ void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch, Error **errp); +/** + * aio_context_set_thread_pool_params: + * @ctx: the aio context + * @min: min number of threads to have readily available in the thread pool + * @min: max number of threads the thread pool can contain + */ +void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min, + int64_t max, Error **errp); #endif diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h index 7dd7d730a0..2020bcc92d 100644 --- a/include/block/thread-pool.h +++ b/include/block/thread-pool.h @@ -20,6 +20,8 @@ #include "block/block.h" +#define THREAD_POOL_MAX_THREADS_DEFAULT 64 + typedef int ThreadPoolFunc(void *opaque); typedef struct ThreadPool ThreadPool; @@ -33,5 +35,6 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, void *arg); void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg); +void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx); #endif diff --git a/include/hw/virtio/virtio-scsi.h b/include/hw/virtio/virtio-scsi.h index 0997313f0a..a36aad9c86 100644 --- a/include/hw/virtio/virtio-scsi.h +++ b/include/hw/virtio/virtio-scsi.h @@ -92,42 +92,6 @@ struct VirtIOSCSI { uint32_t host_features; }; -typedef struct VirtIOSCSIReq { - /* Note: - * - fields up to resp_iov are initialized by virtio_scsi_init_req; - * - fields starting at vring are zeroed by virtio_scsi_init_req. - * */ - VirtQueueElement elem; - - VirtIOSCSI *dev; - VirtQueue *vq; - QEMUSGList qsgl; - QEMUIOVector resp_iov; - - union { - /* Used for two-stage request submission */ - QTAILQ_ENTRY(VirtIOSCSIReq) next; - - /* Used for cancellation of request during TMFs */ - int remaining; - }; - - SCSIRequest *sreq; - size_t resp_size; - enum SCSIXferMode mode; - union { - VirtIOSCSICmdResp cmd; - VirtIOSCSICtrlTMFResp tmf; - VirtIOSCSICtrlANResp an; - VirtIOSCSIEvent event; - } resp; - union { - VirtIOSCSICmdReq cmd; - VirtIOSCSICtrlTMFReq tmf; - VirtIOSCSICtrlANReq an; - } req; -} VirtIOSCSIReq; - static inline void virtio_scsi_acquire(VirtIOSCSI *s) { if (s->ctx) { @@ -149,13 +113,6 @@ void virtio_scsi_common_realize(DeviceState *dev, Error **errp); void virtio_scsi_common_unrealize(DeviceState *dev); -bool virtio_scsi_handle_event_vq(VirtIOSCSI *s, VirtQueue *vq); -bool virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq); -bool virtio_scsi_handle_ctrl_vq(VirtIOSCSI *s, VirtQueue *vq); -void virtio_scsi_init_req(VirtIOSCSI *s, VirtQueue *vq, VirtIOSCSIReq *req); -void virtio_scsi_free_req(VirtIOSCSIReq *req); -void virtio_scsi_push_event(VirtIOSCSI *s, SCSIDevice *dev, - uint32_t event, uint32_t reason); void virtio_scsi_dataplane_setup(VirtIOSCSI *s, Error **errp); int virtio_scsi_dataplane_start(VirtIODevice *s); diff --git a/include/hw/virtio/virtio.h b/include/hw/virtio/virtio.h index b31c4507f5..b62a35fdca 100644 --- a/include/hw/virtio/virtio.h +++ b/include/hw/virtio/virtio.h @@ -317,6 +317,7 @@ EventNotifier *virtio_queue_get_host_notifier(VirtQueue *vq); void virtio_queue_set_host_notifier_enabled(VirtQueue *vq, bool enabled); void virtio_queue_host_notifier_read(EventNotifier *n); void virtio_queue_aio_attach_host_notifier(VirtQueue *vq, AioContext *ctx); +void virtio_queue_aio_attach_host_notifier_no_poll(VirtQueue *vq, AioContext *ctx); void virtio_queue_aio_detach_host_notifier(VirtQueue *vq, AioContext *ctx); VirtQueue *virtio_vector_first_queue(VirtIODevice *vdev, uint16_t vector); VirtQueue *virtio_vector_next_queue(VirtQueue *vq); diff --git a/include/qemu/main-loop.h b/include/qemu/main-loop.h index 89bd9edefb..5518845299 100644 --- a/include/qemu/main-loop.h +++ b/include/qemu/main-loop.h @@ -26,9 +26,19 @@ #define QEMU_MAIN_LOOP_H #include "block/aio.h" +#include "qom/object.h" +#include "sysemu/event-loop-base.h" #define SIG_IPI SIGUSR1 +#define TYPE_MAIN_LOOP "main-loop" +OBJECT_DECLARE_TYPE(MainLoop, MainLoopClass, MAIN_LOOP) + +struct MainLoop { + EventLoopBase parent_obj; +}; +typedef struct MainLoop MainLoop; + /** * qemu_init_main_loop: Set up the process so that it can run the main loop. * diff --git a/include/sysemu/event-loop-base.h b/include/sysemu/event-loop-base.h new file mode 100644 index 0000000000..2748bf6ae1 --- /dev/null +++ b/include/sysemu/event-loop-base.h @@ -0,0 +1,41 @@ +/* + * QEMU event-loop backend + * + * Copyright (C) 2022 Red Hat Inc + * + * Authors: + * Nicolas Saenz Julienne + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + */ +#ifndef QEMU_EVENT_LOOP_BASE_H +#define QEMU_EVENT_LOOP_BASE_H + +#include "qom/object.h" +#include "block/aio.h" +#include "qemu/typedefs.h" + +#define TYPE_EVENT_LOOP_BASE "event-loop-base" +OBJECT_DECLARE_TYPE(EventLoopBase, EventLoopBaseClass, + EVENT_LOOP_BASE) + +struct EventLoopBaseClass { + ObjectClass parent_class; + + void (*init)(EventLoopBase *base, Error **errp); + void (*update_params)(EventLoopBase *base, Error **errp); + bool (*can_be_deleted)(EventLoopBase *base); +}; + +struct EventLoopBase { + Object parent; + + /* AioContext AIO engine parameters */ + int64_t aio_max_batch; + + /* AioContext thread pool parameters */ + int64_t thread_pool_min; + int64_t thread_pool_max; +}; +#endif diff --git a/include/sysemu/iothread.h b/include/sysemu/iothread.h index 7f714bd136..8f8601d6ab 100644 --- a/include/sysemu/iothread.h +++ b/include/sysemu/iothread.h @@ -17,11 +17,12 @@ #include "block/aio.h" #include "qemu/thread.h" #include "qom/object.h" +#include "sysemu/event-loop-base.h" #define TYPE_IOTHREAD "iothread" struct IOThread { - Object parent_obj; + EventLoopBase parent_obj; QemuThread thread; AioContext *ctx; @@ -37,9 +38,6 @@ struct IOThread { int64_t poll_max_ns; int64_t poll_grow; int64_t poll_shrink; - - /* AioContext AIO engine parameters */ - int64_t aio_max_batch; }; typedef struct IOThread IOThread; diff --git a/iothread.c b/iothread.c index 0f98af0f2a..529194a566 100644 --- a/iothread.c +++ b/iothread.c @@ -17,6 +17,7 @@ #include "qemu/module.h" #include "block/aio.h" #include "block/block.h" +#include "sysemu/event-loop-base.h" #include "sysemu/iothread.h" #include "qapi/error.h" #include "qapi/qapi-commands-misc.h" @@ -152,10 +153,15 @@ static void iothread_init_gcontext(IOThread *iothread) iothread->main_loop = g_main_loop_new(iothread->worker_context, TRUE); } -static void iothread_set_aio_context_params(IOThread *iothread, Error **errp) +static void iothread_set_aio_context_params(EventLoopBase *base, Error **errp) { + IOThread *iothread = IOTHREAD(base); ERRP_GUARD(); + if (!iothread->ctx) { + return; + } + aio_context_set_poll_params(iothread->ctx, iothread->poll_max_ns, iothread->poll_grow, @@ -166,14 +172,18 @@ static void iothread_set_aio_context_params(IOThread *iothread, Error **errp) } aio_context_set_aio_params(iothread->ctx, - iothread->aio_max_batch, + iothread->parent_obj.aio_max_batch, errp); + + aio_context_set_thread_pool_params(iothread->ctx, base->thread_pool_min, + base->thread_pool_max, errp); } -static void iothread_complete(UserCreatable *obj, Error **errp) + +static void iothread_init(EventLoopBase *base, Error **errp) { Error *local_error = NULL; - IOThread *iothread = IOTHREAD(obj); + IOThread *iothread = IOTHREAD(base); char *thread_name; iothread->stopping = false; @@ -189,7 +199,7 @@ static void iothread_complete(UserCreatable *obj, Error **errp) */ iothread_init_gcontext(iothread); - iothread_set_aio_context_params(iothread, &local_error); + iothread_set_aio_context_params(base, &local_error); if (local_error) { error_propagate(errp, local_error); aio_context_unref(iothread->ctx); @@ -201,7 +211,7 @@ static void iothread_complete(UserCreatable *obj, Error **errp) * to inherit. */ thread_name = g_strdup_printf("IO %s", - object_get_canonical_path_component(OBJECT(obj))); + object_get_canonical_path_component(OBJECT(base))); qemu_thread_create(&iothread->thread, thread_name, iothread_run, iothread, QEMU_THREAD_JOINABLE); g_free(thread_name); @@ -226,9 +236,6 @@ static IOThreadParamInfo poll_grow_info = { static IOThreadParamInfo poll_shrink_info = { "poll-shrink", offsetof(IOThread, poll_shrink), }; -static IOThreadParamInfo aio_max_batch_info = { - "aio-max-batch", offsetof(IOThread, aio_max_batch), -}; static void iothread_get_param(Object *obj, Visitor *v, const char *name, IOThreadParamInfo *info, Error **errp) @@ -288,35 +295,12 @@ static void iothread_set_poll_param(Object *obj, Visitor *v, } } -static void iothread_get_aio_param(Object *obj, Visitor *v, - const char *name, void *opaque, Error **errp) -{ - IOThreadParamInfo *info = opaque; - - iothread_get_param(obj, v, name, info, errp); -} - -static void iothread_set_aio_param(Object *obj, Visitor *v, - const char *name, void *opaque, Error **errp) -{ - IOThread *iothread = IOTHREAD(obj); - IOThreadParamInfo *info = opaque; - - if (!iothread_set_param(obj, v, name, info, errp)) { - return; - } - - if (iothread->ctx) { - aio_context_set_aio_params(iothread->ctx, - iothread->aio_max_batch, - errp); - } -} - static void iothread_class_init(ObjectClass *klass, void *class_data) { - UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass); - ucc->complete = iothread_complete; + EventLoopBaseClass *bc = EVENT_LOOP_BASE_CLASS(klass); + + bc->init = iothread_init; + bc->update_params = iothread_set_aio_context_params; object_class_property_add(klass, "poll-max-ns", "int", iothread_get_poll_param, @@ -330,23 +314,15 @@ static void iothread_class_init(ObjectClass *klass, void *class_data) iothread_get_poll_param, iothread_set_poll_param, NULL, &poll_shrink_info); - object_class_property_add(klass, "aio-max-batch", "int", - iothread_get_aio_param, - iothread_set_aio_param, - NULL, &aio_max_batch_info); } static const TypeInfo iothread_info = { .name = TYPE_IOTHREAD, - .parent = TYPE_OBJECT, + .parent = TYPE_EVENT_LOOP_BASE, .class_init = iothread_class_init, .instance_size = sizeof(IOThread), .instance_init = iothread_instance_init, .instance_finalize = iothread_instance_finalize, - .interfaces = (InterfaceInfo[]) { - {TYPE_USER_CREATABLE}, - {} - }, }; static void iothread_register_types(void) @@ -383,7 +359,7 @@ static int query_one_iothread(Object *object, void *opaque) info->poll_max_ns = iothread->poll_max_ns; info->poll_grow = iothread->poll_grow; info->poll_shrink = iothread->poll_shrink; - info->aio_max_batch = iothread->aio_max_batch; + info->aio_max_batch = iothread->parent_obj.aio_max_batch; QAPI_LIST_APPEND(*tail, info); return 0; diff --git a/meson.build b/meson.build index fa672e57bc..864e97945f 100644 --- a/meson.build +++ b/meson.build @@ -3025,6 +3025,7 @@ subdir('qom') subdir('authz') subdir('crypto') subdir('ui') +subdir('hw') if enable_modules @@ -3032,6 +3033,18 @@ if enable_modules modulecommon = declare_dependency(link_whole: libmodulecommon, compile_args: '-DBUILD_DSO') endif +qom_ss = qom_ss.apply(config_host, strict: false) +libqom = static_library('qom', qom_ss.sources() + genh, + dependencies: [qom_ss.dependencies()], + name_suffix: 'fa') +qom = declare_dependency(link_whole: libqom) + +event_loop_base = files('event-loop-base.c') +event_loop_base = static_library('event-loop-base', sources: event_loop_base + genh, + build_by_default: true) +event_loop_base = declare_dependency(link_whole: event_loop_base, + dependencies: [qom]) + stub_ss = stub_ss.apply(config_all, strict: false) util_ss.add_all(trace_ss) @@ -3040,7 +3053,8 @@ libqemuutil = static_library('qemuutil', sources: util_ss.sources() + stub_ss.sources() + genh, dependencies: [util_ss.dependencies(), libm, threads, glib, socket, malloc, pixman]) qemuutil = declare_dependency(link_with: libqemuutil, - sources: genh + version_res) + sources: genh + version_res, + dependencies: [event_loop_base]) if have_system or have_user decodetree = generator(find_program('scripts/decodetree.py'), @@ -3118,7 +3132,6 @@ subdir('monitor') subdir('net') subdir('replay') subdir('semihosting') -subdir('hw') subdir('tcg') subdir('fpu') subdir('accel') @@ -3243,13 +3256,6 @@ qemu_syms = custom_target('qemu.syms', output: 'qemu.syms', capture: true, command: [undefsym, nm, '@INPUT@']) -qom_ss = qom_ss.apply(config_host, strict: false) -libqom = static_library('qom', qom_ss.sources() + genh, - dependencies: [qom_ss.dependencies()], - name_suffix: 'fa') - -qom = declare_dependency(link_whole: libqom) - authz_ss = authz_ss.apply(config_host, strict: false) libauthz = static_library('authz', authz_ss.sources() + genh, dependencies: [authz_ss.dependencies()], @@ -3302,7 +3308,7 @@ libblockdev = static_library('blockdev', blockdev_ss.sources() + genh, build_by_default: false) blockdev = declare_dependency(link_whole: [libblockdev], - dependencies: [block]) + dependencies: [block, event_loop_base]) qmp_ss = qmp_ss.apply(config_host, strict: false) libqmp = static_library('qmp', qmp_ss.sources() + genh, diff --git a/qapi/qom.json b/qapi/qom.json index eeb5395ff3..6a653c6636 100644 --- a/qapi/qom.json +++ b/qapi/qom.json @@ -499,6 +499,28 @@ '*repeat': 'bool', '*grab-toggle': 'GrabToggleKeys' } } +## +# @EventLoopBaseProperties: +# +# Common properties for event loops +# +# @aio-max-batch: maximum number of requests in a batch for the AIO engine, +# 0 means that the engine will use its default. +# (default: 0) +# +# @thread-pool-min: minimum number of threads reserved in the thread pool +# (default:0) +# +# @thread-pool-max: maximum number of threads the thread pool can contain +# (default:64) +# +# Since: 7.1 +## +{ 'struct': 'EventLoopBaseProperties', + 'data': { '*aio-max-batch': 'int', + '*thread-pool-min': 'int', + '*thread-pool-max': 'int' } } + ## # @IothreadProperties: # @@ -516,17 +538,26 @@ # algorithm detects it is spending too long polling without # encountering events. 0 selects a default behaviour (default: 0) # -# @aio-max-batch: maximum number of requests in a batch for the AIO engine, -# 0 means that the engine will use its default -# (default:0, since 6.1) +# The @aio-max-batch option is available since 6.1. # # Since: 2.0 ## { 'struct': 'IothreadProperties', + 'base': 'EventLoopBaseProperties', 'data': { '*poll-max-ns': 'int', '*poll-grow': 'int', - '*poll-shrink': 'int', - '*aio-max-batch': 'int' } } + '*poll-shrink': 'int' } } + +## +# @MainLoopProperties: +# +# Properties for the main-loop object. +# +# Since: 7.1 +## +{ 'struct': 'MainLoopProperties', + 'base': 'EventLoopBaseProperties', + 'data': {} } ## # @MemoryBackendProperties: @@ -818,6 +849,7 @@ { 'name': 'input-linux', 'if': 'CONFIG_LINUX' }, 'iothread', + 'main-loop', { 'name': 'memory-backend-epc', 'if': 'CONFIG_LINUX' }, 'memory-backend-file', @@ -883,6 +915,7 @@ 'input-linux': { 'type': 'InputLinuxProperties', 'if': 'CONFIG_LINUX' }, 'iothread': 'IothreadProperties', + 'main-loop': 'MainLoopProperties', 'memory-backend-epc': { 'type': 'MemoryBackendEpcProperties', 'if': 'CONFIG_LINUX' }, 'memory-backend-file': 'MemoryBackendFileProperties', diff --git a/util/aio-posix.c b/util/aio-posix.c index be0182a3c6..731f3826c0 100644 --- a/util/aio-posix.c +++ b/util/aio-posix.c @@ -15,6 +15,7 @@ #include "qemu/osdep.h" #include "block/block.h" +#include "block/thread-pool.h" #include "qemu/main-loop.h" #include "qemu/rcu.h" #include "qemu/rcu_queue.h" diff --git a/util/async.c b/util/async.c index 2ea1172f3e..554ba70cca 100644 --- a/util/async.c +++ b/util/async.c @@ -563,6 +563,9 @@ AioContext *aio_context_new(Error **errp) ctx->aio_max_batch = 0; + ctx->thread_pool_min = 0; + ctx->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT; + return ctx; fail: g_source_destroy(&ctx->source); @@ -696,3 +699,20 @@ void qemu_set_current_aio_context(AioContext *ctx) assert(!get_my_aiocontext()); set_my_aiocontext(ctx); } + +void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min, + int64_t max, Error **errp) +{ + + if (min > max || !max || min > INT_MAX || max > INT_MAX) { + error_setg(errp, "bad thread-pool-min/thread-pool-max values"); + return; + } + + ctx->thread_pool_min = min; + ctx->thread_pool_max = max; + + if (ctx->thread_pool) { + thread_pool_update_params(ctx->thread_pool, ctx); + } +} diff --git a/util/main-loop.c b/util/main-loop.c index 9afac10dff..f00a25451b 100644 --- a/util/main-loop.c +++ b/util/main-loop.c @@ -30,9 +30,11 @@ #include "sysemu/replay.h" #include "qemu/main-loop.h" #include "block/aio.h" +#include "block/thread-pool.h" #include "qemu/error-report.h" #include "qemu/queue.h" #include "qemu/compiler.h" +#include "qom/object.h" #ifndef _WIN32 #include @@ -184,6 +186,69 @@ int qemu_init_main_loop(Error **errp) return 0; } +static void main_loop_update_params(EventLoopBase *base, Error **errp) +{ + ERRP_GUARD(); + + if (!qemu_aio_context) { + error_setg(errp, "qemu aio context not ready"); + return; + } + + aio_context_set_aio_params(qemu_aio_context, base->aio_max_batch, errp); + if (*errp) { + return; + } + + aio_context_set_thread_pool_params(qemu_aio_context, base->thread_pool_min, + base->thread_pool_max, errp); +} + +MainLoop *mloop; + +static void main_loop_init(EventLoopBase *base, Error **errp) +{ + MainLoop *m = MAIN_LOOP(base); + + if (mloop) { + error_setg(errp, "only one main-loop instance allowed"); + return; + } + + main_loop_update_params(base, errp); + + mloop = m; + return; +} + +static bool main_loop_can_be_deleted(EventLoopBase *base) +{ + return false; +} + +static void main_loop_class_init(ObjectClass *oc, void *class_data) +{ + EventLoopBaseClass *bc = EVENT_LOOP_BASE_CLASS(oc); + + bc->init = main_loop_init; + bc->update_params = main_loop_update_params; + bc->can_be_deleted = main_loop_can_be_deleted; +} + +static const TypeInfo main_loop_info = { + .name = TYPE_MAIN_LOOP, + .parent = TYPE_EVENT_LOOP_BASE, + .class_init = main_loop_class_init, + .instance_size = sizeof(MainLoop), +}; + +static void main_loop_register_types(void) +{ + type_register_static(&main_loop_info); +} + +type_init(main_loop_register_types) + static int max_priority; #ifndef _WIN32 diff --git a/util/thread-pool.c b/util/thread-pool.c index d763cea505..196835b4d3 100644 --- a/util/thread-pool.c +++ b/util/thread-pool.c @@ -58,7 +58,6 @@ struct ThreadPool { QemuMutex lock; QemuCond worker_stopped; QemuSemaphore sem; - int max_threads; QEMUBH *new_thread_bh; /* The following variables are only accessed from one AioContext. */ @@ -71,8 +70,27 @@ struct ThreadPool { int new_threads; /* backlog of threads we need to create */ int pending_threads; /* threads created but not running yet */ bool stopping; + int min_threads; + int max_threads; }; +static inline bool back_to_sleep(ThreadPool *pool, int ret) +{ + /* + * The semaphore timed out, we should exit the loop except when: + * - There is work to do, we raced with the signal. + * - The max threads threshold just changed, we raced with the signal. + * - The thread pool forces a minimum number of readily available threads. + */ + if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) || + pool->cur_threads > pool->max_threads || + pool->cur_threads <= pool->min_threads)) { + return true; + } + + return false; +} + static void *worker_thread(void *opaque) { ThreadPool *pool = opaque; @@ -91,8 +109,9 @@ static void *worker_thread(void *opaque) ret = qemu_sem_timedwait(&pool->sem, 10000); qemu_mutex_lock(&pool->lock); pool->idle_threads--; - } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list)); - if (ret == -1 || pool->stopping) { + } while (back_to_sleep(pool, ret)); + if (ret == -1 || pool->stopping || + pool->cur_threads > pool->max_threads) { break; } @@ -294,6 +313,33 @@ void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg) thread_pool_submit_aio(pool, func, arg, NULL, NULL); } +void thread_pool_update_params(ThreadPool *pool, AioContext *ctx) +{ + qemu_mutex_lock(&pool->lock); + + pool->min_threads = ctx->thread_pool_min; + pool->max_threads = ctx->thread_pool_max; + + /* + * We either have to: + * - Increase the number available of threads until over the min_threads + * threshold. + * - Decrease the number of available threads until under the max_threads + * threshold. + * - Do nothing. The current number of threads fall in between the min and + * max thresholds. We'll let the pool manage itself. + */ + for (int i = pool->cur_threads; i < pool->min_threads; i++) { + spawn_thread(pool); + } + + for (int i = pool->cur_threads; i > pool->max_threads; i--) { + qemu_sem_post(&pool->sem); + } + + qemu_mutex_unlock(&pool->lock); +} + static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) { if (!ctx) { @@ -306,11 +352,12 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) qemu_mutex_init(&pool->lock); qemu_cond_init(&pool->worker_stopped); qemu_sem_init(&pool->sem, 0); - pool->max_threads = 64; pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); QLIST_INIT(&pool->head); QTAILQ_INIT(&pool->request_list); + + thread_pool_update_params(pool, ctx); } ThreadPool *thread_pool_new(AioContext *ctx)