blockjob: centralize QMP event emissions

There's no reason to leave this to blockdev; we can do it in blockjobs
directly and get rid of an extra callback for most users.

All non-internal events, even those created outside of QMP, will
consistently emit events.

Signed-off-by: John Snow <jsnow@redhat.com>
Reviewed-by: Kevin Wolf <kwolf@redhat.com>
Reviewed-by: Jeff Cody <jcody@redhat.com>
Message-id: 1477584421-1399-5-git-send-email-jsnow@redhat.com
Signed-off-by: Jeff Cody <jcody@redhat.com>
This commit is contained in:
John Snow 2016-10-27 12:06:58 -04:00 committed by Jeff Cody
parent 47970dfb0a
commit 8254b6d953
8 changed files with 42 additions and 83 deletions

View File

@ -209,8 +209,8 @@ static const BlockJobDriver commit_job_driver = {
void commit_start(const char *job_id, BlockDriverState *bs, void commit_start(const char *job_id, BlockDriverState *bs,
BlockDriverState *base, BlockDriverState *top, int64_t speed, BlockDriverState *base, BlockDriverState *top, int64_t speed,
BlockdevOnError on_error, BlockCompletionFunc *cb, BlockdevOnError on_error, const char *backing_file_str,
void *opaque, const char *backing_file_str, Error **errp) Error **errp)
{ {
CommitBlockJob *s; CommitBlockJob *s;
BlockReopenQueue *reopen_queue = NULL; BlockReopenQueue *reopen_queue = NULL;
@ -234,7 +234,7 @@ void commit_start(const char *job_id, BlockDriverState *bs,
} }
s = block_job_create(job_id, &commit_job_driver, bs, speed, s = block_job_create(job_id, &commit_job_driver, bs, speed,
BLOCK_JOB_DEFAULT, cb, opaque, errp); BLOCK_JOB_DEFAULT, NULL, NULL, errp);
if (!s) { if (!s) {
return; return;
} }
@ -290,7 +290,7 @@ void commit_start(const char *job_id, BlockDriverState *bs,
s->on_error = on_error; s->on_error = on_error;
s->common.co = qemu_coroutine_create(commit_run, s); s->common.co = qemu_coroutine_create(commit_run, s);
trace_commit_start(bs, base, top, s, s->common.co, opaque); trace_commit_start(bs, base, top, s, s->common.co);
qemu_coroutine_enter(s->common.co); qemu_coroutine_enter(s->common.co);
} }

View File

@ -1018,9 +1018,7 @@ void mirror_start(const char *job_id, BlockDriverState *bs,
MirrorSyncMode mode, BlockMirrorBackingMode backing_mode, MirrorSyncMode mode, BlockMirrorBackingMode backing_mode,
BlockdevOnError on_source_error, BlockdevOnError on_source_error,
BlockdevOnError on_target_error, BlockdevOnError on_target_error,
bool unmap, bool unmap, Error **errp)
BlockCompletionFunc *cb,
void *opaque, Error **errp)
{ {
bool is_none_mode; bool is_none_mode;
BlockDriverState *base; BlockDriverState *base;
@ -1033,7 +1031,7 @@ void mirror_start(const char *job_id, BlockDriverState *bs,
base = mode == MIRROR_SYNC_MODE_TOP ? backing_bs(bs) : NULL; base = mode == MIRROR_SYNC_MODE_TOP ? backing_bs(bs) : NULL;
mirror_start_job(job_id, bs, BLOCK_JOB_DEFAULT, target, replaces, mirror_start_job(job_id, bs, BLOCK_JOB_DEFAULT, target, replaces,
speed, granularity, buf_size, backing_mode, speed, granularity, buf_size, backing_mode,
on_source_error, on_target_error, unmap, cb, opaque, errp, on_source_error, on_target_error, unmap, NULL, NULL, errp,
&mirror_job_driver, is_none_mode, base, false); &mirror_job_driver, is_none_mode, base, false);
} }

View File

@ -222,15 +222,14 @@ static const BlockJobDriver stream_job_driver = {
void stream_start(const char *job_id, BlockDriverState *bs, void stream_start(const char *job_id, BlockDriverState *bs,
BlockDriverState *base, const char *backing_file_str, BlockDriverState *base, const char *backing_file_str,
int64_t speed, BlockdevOnError on_error, int64_t speed, BlockdevOnError on_error, Error **errp)
BlockCompletionFunc *cb, void *opaque, Error **errp)
{ {
StreamBlockJob *s; StreamBlockJob *s;
BlockDriverState *iter; BlockDriverState *iter;
int orig_bs_flags; int orig_bs_flags;
s = block_job_create(job_id, &stream_job_driver, bs, speed, s = block_job_create(job_id, &stream_job_driver, bs, speed,
BLOCK_JOB_DEFAULT, cb, opaque, errp); BLOCK_JOB_DEFAULT, NULL, NULL, errp);
if (!s) { if (!s) {
return; return;
} }
@ -256,6 +255,6 @@ void stream_start(const char *job_id, BlockDriverState *bs,
s->on_error = on_error; s->on_error = on_error;
s->common.co = qemu_coroutine_create(stream_run, s); s->common.co = qemu_coroutine_create(stream_run, s);
trace_stream_start(bs, base, s, s->common.co, opaque); trace_stream_start(bs, base, s, s->common.co);
qemu_coroutine_enter(s->common.co); qemu_coroutine_enter(s->common.co);
} }

View File

@ -19,11 +19,11 @@ bdrv_co_do_copy_on_readv(void *bs, int64_t offset, unsigned int bytes, int64_t c
# block/stream.c # block/stream.c
stream_one_iteration(void *s, int64_t sector_num, int nb_sectors, int is_allocated) "s %p sector_num %"PRId64" nb_sectors %d is_allocated %d" stream_one_iteration(void *s, int64_t sector_num, int nb_sectors, int is_allocated) "s %p sector_num %"PRId64" nb_sectors %d is_allocated %d"
stream_start(void *bs, void *base, void *s, void *co, void *opaque) "bs %p base %p s %p co %p opaque %p" stream_start(void *bs, void *base, void *s, void *co) "bs %p base %p s %p co %p"
# block/commit.c # block/commit.c
commit_one_iteration(void *s, int64_t sector_num, int nb_sectors, int is_allocated) "s %p sector_num %"PRId64" nb_sectors %d is_allocated %d" commit_one_iteration(void *s, int64_t sector_num, int nb_sectors, int is_allocated) "s %p sector_num %"PRId64" nb_sectors %d is_allocated %d"
commit_start(void *bs, void *base, void *top, void *s, void *co, void *opaque) "bs %p base %p top %p s %p co %p opaque %p" commit_start(void *bs, void *base, void *top, void *s, void *co) "bs %p base %p top %p s %p co %p"
# block/mirror.c # block/mirror.c
mirror_start(void *bs, void *s, void *co, void *opaque) "bs %p s %p co %p opaque %p" mirror_start(void *bs, void *s, void *co, void *opaque) "bs %p s %p co %p opaque %p"
@ -51,7 +51,6 @@ qmp_block_job_cancel(void *job) "job %p"
qmp_block_job_pause(void *job) "job %p" qmp_block_job_pause(void *job) "job %p"
qmp_block_job_resume(void *job) "job %p" qmp_block_job_resume(void *job) "job %p"
qmp_block_job_complete(void *job) "job %p" qmp_block_job_complete(void *job) "job %p"
block_job_cb(void *bs, void *job, int ret) "bs %p job %p ret %d"
qmp_block_stream(void *bs, void *job) "bs %p job %p" qmp_block_stream(void *bs, void *job) "bs %p job %p"
# block/raw-win32.c # block/raw-win32.c

View File

@ -2905,31 +2905,6 @@ out:
aio_context_release(aio_context); aio_context_release(aio_context);
} }
static void block_job_cb(void *opaque, int ret)
{
/* Note that this function may be executed from another AioContext besides
* the QEMU main loop. If you need to access anything that assumes the
* QEMU global mutex, use a BH or introduce a mutex.
*/
BlockDriverState *bs = opaque;
const char *msg = NULL;
trace_block_job_cb(bs, bs->job, ret);
assert(bs->job);
if (ret < 0) {
msg = strerror(-ret);
}
if (block_job_is_cancelled(bs->job)) {
block_job_event_cancelled(bs->job);
} else {
block_job_event_completed(bs->job, msg);
}
}
void qmp_block_stream(bool has_job_id, const char *job_id, const char *device, void qmp_block_stream(bool has_job_id, const char *job_id, const char *device,
bool has_base, const char *base, bool has_base, const char *base,
bool has_base_node, const char *base_node, bool has_base_node, const char *base_node,
@ -3005,7 +2980,7 @@ void qmp_block_stream(bool has_job_id, const char *job_id, const char *device,
base_name = has_backing_file ? backing_file : base_name; base_name = has_backing_file ? backing_file : base_name;
stream_start(has_job_id ? job_id : NULL, bs, base_bs, base_name, stream_start(has_job_id ? job_id : NULL, bs, base_bs, base_name,
has_speed ? speed : 0, on_error, block_job_cb, bs, &local_err); has_speed ? speed : 0, on_error, &local_err);
if (local_err) { if (local_err) {
error_propagate(errp, local_err); error_propagate(errp, local_err);
goto out; goto out;
@ -3111,16 +3086,16 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
goto out; goto out;
} }
commit_active_start(has_job_id ? job_id : NULL, bs, base_bs, commit_active_start(has_job_id ? job_id : NULL, bs, base_bs,
BLOCK_JOB_DEFAULT, speed, on_error, block_job_cb, BLOCK_JOB_DEFAULT, speed, on_error, NULL, NULL,
bs, &local_err, false); &local_err, false);
} else { } else {
BlockDriverState *overlay_bs = bdrv_find_overlay(bs, top_bs); BlockDriverState *overlay_bs = bdrv_find_overlay(bs, top_bs);
if (bdrv_op_is_blocked(overlay_bs, BLOCK_OP_TYPE_COMMIT_TARGET, errp)) { if (bdrv_op_is_blocked(overlay_bs, BLOCK_OP_TYPE_COMMIT_TARGET, errp)) {
goto out; goto out;
} }
commit_start(has_job_id ? job_id : NULL, bs, base_bs, top_bs, speed, commit_start(has_job_id ? job_id : NULL, bs, base_bs, top_bs, speed,
on_error, block_job_cb, bs, on_error, has_backing_file ? backing_file : NULL,
has_backing_file ? backing_file : NULL, &local_err); &local_err);
} }
if (local_err != NULL) { if (local_err != NULL) {
error_propagate(errp, local_err); error_propagate(errp, local_err);
@ -3241,7 +3216,7 @@ static void do_drive_backup(DriveBackup *backup, BlockJobTxn *txn, Error **errp)
backup_start(backup->job_id, bs, target_bs, backup->speed, backup->sync, backup_start(backup->job_id, bs, target_bs, backup->speed, backup->sync,
bmap, backup->compress, backup->on_source_error, bmap, backup->compress, backup->on_source_error,
backup->on_target_error, BLOCK_JOB_DEFAULT, backup->on_target_error, BLOCK_JOB_DEFAULT,
block_job_cb, bs, txn, &local_err); NULL, NULL, txn, &local_err);
bdrv_unref(target_bs); bdrv_unref(target_bs);
if (local_err != NULL) { if (local_err != NULL) {
error_propagate(errp, local_err); error_propagate(errp, local_err);
@ -3312,7 +3287,7 @@ void do_blockdev_backup(BlockdevBackup *backup, BlockJobTxn *txn, Error **errp)
backup_start(backup->job_id, bs, target_bs, backup->speed, backup->sync, backup_start(backup->job_id, bs, target_bs, backup->speed, backup->sync,
NULL, backup->compress, backup->on_source_error, NULL, backup->compress, backup->on_source_error,
backup->on_target_error, BLOCK_JOB_DEFAULT, backup->on_target_error, BLOCK_JOB_DEFAULT,
block_job_cb, bs, txn, &local_err); NULL, NULL, txn, &local_err);
if (local_err != NULL) { if (local_err != NULL) {
error_propagate(errp, local_err); error_propagate(errp, local_err);
} }
@ -3391,8 +3366,7 @@ static void blockdev_mirror_common(const char *job_id, BlockDriverState *bs,
mirror_start(job_id, bs, target, mirror_start(job_id, bs, target,
has_replaces ? replaces : NULL, has_replaces ? replaces : NULL,
speed, granularity, buf_size, sync, backing_mode, speed, granularity, buf_size, sync, backing_mode,
on_source_error, on_target_error, unmap, on_source_error, on_target_error, unmap, errp);
block_job_cb, bs, errp);
} }
void qmp_drive_mirror(DriveMirror *arg, Error **errp) void qmp_drive_mirror(DriveMirror *arg, Error **errp)

View File

@ -38,6 +38,9 @@
#include "qemu/timer.h" #include "qemu/timer.h"
#include "qapi-event.h" #include "qapi-event.h"
static void block_job_event_cancelled(BlockJob *job);
static void block_job_event_completed(BlockJob *job, const char *msg);
/* Transactional group of block jobs */ /* Transactional group of block jobs */
struct BlockJobTxn { struct BlockJobTxn {
@ -127,7 +130,6 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
BlockBackend *blk; BlockBackend *blk;
BlockJob *job; BlockJob *job;
assert(cb);
if (bs->job) { if (bs->job) {
error_setg(errp, QERR_DEVICE_IN_USE, bdrv_get_device_name(bs)); error_setg(errp, QERR_DEVICE_IN_USE, bdrv_get_device_name(bs));
return NULL; return NULL;
@ -239,7 +241,20 @@ static void block_job_completed_single(BlockJob *job)
job->driver->abort(job); job->driver->abort(job);
} }
} }
job->cb(job->opaque, job->ret);
if (job->cb) {
job->cb(job->opaque, job->ret);
}
if (block_job_is_cancelled(job)) {
block_job_event_cancelled(job);
} else {
const char *msg = NULL;
if (job->ret < 0) {
msg = strerror(-job->ret);
}
block_job_event_completed(job, msg);
}
if (job->txn) { if (job->txn) {
block_job_txn_unref(job->txn); block_job_txn_unref(job->txn);
} }
@ -553,7 +568,7 @@ static void block_job_iostatus_set_err(BlockJob *job, int error)
} }
} }
void block_job_event_cancelled(BlockJob *job) static void block_job_event_cancelled(BlockJob *job)
{ {
if (block_job_is_internal(job)) { if (block_job_is_internal(job)) {
return; return;
@ -567,7 +582,7 @@ void block_job_event_cancelled(BlockJob *job)
&error_abort); &error_abort);
} }
void block_job_event_completed(BlockJob *job, const char *msg) static void block_job_event_completed(BlockJob *job, const char *msg)
{ {
if (block_job_is_internal(job)) { if (block_job_is_internal(job)) {
return; return;

View File

@ -665,8 +665,6 @@ int is_windows_drive(const char *filename);
* the new backing file if the job completes. Ignored if @base is %NULL. * the new backing file if the job completes. Ignored if @base is %NULL.
* @speed: The maximum speed, in bytes per second, or 0 for unlimited. * @speed: The maximum speed, in bytes per second, or 0 for unlimited.
* @on_error: The action to take upon error. * @on_error: The action to take upon error.
* @cb: Completion function for the job.
* @opaque: Opaque pointer value passed to @cb.
* @errp: Error object. * @errp: Error object.
* *
* Start a streaming operation on @bs. Clusters that are unallocated * Start a streaming operation on @bs. Clusters that are unallocated
@ -678,8 +676,7 @@ int is_windows_drive(const char *filename);
*/ */
void stream_start(const char *job_id, BlockDriverState *bs, void stream_start(const char *job_id, BlockDriverState *bs,
BlockDriverState *base, const char *backing_file_str, BlockDriverState *base, const char *backing_file_str,
int64_t speed, BlockdevOnError on_error, int64_t speed, BlockdevOnError on_error, Error **errp);
BlockCompletionFunc *cb, void *opaque, Error **errp);
/** /**
* commit_start: * commit_start:
@ -690,16 +687,14 @@ void stream_start(const char *job_id, BlockDriverState *bs,
* @base: Block device that will be written into, and become the new top. * @base: Block device that will be written into, and become the new top.
* @speed: The maximum speed, in bytes per second, or 0 for unlimited. * @speed: The maximum speed, in bytes per second, or 0 for unlimited.
* @on_error: The action to take upon error. * @on_error: The action to take upon error.
* @cb: Completion function for the job.
* @opaque: Opaque pointer value passed to @cb.
* @backing_file_str: String to use as the backing file in @top's overlay * @backing_file_str: String to use as the backing file in @top's overlay
* @errp: Error object. * @errp: Error object.
* *
*/ */
void commit_start(const char *job_id, BlockDriverState *bs, void commit_start(const char *job_id, BlockDriverState *bs,
BlockDriverState *base, BlockDriverState *top, int64_t speed, BlockDriverState *base, BlockDriverState *top, int64_t speed,
BlockdevOnError on_error, BlockCompletionFunc *cb, BlockdevOnError on_error, const char *backing_file_str,
void *opaque, const char *backing_file_str, Error **errp); Error **errp);
/** /**
* commit_active_start: * commit_active_start:
* @job_id: The id of the newly-created job, or %NULL to use the * @job_id: The id of the newly-created job, or %NULL to use the
@ -737,8 +732,6 @@ void commit_active_start(const char *job_id, BlockDriverState *bs,
* @on_source_error: The action to take upon error reading from the source. * @on_source_error: The action to take upon error reading from the source.
* @on_target_error: The action to take upon error writing to the target. * @on_target_error: The action to take upon error writing to the target.
* @unmap: Whether to unmap target where source sectors only contain zeroes. * @unmap: Whether to unmap target where source sectors only contain zeroes.
* @cb: Completion function for the job.
* @opaque: Opaque pointer value passed to @cb.
* @errp: Error object. * @errp: Error object.
* *
* Start a mirroring operation on @bs. Clusters that are allocated * Start a mirroring operation on @bs. Clusters that are allocated
@ -752,9 +745,7 @@ void mirror_start(const char *job_id, BlockDriverState *bs,
MirrorSyncMode mode, BlockMirrorBackingMode backing_mode, MirrorSyncMode mode, BlockMirrorBackingMode backing_mode,
BlockdevOnError on_source_error, BlockdevOnError on_source_error,
BlockdevOnError on_target_error, BlockdevOnError on_target_error,
bool unmap, bool unmap, Error **errp);
BlockCompletionFunc *cb,
void *opaque, Error **errp);
/* /*
* backup_start: * backup_start:

View File

@ -394,23 +394,6 @@ void block_job_resume(BlockJob *job);
*/ */
void block_job_enter(BlockJob *job); void block_job_enter(BlockJob *job);
/**
* block_job_event_cancelled:
* @job: The job whose information is requested.
*
* Send a BLOCK_JOB_CANCELLED event for the specified job.
*/
void block_job_event_cancelled(BlockJob *job);
/**
* block_job_ready:
* @job: The job which is now ready to complete.
* @msg: Error message. Only present on failure.
*
* Send a BLOCK_JOB_COMPLETED event for the specified job.
*/
void block_job_event_completed(BlockJob *job, const char *msg);
/** /**
* block_job_ready: * block_job_ready:
* @job: The job which is now ready to complete. * @job: The job which is now ready to complete.