monitor: separate QMP parser and dispatcher
Originally QMP goes through these steps: JSON Parser --> QMP Dispatcher --> Respond /|\ (2) (3) | (1) | \|/ (4) +--------- main thread --------+ This patch does this: JSON Parser QMP Dispatcher --> Respond /|\ | /|\ (4) | | | (2) | (3) | (5) (1) | +-----> | \|/ +--------- main thread <-------+ So the parsing job and the dispatching job is isolated now. It gives us a chance in follow up patches to totally move the parser outside. The isolation is done using one QEMUBH. Only one dispatcher QEMUBH is used for all the monitors. Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com> Signed-off-by: Peter Xu <peterx@redhat.com> Message-Id: <20180309090006.10018-15-peterx@redhat.com> Reviewed-by: Eric Blake <eblake@redhat.com> [eblake: grammar tweaks, rebase to qobject_to()] Signed-off-by: Eric Blake <eblake@redhat.com>
This commit is contained in:
parent
e3e977d45b
commit
71da4667db
201
monitor.c
201
monitor.c
@ -172,6 +172,13 @@ typedef struct {
|
||||
*/
|
||||
QmpCommandList *commands;
|
||||
bool qmp_caps[QMP_CAPABILITY__MAX];
|
||||
/*
|
||||
* Protects qmp request/response queue. Please take monitor_lock
|
||||
* first when used together.
|
||||
*/
|
||||
QemuMutex qmp_queue_lock;
|
||||
/* Input queue that holds all the parsed QMP requests */
|
||||
GQueue *qmp_requests;
|
||||
} MonitorQMP;
|
||||
|
||||
/*
|
||||
@ -218,6 +225,8 @@ struct Monitor {
|
||||
/* Let's add monitor global variables to this struct. */
|
||||
static struct {
|
||||
IOThread *mon_iothread;
|
||||
/* Bottom half to dispatch the requests received from IO thread */
|
||||
QEMUBH *qmp_dispatcher_bh;
|
||||
} mon_global;
|
||||
|
||||
/* QMP checker flags */
|
||||
@ -600,11 +609,13 @@ static void monitor_data_init(Monitor *mon, bool skip_flush,
|
||||
{
|
||||
memset(mon, 0, sizeof(Monitor));
|
||||
qemu_mutex_init(&mon->out_lock);
|
||||
qemu_mutex_init(&mon->qmp.qmp_queue_lock);
|
||||
mon->outbuf = qstring_new();
|
||||
/* Use *mon_cmds by default. */
|
||||
mon->cmd_table = mon_cmds;
|
||||
mon->skip_flush = skip_flush;
|
||||
mon->use_io_thr = use_io_thr;
|
||||
mon->qmp.qmp_requests = g_queue_new();
|
||||
}
|
||||
|
||||
static void monitor_data_destroy(Monitor *mon)
|
||||
@ -617,6 +628,8 @@ static void monitor_data_destroy(Monitor *mon)
|
||||
readline_free(mon->rs);
|
||||
QDECREF(mon->outbuf);
|
||||
qemu_mutex_destroy(&mon->out_lock);
|
||||
qemu_mutex_destroy(&mon->qmp.qmp_queue_lock);
|
||||
g_queue_free(mon->qmp.qmp_requests);
|
||||
}
|
||||
|
||||
char *qmp_human_monitor_command(const char *command_line, bool has_cpu_index,
|
||||
@ -1059,6 +1072,16 @@ static void monitor_init_qmp_commands(void)
|
||||
qmp_marshal_qmp_capabilities, QCO_NO_OPTIONS);
|
||||
}
|
||||
|
||||
static bool qmp_cap_enabled(Monitor *mon, QMPCapability cap)
|
||||
{
|
||||
return mon->qmp.qmp_caps[cap];
|
||||
}
|
||||
|
||||
static bool qmp_oob_enabled(Monitor *mon)
|
||||
{
|
||||
return qmp_cap_enabled(mon, QMP_CAPABILITY_OOB);
|
||||
}
|
||||
|
||||
static void qmp_caps_check(Monitor *mon, QMPCapabilityList *list,
|
||||
Error **errp)
|
||||
{
|
||||
@ -3869,30 +3892,39 @@ static void monitor_qmp_respond(Monitor *mon, QObject *rsp,
|
||||
qobject_decref(rsp);
|
||||
}
|
||||
|
||||
static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens)
|
||||
struct QMPRequest {
|
||||
/* Owner of the request */
|
||||
Monitor *mon;
|
||||
/* "id" field of the request */
|
||||
QObject *id;
|
||||
/* Request object to be handled */
|
||||
QObject *req;
|
||||
/*
|
||||
* Whether we need to resume the monitor afterward. This flag is
|
||||
* used to emulate the old QMP server behavior that the current
|
||||
* command must be completed before execution of the next one.
|
||||
*/
|
||||
bool need_resume;
|
||||
};
|
||||
typedef struct QMPRequest QMPRequest;
|
||||
|
||||
/*
|
||||
* Dispatch one single QMP request. The function will free the req_obj
|
||||
* and objects inside it before return.
|
||||
*/
|
||||
static void monitor_qmp_dispatch_one(QMPRequest *req_obj)
|
||||
{
|
||||
QObject *req, *rsp = NULL, *id = NULL;
|
||||
Monitor *mon, *old_mon;
|
||||
QObject *req, *rsp = NULL, *id;
|
||||
QDict *qdict = NULL;
|
||||
MonitorQMP *mon_qmp = container_of(parser, MonitorQMP, parser);
|
||||
Monitor *old_mon, *mon = container_of(mon_qmp, Monitor, qmp);
|
||||
bool need_resume;
|
||||
|
||||
Error *err = NULL;
|
||||
req = req_obj->req;
|
||||
mon = req_obj->mon;
|
||||
id = req_obj->id;
|
||||
need_resume = req_obj->need_resume;
|
||||
|
||||
req = json_parser_parse_err(tokens, NULL, &err);
|
||||
if (!req && !err) {
|
||||
/* json_parser_parse_err() sucks: can fail without setting @err */
|
||||
error_setg(&err, QERR_JSON_PARSING);
|
||||
}
|
||||
if (err) {
|
||||
goto err_out;
|
||||
}
|
||||
|
||||
qdict = qobject_to(QDict, req);
|
||||
if (qdict) {
|
||||
id = qdict_get(qdict, "id");
|
||||
qobject_incref(id);
|
||||
qdict_del(qdict, "id");
|
||||
} /* else will fail qmp_dispatch() */
|
||||
g_free(req_obj);
|
||||
|
||||
if (trace_event_get_state_backends(TRACE_HANDLE_QMP_COMMAND)) {
|
||||
QString *req_json = qobject_to_json(req);
|
||||
@ -3903,7 +3935,7 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens)
|
||||
old_mon = cur_mon;
|
||||
cur_mon = mon;
|
||||
|
||||
rsp = qmp_dispatch(cur_mon->qmp.commands, req);
|
||||
rsp = qmp_dispatch(mon->qmp.commands, req);
|
||||
|
||||
cur_mon = old_mon;
|
||||
|
||||
@ -3919,12 +3951,122 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens)
|
||||
}
|
||||
}
|
||||
|
||||
err_out:
|
||||
monitor_qmp_respond(mon, rsp, err, id);
|
||||
/* Respond if necessary */
|
||||
monitor_qmp_respond(mon, rsp, NULL, id);
|
||||
|
||||
/* This pairs with the monitor_suspend() in handle_qmp_command(). */
|
||||
if (need_resume) {
|
||||
monitor_resume(mon);
|
||||
}
|
||||
|
||||
qobject_decref(req);
|
||||
}
|
||||
|
||||
/*
|
||||
* Pop one QMP request from monitor queues, return NULL if not found.
|
||||
* We are using round-robin fashion to pop the request, to avoid
|
||||
* processing commands only on a very busy monitor. To achieve that,
|
||||
* when we process one request on a specific monitor, we put that
|
||||
* monitor to the end of mon_list queue.
|
||||
*/
|
||||
static QMPRequest *monitor_qmp_requests_pop_one(void)
|
||||
{
|
||||
QMPRequest *req_obj = NULL;
|
||||
Monitor *mon;
|
||||
|
||||
qemu_mutex_lock(&monitor_lock);
|
||||
|
||||
QTAILQ_FOREACH(mon, &mon_list, entry) {
|
||||
qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
|
||||
req_obj = g_queue_pop_head(mon->qmp.qmp_requests);
|
||||
qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
|
||||
if (req_obj) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (req_obj) {
|
||||
/*
|
||||
* We found one request on the monitor. Degrade this monitor's
|
||||
* priority to lowest by re-inserting it to end of queue.
|
||||
*/
|
||||
QTAILQ_REMOVE(&mon_list, mon, entry);
|
||||
QTAILQ_INSERT_TAIL(&mon_list, mon, entry);
|
||||
}
|
||||
|
||||
qemu_mutex_unlock(&monitor_lock);
|
||||
|
||||
return req_obj;
|
||||
}
|
||||
|
||||
static void monitor_qmp_bh_dispatcher(void *data)
|
||||
{
|
||||
QMPRequest *req_obj = monitor_qmp_requests_pop_one();
|
||||
|
||||
if (req_obj) {
|
||||
monitor_qmp_dispatch_one(req_obj);
|
||||
/* Reschedule instead of looping so the main loop stays responsive */
|
||||
qemu_bh_schedule(mon_global.qmp_dispatcher_bh);
|
||||
}
|
||||
}
|
||||
|
||||
static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens)
|
||||
{
|
||||
QObject *req, *id = NULL;
|
||||
QDict *qdict = NULL;
|
||||
MonitorQMP *mon_qmp = container_of(parser, MonitorQMP, parser);
|
||||
Monitor *mon = container_of(mon_qmp, Monitor, qmp);
|
||||
Error *err = NULL;
|
||||
QMPRequest *req_obj;
|
||||
|
||||
req = json_parser_parse_err(tokens, NULL, &err);
|
||||
if (!req && !err) {
|
||||
/* json_parser_parse_err() sucks: can fail without setting @err */
|
||||
error_setg(&err, QERR_JSON_PARSING);
|
||||
}
|
||||
if (err) {
|
||||
monitor_qmp_respond(mon, NULL, err, NULL);
|
||||
qobject_decref(req);
|
||||
return;
|
||||
}
|
||||
|
||||
qdict = qobject_to(QDict, req);
|
||||
if (qdict) {
|
||||
id = qdict_get(qdict, "id");
|
||||
qobject_incref(id);
|
||||
qdict_del(qdict, "id");
|
||||
} /* else will fail qmp_dispatch() */
|
||||
|
||||
req_obj = g_new0(QMPRequest, 1);
|
||||
req_obj->mon = mon;
|
||||
req_obj->id = id;
|
||||
req_obj->req = req;
|
||||
req_obj->need_resume = false;
|
||||
|
||||
/*
|
||||
* If OOB is not enabled on the current monitor, we'll emulate the
|
||||
* old behavior that we won't process the current monitor any more
|
||||
* until it has responded. This helps make sure that as long as
|
||||
* OOB is not enabled, the server will never drop any command.
|
||||
*/
|
||||
if (!qmp_oob_enabled(mon)) {
|
||||
monitor_suspend(mon);
|
||||
req_obj->need_resume = true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Put the request to the end of queue so that requests will be
|
||||
* handled in time order. Ownership for req_obj, req, id,
|
||||
* etc. will be delivered to the handler side.
|
||||
*/
|
||||
qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
|
||||
g_queue_push_tail(mon->qmp.qmp_requests, req_obj);
|
||||
qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
|
||||
|
||||
/* Kick the dispatcher routine */
|
||||
qemu_bh_schedule(mon_global.qmp_dispatcher_bh);
|
||||
}
|
||||
|
||||
static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size)
|
||||
{
|
||||
Monitor *mon = opaque;
|
||||
@ -4137,6 +4279,15 @@ static void monitor_iothread_init(void)
|
||||
{
|
||||
mon_global.mon_iothread = iothread_create("mon_iothread",
|
||||
&error_abort);
|
||||
|
||||
/*
|
||||
* This MUST be on main loop thread since we have commands that
|
||||
* have assumption to be run on main loop thread. It would be
|
||||
* nice that one day we can remove this assumption in the future.
|
||||
*/
|
||||
mon_global.qmp_dispatcher_bh = aio_bh_new(qemu_get_aio_context(),
|
||||
monitor_qmp_bh_dispatcher,
|
||||
NULL);
|
||||
}
|
||||
|
||||
void monitor_init_globals(void)
|
||||
@ -4288,6 +4439,10 @@ void monitor_cleanup(void)
|
||||
}
|
||||
qemu_mutex_unlock(&monitor_lock);
|
||||
|
||||
/* QEMUBHs needs to be deleted before destroying the IOThread. */
|
||||
qemu_bh_delete(mon_global.qmp_dispatcher_bh);
|
||||
mon_global.qmp_dispatcher_bh = NULL;
|
||||
|
||||
iothread_destroy(mon_global.mon_iothread);
|
||||
mon_global.mon_iothread = NULL;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user