Block patches:

- (Block) job exit refactoring, part 1
   (removing job_defer_to_main_loop())
 - test-bdrv-drain leak fix
 -----BEGIN PGP SIGNATURE-----
 
 iQEcBAABAgAGBQJbiVEJAAoJEPQH2wBh1c9AHb4H/0P6yjozmu8J6cQhfXmFsJQk
 72Y6Lu9w7kNL43dEBkAU3vzDPUkzHRwpO5pLPKqQh0ojCz45KfTMozh/iMoJtuKP
 Hev4ZlRlFpcr0NHLQnysxsgV7FYbDEVS9xdQ6KlgFXyDBLgZVGykjq67kwDtXfnp
 eQof9Nf0T+m3bNJey6C43l4YqPzPIUCfoSgCqkoB1W6QGtfglGx8I4evjjgxv7GT
 s8IzBg7WSi7h8+mouZcXOs8/w7nJNeSSbMb921NXCWXCzIVHLpw5SImDiQfxEvcy
 pnBtVttty6pAmQvOC6GphqHPNIeRYLTIxkEzBxZUAePonsEa9zw33pjBGdWtSPU=
 =60m9
 -----END PGP SIGNATURE-----

Merge remote-tracking branch 'remotes/xanclic/tags/pull-block-2018-08-31-v2' into staging

Block patches:
- (Block) job exit refactoring, part 1
  (removing job_defer_to_main_loop())
- test-bdrv-drain leak fix

# gpg: Signature made Fri 31 Aug 2018 15:30:33 BST
# gpg:                using RSA key F407DB0061D5CF40
# gpg: Good signature from "Max Reitz <mreitz@redhat.com>"
# Primary key fingerprint: 91BE B60A 30DB 3E88 57D1  1829 F407 DB00 61D5 CF40

* remotes/xanclic/tags/pull-block-2018-08-31-v2:
  jobs: remove job_defer_to_main_loop
  jobs: remove ret argument to job_completed; privatize it
  block/backup: make function variables consistently named
  jobs: utilize job_exit shim
  block/mirror: utilize job_exit shim
  block/commit: utilize job_exit shim
  jobs: add exit shim
  jobs: canonize Error object
  jobs: change start callback to run callback
  tests: fix bdrv-drain leak

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
This commit is contained in:
Peter Maydell 2018-09-24 14:35:58 +01:00
commit d6f71af654
12 changed files with 161 additions and 242 deletions

View File

@ -380,18 +380,6 @@ static BlockErrorAction backup_error_action(BackupBlockJob *job,
} }
} }
typedef struct {
int ret;
} BackupCompleteData;
static void backup_complete(Job *job, void *opaque)
{
BackupCompleteData *data = opaque;
job_completed(job, data->ret, NULL);
g_free(data);
}
static bool coroutine_fn yield_and_check(BackupBlockJob *job) static bool coroutine_fn yield_and_check(BackupBlockJob *job)
{ {
uint64_t delay_ns; uint64_t delay_ns;
@ -480,60 +468,59 @@ static void backup_incremental_init_copy_bitmap(BackupBlockJob *job)
bdrv_dirty_iter_free(dbi); bdrv_dirty_iter_free(dbi);
} }
static void coroutine_fn backup_run(void *opaque) static int coroutine_fn backup_run(Job *job, Error **errp)
{ {
BackupBlockJob *job = opaque; BackupBlockJob *s = container_of(job, BackupBlockJob, common.job);
BackupCompleteData *data; BlockDriverState *bs = blk_bs(s->common.blk);
BlockDriverState *bs = blk_bs(job->common.blk);
int64_t offset, nb_clusters; int64_t offset, nb_clusters;
int ret = 0; int ret = 0;
QLIST_INIT(&job->inflight_reqs); QLIST_INIT(&s->inflight_reqs);
qemu_co_rwlock_init(&job->flush_rwlock); qemu_co_rwlock_init(&s->flush_rwlock);
nb_clusters = DIV_ROUND_UP(job->len, job->cluster_size); nb_clusters = DIV_ROUND_UP(s->len, s->cluster_size);
job_progress_set_remaining(&job->common.job, job->len); job_progress_set_remaining(job, s->len);
job->copy_bitmap = hbitmap_alloc(nb_clusters, 0); s->copy_bitmap = hbitmap_alloc(nb_clusters, 0);
if (job->sync_mode == MIRROR_SYNC_MODE_INCREMENTAL) { if (s->sync_mode == MIRROR_SYNC_MODE_INCREMENTAL) {
backup_incremental_init_copy_bitmap(job); backup_incremental_init_copy_bitmap(s);
} else { } else {
hbitmap_set(job->copy_bitmap, 0, nb_clusters); hbitmap_set(s->copy_bitmap, 0, nb_clusters);
} }
job->before_write.notify = backup_before_write_notify; s->before_write.notify = backup_before_write_notify;
bdrv_add_before_write_notifier(bs, &job->before_write); bdrv_add_before_write_notifier(bs, &s->before_write);
if (job->sync_mode == MIRROR_SYNC_MODE_NONE) { if (s->sync_mode == MIRROR_SYNC_MODE_NONE) {
/* All bits are set in copy_bitmap to allow any cluster to be copied. /* All bits are set in copy_bitmap to allow any cluster to be copied.
* This does not actually require them to be copied. */ * This does not actually require them to be copied. */
while (!job_is_cancelled(&job->common.job)) { while (!job_is_cancelled(job)) {
/* Yield until the job is cancelled. We just let our before_write /* Yield until the job is cancelled. We just let our before_write
* notify callback service CoW requests. */ * notify callback service CoW requests. */
job_yield(&job->common.job); job_yield(job);
} }
} else if (job->sync_mode == MIRROR_SYNC_MODE_INCREMENTAL) { } else if (s->sync_mode == MIRROR_SYNC_MODE_INCREMENTAL) {
ret = backup_run_incremental(job); ret = backup_run_incremental(s);
} else { } else {
/* Both FULL and TOP SYNC_MODE's require copying.. */ /* Both FULL and TOP SYNC_MODE's require copying.. */
for (offset = 0; offset < job->len; for (offset = 0; offset < s->len;
offset += job->cluster_size) { offset += s->cluster_size) {
bool error_is_read; bool error_is_read;
int alloced = 0; int alloced = 0;
if (yield_and_check(job)) { if (yield_and_check(s)) {
break; break;
} }
if (job->sync_mode == MIRROR_SYNC_MODE_TOP) { if (s->sync_mode == MIRROR_SYNC_MODE_TOP) {
int i; int i;
int64_t n; int64_t n;
/* Check to see if these blocks are already in the /* Check to see if these blocks are already in the
* backing file. */ * backing file. */
for (i = 0; i < job->cluster_size;) { for (i = 0; i < s->cluster_size;) {
/* bdrv_is_allocated() only returns true/false based /* bdrv_is_allocated() only returns true/false based
* on the first set of sectors it comes across that * on the first set of sectors it comes across that
* are are all in the same state. * are are all in the same state.
@ -542,7 +529,7 @@ static void coroutine_fn backup_run(void *opaque)
* needed but at some point that is always the case. */ * needed but at some point that is always the case. */
alloced = alloced =
bdrv_is_allocated(bs, offset + i, bdrv_is_allocated(bs, offset + i,
job->cluster_size - i, &n); s->cluster_size - i, &n);
i += n; i += n;
if (alloced || n == 0) { if (alloced || n == 0) {
@ -560,33 +547,31 @@ static void coroutine_fn backup_run(void *opaque)
if (alloced < 0) { if (alloced < 0) {
ret = alloced; ret = alloced;
} else { } else {
ret = backup_do_cow(job, offset, job->cluster_size, ret = backup_do_cow(s, offset, s->cluster_size,
&error_is_read, false); &error_is_read, false);
} }
if (ret < 0) { if (ret < 0) {
/* Depending on error action, fail now or retry cluster */ /* Depending on error action, fail now or retry cluster */
BlockErrorAction action = BlockErrorAction action =
backup_error_action(job, error_is_read, -ret); backup_error_action(s, error_is_read, -ret);
if (action == BLOCK_ERROR_ACTION_REPORT) { if (action == BLOCK_ERROR_ACTION_REPORT) {
break; break;
} else { } else {
offset -= job->cluster_size; offset -= s->cluster_size;
continue; continue;
} }
} }
} }
} }
notifier_with_return_remove(&job->before_write); notifier_with_return_remove(&s->before_write);
/* wait until pending backup_do_cow() calls have completed */ /* wait until pending backup_do_cow() calls have completed */
qemu_co_rwlock_wrlock(&job->flush_rwlock); qemu_co_rwlock_wrlock(&s->flush_rwlock);
qemu_co_rwlock_unlock(&job->flush_rwlock); qemu_co_rwlock_unlock(&s->flush_rwlock);
hbitmap_free(job->copy_bitmap); hbitmap_free(s->copy_bitmap);
data = g_malloc(sizeof(*data)); return ret;
data->ret = ret;
job_defer_to_main_loop(&job->common.job, backup_complete, data);
} }
static const BlockJobDriver backup_job_driver = { static const BlockJobDriver backup_job_driver = {
@ -596,7 +581,7 @@ static const BlockJobDriver backup_job_driver = {
.free = block_job_free, .free = block_job_free,
.user_resume = block_job_user_resume, .user_resume = block_job_user_resume,
.drain = block_job_drain, .drain = block_job_drain,
.start = backup_run, .run = backup_run,
.commit = backup_commit, .commit = backup_commit,
.abort = backup_abort, .abort = backup_abort,
.clean = backup_clean, .clean = backup_clean,

View File

@ -68,19 +68,13 @@ static int coroutine_fn commit_populate(BlockBackend *bs, BlockBackend *base,
return 0; return 0;
} }
typedef struct { static void commit_exit(Job *job)
int ret;
} CommitCompleteData;
static void commit_complete(Job *job, void *opaque)
{ {
CommitBlockJob *s = container_of(job, CommitBlockJob, common.job); CommitBlockJob *s = container_of(job, CommitBlockJob, common.job);
BlockJob *bjob = &s->common; BlockJob *bjob = &s->common;
CommitCompleteData *data = opaque;
BlockDriverState *top = blk_bs(s->top); BlockDriverState *top = blk_bs(s->top);
BlockDriverState *base = blk_bs(s->base); BlockDriverState *base = blk_bs(s->base);
BlockDriverState *commit_top_bs = s->commit_top_bs; BlockDriverState *commit_top_bs = s->commit_top_bs;
int ret = data->ret;
bool remove_commit_top_bs = false; bool remove_commit_top_bs = false;
/* Make sure commit_top_bs and top stay around until bdrv_replace_node() */ /* Make sure commit_top_bs and top stay around until bdrv_replace_node() */
@ -91,10 +85,10 @@ static void commit_complete(Job *job, void *opaque)
* the normal backing chain can be restored. */ * the normal backing chain can be restored. */
blk_unref(s->base); blk_unref(s->base);
if (!job_is_cancelled(job) && ret == 0) { if (!job_is_cancelled(job) && job->ret == 0) {
/* success */ /* success */
ret = bdrv_drop_intermediate(s->commit_top_bs, base, job->ret = bdrv_drop_intermediate(s->commit_top_bs, base,
s->backing_file_str); s->backing_file_str);
} else { } else {
/* XXX Can (or should) we somehow keep 'consistent read' blocked even /* XXX Can (or should) we somehow keep 'consistent read' blocked even
* after the failed/cancelled commit job is gone? If we already wrote * after the failed/cancelled commit job is gone? If we already wrote
@ -117,9 +111,6 @@ static void commit_complete(Job *job, void *opaque)
* bdrv_set_backing_hd() to fail. */ * bdrv_set_backing_hd() to fail. */
block_job_remove_all_bdrv(bjob); block_job_remove_all_bdrv(bjob);
job_completed(job, ret, NULL);
g_free(data);
/* If bdrv_drop_intermediate() didn't already do that, remove the commit /* If bdrv_drop_intermediate() didn't already do that, remove the commit
* filter driver from the backing chain. Do this as the final step so that * filter driver from the backing chain. Do this as the final step so that
* the 'consistent read' permission can be granted. */ * the 'consistent read' permission can be granted. */
@ -134,10 +125,9 @@ static void commit_complete(Job *job, void *opaque)
bdrv_unref(top); bdrv_unref(top);
} }
static void coroutine_fn commit_run(void *opaque) static int coroutine_fn commit_run(Job *job, Error **errp)
{ {
CommitBlockJob *s = opaque; CommitBlockJob *s = container_of(job, CommitBlockJob, common.job);
CommitCompleteData *data;
int64_t offset; int64_t offset;
uint64_t delay_ns = 0; uint64_t delay_ns = 0;
int ret = 0; int ret = 0;
@ -210,9 +200,7 @@ static void coroutine_fn commit_run(void *opaque)
out: out:
qemu_vfree(buf); qemu_vfree(buf);
data = g_malloc(sizeof(*data)); return ret;
data->ret = ret;
job_defer_to_main_loop(&s->common.job, commit_complete, data);
} }
static const BlockJobDriver commit_job_driver = { static const BlockJobDriver commit_job_driver = {
@ -222,7 +210,8 @@ static const BlockJobDriver commit_job_driver = {
.free = block_job_free, .free = block_job_free,
.user_resume = block_job_user_resume, .user_resume = block_job_user_resume,
.drain = block_job_drain, .drain = block_job_drain,
.start = commit_run, .run = commit_run,
.exit = commit_exit,
}, },
}; };

View File

@ -34,33 +34,26 @@ typedef struct BlockdevCreateJob {
Job common; Job common;
BlockDriver *drv; BlockDriver *drv;
BlockdevCreateOptions *opts; BlockdevCreateOptions *opts;
int ret;
Error *err;
} BlockdevCreateJob; } BlockdevCreateJob;
static void blockdev_create_complete(Job *job, void *opaque) static int coroutine_fn blockdev_create_run(Job *job, Error **errp)
{ {
BlockdevCreateJob *s = container_of(job, BlockdevCreateJob, common); BlockdevCreateJob *s = container_of(job, BlockdevCreateJob, common);
int ret;
job_completed(job, s->ret, s->err);
}
static void coroutine_fn blockdev_create_run(void *opaque)
{
BlockdevCreateJob *s = opaque;
job_progress_set_remaining(&s->common, 1); job_progress_set_remaining(&s->common, 1);
s->ret = s->drv->bdrv_co_create(s->opts, &s->err); ret = s->drv->bdrv_co_create(s->opts, errp);
job_progress_update(&s->common, 1); job_progress_update(&s->common, 1);
qapi_free_BlockdevCreateOptions(s->opts); qapi_free_BlockdevCreateOptions(s->opts);
job_defer_to_main_loop(&s->common, blockdev_create_complete, NULL);
return ret;
} }
static const JobDriver blockdev_create_job_driver = { static const JobDriver blockdev_create_job_driver = {
.instance_size = sizeof(BlockdevCreateJob), .instance_size = sizeof(BlockdevCreateJob),
.job_type = JOB_TYPE_CREATE, .job_type = JOB_TYPE_CREATE,
.start = blockdev_create_run, .run = blockdev_create_run,
}; };
void qmp_blockdev_create(const char *job_id, BlockdevCreateOptions *options, void qmp_blockdev_create(const char *job_id, BlockdevCreateOptions *options,

View File

@ -607,26 +607,22 @@ static void mirror_wait_for_all_io(MirrorBlockJob *s)
} }
} }
typedef struct { static void mirror_exit(Job *job)
int ret;
} MirrorExitData;
static void mirror_exit(Job *job, void *opaque)
{ {
MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job); MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
BlockJob *bjob = &s->common; BlockJob *bjob = &s->common;
MirrorExitData *data = opaque;
MirrorBDSOpaque *bs_opaque = s->mirror_top_bs->opaque; MirrorBDSOpaque *bs_opaque = s->mirror_top_bs->opaque;
AioContext *replace_aio_context = NULL; AioContext *replace_aio_context = NULL;
BlockDriverState *src = s->mirror_top_bs->backing->bs; BlockDriverState *src = s->mirror_top_bs->backing->bs;
BlockDriverState *target_bs = blk_bs(s->target); BlockDriverState *target_bs = blk_bs(s->target);
BlockDriverState *mirror_top_bs = s->mirror_top_bs; BlockDriverState *mirror_top_bs = s->mirror_top_bs;
Error *local_err = NULL; Error *local_err = NULL;
int ret = job->ret;
bdrv_release_dirty_bitmap(src, s->dirty_bitmap); bdrv_release_dirty_bitmap(src, s->dirty_bitmap);
/* Make sure that the source BDS doesn't go away before we called /* Make sure that the source BDS doesn't go away during bdrv_replace_node,
* job_completed(). */ * before we can call bdrv_drained_end */
bdrv_ref(src); bdrv_ref(src);
bdrv_ref(mirror_top_bs); bdrv_ref(mirror_top_bs);
bdrv_ref(target_bs); bdrv_ref(target_bs);
@ -652,7 +648,7 @@ static void mirror_exit(Job *job, void *opaque)
bdrv_set_backing_hd(target_bs, backing, &local_err); bdrv_set_backing_hd(target_bs, backing, &local_err);
if (local_err) { if (local_err) {
error_report_err(local_err); error_report_err(local_err);
data->ret = -EPERM; ret = -EPERM;
} }
} }
} }
@ -662,7 +658,7 @@ static void mirror_exit(Job *job, void *opaque)
aio_context_acquire(replace_aio_context); aio_context_acquire(replace_aio_context);
} }
if (s->should_complete && data->ret == 0) { if (s->should_complete && ret == 0) {
BlockDriverState *to_replace = src; BlockDriverState *to_replace = src;
if (s->to_replace) { if (s->to_replace) {
to_replace = s->to_replace; to_replace = s->to_replace;
@ -679,7 +675,7 @@ static void mirror_exit(Job *job, void *opaque)
bdrv_drained_end(target_bs); bdrv_drained_end(target_bs);
if (local_err) { if (local_err) {
error_report_err(local_err); error_report_err(local_err);
data->ret = -EPERM; ret = -EPERM;
} }
} }
if (s->to_replace) { if (s->to_replace) {
@ -710,12 +706,12 @@ static void mirror_exit(Job *job, void *opaque)
blk_insert_bs(bjob->blk, mirror_top_bs, &error_abort); blk_insert_bs(bjob->blk, mirror_top_bs, &error_abort);
bs_opaque->job = NULL; bs_opaque->job = NULL;
job_completed(job, data->ret, NULL);
g_free(data);
bdrv_drained_end(src); bdrv_drained_end(src);
bdrv_unref(mirror_top_bs); bdrv_unref(mirror_top_bs);
bdrv_unref(src); bdrv_unref(src);
job->ret = ret;
} }
static void mirror_throttle(MirrorBlockJob *s) static void mirror_throttle(MirrorBlockJob *s)
@ -812,10 +808,9 @@ static int mirror_flush(MirrorBlockJob *s)
return ret; return ret;
} }
static void coroutine_fn mirror_run(void *opaque) static int coroutine_fn mirror_run(Job *job, Error **errp)
{ {
MirrorBlockJob *s = opaque; MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
MirrorExitData *data;
BlockDriverState *bs = s->mirror_top_bs->backing->bs; BlockDriverState *bs = s->mirror_top_bs->backing->bs;
BlockDriverState *target_bs = blk_bs(s->target); BlockDriverState *target_bs = blk_bs(s->target);
bool need_drain = true; bool need_drain = true;
@ -1035,13 +1030,11 @@ immediate_exit:
g_free(s->in_flight_bitmap); g_free(s->in_flight_bitmap);
bdrv_dirty_iter_free(s->dbi); bdrv_dirty_iter_free(s->dbi);
data = g_malloc(sizeof(*data));
data->ret = ret;
if (need_drain) { if (need_drain) {
bdrv_drained_begin(bs); bdrv_drained_begin(bs);
} }
job_defer_to_main_loop(&s->common.job, mirror_exit, data);
return ret;
} }
static void mirror_complete(Job *job, Error **errp) static void mirror_complete(Job *job, Error **errp)
@ -1138,7 +1131,8 @@ static const BlockJobDriver mirror_job_driver = {
.free = block_job_free, .free = block_job_free,
.user_resume = block_job_user_resume, .user_resume = block_job_user_resume,
.drain = block_job_drain, .drain = block_job_drain,
.start = mirror_run, .run = mirror_run,
.exit = mirror_exit,
.pause = mirror_pause, .pause = mirror_pause,
.complete = mirror_complete, .complete = mirror_complete,
}, },
@ -1154,7 +1148,8 @@ static const BlockJobDriver commit_active_job_driver = {
.free = block_job_free, .free = block_job_free,
.user_resume = block_job_user_resume, .user_resume = block_job_user_resume,
.drain = block_job_drain, .drain = block_job_drain,
.start = mirror_run, .run = mirror_run,
.exit = mirror_exit,
.pause = mirror_pause, .pause = mirror_pause,
.complete = mirror_complete, .complete = mirror_complete,
}, },

View File

@ -54,20 +54,16 @@ static int coroutine_fn stream_populate(BlockBackend *blk,
return blk_co_preadv(blk, offset, qiov.size, &qiov, BDRV_REQ_COPY_ON_READ); return blk_co_preadv(blk, offset, qiov.size, &qiov, BDRV_REQ_COPY_ON_READ);
} }
typedef struct { static void stream_exit(Job *job)
int ret;
} StreamCompleteData;
static void stream_complete(Job *job, void *opaque)
{ {
StreamBlockJob *s = container_of(job, StreamBlockJob, common.job); StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
BlockJob *bjob = &s->common; BlockJob *bjob = &s->common;
StreamCompleteData *data = opaque;
BlockDriverState *bs = blk_bs(bjob->blk); BlockDriverState *bs = blk_bs(bjob->blk);
BlockDriverState *base = s->base; BlockDriverState *base = s->base;
Error *local_err = NULL; Error *local_err = NULL;
int ret = job->ret;
if (!job_is_cancelled(job) && bs->backing && data->ret == 0) { if (!job_is_cancelled(job) && bs->backing && ret == 0) {
const char *base_id = NULL, *base_fmt = NULL; const char *base_id = NULL, *base_fmt = NULL;
if (base) { if (base) {
base_id = s->backing_file_str; base_id = s->backing_file_str;
@ -75,11 +71,11 @@ static void stream_complete(Job *job, void *opaque)
base_fmt = base->drv->format_name; base_fmt = base->drv->format_name;
} }
} }
data->ret = bdrv_change_backing_file(bs, base_id, base_fmt); ret = bdrv_change_backing_file(bs, base_id, base_fmt);
bdrv_set_backing_hd(bs, base, &local_err); bdrv_set_backing_hd(bs, base, &local_err);
if (local_err) { if (local_err) {
error_report_err(local_err); error_report_err(local_err);
data->ret = -EPERM; ret = -EPERM;
goto out; goto out;
} }
} }
@ -93,14 +89,12 @@ out:
} }
g_free(s->backing_file_str); g_free(s->backing_file_str);
job_completed(job, data->ret, NULL); job->ret = ret;
g_free(data);
} }
static void coroutine_fn stream_run(void *opaque) static int coroutine_fn stream_run(Job *job, Error **errp)
{ {
StreamBlockJob *s = opaque; StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
StreamCompleteData *data;
BlockBackend *blk = s->common.blk; BlockBackend *blk = s->common.blk;
BlockDriverState *bs = blk_bs(blk); BlockDriverState *bs = blk_bs(blk);
BlockDriverState *base = s->base; BlockDriverState *base = s->base;
@ -203,9 +197,7 @@ static void coroutine_fn stream_run(void *opaque)
out: out:
/* Modify backing chain and close BDSes in main loop */ /* Modify backing chain and close BDSes in main loop */
data = g_malloc(sizeof(*data)); return ret;
data->ret = ret;
job_defer_to_main_loop(&s->common.job, stream_complete, data);
} }
static const BlockJobDriver stream_job_driver = { static const BlockJobDriver stream_job_driver = {
@ -213,7 +205,8 @@ static const BlockJobDriver stream_job_driver = {
.instance_size = sizeof(StreamBlockJob), .instance_size = sizeof(StreamBlockJob),
.job_type = JOB_TYPE_STREAM, .job_type = JOB_TYPE_STREAM,
.free = block_job_free, .free = block_job_free,
.start = stream_run, .run = stream_run,
.exit = stream_exit,
.user_resume = block_job_user_resume, .user_resume = block_job_user_resume,
.drain = block_job_drain, .drain = block_job_drain,
}, },

View File

@ -124,12 +124,20 @@ typedef struct Job {
/** Estimated progress_current value at the completion of the job */ /** Estimated progress_current value at the completion of the job */
int64_t progress_total; int64_t progress_total;
/** Error string for a failed job (NULL if, and only if, job->ret == 0) */ /**
char *error; * Return code from @run and/or @prepare callback(s).
* Not final until the job has reached the CONCLUDED status.
/** ret code passed to job_completed. */ * 0 on success, -errno on failure.
*/
int ret; int ret;
/**
* Error object for a failed job.
* If job->ret is nonzero and an error object was not set, it will be set
* to strerror(-job->ret) during job_completed.
*/
Error *err;
/** The completion function that will be called when the job completes. */ /** The completion function that will be called when the job completes. */
BlockCompletionFunc *cb; BlockCompletionFunc *cb;
@ -168,8 +176,17 @@ struct JobDriver {
/** Enum describing the operation */ /** Enum describing the operation */
JobType job_type; JobType job_type;
/** Mandatory: Entrypoint for the Coroutine. */ /**
CoroutineEntry *start; * Mandatory: Entrypoint for the Coroutine.
*
* This callback will be invoked when moving from CREATED to RUNNING.
*
* If this callback returns nonzero, the job transaction it is part of is
* aborted. If it returns zero, the job moves into the WAITING state. If it
* is the last job to complete in its transaction, all jobs in the
* transaction move from WAITING to PENDING.
*/
int coroutine_fn (*run)(Job *job, Error **errp);
/** /**
* If the callback is not NULL, it will be invoked when the job transitions * If the callback is not NULL, it will be invoked when the job transitions
@ -204,6 +221,17 @@ struct JobDriver {
*/ */
void (*drain)(Job *job); void (*drain)(Job *job);
/**
* If the callback is not NULL, exit will be invoked from the main thread
* when the job's coroutine has finished, but before transactional
* convergence; before @prepare or @abort.
*
* FIXME TODO: This callback is only temporary to transition remaining jobs
* to prepare/commit/abort/clean callbacks and will be removed before 3.1.
* is released.
*/
void (*exit)(Job *job);
/** /**
* If the callback is not NULL, prepare will be invoked when all the jobs * If the callback is not NULL, prepare will be invoked when all the jobs
* belonging to the same transaction complete; or upon this job's completion * belonging to the same transaction complete; or upon this job's completion
@ -481,19 +509,6 @@ void job_early_fail(Job *job);
/** Moves the @job from RUNNING to READY */ /** Moves the @job from RUNNING to READY */
void job_transition_to_ready(Job *job); void job_transition_to_ready(Job *job);
/**
* @job: The job being completed.
* @ret: The status code.
* @error: The error message for a failing job (only with @ret < 0). If @ret is
* negative, but NULL is given for @error, strerror() is used.
*
* Marks @job as completed. If @ret is non-zero, the job transaction it is part
* of is aborted. If @ret is zero, the job moves into the WAITING state. If it
* is the last job to complete in its transaction, all jobs in the transaction
* move from WAITING to PENDING.
*/
void job_completed(Job *job, int ret, Error *error);
/** Asynchronously complete the specified @job. */ /** Asynchronously complete the specified @job. */
void job_complete(Job *job, Error **errp); void job_complete(Job *job, Error **errp);
@ -553,23 +568,6 @@ void job_finalize(Job *job, Error **errp);
*/ */
void job_dismiss(Job **job, Error **errp); void job_dismiss(Job **job, Error **errp);
typedef void JobDeferToMainLoopFn(Job *job, void *opaque);
/**
* @job: The job
* @fn: The function to run in the main loop
* @opaque: The opaque value that is passed to @fn
*
* This function must be called by the main job coroutine just before it
* returns. @fn is executed in the main loop with the job AioContext acquired.
*
* Block jobs must call bdrv_unref(), bdrv_close(), and anything that uses
* bdrv_drain_all() in the main loop.
*
* The @job AioContext is held while @fn executes.
*/
void job_defer_to_main_loop(Job *job, JobDeferToMainLoopFn *fn, void *opaque);
/** /**
* Synchronously finishes the given @job. If @finish is given, it is called to * Synchronously finishes the given @job. If @finish is given, it is called to
* trigger completion or cancellation of the job. * trigger completion or cancellation of the job.

View File

@ -146,8 +146,9 @@ static JobInfo *job_query_single(Job *job, Error **errp)
.status = job->status, .status = job->status,
.current_progress = job->progress_current, .current_progress = job->progress_current,
.total_progress = job->progress_total, .total_progress = job->progress_total,
.has_error = !!job->error, .has_error = !!job->err,
.error = g_strdup(job->error), .error = job->err ? \
g_strdup(error_get_pretty(job->err)) : NULL,
}; };
return info; return info;

73
job.c
View File

@ -369,7 +369,7 @@ void job_unref(Job *job)
QLIST_REMOVE(job, job_list); QLIST_REMOVE(job, job_list);
g_free(job->error); error_free(job->err);
g_free(job->id); g_free(job->id);
g_free(job); g_free(job);
} }
@ -535,6 +535,20 @@ void job_drain(Job *job)
} }
} }
static void job_completed(Job *job);
static void job_exit(void *opaque)
{
Job *job = (Job *)opaque;
AioContext *aio_context = job->aio_context;
if (job->driver->exit) {
aio_context_acquire(aio_context);
job->driver->exit(job);
aio_context_release(aio_context);
}
job_completed(job);
}
/** /**
* All jobs must allow a pause point before entering their job proper. This * All jobs must allow a pause point before entering their job proper. This
@ -544,16 +558,18 @@ static void coroutine_fn job_co_entry(void *opaque)
{ {
Job *job = opaque; Job *job = opaque;
assert(job && job->driver && job->driver->start); assert(job && job->driver && job->driver->run);
job_pause_point(job); job_pause_point(job);
job->driver->start(job); job->ret = job->driver->run(job, &job->err);
job->deferred_to_main_loop = true;
aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
} }
void job_start(Job *job) void job_start(Job *job)
{ {
assert(job && !job_started(job) && job->paused && assert(job && !job_started(job) && job->paused &&
job->driver && job->driver->start); job->driver && job->driver->run);
job->co = qemu_coroutine_create(job_co_entry, job); job->co = qemu_coroutine_create(job_co_entry, job);
job->pause_count--; job->pause_count--;
job->busy = true; job->busy = true;
@ -666,8 +682,8 @@ static void job_update_rc(Job *job)
job->ret = -ECANCELED; job->ret = -ECANCELED;
} }
if (job->ret) { if (job->ret) {
if (!job->error) { if (!job->err) {
job->error = g_strdup(strerror(-job->ret)); error_setg(&job->err, "%s", strerror(-job->ret));
} }
job_state_transition(job, JOB_STATUS_ABORTING); job_state_transition(job, JOB_STATUS_ABORTING);
} }
@ -865,19 +881,12 @@ static void job_completed_txn_success(Job *job)
} }
} }
void job_completed(Job *job, int ret, Error *error) static void job_completed(Job *job)
{ {
assert(job && job->txn && !job_is_completed(job)); assert(job && job->txn && !job_is_completed(job));
job->ret = ret;
if (error) {
assert(job->ret < 0);
job->error = g_strdup(error_get_pretty(error));
error_free(error);
}
job_update_rc(job); job_update_rc(job);
trace_job_completed(job, ret, job->ret); trace_job_completed(job, job->ret);
if (job->ret) { if (job->ret) {
job_completed_txn_abort(job); job_completed_txn_abort(job);
} else { } else {
@ -893,7 +902,7 @@ void job_cancel(Job *job, bool force)
} }
job_cancel_async(job, force); job_cancel_async(job, force);
if (!job_started(job)) { if (!job_started(job)) {
job_completed(job, -ECANCELED, NULL); job_completed(job);
} else if (job->deferred_to_main_loop) { } else if (job->deferred_to_main_loop) {
job_completed_txn_abort(job); job_completed_txn_abort(job);
} else { } else {
@ -956,38 +965,6 @@ void job_complete(Job *job, Error **errp)
job->driver->complete(job, errp); job->driver->complete(job, errp);
} }
typedef struct {
Job *job;
JobDeferToMainLoopFn *fn;
void *opaque;
} JobDeferToMainLoopData;
static void job_defer_to_main_loop_bh(void *opaque)
{
JobDeferToMainLoopData *data = opaque;
Job *job = data->job;
AioContext *aio_context = job->aio_context;
aio_context_acquire(aio_context);
data->fn(data->job, data->opaque);
aio_context_release(aio_context);
g_free(data);
}
void job_defer_to_main_loop(Job *job, JobDeferToMainLoopFn *fn, void *opaque)
{
JobDeferToMainLoopData *data = g_malloc(sizeof(*data));
data->job = job;
data->fn = fn;
data->opaque = opaque;
job->deferred_to_main_loop = true;
aio_bh_schedule_oneshot(qemu_get_aio_context(),
job_defer_to_main_loop_bh, data);
}
int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp) int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp)
{ {
Error *local_err = NULL; Error *local_err = NULL;

View File

@ -752,14 +752,9 @@ typedef struct TestBlockJob {
bool should_complete; bool should_complete;
} TestBlockJob; } TestBlockJob;
static void test_job_completed(Job *job, void *opaque) static int coroutine_fn test_job_run(Job *job, Error **errp)
{ {
job_completed(job, 0, NULL); TestBlockJob *s = container_of(job, TestBlockJob, common.job);
}
static void coroutine_fn test_job_start(void *opaque)
{
TestBlockJob *s = opaque;
job_transition_to_ready(&s->common.job); job_transition_to_ready(&s->common.job);
while (!s->should_complete) { while (!s->should_complete) {
@ -770,7 +765,7 @@ static void coroutine_fn test_job_start(void *opaque)
job_pause_point(&s->common.job); job_pause_point(&s->common.job);
} }
job_defer_to_main_loop(&s->common.job, test_job_completed, NULL); return 0;
} }
static void test_job_complete(Job *job, Error **errp) static void test_job_complete(Job *job, Error **errp)
@ -785,7 +780,7 @@ BlockJobDriver test_job_driver = {
.free = block_job_free, .free = block_job_free,
.user_resume = block_job_user_resume, .user_resume = block_job_user_resume,
.drain = block_job_drain, .drain = block_job_drain,
.start = test_job_start, .run = test_job_run,
.complete = test_job_complete, .complete = test_job_complete,
}, },
}; };
@ -948,6 +943,7 @@ static void coroutine_fn test_co_delete_by_drain(void *opaque)
} }
dbdd->done = true; dbdd->done = true;
g_free(buffer);
} }
/** /**

View File

@ -24,39 +24,31 @@ typedef struct {
int *result; int *result;
} TestBlockJob; } TestBlockJob;
static void test_block_job_complete(Job *job, void *opaque) static void test_block_job_exit(Job *job)
{ {
BlockJob *bjob = container_of(job, BlockJob, job); BlockJob *bjob = container_of(job, BlockJob, job);
BlockDriverState *bs = blk_bs(bjob->blk); BlockDriverState *bs = blk_bs(bjob->blk);
int rc = (intptr_t)opaque;
if (job_is_cancelled(job)) {
rc = -ECANCELED;
}
job_completed(job, rc, NULL);
bdrv_unref(bs); bdrv_unref(bs);
} }
static void coroutine_fn test_block_job_run(void *opaque) static int coroutine_fn test_block_job_run(Job *job, Error **errp)
{ {
TestBlockJob *s = opaque; TestBlockJob *s = container_of(job, TestBlockJob, common.job);
BlockJob *job = &s->common;
while (s->iterations--) { while (s->iterations--) {
if (s->use_timer) { if (s->use_timer) {
job_sleep_ns(&job->job, 0); job_sleep_ns(job, 0);
} else { } else {
job_yield(&job->job); job_yield(job);
} }
if (job_is_cancelled(&job->job)) { if (job_is_cancelled(job)) {
break; break;
} }
} }
job_defer_to_main_loop(&job->job, test_block_job_complete, return s->rc;
(void *)(intptr_t)s->rc);
} }
typedef struct { typedef struct {
@ -80,7 +72,8 @@ static const BlockJobDriver test_block_job_driver = {
.free = block_job_free, .free = block_job_free,
.user_resume = block_job_user_resume, .user_resume = block_job_user_resume,
.drain = block_job_drain, .drain = block_job_drain,
.start = test_block_job_run, .run = test_block_job_run,
.exit = test_block_job_exit,
}, },
}; };

View File

@ -163,11 +163,10 @@ typedef struct CancelJob {
bool completed; bool completed;
} CancelJob; } CancelJob;
static void cancel_job_completed(Job *job, void *opaque) static void cancel_job_exit(Job *job)
{ {
CancelJob *s = opaque; CancelJob *s = container_of(job, CancelJob, common.job);
s->completed = true; s->completed = true;
job_completed(job, 0, NULL);
} }
static void cancel_job_complete(Job *job, Error **errp) static void cancel_job_complete(Job *job, Error **errp)
@ -176,13 +175,13 @@ static void cancel_job_complete(Job *job, Error **errp)
s->should_complete = true; s->should_complete = true;
} }
static void coroutine_fn cancel_job_start(void *opaque) static int coroutine_fn cancel_job_run(Job *job, Error **errp)
{ {
CancelJob *s = opaque; CancelJob *s = container_of(job, CancelJob, common.job);
while (!s->should_complete) { while (!s->should_complete) {
if (job_is_cancelled(&s->common.job)) { if (job_is_cancelled(&s->common.job)) {
goto defer; return 0;
} }
if (!job_is_ready(&s->common.job) && s->should_converge) { if (!job_is_ready(&s->common.job) && s->should_converge) {
@ -192,8 +191,7 @@ static void coroutine_fn cancel_job_start(void *opaque)
job_sleep_ns(&s->common.job, 100000); job_sleep_ns(&s->common.job, 100000);
} }
defer: return 0;
job_defer_to_main_loop(&s->common.job, cancel_job_completed, s);
} }
static const BlockJobDriver test_cancel_driver = { static const BlockJobDriver test_cancel_driver = {
@ -202,7 +200,8 @@ static const BlockJobDriver test_cancel_driver = {
.free = block_job_free, .free = block_job_free,
.user_resume = block_job_user_resume, .user_resume = block_job_user_resume,
.drain = block_job_drain, .drain = block_job_drain,
.start = cancel_job_start, .run = cancel_job_run,
.exit = cancel_job_exit,
.complete = cancel_job_complete, .complete = cancel_job_complete,
}, },
}; };

View File

@ -107,7 +107,7 @@ gdbstub_err_checksum_incorrect(uint8_t expected, uint8_t got) "got command packe
# job.c # job.c
job_state_transition(void *job, int ret, const char *legal, const char *s0, const char *s1) "job %p (ret: %d) attempting %s transition (%s-->%s)" job_state_transition(void *job, int ret, const char *legal, const char *s0, const char *s1) "job %p (ret: %d) attempting %s transition (%s-->%s)"
job_apply_verb(void *job, const char *state, const char *verb, const char *legal) "job %p in state %s; applying verb %s (%s)" job_apply_verb(void *job, const char *state, const char *verb, const char *legal) "job %p in state %s; applying verb %s (%s)"
job_completed(void *job, int ret, int jret) "job %p ret %d corrected ret %d" job_completed(void *job, int ret) "job %p ret %d"
# job-qmp.c # job-qmp.c
qmp_job_cancel(void *job) "job %p" qmp_job_cancel(void *job) "job %p"