diff --git a/audio/wavcapture.c b/audio/wavcapture.c index 17e87ed6f4..c60286e162 100644 --- a/audio/wavcapture.c +++ b/audio/wavcapture.c @@ -1,5 +1,5 @@ #include "qemu/osdep.h" -#include "monitor/monitor.h" +#include "qemu/qemu-print.h" #include "qapi/error.h" #include "qemu/error-report.h" #include "audio.h" @@ -94,9 +94,9 @@ static void wav_capture_info (void *opaque) WAVState *wav = opaque; char *path = wav->path; - monitor_printf (cur_mon, "Capturing audio(%d,%d,%d) to %s: %d bytes\n", - wav->freq, wav->bits, wav->nchannels, - path ? path : "", wav->bytes); + qemu_printf("Capturing audio(%d,%d,%d) to %s: %d bytes\n", + wav->freq, wav->bits, wav->nchannels, + path ? path : "", wav->bytes); } static struct capture_ops wav_capture_ops = { diff --git a/block.c b/block.c index 52b2e2709f..430edf79bb 100644 --- a/block.c +++ b/block.c @@ -6303,6 +6303,56 @@ AioContext *bdrv_get_aio_context(BlockDriverState *bs) return bs ? bs->aio_context : qemu_get_aio_context(); } +AioContext *coroutine_fn bdrv_co_enter(BlockDriverState *bs) +{ + Coroutine *self = qemu_coroutine_self(); + AioContext *old_ctx = qemu_coroutine_get_aio_context(self); + AioContext *new_ctx; + + /* + * Increase bs->in_flight to ensure that this operation is completed before + * moving the node to a different AioContext. Read new_ctx only afterwards. + */ + bdrv_inc_in_flight(bs); + + new_ctx = bdrv_get_aio_context(bs); + aio_co_reschedule_self(new_ctx); + return old_ctx; +} + +void coroutine_fn bdrv_co_leave(BlockDriverState *bs, AioContext *old_ctx) +{ + aio_co_reschedule_self(old_ctx); + bdrv_dec_in_flight(bs); +} + +void coroutine_fn bdrv_co_lock(BlockDriverState *bs) +{ + AioContext *ctx = bdrv_get_aio_context(bs); + + /* In the main thread, bs->aio_context won't change concurrently */ + assert(qemu_get_current_aio_context() == qemu_get_aio_context()); + + /* + * We're in coroutine context, so we already hold the lock of the main + * loop AioContext. Don't lock it twice to avoid deadlocks. + */ + assert(qemu_in_coroutine()); + if (ctx != qemu_get_aio_context()) { + aio_context_acquire(ctx); + } +} + +void coroutine_fn bdrv_co_unlock(BlockDriverState *bs) +{ + AioContext *ctx = bdrv_get_aio_context(bs); + + assert(qemu_in_coroutine()); + if (ctx != qemu_get_aio_context()) { + aio_context_release(ctx); + } +} + void bdrv_coroutine_enter(BlockDriverState *bs, Coroutine *co) { aio_co_enter(bdrv_get_aio_context(bs), co); diff --git a/blockdev.c b/blockdev.c index a6ae475dac..fe6fb5dc1d 100644 --- a/blockdev.c +++ b/blockdev.c @@ -2449,14 +2449,14 @@ BlockDirtyBitmapSha256 *qmp_x_debug_block_dirty_bitmap_sha256(const char *node, return ret; } -void qmp_block_resize(bool has_device, const char *device, - bool has_node_name, const char *node_name, - int64_t size, Error **errp) +void coroutine_fn qmp_block_resize(bool has_device, const char *device, + bool has_node_name, const char *node_name, + int64_t size, Error **errp) { Error *local_err = NULL; BlockBackend *blk = NULL; BlockDriverState *bs; - AioContext *aio_context; + AioContext *old_ctx; bs = bdrv_lookup_bs(has_device ? device : NULL, has_node_name ? node_name : NULL, @@ -2466,9 +2466,6 @@ void qmp_block_resize(bool has_device, const char *device, return; } - aio_context = bdrv_get_aio_context(bs); - aio_context_acquire(aio_context); - if (size < 0) { error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "size", "a >0 size"); goto out; @@ -2485,12 +2482,15 @@ void qmp_block_resize(bool has_device, const char *device, } bdrv_drained_begin(bs); + old_ctx = bdrv_co_enter(bs); blk_truncate(blk, size, false, PREALLOC_MODE_OFF, 0, errp); + bdrv_co_leave(bs, old_ctx); bdrv_drained_end(bs); out: + bdrv_co_lock(bs); blk_unref(blk); - aio_context_release(aio_context); + bdrv_co_unlock(bs); } void qmp_block_stream(bool has_job_id, const char *job_id, const char *device, diff --git a/docs/devel/qapi-code-gen.txt b/docs/devel/qapi-code-gen.txt index 5fc67c99cd..c6438c6aa9 100644 --- a/docs/devel/qapi-code-gen.txt +++ b/docs/devel/qapi-code-gen.txt @@ -472,6 +472,7 @@ Syntax: '*gen': false, '*allow-oob': true, '*allow-preconfig': true, + '*coroutine': true, '*if': COND, '*features': FEATURES } @@ -596,6 +597,34 @@ before the machine is built. It defaults to false. For example: QMP is available before the machine is built only when QEMU was started with --preconfig. +Member 'coroutine' tells the QMP dispatcher whether the command handler +is safe to be run in a coroutine. It defaults to false. If it is true, +the command handler is called from coroutine context and may yield while +waiting for an external event (such as I/O completion) in order to avoid +blocking the guest and other background operations. + +Coroutine safety can be hard to prove, similar to thread safety. Common +pitfalls are: + +- The global mutex isn't held across qemu_coroutine_yield(), so + operations that used to assume that they execute atomically may have + to be more careful to protect against changes in the global state. + +- Nested event loops (AIO_WAIT_WHILE() etc.) are problematic in + coroutine context and can easily lead to deadlocks. They should be + replaced by yielding and reentering the coroutine when the condition + becomes false. + +Since the command handler may assume coroutine context, any callers +other than the QMP dispatcher must also call it in coroutine context. +In particular, HMP commands calling such a QMP command handler must be +marked .coroutine = true in hmp-commands.hx. + +It is an error to specify both 'coroutine': true and 'allow-oob': true +for a command. We don't currently have a use case for both together and +without a use case, it's not entirely clear what the semantics should +be. + The optional 'if' member specifies a conditional. See "Configuring the schema" below for more on this. diff --git a/docs/sphinx/qapidoc.py b/docs/sphinx/qapidoc.py index 6944ffa6aa..e03abcbb95 100644 --- a/docs/sphinx/qapidoc.py +++ b/docs/sphinx/qapidoc.py @@ -330,7 +330,7 @@ class QAPISchemaGenRSTVisitor(QAPISchemaVisitor): def visit_command(self, name, info, ifcond, features, arg_type, ret_type, gen, success_response, boxed, allow_oob, - allow_preconfig): + allow_preconfig, coroutine): doc = self._cur_doc self._add_doc('Command', self._nodes_for_arguments(doc, diff --git a/dump/dump.c b/dump/dump.c index 45da46a952..dec32468d9 100644 --- a/dump/dump.c +++ b/dump/dump.c @@ -1986,7 +1986,7 @@ void qmp_dump_guest_memory(bool paging, const char *file, #if !defined(WIN32) if (strstart(file, "fd:", &p)) { - fd = monitor_get_fd(cur_mon, p, errp); + fd = monitor_get_fd(monitor_cur(), p, errp); if (fd == -1) { return; } diff --git a/hmp-commands.hx b/hmp-commands.hx index e43ce600b8..cd068389de 100644 --- a/hmp-commands.hx +++ b/hmp-commands.hx @@ -76,6 +76,7 @@ ERST .params = "device size", .help = "resize a block image", .cmd = hmp_block_resize, + .coroutine = true, }, SRST diff --git a/hw/core/machine-hmp-cmds.c b/hw/core/machine-hmp-cmds.c index f4092b98cc..6357be9c6b 100644 --- a/hw/core/machine-hmp-cmds.c +++ b/hw/core/machine-hmp-cmds.c @@ -34,7 +34,7 @@ void hmp_info_cpus(Monitor *mon, const QDict *qdict) for (cpu = cpu_list; cpu; cpu = cpu->next) { int active = ' '; - if (cpu->value->cpu_index == monitor_get_cpu_index()) { + if (cpu->value->cpu_index == monitor_get_cpu_index(mon)) { active = '*'; } diff --git a/hw/scsi/vhost-scsi.c b/hw/scsi/vhost-scsi.c index a83ffeefc8..4d70fa036b 100644 --- a/hw/scsi/vhost-scsi.c +++ b/hw/scsi/vhost-scsi.c @@ -177,7 +177,7 @@ static void vhost_scsi_realize(DeviceState *dev, Error **errp) } if (vs->conf.vhostfd) { - vhostfd = monitor_fd_param(cur_mon, vs->conf.vhostfd, errp); + vhostfd = monitor_fd_param(monitor_cur(), vs->conf.vhostfd, errp); if (vhostfd == -1) { error_prepend(errp, "vhost-scsi: unable to parse vhostfd: "); return; diff --git a/hw/virtio/vhost-vsock.c b/hw/virtio/vhost-vsock.c index c8f0699b4f..f9db4beb47 100644 --- a/hw/virtio/vhost-vsock.c +++ b/hw/virtio/vhost-vsock.c @@ -143,7 +143,7 @@ static void vhost_vsock_device_realize(DeviceState *dev, Error **errp) } if (vsock->conf.vhostfd) { - vhostfd = monitor_fd_param(cur_mon, vsock->conf.vhostfd, errp); + vhostfd = monitor_fd_param(monitor_cur(), vsock->conf.vhostfd, errp); if (vhostfd == -1) { error_prepend(errp, "vhost-vsock: unable to parse vhostfd: "); return; diff --git a/include/block/aio.h b/include/block/aio.h index ec8c5af642..5f342267d5 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -17,6 +17,7 @@ #ifdef CONFIG_LINUX_IO_URING #include #endif +#include "qemu/coroutine.h" #include "qemu/queue.h" #include "qemu/event_notifier.h" #include "qemu/thread.h" @@ -654,6 +655,15 @@ static inline bool aio_node_check(AioContext *ctx, bool is_external) */ void aio_co_schedule(AioContext *ctx, struct Coroutine *co); +/** + * aio_co_reschedule_self: + * @new_ctx: the new context + * + * Move the currently running coroutine to new_ctx. If the coroutine is already + * running in new_ctx, do nothing. + */ +void coroutine_fn aio_co_reschedule_self(AioContext *new_ctx); + /** * aio_co_wake: * @co: the coroutine diff --git a/include/block/block.h b/include/block/block.h index ce2ac39299..d16c401cb4 100644 --- a/include/block/block.h +++ b/include/block/block.h @@ -640,6 +640,37 @@ bool bdrv_debug_is_suspended(BlockDriverState *bs, const char *tag); */ AioContext *bdrv_get_aio_context(BlockDriverState *bs); +/** + * Move the current coroutine to the AioContext of @bs and return the old + * AioContext of the coroutine. Increase bs->in_flight so that draining @bs + * will wait for the operation to proceed until the corresponding + * bdrv_co_leave(). + * + * Consequently, you can't call drain inside a bdrv_co_enter/leave() section as + * this will deadlock. + */ +AioContext *coroutine_fn bdrv_co_enter(BlockDriverState *bs); + +/** + * Ends a section started by bdrv_co_enter(). Move the current coroutine back + * to old_ctx and decrease bs->in_flight again. + */ +void coroutine_fn bdrv_co_leave(BlockDriverState *bs, AioContext *old_ctx); + +/** + * Locks the AioContext of @bs if it's not the current AioContext. This avoids + * double locking which could lead to deadlocks: This is a coroutine_fn, so we + * know we already own the lock of the current AioContext. + * + * May only be called in the main thread. + */ +void coroutine_fn bdrv_co_lock(BlockDriverState *bs); + +/** + * Unlocks the AioContext of @bs if it's not the current AioContext. + */ +void coroutine_fn bdrv_co_unlock(BlockDriverState *bs); + /** * Transfer control to @co in the aio context of @bs */ diff --git a/include/monitor/monitor.h b/include/monitor/monitor.h index c0170773d4..348bfad3d5 100644 --- a/include/monitor/monitor.h +++ b/include/monitor/monitor.h @@ -5,7 +5,6 @@ #include "qapi/qapi-types-misc.h" #include "qemu/readline.h" -extern __thread Monitor *cur_mon; typedef struct MonitorHMP MonitorHMP; typedef struct MonitorOptions MonitorOptions; @@ -13,6 +12,8 @@ typedef struct MonitorOptions MonitorOptions; extern QemuOptsList qemu_mon_opts; +Monitor *monitor_cur(void); +Monitor *monitor_set_cur(Coroutine *co, Monitor *mon); bool monitor_cur_is_qmp(void); void monitor_init_globals(void); @@ -33,8 +34,8 @@ int monitor_vprintf(Monitor *mon, const char *fmt, va_list ap) GCC_FMT_ATTR(2, 0); int monitor_printf(Monitor *mon, const char *fmt, ...) GCC_FMT_ATTR(2, 3); void monitor_flush(Monitor *mon); -int monitor_set_cpu(int cpu_index); -int monitor_get_cpu_index(void); +int monitor_set_cpu(Monitor *mon, int cpu_index); +int monitor_get_cpu_index(Monitor *mon); void monitor_read_command(MonitorHMP *mon, int show_prompt); int monitor_read_password(MonitorHMP *mon, ReadLineFunc *readline_func, diff --git a/include/qapi/qmp/dispatch.h b/include/qapi/qmp/dispatch.h index 5a9cf82472..af8d96c570 100644 --- a/include/qapi/qmp/dispatch.h +++ b/include/qapi/qmp/dispatch.h @@ -14,6 +14,7 @@ #ifndef QAPI_QMP_DISPATCH_H #define QAPI_QMP_DISPATCH_H +#include "monitor/monitor.h" #include "qemu/queue.h" typedef void (QmpCommandFunc)(QDict *, QObject **, Error **); @@ -24,11 +25,13 @@ typedef enum QmpCommandOptions QCO_NO_SUCCESS_RESP = (1U << 0), QCO_ALLOW_OOB = (1U << 1), QCO_ALLOW_PRECONFIG = (1U << 2), + QCO_COROUTINE = (1U << 3), } QmpCommandOptions; typedef struct QmpCommand { const char *name; + /* Runs in coroutine context if QCO_COROUTINE is set */ QmpCommandFunc *fn; QmpCommandOptions options; QTAILQ_ENTRY(QmpCommand) node; @@ -49,7 +52,7 @@ const char *qmp_command_name(const QmpCommand *cmd); bool qmp_has_success_response(const QmpCommand *cmd); QDict *qmp_error_response(Error *err); QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request, - bool allow_oob); + bool allow_oob, Monitor *cur_mon); bool qmp_is_oob(const QDict *dict); typedef void (*qmp_cmd_callback_fn)(const QmpCommand *cmd, void *opaque); diff --git a/migration/fd.c b/migration/fd.c index 0a29ecdebf..6f2f50475f 100644 --- a/migration/fd.c +++ b/migration/fd.c @@ -26,7 +26,7 @@ void fd_start_outgoing_migration(MigrationState *s, const char *fdname, Error **errp) { QIOChannel *ioc; - int fd = monitor_get_fd(cur_mon, fdname, errp); + int fd = monitor_get_fd(monitor_cur(), fdname, errp); if (fd == -1) { return; } @@ -55,7 +55,7 @@ static gboolean fd_accept_incoming_migration(QIOChannel *ioc, void fd_start_incoming_migration(const char *fdname, Error **errp) { QIOChannel *ioc; - int fd = monitor_fd_param(cur_mon, fdname, errp); + int fd = monitor_fd_param(monitor_cur(), fdname, errp); if (fd == -1) { return; } diff --git a/monitor/hmp-cmds.c b/monitor/hmp-cmds.c index dc0de39219..9789f4277f 100644 --- a/monitor/hmp-cmds.c +++ b/monitor/hmp-cmds.c @@ -998,7 +998,7 @@ void hmp_cpu(Monitor *mon, const QDict *qdict) /* XXX: drop the monitor_set_cpu() usage when all HMP commands that use it are converted to the QAPI */ cpu_index = qdict_get_int(qdict, "index"); - if (monitor_set_cpu(cpu_index) < 0) { + if (monitor_set_cpu(mon, cpu_index) < 0) { monitor_printf(mon, "invalid CPU index\n"); } } @@ -1009,7 +1009,7 @@ void hmp_memsave(Monitor *mon, const QDict *qdict) const char *filename = qdict_get_str(qdict, "filename"); uint64_t addr = qdict_get_int(qdict, "val"); Error *err = NULL; - int cpu_index = monitor_get_cpu_index(); + int cpu_index = monitor_get_cpu_index(mon); if (cpu_index < 0) { monitor_printf(mon, "No CPU available\n"); diff --git a/monitor/hmp.c b/monitor/hmp.c index 4ecdefd705..c5cd9d372b 100644 --- a/monitor/hmp.c +++ b/monitor/hmp.c @@ -1056,6 +1056,21 @@ fail: return NULL; } +typedef struct HandleHmpCommandCo { + Monitor *mon; + const HMPCommand *cmd; + QDict *qdict; + bool done; +} HandleHmpCommandCo; + +static void handle_hmp_command_co(void *opaque) +{ + HandleHmpCommandCo *data = opaque; + data->cmd->cmd(data->mon, data->qdict); + monitor_set_cur(qemu_coroutine_self(), NULL); + data->done = true; +} + void handle_hmp_command(MonitorHMP *mon, const char *cmdline) { QDict *qdict; @@ -1079,7 +1094,24 @@ void handle_hmp_command(MonitorHMP *mon, const char *cmdline) return; } - cmd->cmd(&mon->common, qdict); + if (!cmd->coroutine) { + /* old_mon is non-NULL when called from qmp_human_monitor_command() */ + Monitor *old_mon = monitor_set_cur(qemu_coroutine_self(), &mon->common); + cmd->cmd(&mon->common, qdict); + monitor_set_cur(qemu_coroutine_self(), old_mon); + } else { + HandleHmpCommandCo data = { + .mon = &mon->common, + .cmd = cmd, + .qdict = qdict, + .done = false, + }; + Coroutine *co = qemu_coroutine_create(handle_hmp_command_co, &data); + monitor_set_cur(co, &mon->common); + aio_co_enter(qemu_get_aio_context(), co); + AIO_WAIT_WHILE(qemu_get_aio_context(), !data.done); + } + qobject_unref(qdict); } @@ -1300,26 +1332,20 @@ cleanup: static void monitor_read(void *opaque, const uint8_t *buf, int size) { - MonitorHMP *mon; - Monitor *old_mon = cur_mon; + MonitorHMP *mon = container_of(opaque, MonitorHMP, common); int i; - cur_mon = opaque; - mon = container_of(cur_mon, MonitorHMP, common); - if (mon->rs) { for (i = 0; i < size; i++) { readline_handle_byte(mon->rs, buf[i]); } } else { if (size == 0 || buf[size - 1] != 0) { - monitor_printf(cur_mon, "corrupted command\n"); + monitor_printf(&mon->common, "corrupted command\n"); } else { handle_hmp_command(mon, (char *)buf); } } - - cur_mon = old_mon; } static void monitor_event(void *opaque, QEMUChrEvent event) diff --git a/monitor/misc.c b/monitor/misc.c index 6e0da0cb96..4a859fb24a 100644 --- a/monitor/misc.c +++ b/monitor/misc.c @@ -120,18 +120,13 @@ char *qmp_human_monitor_command(const char *command_line, bool has_cpu_index, int64_t cpu_index, Error **errp) { char *output = NULL; - Monitor *old_mon; MonitorHMP hmp = {}; monitor_data_init(&hmp.common, false, true, false); - old_mon = cur_mon; - cur_mon = &hmp.common; - if (has_cpu_index) { - int ret = monitor_set_cpu(cpu_index); + int ret = monitor_set_cpu(&hmp.common, cpu_index); if (ret < 0) { - cur_mon = old_mon; error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "cpu-index", "a CPU number"); goto out; @@ -139,7 +134,6 @@ char *qmp_human_monitor_command(const char *command_line, bool has_cpu_index, } handle_hmp_command(&hmp, command_line); - cur_mon = old_mon; WITH_QEMU_LOCK_GUARD(&hmp.common.mon_lock) { if (qstring_get_length(hmp.common.outbuf) > 0) { @@ -255,7 +249,7 @@ static void monitor_init_qmp_commands(void) } /* Set the current CPU defined by the user. Callers must hold BQL. */ -int monitor_set_cpu(int cpu_index) +int monitor_set_cpu(Monitor *mon, int cpu_index) { CPUState *cpu; @@ -263,29 +257,29 @@ int monitor_set_cpu(int cpu_index) if (cpu == NULL) { return -1; } - g_free(cur_mon->mon_cpu_path); - cur_mon->mon_cpu_path = object_get_canonical_path(OBJECT(cpu)); + g_free(mon->mon_cpu_path); + mon->mon_cpu_path = object_get_canonical_path(OBJECT(cpu)); return 0; } /* Callers must hold BQL. */ -static CPUState *mon_get_cpu_sync(bool synchronize) +static CPUState *mon_get_cpu_sync(Monitor *mon, bool synchronize) { CPUState *cpu = NULL; - if (cur_mon->mon_cpu_path) { - cpu = (CPUState *) object_resolve_path_type(cur_mon->mon_cpu_path, + if (mon->mon_cpu_path) { + cpu = (CPUState *) object_resolve_path_type(mon->mon_cpu_path, TYPE_CPU, NULL); if (!cpu) { - g_free(cur_mon->mon_cpu_path); - cur_mon->mon_cpu_path = NULL; + g_free(mon->mon_cpu_path); + mon->mon_cpu_path = NULL; } } - if (!cur_mon->mon_cpu_path) { + if (!mon->mon_cpu_path) { if (!first_cpu) { return NULL; } - monitor_set_cpu(first_cpu->cpu_index); + monitor_set_cpu(mon, first_cpu->cpu_index); cpu = first_cpu; } assert(cpu != NULL); @@ -297,7 +291,7 @@ static CPUState *mon_get_cpu_sync(bool synchronize) CPUState *mon_get_cpu(void) { - return mon_get_cpu_sync(true); + return mon_get_cpu_sync(monitor_cur(), true); } CPUArchState *mon_get_cpu_env(void) @@ -307,9 +301,9 @@ CPUArchState *mon_get_cpu_env(void) return cs ? cs->env_ptr : NULL; } -int monitor_get_cpu_index(void) +int monitor_get_cpu_index(Monitor *mon) { - CPUState *cs = mon_get_cpu_sync(false); + CPUState *cs = mon_get_cpu_sync(mon, false); return cs ? cs->cpu_index : UNASSIGNED_CPU_INDEX; } @@ -1232,6 +1226,7 @@ static void hmp_acl_remove(Monitor *mon, const QDict *qdict) void qmp_getfd(const char *fdname, Error **errp) { + Monitor *cur_mon = monitor_cur(); mon_fd_t *monfd; int fd, tmp_fd; @@ -1270,6 +1265,7 @@ void qmp_getfd(const char *fdname, Error **errp) void qmp_closefd(const char *fdname, Error **errp) { + Monitor *cur_mon = monitor_cur(); mon_fd_t *monfd; int tmp_fd; @@ -1356,7 +1352,7 @@ AddfdInfo *qmp_add_fd(bool has_fdset_id, int64_t fdset_id, bool has_opaque, const char *opaque, Error **errp) { int fd; - Monitor *mon = cur_mon; + Monitor *mon = monitor_cur(); AddfdInfo *fdinfo; fd = qemu_chr_fe_get_msgfd(&mon->chr); diff --git a/monitor/monitor-internal.h b/monitor/monitor-internal.h index b39e03b744..ad2e64be13 100644 --- a/monitor/monitor-internal.h +++ b/monitor/monitor-internal.h @@ -74,6 +74,7 @@ typedef struct HMPCommand { const char *help; const char *flags; /* p=preconfig */ void (*cmd)(Monitor *mon, const QDict *qdict); + bool coroutine; /* * @sub_table is a list of 2nd level of commands. If it does not exist, * cmd should be used. If it exists, sub_table[?].cmd should be @@ -155,7 +156,9 @@ static inline bool monitor_is_qmp(const Monitor *mon) typedef QTAILQ_HEAD(MonitorList, Monitor) MonitorList; extern IOThread *mon_iothread; -extern QEMUBH *qmp_dispatcher_bh; +extern Coroutine *qmp_dispatcher_co; +extern bool qmp_dispatcher_co_shutdown; +extern bool qmp_dispatcher_co_busy; extern QmpCommandList qmp_commands, qmp_cap_negotiation_commands; extern QemuMutex monitor_lock; extern MonitorList mon_list; @@ -173,7 +176,7 @@ void monitor_fdsets_cleanup(void); void qmp_send_response(MonitorQMP *mon, const QDict *rsp); void monitor_data_destroy_qmp(MonitorQMP *mon); -void monitor_qmp_bh_dispatcher(void *data); +void coroutine_fn monitor_qmp_dispatcher_co(void *data); int get_monitor_def(int64_t *pval, const char *name); void help_cmd(Monitor *mon, const char *name); diff --git a/monitor/monitor.c b/monitor/monitor.c index 0f32892ad4..ceffe1a83b 100644 --- a/monitor/monitor.c +++ b/monitor/monitor.c @@ -55,24 +55,85 @@ typedef struct { /* Shared monitor I/O thread */ IOThread *mon_iothread; -/* Bottom half to dispatch the requests received from I/O thread */ -QEMUBH *qmp_dispatcher_bh; +/* Coroutine to dispatch the requests received from I/O thread */ +Coroutine *qmp_dispatcher_co; -/* Protects mon_list, monitor_qapi_event_state, monitor_destroyed. */ +/* Set to true when the dispatcher coroutine should terminate */ +bool qmp_dispatcher_co_shutdown; + +/* + * qmp_dispatcher_co_busy is used for synchronisation between the + * monitor thread and the main thread to ensure that the dispatcher + * coroutine never gets scheduled a second time when it's already + * scheduled (scheduling the same coroutine twice is forbidden). + * + * It is true if the coroutine is active and processing requests. + * Additional requests may then be pushed onto mon->qmp_requests, + * and @qmp_dispatcher_co_shutdown may be set without further ado. + * @qmp_dispatcher_co_busy must not be woken up in this case. + * + * If false, you also have to set @qmp_dispatcher_co_busy to true and + * wake up @qmp_dispatcher_co after pushing the new requests. + * + * The coroutine will automatically change this variable back to false + * before it yields. Nobody else may set the variable to false. + * + * Access must be atomic for thread safety. + */ +bool qmp_dispatcher_co_busy; + +/* + * Protects mon_list, monitor_qapi_event_state, coroutine_mon, + * monitor_destroyed. + */ QemuMutex monitor_lock; static GHashTable *monitor_qapi_event_state; +static GHashTable *coroutine_mon; /* Maps Coroutine* to Monitor* */ MonitorList mon_list; int mon_refcount; static bool monitor_destroyed; -__thread Monitor *cur_mon; +Monitor *monitor_cur(void) +{ + Monitor *mon; + + qemu_mutex_lock(&monitor_lock); + mon = g_hash_table_lookup(coroutine_mon, qemu_coroutine_self()); + qemu_mutex_unlock(&monitor_lock); + + return mon; +} + +/** + * Sets a new current monitor and returns the old one. + * + * If a non-NULL monitor is set for a coroutine, another call + * resetting it to NULL is required before the coroutine terminates, + * otherwise a stale entry would remain in the hash table. + */ +Monitor *monitor_set_cur(Coroutine *co, Monitor *mon) +{ + Monitor *old_monitor = monitor_cur(); + + qemu_mutex_lock(&monitor_lock); + if (mon) { + g_hash_table_replace(coroutine_mon, co, mon); + } else { + g_hash_table_remove(coroutine_mon, co); + } + qemu_mutex_unlock(&monitor_lock); + + return old_monitor; +} /** * Is the current monitor, if any, a QMP monitor? */ bool monitor_cur_is_qmp(void) { + Monitor *cur_mon = monitor_cur(); + return cur_mon && monitor_is_qmp(cur_mon); } @@ -209,6 +270,8 @@ int monitor_printf(Monitor *mon, const char *fmt, ...) */ int error_vprintf(const char *fmt, va_list ap) { + Monitor *cur_mon = monitor_cur(); + if (cur_mon && !monitor_cur_is_qmp()) { return monitor_vprintf(cur_mon, fmt, ap); } @@ -217,6 +280,8 @@ int error_vprintf(const char *fmt, va_list ap) int error_vprintf_unless_qmp(const char *fmt, va_list ap) { + Monitor *cur_mon = monitor_cur(); + if (!cur_mon) { return vfprintf(stderr, fmt, ap); } @@ -582,9 +647,24 @@ void monitor_cleanup(void) } qemu_mutex_unlock(&monitor_lock); - /* QEMUBHs needs to be deleted before destroying the I/O thread */ - qemu_bh_delete(qmp_dispatcher_bh); - qmp_dispatcher_bh = NULL; + /* + * The dispatcher needs to stop before destroying the I/O thread. + * + * We need to poll both qemu_aio_context and iohandler_ctx to make + * sure that the dispatcher coroutine keeps making progress and + * eventually terminates. qemu_aio_context is automatically + * polled by calling AIO_WAIT_WHILE on it, but we must poll + * iohandler_ctx manually. + */ + qmp_dispatcher_co_shutdown = true; + if (!qatomic_xchg(&qmp_dispatcher_co_busy, true)) { + aio_co_wake(qmp_dispatcher_co); + } + + AIO_WAIT_WHILE(qemu_get_aio_context(), + (aio_poll(iohandler_get_aio_context(), false), + qatomic_mb_read(&qmp_dispatcher_co_busy))); + if (mon_iothread) { iothread_destroy(mon_iothread); mon_iothread = NULL; @@ -601,15 +681,16 @@ void monitor_init_globals_core(void) { monitor_qapi_event_init(); qemu_mutex_init(&monitor_lock); + coroutine_mon = g_hash_table_new(NULL, NULL); /* * The dispatcher BH must run in the main loop thread, since we * have commands assuming that context. It would be nice to get * rid of those assumptions. */ - qmp_dispatcher_bh = aio_bh_new(iohandler_get_aio_context(), - monitor_qmp_bh_dispatcher, - NULL); + qmp_dispatcher_co = qemu_coroutine_create(monitor_qmp_dispatcher_co, NULL); + qatomic_mb_set(&qmp_dispatcher_co_busy, true); + aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co); } int monitor_init(MonitorOptions *opts, bool allow_hmp, Error **errp) diff --git a/monitor/qmp-cmds-control.c b/monitor/qmp-cmds-control.c index 8f04cfa6e6..a456762f6a 100644 --- a/monitor/qmp-cmds-control.c +++ b/monitor/qmp-cmds-control.c @@ -69,6 +69,7 @@ static bool qmp_caps_accept(MonitorQMP *mon, QMPCapabilityList *list, void qmp_qmp_capabilities(bool has_enable, QMPCapabilityList *enable, Error **errp) { + Monitor *cur_mon = monitor_cur(); MonitorQMP *mon; assert(monitor_is_qmp(cur_mon)); @@ -119,6 +120,7 @@ static void query_commands_cb(const QmpCommand *cmd, void *opaque) CommandInfoList *qmp_query_commands(Error **errp) { CommandInfoList *list = NULL; + Monitor *cur_mon = monitor_cur(); MonitorQMP *mon; assert(monitor_is_qmp(cur_mon)); diff --git a/monitor/qmp-cmds.c b/monitor/qmp-cmds.c index 0ab5b78580..1abef70a89 100644 --- a/monitor/qmp-cmds.c +++ b/monitor/qmp-cmds.c @@ -328,7 +328,7 @@ void qmp_add_client(const char *protocol, const char *fdname, Chardev *s; int fd; - fd = monitor_get_fd(cur_mon, fdname, errp); + fd = monitor_get_fd(monitor_cur(), fdname, errp); if (fd < 0) { return; } diff --git a/monitor/qmp.c b/monitor/qmp.c index d433ceae5b..b42f8c6af3 100644 --- a/monitor/qmp.c +++ b/monitor/qmp.c @@ -133,18 +133,17 @@ static void monitor_qmp_respond(MonitorQMP *mon, QDict *rsp) } } +/* + * Runs outside of coroutine context for OOB commands, but in + * coroutine context for everything else. + */ static void monitor_qmp_dispatch(MonitorQMP *mon, QObject *req) { - Monitor *old_mon; QDict *rsp; QDict *error; - old_mon = cur_mon; - cur_mon = &mon->common; - - rsp = qmp_dispatch(mon->commands, req, qmp_oob_enabled(mon)); - - cur_mon = old_mon; + rsp = qmp_dispatch(mon->commands, req, qmp_oob_enabled(mon), + &mon->common); if (mon->commands == &qmp_cap_negotiation_commands) { error = qdict_get_qdict(rsp, "error"); @@ -211,43 +210,99 @@ static QMPRequest *monitor_qmp_requests_pop_any_with_lock(void) return req_obj; } -void monitor_qmp_bh_dispatcher(void *data) +void coroutine_fn monitor_qmp_dispatcher_co(void *data) { - QMPRequest *req_obj = monitor_qmp_requests_pop_any_with_lock(); + QMPRequest *req_obj = NULL; QDict *rsp; bool need_resume; MonitorQMP *mon; - if (!req_obj) { - return; - } + while (true) { + assert(qatomic_mb_read(&qmp_dispatcher_co_busy) == true); - mon = req_obj->mon; - /* qmp_oob_enabled() might change after "qmp_capabilities" */ - need_resume = !qmp_oob_enabled(mon) || - mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1; - qemu_mutex_unlock(&mon->qmp_queue_lock); - if (req_obj->req) { - QDict *qdict = qobject_to(QDict, req_obj->req); - QObject *id = qdict ? qdict_get(qdict, "id") : NULL; - trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: ""); - monitor_qmp_dispatch(mon, req_obj->req); - } else { - assert(req_obj->err); - rsp = qmp_error_response(req_obj->err); - req_obj->err = NULL; - monitor_qmp_respond(mon, rsp); - qobject_unref(rsp); - } + /* + * Mark the dispatcher as not busy already here so that we + * don't miss any new requests coming in the middle of our + * processing. + */ + qatomic_mb_set(&qmp_dispatcher_co_busy, false); - if (need_resume) { - /* Pairs with the monitor_suspend() in handle_qmp_command() */ - monitor_resume(&mon->common); - } - qmp_request_free(req_obj); + while (!(req_obj = monitor_qmp_requests_pop_any_with_lock())) { + /* + * No more requests to process. Wait to be reentered from + * handle_qmp_command() when it pushes more requests, or + * from monitor_cleanup() when it requests shutdown. + */ + if (!qmp_dispatcher_co_shutdown) { + qemu_coroutine_yield(); - /* Reschedule instead of looping so the main loop stays responsive */ - qemu_bh_schedule(qmp_dispatcher_bh); + /* + * busy must be set to true again by whoever + * rescheduled us to avoid double scheduling + */ + assert(qatomic_xchg(&qmp_dispatcher_co_busy, false) == true); + } + + /* + * qmp_dispatcher_co_shutdown may have changed if we + * yielded and were reentered from monitor_cleanup() + */ + if (qmp_dispatcher_co_shutdown) { + return; + } + } + + if (qatomic_xchg(&qmp_dispatcher_co_busy, true) == true) { + /* + * Someone rescheduled us (probably because a new requests + * came in), but we didn't actually yield. Do that now, + * only to be immediately reentered and removed from the + * list of scheduled coroutines. + */ + qemu_coroutine_yield(); + } + + /* + * Move the coroutine from iohandler_ctx to qemu_aio_context for + * executing the command handler so that it can make progress if it + * involves an AIO_WAIT_WHILE(). + */ + aio_co_schedule(qemu_get_aio_context(), qmp_dispatcher_co); + qemu_coroutine_yield(); + + mon = req_obj->mon; + /* qmp_oob_enabled() might change after "qmp_capabilities" */ + need_resume = !qmp_oob_enabled(mon) || + mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1; + qemu_mutex_unlock(&mon->qmp_queue_lock); + if (req_obj->req) { + QDict *qdict = qobject_to(QDict, req_obj->req); + QObject *id = qdict ? qdict_get(qdict, "id") : NULL; + trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: ""); + monitor_qmp_dispatch(mon, req_obj->req); + } else { + assert(req_obj->err); + rsp = qmp_error_response(req_obj->err); + req_obj->err = NULL; + monitor_qmp_respond(mon, rsp); + qobject_unref(rsp); + } + + if (need_resume) { + /* Pairs with the monitor_suspend() in handle_qmp_command() */ + monitor_resume(&mon->common); + } + qmp_request_free(req_obj); + + /* + * Yield and reschedule so the main loop stays responsive. + * + * Move back to iohandler_ctx so that nested event loops for + * qemu_aio_context don't start new monitor commands. + */ + aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co); + qemu_coroutine_yield(); + } } static void handle_qmp_command(void *opaque, QObject *req, Error *err) @@ -308,7 +363,9 @@ static void handle_qmp_command(void *opaque, QObject *req, Error *err) qemu_mutex_unlock(&mon->qmp_queue_lock); /* Kick the dispatcher routine */ - qemu_bh_schedule(qmp_dispatcher_bh); + if (!qatomic_xchg(&qmp_dispatcher_co_busy, true)) { + aio_co_wake(qmp_dispatcher_co); + } } static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size) diff --git a/net/socket.c b/net/socket.c index 2d21fddd9c..15b410e8d8 100644 --- a/net/socket.c +++ b/net/socket.c @@ -727,7 +727,7 @@ int net_init_socket(const Netdev *netdev, const char *name, if (sock->has_fd) { int fd, ret; - fd = monitor_fd_param(cur_mon, sock->fd, errp); + fd = monitor_fd_param(monitor_cur(), sock->fd, errp); if (fd == -1) { return -1; } diff --git a/net/tap.c b/net/tap.c index 04ce72dd2f..c46ff66184 100644 --- a/net/tap.c +++ b/net/tap.c @@ -700,7 +700,7 @@ static void net_init_tap_one(const NetdevTapOptions *tap, NetClientState *peer, if (vhostfdname) { int ret; - vhostfd = monitor_fd_param(cur_mon, vhostfdname, &err); + vhostfd = monitor_fd_param(monitor_cur(), vhostfdname, &err); if (vhostfd == -1) { if (tap->has_vhostforce && tap->vhostforce) { error_propagate(errp, err); @@ -808,7 +808,7 @@ int net_init_tap(const Netdev *netdev, const char *name, return -1; } - fd = monitor_fd_param(cur_mon, tap->fd, errp); + fd = monitor_fd_param(monitor_cur(), tap->fd, errp); if (fd == -1) { return -1; } @@ -862,7 +862,7 @@ int net_init_tap(const Netdev *netdev, const char *name, } for (i = 0; i < nfds; i++) { - fd = monitor_fd_param(cur_mon, fds[i], errp); + fd = monitor_fd_param(monitor_cur(), fds[i], errp); if (fd == -1) { ret = -1; goto free_fail; diff --git a/qapi/block-core.json b/qapi/block-core.json index 3758ea9912..ee5ebef7f2 100644 --- a/qapi/block-core.json +++ b/qapi/block-core.json @@ -1310,7 +1310,8 @@ { 'command': 'block_resize', 'data': { '*device': 'str', '*node-name': 'str', - 'size': 'int' } } + 'size': 'int' }, + 'coroutine': true } ## # @NewImageMode: diff --git a/qapi/qmp-dispatch.c b/qapi/qmp-dispatch.c index 79347e0864..9a2d7dd29a 100644 --- a/qapi/qmp-dispatch.c +++ b/qapi/qmp-dispatch.c @@ -12,12 +12,16 @@ */ #include "qemu/osdep.h" + +#include "block/aio.h" #include "qapi/error.h" #include "qapi/qmp/dispatch.h" #include "qapi/qmp/qdict.h" #include "qapi/qmp/qjson.h" #include "sysemu/runstate.h" #include "qapi/qmp/qbool.h" +#include "qemu/coroutine.h" +#include "qemu/main-loop.h" static QDict *qmp_dispatch_check_obj(QDict *dict, bool allow_oob, Error **errp) @@ -88,8 +92,32 @@ bool qmp_is_oob(const QDict *dict) && !qdict_haskey(dict, "execute"); } +typedef struct QmpDispatchBH { + const QmpCommand *cmd; + Monitor *cur_mon; + QDict *args; + QObject **ret; + Error **errp; + Coroutine *co; +} QmpDispatchBH; + +static void do_qmp_dispatch_bh(void *opaque) +{ + QmpDispatchBH *data = opaque; + + assert(monitor_cur() == NULL); + monitor_set_cur(qemu_coroutine_self(), data->cur_mon); + data->cmd->fn(data->args, data->ret, data->errp); + monitor_set_cur(qemu_coroutine_self(), NULL); + aio_co_wake(data->co); +} + +/* + * Runs outside of coroutine context for OOB commands, but in coroutine + * context for everything else. + */ QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request, - bool allow_oob) + bool allow_oob, Monitor *cur_mon) { Error *err = NULL; bool oob; @@ -152,7 +180,40 @@ QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request, args = qdict_get_qdict(dict, "arguments"); qobject_ref(args); } - cmd->fn(args, &ret, &err); + + assert(!(oob && qemu_in_coroutine())); + assert(monitor_cur() == NULL); + if (!!(cmd->options & QCO_COROUTINE) == qemu_in_coroutine()) { + monitor_set_cur(qemu_coroutine_self(), cur_mon); + cmd->fn(args, &ret, &err); + monitor_set_cur(qemu_coroutine_self(), NULL); + } else { + /* + * Actual context doesn't match the one the command needs. + * + * Case 1: we are in coroutine context, but command does not + * have QCO_COROUTINE. We need to drop out of coroutine + * context for executing it. + * + * Case 2: we are outside coroutine context, but command has + * QCO_COROUTINE. Can't actually happen, because we get here + * outside coroutine context only when executing a command + * out of band, and OOB commands never have QCO_COROUTINE. + */ + assert(!oob && qemu_in_coroutine() && !(cmd->options & QCO_COROUTINE)); + + QmpDispatchBH data = { + .cur_mon = cur_mon, + .cmd = cmd, + .args = args, + .ret = &ret, + .errp = &err, + .co = qemu_coroutine_self(), + }; + aio_bh_schedule_oneshot(qemu_get_aio_context(), do_qmp_dispatch_bh, + &data); + qemu_coroutine_yield(); + } qobject_unref(args); if (err) { /* or assert(!ret) after reviewing all handlers: */ diff --git a/qapi/qmp-registry.c b/qapi/qmp-registry.c index d0f9a1d3e3..58c65b5052 100644 --- a/qapi/qmp-registry.c +++ b/qapi/qmp-registry.c @@ -20,6 +20,9 @@ void qmp_register_command(QmpCommandList *cmds, const char *name, { QmpCommand *cmd = g_malloc0(sizeof(*cmd)); + /* QCO_COROUTINE and QCO_ALLOW_OOB are incompatible for now */ + assert(!((options & QCO_COROUTINE) && (options & QCO_ALLOW_OOB))); + cmd->name = name; cmd->fn = fn; cmd->enabled = true; diff --git a/qga/main.c b/qga/main.c index 740f5f7303..dea6a3aa64 100644 --- a/qga/main.c +++ b/qga/main.c @@ -579,7 +579,7 @@ static void process_event(void *opaque, QObject *obj, Error *err) } g_debug("processing command"); - rsp = qmp_dispatch(&ga_commands, obj, false); + rsp = qmp_dispatch(&ga_commands, obj, false, NULL); end: ret = send_response(s, rsp); diff --git a/scripts/qapi/commands.py b/scripts/qapi/commands.py index 3cf9e1110b..6e6fc94a14 100644 --- a/scripts/qapi/commands.py +++ b/scripts/qapi/commands.py @@ -176,7 +176,8 @@ out: return ret -def gen_register_command(name, success_response, allow_oob, allow_preconfig): +def gen_register_command(name, success_response, allow_oob, allow_preconfig, + coroutine): options = [] if not success_response: @@ -185,6 +186,8 @@ def gen_register_command(name, success_response, allow_oob, allow_preconfig): options += ['QCO_ALLOW_OOB'] if allow_preconfig: options += ['QCO_ALLOW_PRECONFIG'] + if coroutine: + options += ['QCO_COROUTINE'] if not options: options = ['QCO_NO_OPTIONS'] @@ -267,7 +270,7 @@ void %(c_prefix)sqmp_init_marshal(QmpCommandList *cmds); def visit_command(self, name, info, ifcond, features, arg_type, ret_type, gen, success_response, boxed, - allow_oob, allow_preconfig): + allow_oob, allow_preconfig, coroutine): if not gen: return # FIXME: If T is a user-defined type, the user is responsible @@ -285,7 +288,8 @@ void %(c_prefix)sqmp_init_marshal(QmpCommandList *cmds); self._genh.add(gen_marshal_decl(name)) self._genc.add(gen_marshal(name, arg_type, boxed, ret_type)) self._regy.add(gen_register_command(name, success_response, - allow_oob, allow_preconfig)) + allow_oob, allow_preconfig, + coroutine)) def gen_commands(schema, output_dir, prefix): diff --git a/scripts/qapi/expr.py b/scripts/qapi/expr.py index 2942520399..a15c1fb474 100644 --- a/scripts/qapi/expr.py +++ b/scripts/qapi/expr.py @@ -88,10 +88,17 @@ def check_flags(expr, info): if key in expr and expr[key] is not False: raise QAPISemError( info, "flag '%s' may only use false value" % key) - for key in ['boxed', 'allow-oob', 'allow-preconfig']: + for key in ['boxed', 'allow-oob', 'allow-preconfig', 'coroutine']: if key in expr and expr[key] is not True: raise QAPISemError( info, "flag '%s' may only use true value" % key) + if 'allow-oob' in expr and 'coroutine' in expr: + # This is not necessarily a fundamental incompatibility, but + # we don't have a use case and the desired semantics isn't + # obvious. The simplest solution is to forbid it until we get + # a use case for it. + raise QAPISemError(info, "flags 'allow-oob' and 'coroutine' " + "are incompatible") def check_if(expr, info, source): @@ -342,7 +349,7 @@ def check_exprs(exprs): ['command'], ['data', 'returns', 'boxed', 'if', 'features', 'gen', 'success-response', 'allow-oob', - 'allow-preconfig']) + 'allow-preconfig', 'coroutine']) normalize_members(expr.get('data')) check_command(expr, info) elif meta == 'event': diff --git a/scripts/qapi/introspect.py b/scripts/qapi/introspect.py index 23652be810..5907b09cd5 100644 --- a/scripts/qapi/introspect.py +++ b/scripts/qapi/introspect.py @@ -216,7 +216,7 @@ const QLitObject %(c_name)s = %(c_string)s; def visit_command(self, name, info, ifcond, features, arg_type, ret_type, gen, success_response, boxed, - allow_oob, allow_preconfig): + allow_oob, allow_preconfig, coroutine): arg_type = arg_type or self._schema.the_empty_object_type ret_type = ret_type or self._schema.the_empty_object_type obj = {'arg-type': self._use_type(arg_type), diff --git a/scripts/qapi/schema.py b/scripts/qapi/schema.py index 78309a00f0..d1307ec661 100644 --- a/scripts/qapi/schema.py +++ b/scripts/qapi/schema.py @@ -128,7 +128,7 @@ class QAPISchemaVisitor: def visit_command(self, name, info, ifcond, features, arg_type, ret_type, gen, success_response, boxed, - allow_oob, allow_preconfig): + allow_oob, allow_preconfig, coroutine): pass def visit_event(self, name, info, ifcond, features, arg_type, boxed): @@ -713,7 +713,8 @@ class QAPISchemaCommand(QAPISchemaEntity): def __init__(self, name, info, doc, ifcond, features, arg_type, ret_type, - gen, success_response, boxed, allow_oob, allow_preconfig): + gen, success_response, boxed, allow_oob, allow_preconfig, + coroutine): super().__init__(name, info, doc, ifcond, features) assert not arg_type or isinstance(arg_type, str) assert not ret_type or isinstance(ret_type, str) @@ -726,6 +727,7 @@ class QAPISchemaCommand(QAPISchemaEntity): self.boxed = boxed self.allow_oob = allow_oob self.allow_preconfig = allow_preconfig + self.coroutine = coroutine def check(self, schema): super().check(schema) @@ -768,7 +770,8 @@ class QAPISchemaCommand(QAPISchemaEntity): visitor.visit_command( self.name, self.info, self.ifcond, self.features, self.arg_type, self.ret_type, self.gen, self.success_response, - self.boxed, self.allow_oob, self.allow_preconfig) + self.boxed, self.allow_oob, self.allow_preconfig, + self.coroutine) class QAPISchemaEvent(QAPISchemaEntity): @@ -1074,6 +1077,7 @@ class QAPISchema: boxed = expr.get('boxed', False) allow_oob = expr.get('allow-oob', False) allow_preconfig = expr.get('allow-preconfig', False) + coroutine = expr.get('coroutine', False) ifcond = expr.get('if') features = self._make_features(expr.get('features'), info) if isinstance(data, OrderedDict): @@ -1086,7 +1090,8 @@ class QAPISchema: self._def_entity(QAPISchemaCommand(name, info, doc, ifcond, features, data, rets, gen, success_response, - boxed, allow_oob, allow_preconfig)) + boxed, allow_oob, allow_preconfig, + coroutine)) def _def_event(self, expr, info, doc): name = expr['event'] diff --git a/softmmu/cpus.c b/softmmu/cpus.c index 9e33416b4d..47cceddd80 100644 --- a/softmmu/cpus.c +++ b/softmmu/cpus.c @@ -793,6 +793,6 @@ exit: void qmp_inject_nmi(Error **errp) { - nmi_monitor_handle(monitor_get_cpu_index(), errp); + nmi_monitor_handle(monitor_get_cpu_index(monitor_cur()), errp); } diff --git a/stubs/monitor-core.c b/stubs/monitor-core.c index 6cff1c4e1d..d058a2a00d 100644 --- a/stubs/monitor-core.c +++ b/stubs/monitor-core.c @@ -3,7 +3,15 @@ #include "qemu-common.h" #include "qapi/qapi-emit-events.h" -__thread Monitor *cur_mon; +Monitor *monitor_cur(void) +{ + return NULL; +} + +Monitor *monitor_set_cur(Coroutine *co, Monitor *mon) +{ + return NULL; +} void monitor_init_qmp(Chardev *chr, bool pretty, Error **errp) { diff --git a/tests/qapi-schema/meson.build b/tests/qapi-schema/meson.build index f08c902911..1f222a7a13 100644 --- a/tests/qapi-schema/meson.build +++ b/tests/qapi-schema/meson.build @@ -142,6 +142,7 @@ schemas = [ 'nested-struct-data.json', 'nested-struct-data-invalid-dict.json', 'non-objects.json', + 'oob-coroutine.json', 'oob-test.json', 'allow-preconfig-test.json', 'pragma-doc-required-crap.json', diff --git a/tests/qapi-schema/oob-coroutine.err b/tests/qapi-schema/oob-coroutine.err new file mode 100644 index 0000000000..c01a4992bd --- /dev/null +++ b/tests/qapi-schema/oob-coroutine.err @@ -0,0 +1,2 @@ +oob-coroutine.json: In command 'oob-command-1': +oob-coroutine.json:2: flags 'allow-oob' and 'coroutine' are incompatible diff --git a/tests/qapi-schema/oob-coroutine.json b/tests/qapi-schema/oob-coroutine.json new file mode 100644 index 0000000000..0f67663bcd --- /dev/null +++ b/tests/qapi-schema/oob-coroutine.json @@ -0,0 +1,2 @@ +# Check that incompatible flags allow-oob and coroutine are rejected +{ 'command': 'oob-command-1', 'allow-oob': true, 'coroutine': true } diff --git a/tests/qapi-schema/oob-coroutine.out b/tests/qapi-schema/oob-coroutine.out new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/qapi-schema/qapi-schema-test.json b/tests/qapi-schema/qapi-schema-test.json index 3a9f2cbb33..63f92adf68 100644 --- a/tests/qapi-schema/qapi-schema-test.json +++ b/tests/qapi-schema/qapi-schema-test.json @@ -148,6 +148,7 @@ 'returns': 'UserDefTwo' } { 'command': 'cmd-success-response', 'data': {}, 'success-response': false } +{ 'command': 'coroutine-cmd', 'data': {}, 'coroutine': true } # Returning a non-dictionary requires a name from the whitelist { 'command': 'guest-get-time', 'data': {'a': 'int', '*b': 'int' }, diff --git a/tests/qapi-schema/qapi-schema-test.out b/tests/qapi-schema/qapi-schema-test.out index 891b4101e0..8868ca0dca 100644 --- a/tests/qapi-schema/qapi-schema-test.out +++ b/tests/qapi-schema/qapi-schema-test.out @@ -203,6 +203,8 @@ command user_def_cmd2 q_obj_user_def_cmd2-arg -> UserDefTwo gen=True success_response=True boxed=False oob=False preconfig=False command cmd-success-response None -> None gen=True success_response=False boxed=False oob=False preconfig=False +command coroutine-cmd None -> None + gen=True success_response=True boxed=False oob=False preconfig=False coroutine=True object q_obj_guest-get-time-arg member a: int optional=False member b: int optional=True diff --git a/tests/qapi-schema/test-qapi.py b/tests/qapi-schema/test-qapi.py index f396b471eb..e8db9d09d9 100755 --- a/tests/qapi-schema/test-qapi.py +++ b/tests/qapi-schema/test-qapi.py @@ -68,12 +68,13 @@ class QAPISchemaTestVisitor(QAPISchemaVisitor): def visit_command(self, name, info, ifcond, features, arg_type, ret_type, gen, success_response, boxed, - allow_oob, allow_preconfig): + allow_oob, allow_preconfig, coroutine): print('command %s %s -> %s' % (name, arg_type and arg_type.name, ret_type and ret_type.name)) - print(' gen=%s success_response=%s boxed=%s oob=%s preconfig=%s' - % (gen, success_response, boxed, allow_oob, allow_preconfig)) + print(' gen=%s success_response=%s boxed=%s oob=%s preconfig=%s%s' + % (gen, success_response, boxed, allow_oob, allow_preconfig, + " coroutine=True" if coroutine else "")) self._print_if(ifcond) self._print_features(features) diff --git a/tests/test-qmp-cmds.c b/tests/test-qmp-cmds.c index d12ff47e26..d3413bfef0 100644 --- a/tests/test-qmp-cmds.c +++ b/tests/test-qmp-cmds.c @@ -36,6 +36,10 @@ void qmp_cmd_success_response(Error **errp) { } +void qmp_coroutine_cmd(Error **errp) +{ +} + Empty2 *qmp_user_def_cmd0(Error **errp) { return g_new0(Empty2, 1); @@ -152,7 +156,7 @@ static QObject *do_qmp_dispatch(bool allow_oob, const char *template, ...) req = qdict_from_vjsonf_nofail(template, ap); va_end(ap); - resp = qmp_dispatch(&qmp_commands, QOBJECT(req), allow_oob); + resp = qmp_dispatch(&qmp_commands, QOBJECT(req), allow_oob, NULL); g_assert(resp); ret = qdict_get(resp, "return"); g_assert(ret); @@ -175,7 +179,7 @@ static void do_qmp_dispatch_error(bool allow_oob, ErrorClass cls, req = qdict_from_vjsonf_nofail(template, ap); va_end(ap); - resp = qmp_dispatch(&qmp_commands, QOBJECT(req), allow_oob); + resp = qmp_dispatch(&qmp_commands, QOBJECT(req), allow_oob, NULL); g_assert(resp); error = qdict_get_qdict(resp, "error"); g_assert(error); @@ -231,7 +235,7 @@ static void test_dispatch_cmd_success_response(void) QDict *resp; qdict_put_str(req, "execute", "cmd-success-response"); - resp = qmp_dispatch(&qmp_commands, QOBJECT(req), false); + resp = qmp_dispatch(&qmp_commands, QOBJECT(req), false, NULL); g_assert_null(resp); qobject_unref(req); } diff --git a/tests/test-util-sockets.c b/tests/test-util-sockets.c index 1bbb16d9b1..f6336e0f91 100644 --- a/tests/test-util-sockets.c +++ b/tests/test-util-sockets.c @@ -52,6 +52,7 @@ static void test_fd_is_socket_good(void) static int mon_fd = -1; static const char *mon_fdname; +__thread Monitor *cur_mon; int monitor_get_fd(Monitor *mon, const char *fdname, Error **errp) { @@ -65,15 +66,14 @@ int monitor_get_fd(Monitor *mon, const char *fdname, Error **errp) } /* - * Syms of stubs in libqemuutil.a are discarded at .o file granularity. - * To replace monitor_get_fd() we must ensure everything in - * stubs/monitor.c is defined, to make sure monitor.o is discarded + * Syms of stubs in libqemuutil.a are discarded at .o file + * granularity. To replace monitor_get_fd() and monitor_cur(), we + * must ensure that we also replace any other symbol that is used in + * the binary and would be taken from the same stub object file, * otherwise we get duplicate syms at link time. */ -__thread Monitor *cur_mon; +Monitor *monitor_cur(void) { return cur_mon; } int monitor_vprintf(Monitor *mon, const char *fmt, va_list ap) { abort(); } -void monitor_init_qmp(Chardev *chr, bool pretty, Error **errp) {} -void monitor_init_hmp(Chardev *chr, bool use_readline, Error **errp) {} #ifndef _WIN32 static void test_socket_fd_pass_name_good(void) diff --git a/trace/control.c b/trace/control.c index c63a4de732..b35e512dce 100644 --- a/trace/control.c +++ b/trace/control.c @@ -176,7 +176,7 @@ void trace_enable_events(const char *line_buf) { if (is_help_option(line_buf)) { trace_list_events(); - if (cur_mon == NULL) { + if (monitor_cur() == NULL) { exit(0); } } else { diff --git a/util/aio-posix.c b/util/aio-posix.c index 280f27bb99..30f5354b1e 100644 --- a/util/aio-posix.c +++ b/util/aio-posix.c @@ -15,6 +15,7 @@ #include "qemu/osdep.h" #include "block/block.h" +#include "qemu/main-loop.h" #include "qemu/rcu.h" #include "qemu/rcu_queue.h" #include "qemu/sockets.h" @@ -558,8 +559,13 @@ bool aio_poll(AioContext *ctx, bool blocking) * There cannot be two concurrent aio_poll calls for the same AioContext (or * an aio_poll concurrent with a GSource prepare/check/dispatch callback). * We rely on this below to avoid slow locked accesses to ctx->notify_me. + * + * aio_poll() may only be called in the AioContext's thread. iohandler_ctx + * is special in that it runs in the main thread, but that thread's context + * is qemu_aio_context. */ - assert(in_aio_context_home_thread(ctx)); + assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ? + qemu_get_aio_context() : ctx)); qemu_lockcnt_inc(&ctx->list_lock); diff --git a/util/async.c b/util/async.c index f758354c6a..674dbefb7c 100644 --- a/util/async.c +++ b/util/async.c @@ -569,6 +569,36 @@ void aio_co_schedule(AioContext *ctx, Coroutine *co) aio_context_unref(ctx); } +typedef struct AioCoRescheduleSelf { + Coroutine *co; + AioContext *new_ctx; +} AioCoRescheduleSelf; + +static void aio_co_reschedule_self_bh(void *opaque) +{ + AioCoRescheduleSelf *data = opaque; + aio_co_schedule(data->new_ctx, data->co); +} + +void coroutine_fn aio_co_reschedule_self(AioContext *new_ctx) +{ + AioContext *old_ctx = qemu_get_current_aio_context(); + + if (old_ctx != new_ctx) { + AioCoRescheduleSelf data = { + .co = qemu_coroutine_self(), + .new_ctx = new_ctx, + }; + /* + * We can't directly schedule the coroutine in the target context + * because this would be racy: The other thread could try to enter the + * coroutine before it has yielded in this one. + */ + aio_bh_schedule_oneshot(old_ctx, aio_co_reschedule_self_bh, &data); + qemu_coroutine_yield(); + } +} + void aio_co_wake(struct Coroutine *co) { AioContext *ctx; diff --git a/util/qemu-error.c b/util/qemu-error.c index 3ee41438e9..aa30f03564 100644 --- a/util/qemu-error.c +++ b/util/qemu-error.c @@ -171,7 +171,7 @@ static void print_loc(void) int i; const char *const *argp; - if (!cur_mon && progname) { + if (!monitor_cur() && progname) { fprintf(stderr, "%s:", progname); sep = " "; } @@ -208,7 +208,7 @@ static void vreport(report_type type, const char *fmt, va_list ap) GTimeVal tv; gchar *timestr; - if (error_with_timestamp && !cur_mon) { + if (error_with_timestamp && !monitor_cur()) { g_get_current_time(&tv); timestr = g_time_val_to_iso8601(&tv); error_printf("%s ", timestr); @@ -216,7 +216,7 @@ static void vreport(report_type type, const char *fmt, va_list ap) } /* Only prepend guest name if -msg guest-name and -name guest=... are set */ - if (error_with_guestname && error_guest_name && !cur_mon) { + if (error_with_guestname && error_guest_name && !monitor_cur()) { error_printf("%s ", error_guest_name); } diff --git a/util/qemu-print.c b/util/qemu-print.c index e79d6b8396..69ba612f56 100644 --- a/util/qemu-print.c +++ b/util/qemu-print.c @@ -20,6 +20,7 @@ */ int qemu_vprintf(const char *fmt, va_list ap) { + Monitor *cur_mon = monitor_cur(); if (cur_mon) { return monitor_vprintf(cur_mon, fmt, ap); } @@ -48,7 +49,7 @@ int qemu_printf(const char *fmt, ...) int qemu_vfprintf(FILE *stream, const char *fmt, va_list ap) { if (!stream) { - return monitor_vprintf(cur_mon, fmt, ap); + return monitor_vprintf(monitor_cur(), fmt, ap); } return vfprintf(stream, fmt, ap); } diff --git a/util/qemu-sockets.c b/util/qemu-sockets.c index de4bf7616e..05e5c73f9d 100644 --- a/util/qemu-sockets.c +++ b/util/qemu-sockets.c @@ -1092,6 +1092,7 @@ fail: static int socket_get_fd(const char *fdstr, int num, Error **errp) { + Monitor *cur_mon = monitor_cur(); int fd; if (num != 1) { error_setg_errno(errp, EINVAL, "socket_get_fd: too many connections");