job: Move coroutine and related code to Job

This commit moves some core functions for dealing with the job coroutine
from BlockJob to Job. This includes primarily entering the coroutine
(both for the first and reentering) and yielding explicitly and at pause
points.

Signed-off-by: Kevin Wolf <kwolf@redhat.com>
Reviewed-by: John Snow <jsnow@redhat.com>
This commit is contained in:
Kevin Wolf 2018-04-13 17:31:02 +02:00
parent 1908a5590c
commit da01ff7f38
14 changed files with 305 additions and 299 deletions

View File

@ -528,8 +528,8 @@ static const BlockJobDriver backup_job_driver = {
.instance_size = sizeof(BackupBlockJob), .instance_size = sizeof(BackupBlockJob),
.job_type = JOB_TYPE_BACKUP, .job_type = JOB_TYPE_BACKUP,
.free = block_job_free, .free = block_job_free,
.start = backup_run,
}, },
.start = backup_run,
.commit = backup_commit, .commit = backup_commit,
.abort = backup_abort, .abort = backup_abort,
.clean = backup_clean, .clean = backup_clean,

View File

@ -220,8 +220,8 @@ static const BlockJobDriver commit_job_driver = {
.instance_size = sizeof(CommitBlockJob), .instance_size = sizeof(CommitBlockJob),
.job_type = JOB_TYPE_COMMIT, .job_type = JOB_TYPE_COMMIT,
.free = block_job_free, .free = block_job_free,
.start = commit_run,
}, },
.start = commit_run,
}; };
static int coroutine_fn bdrv_commit_top_preadv(BlockDriverState *bs, static int coroutine_fn bdrv_commit_top_preadv(BlockDriverState *bs,
@ -371,7 +371,7 @@ void commit_start(const char *job_id, BlockDriverState *bs,
s->on_error = on_error; s->on_error = on_error;
trace_commit_start(bs, base, top, s); trace_commit_start(bs, base, top, s);
block_job_start(&s->common); job_start(&s->common.job);
return; return;
fail: fail:

View File

@ -126,7 +126,7 @@ static void mirror_iteration_done(MirrorOp *op, int ret)
g_free(op); g_free(op);
if (s->waiting_for_io) { if (s->waiting_for_io) {
qemu_coroutine_enter(s->common.co); qemu_coroutine_enter(s->common.job.co);
} }
} }
@ -345,7 +345,7 @@ static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
mirror_wait_for_io(s); mirror_wait_for_io(s);
} }
block_job_pause_point(&s->common); job_pause_point(&s->common.job);
/* Find the number of consective dirty chunks following the first dirty /* Find the number of consective dirty chunks following the first dirty
* one, and wait for in flight requests in them. */ * one, and wait for in flight requests in them. */
@ -597,7 +597,7 @@ static void mirror_throttle(MirrorBlockJob *s)
s->last_pause_ns = now; s->last_pause_ns = now;
block_job_sleep_ns(&s->common, 0); block_job_sleep_ns(&s->common, 0);
} else { } else {
block_job_pause_point(&s->common); job_pause_point(&s->common.job);
} }
} }
@ -786,7 +786,7 @@ static void coroutine_fn mirror_run(void *opaque)
goto immediate_exit; goto immediate_exit;
} }
block_job_pause_point(&s->common); job_pause_point(&s->common.job);
cnt = bdrv_get_dirty_count(s->dirty_bitmap); cnt = bdrv_get_dirty_count(s->dirty_bitmap);
/* cnt is the number of dirty bytes remaining and s->bytes_in_flight is /* cnt is the number of dirty bytes remaining and s->bytes_in_flight is
@ -957,9 +957,9 @@ static void mirror_complete(BlockJob *job, Error **errp)
block_job_enter(&s->common); block_job_enter(&s->common);
} }
static void mirror_pause(BlockJob *job) static void mirror_pause(Job *job)
{ {
MirrorBlockJob *s = container_of(job, MirrorBlockJob, common); MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
mirror_wait_for_all_io(s); mirror_wait_for_all_io(s);
} }
@ -991,10 +991,10 @@ static const BlockJobDriver mirror_job_driver = {
.instance_size = sizeof(MirrorBlockJob), .instance_size = sizeof(MirrorBlockJob),
.job_type = JOB_TYPE_MIRROR, .job_type = JOB_TYPE_MIRROR,
.free = block_job_free, .free = block_job_free,
.start = mirror_run,
.pause = mirror_pause,
}, },
.start = mirror_run,
.complete = mirror_complete, .complete = mirror_complete,
.pause = mirror_pause,
.attached_aio_context = mirror_attached_aio_context, .attached_aio_context = mirror_attached_aio_context,
.drain = mirror_drain, .drain = mirror_drain,
}; };
@ -1004,10 +1004,10 @@ static const BlockJobDriver commit_active_job_driver = {
.instance_size = sizeof(MirrorBlockJob), .instance_size = sizeof(MirrorBlockJob),
.job_type = JOB_TYPE_COMMIT, .job_type = JOB_TYPE_COMMIT,
.free = block_job_free, .free = block_job_free,
.start = mirror_run,
.pause = mirror_pause,
}, },
.start = mirror_run,
.complete = mirror_complete, .complete = mirror_complete,
.pause = mirror_pause,
.attached_aio_context = mirror_attached_aio_context, .attached_aio_context = mirror_attached_aio_context,
.drain = mirror_drain, .drain = mirror_drain,
}; };
@ -1244,7 +1244,7 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs,
} }
trace_mirror_start(bs, s, opaque); trace_mirror_start(bs, s, opaque);
block_job_start(&s->common); job_start(&s->common.job);
return; return;
fail: fail:

View File

@ -576,7 +576,7 @@ static void replication_start(ReplicationState *rs, ReplicationMode mode,
aio_context_release(aio_context); aio_context_release(aio_context);
return; return;
} }
block_job_start(job); job_start(&job->job);
break; break;
default: default:
aio_context_release(aio_context); aio_context_release(aio_context);

View File

@ -213,8 +213,8 @@ static const BlockJobDriver stream_job_driver = {
.instance_size = sizeof(StreamBlockJob), .instance_size = sizeof(StreamBlockJob),
.job_type = JOB_TYPE_STREAM, .job_type = JOB_TYPE_STREAM,
.free = block_job_free, .free = block_job_free,
.start = stream_run,
}, },
.start = stream_run,
}; };
void stream_start(const char *job_id, BlockDriverState *bs, void stream_start(const char *job_id, BlockDriverState *bs,
@ -262,7 +262,7 @@ void stream_start(const char *job_id, BlockDriverState *bs,
s->on_error = on_error; s->on_error = on_error;
trace_stream_start(bs, base, s); trace_stream_start(bs, base, s);
block_job_start(&s->common); job_start(&s->common.job);
return; return;
fail: fail:

View File

@ -1910,7 +1910,7 @@ static void drive_backup_commit(BlkActionState *common)
aio_context_acquire(aio_context); aio_context_acquire(aio_context);
assert(state->job); assert(state->job);
block_job_start(state->job); job_start(&state->job->job);
aio_context_release(aio_context); aio_context_release(aio_context);
} }
@ -2008,7 +2008,7 @@ static void blockdev_backup_commit(BlkActionState *common)
aio_context_acquire(aio_context); aio_context_acquire(aio_context);
assert(state->job); assert(state->job);
block_job_start(state->job); job_start(&state->job->job);
aio_context_release(aio_context); aio_context_release(aio_context);
} }
@ -3425,7 +3425,7 @@ void qmp_drive_backup(DriveBackup *arg, Error **errp)
BlockJob *job; BlockJob *job;
job = do_drive_backup(arg, NULL, errp); job = do_drive_backup(arg, NULL, errp);
if (job) { if (job) {
block_job_start(job); job_start(&job->job);
} }
} }
@ -3513,7 +3513,7 @@ void qmp_blockdev_backup(BlockdevBackup *arg, Error **errp)
BlockJob *job; BlockJob *job;
job = do_blockdev_backup(arg, NULL, errp); job = do_blockdev_backup(arg, NULL, errp);
if (job) { if (job) {
block_job_start(job); job_start(&job->job);
} }
} }

View File

@ -36,30 +36,9 @@
#include "qemu/coroutine.h" #include "qemu/coroutine.h"
#include "qemu/timer.h" #include "qemu/timer.h"
/* Right now, this mutex is only needed to synchronize accesses to job->busy
* and job->sleep_timer, such as concurrent calls to block_job_do_yield and
* block_job_enter. */
static QemuMutex block_job_mutex;
static void block_job_lock(void)
{
qemu_mutex_lock(&block_job_mutex);
}
static void block_job_unlock(void)
{
qemu_mutex_unlock(&block_job_mutex);
}
static void __attribute__((__constructor__)) block_job_init(void)
{
qemu_mutex_init(&block_job_mutex);
}
static void block_job_event_cancelled(BlockJob *job); static void block_job_event_cancelled(BlockJob *job);
static void block_job_event_completed(BlockJob *job, const char *msg); static void block_job_event_completed(BlockJob *job, const char *msg);
static int block_job_event_pending(BlockJob *job); static int block_job_event_pending(BlockJob *job);
static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job));
/* Transactional group of block jobs */ /* Transactional group of block jobs */
struct BlockJobTxn { struct BlockJobTxn {
@ -161,33 +140,27 @@ static void block_job_txn_del_job(BlockJob *job)
} }
} }
/* Assumes the block_job_mutex is held */ /* Assumes the job_mutex is held */
static bool block_job_timer_pending(BlockJob *job) static bool job_timer_not_pending(Job *job)
{ {
return timer_pending(&job->sleep_timer); return !timer_pending(&job->sleep_timer);
}
/* Assumes the block_job_mutex is held */
static bool block_job_timer_not_pending(BlockJob *job)
{
return !block_job_timer_pending(job);
} }
static void block_job_pause(BlockJob *job) static void block_job_pause(BlockJob *job)
{ {
job->pause_count++; job->job.pause_count++;
} }
static void block_job_resume(BlockJob *job) static void block_job_resume(BlockJob *job)
{ {
assert(job->pause_count > 0); assert(job->job.pause_count > 0);
job->pause_count--; job->job.pause_count--;
if (job->pause_count) { if (job->job.pause_count) {
return; return;
} }
/* kick only if no timer is pending */ /* kick only if no timer is pending */
block_job_enter_cond(job, block_job_timer_not_pending); job_enter_cond(&job->job, job_timer_not_pending);
} }
static void block_job_attached_aio_context(AioContext *new_context, static void block_job_attached_aio_context(AioContext *new_context,
@ -208,7 +181,7 @@ void block_job_free(Job *job)
block_job_detach_aio_context, bjob); block_job_detach_aio_context, bjob);
blk_unref(bjob->blk); blk_unref(bjob->blk);
error_free(bjob->blocker); error_free(bjob->blocker);
assert(!timer_pending(&bjob->sleep_timer)); assert(!timer_pending(&bjob->job.sleep_timer));
} }
static void block_job_attached_aio_context(AioContext *new_context, static void block_job_attached_aio_context(AioContext *new_context,
@ -226,7 +199,7 @@ static void block_job_attached_aio_context(AioContext *new_context,
static void block_job_drain(BlockJob *job) static void block_job_drain(BlockJob *job)
{ {
/* If job is !job->busy this kicks it into the next pause point. */ /* If job is !job->job.busy this kicks it into the next pause point. */
block_job_enter(job); block_job_enter(job);
blk_drain(job->blk); blk_drain(job->blk);
@ -244,7 +217,7 @@ static void block_job_detach_aio_context(void *opaque)
block_job_pause(job); block_job_pause(job);
while (!job->paused && !job->completed) { while (!job->job.paused && !job->completed) {
block_job_drain(job); block_job_drain(job);
} }
@ -312,29 +285,11 @@ bool block_job_is_internal(BlockJob *job)
return (job->job.id == NULL); return (job->job.id == NULL);
} }
static bool block_job_started(BlockJob *job)
{
return job->co;
}
const BlockJobDriver *block_job_driver(BlockJob *job) const BlockJobDriver *block_job_driver(BlockJob *job)
{ {
return job->driver; return job->driver;
} }
/**
* All jobs must allow a pause point before entering their job proper. This
* ensures that jobs can be paused prior to being started, then resumed later.
*/
static void coroutine_fn block_job_co_entry(void *opaque)
{
BlockJob *job = opaque;
assert(job && job->driver && job->driver->start);
block_job_pause_point(job);
job->driver->start(job);
}
static void block_job_sleep_timer_cb(void *opaque) static void block_job_sleep_timer_cb(void *opaque)
{ {
BlockJob *job = opaque; BlockJob *job = opaque;
@ -342,24 +297,12 @@ static void block_job_sleep_timer_cb(void *opaque)
block_job_enter(job); block_job_enter(job);
} }
void block_job_start(BlockJob *job)
{
assert(job && !block_job_started(job) && job->paused &&
job->driver && job->driver->start);
job->co = qemu_coroutine_create(block_job_co_entry, job);
job->pause_count--;
job->busy = true;
job->paused = false;
job_state_transition(&job->job, JOB_STATUS_RUNNING);
bdrv_coroutine_enter(blk_bs(job->blk), job->co);
}
static void block_job_decommission(BlockJob *job) static void block_job_decommission(BlockJob *job)
{ {
assert(job); assert(job);
job->completed = true; job->completed = true;
job->busy = false; job->job.busy = false;
job->paused = false; job->job.paused = false;
job->job.deferred_to_main_loop = true; job->job.deferred_to_main_loop = true;
block_job_txn_del_job(job); block_job_txn_del_job(job);
job_state_transition(&job->job, JOB_STATUS_NULL); job_state_transition(&job->job, JOB_STATUS_NULL);
@ -374,7 +317,7 @@ static void block_job_do_dismiss(BlockJob *job)
static void block_job_conclude(BlockJob *job) static void block_job_conclude(BlockJob *job)
{ {
job_state_transition(&job->job, JOB_STATUS_CONCLUDED); job_state_transition(&job->job, JOB_STATUS_CONCLUDED);
if (job->auto_dismiss || !block_job_started(job)) { if (job->auto_dismiss || !job_started(&job->job)) {
block_job_do_dismiss(job); block_job_do_dismiss(job);
} }
} }
@ -439,7 +382,7 @@ static int block_job_finalize_single(BlockJob *job)
} }
/* Emit events only if we actually started */ /* Emit events only if we actually started */
if (block_job_started(job)) { if (job_started(&job->job)) {
if (job_is_cancelled(&job->job)) { if (job_is_cancelled(&job->job)) {
block_job_event_cancelled(job); block_job_event_cancelled(job);
} else { } else {
@ -464,7 +407,7 @@ static void block_job_cancel_async(BlockJob *job, bool force)
if (job->user_paused) { if (job->user_paused) {
/* Do not call block_job_enter here, the caller will handle it. */ /* Do not call block_job_enter here, the caller will handle it. */
job->user_paused = false; job->user_paused = false;
job->pause_count--; job->job.pause_count--;
} }
job->job.cancelled = true; job->job.cancelled = true;
/* To prevent 'force == false' overriding a previous 'force == true' */ /* To prevent 'force == false' overriding a previous 'force == true' */
@ -615,6 +558,12 @@ static void block_job_completed_txn_success(BlockJob *job)
} }
} }
/* Assumes the job_mutex is held */
static bool job_timer_pending(Job *job)
{
return timer_pending(&job->sleep_timer);
}
void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp) void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
{ {
int64_t old_speed = job->speed; int64_t old_speed = job->speed;
@ -635,7 +584,7 @@ void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
} }
/* kick only if a timer is pending */ /* kick only if a timer is pending */
block_job_enter_cond(job, block_job_timer_pending); job_enter_cond(&job->job, job_timer_pending);
} }
int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n) int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n)
@ -654,7 +603,7 @@ void block_job_complete(BlockJob *job, Error **errp)
if (job_apply_verb(&job->job, JOB_VERB_COMPLETE, errp)) { if (job_apply_verb(&job->job, JOB_VERB_COMPLETE, errp)) {
return; return;
} }
if (job->pause_count || job_is_cancelled(&job->job) || if (job->job.pause_count || job_is_cancelled(&job->job) ||
!job->driver->complete) !job->driver->complete)
{ {
error_setg(errp, "The active block job '%s' cannot be completed", error_setg(errp, "The active block job '%s' cannot be completed",
@ -708,7 +657,7 @@ bool block_job_user_paused(BlockJob *job)
void block_job_user_resume(BlockJob *job, Error **errp) void block_job_user_resume(BlockJob *job, Error **errp)
{ {
assert(job); assert(job);
if (!job->user_paused || job->pause_count <= 0) { if (!job->user_paused || job->job.pause_count <= 0) {
error_setg(errp, "Can't resume a job that was not paused"); error_setg(errp, "Can't resume a job that was not paused");
return; return;
} }
@ -727,7 +676,7 @@ void block_job_cancel(BlockJob *job, bool force)
return; return;
} }
block_job_cancel_async(job, force); block_job_cancel_async(job, force);
if (!block_job_started(job)) { if (!job_started(&job->job)) {
block_job_completed(job, -ECANCELED); block_job_completed(job, -ECANCELED);
} else if (job->job.deferred_to_main_loop) { } else if (job->job.deferred_to_main_loop) {
block_job_completed_txn_abort(job); block_job_completed_txn_abort(job);
@ -797,8 +746,8 @@ BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
info->type = g_strdup(job_type_str(&job->job)); info->type = g_strdup(job_type_str(&job->job));
info->device = g_strdup(job->job.id); info->device = g_strdup(job->job.id);
info->len = job->len; info->len = job->len;
info->busy = atomic_read(&job->busy); info->busy = atomic_read(&job->job.busy);
info->paused = job->pause_count > 0; info->paused = job->job.pause_count > 0;
info->offset = job->offset; info->offset = job->offset;
info->speed = job->speed; info->speed = job->speed;
info->io_status = job->iostatus; info->io_status = job->iostatus;
@ -915,12 +864,9 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
job->blk = blk; job->blk = blk;
job->cb = cb; job->cb = cb;
job->opaque = opaque; job->opaque = opaque;
job->busy = false;
job->paused = true;
job->pause_count = 1;
job->auto_finalize = !(flags & BLOCK_JOB_MANUAL_FINALIZE); job->auto_finalize = !(flags & BLOCK_JOB_MANUAL_FINALIZE);
job->auto_dismiss = !(flags & BLOCK_JOB_MANUAL_DISMISS); job->auto_dismiss = !(flags & BLOCK_JOB_MANUAL_DISMISS);
aio_timer_init(qemu_get_aio_context(), &job->sleep_timer, aio_timer_init(qemu_get_aio_context(), &job->job.sleep_timer,
QEMU_CLOCK_REALTIME, SCALE_NS, QEMU_CLOCK_REALTIME, SCALE_NS,
block_job_sleep_timer_cb, job); block_job_sleep_timer_cb, job);
@ -980,128 +926,41 @@ void block_job_completed(BlockJob *job, int ret)
} }
} }
static bool block_job_should_pause(BlockJob *job)
{
return job->pause_count > 0;
}
/* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
* Reentering the job coroutine with block_job_enter() before the timer has
* expired is allowed and cancels the timer.
*
* If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be
* called explicitly. */
static void block_job_do_yield(BlockJob *job, uint64_t ns)
{
block_job_lock();
if (ns != -1) {
timer_mod(&job->sleep_timer, ns);
}
job->busy = false;
block_job_unlock();
qemu_coroutine_yield();
/* Set by block_job_enter before re-entering the coroutine. */
assert(job->busy);
}
void coroutine_fn block_job_pause_point(BlockJob *job)
{
assert(job && block_job_started(job));
if (!block_job_should_pause(job)) {
return;
}
if (job_is_cancelled(&job->job)) {
return;
}
if (job->driver->pause) {
job->driver->pause(job);
}
if (block_job_should_pause(job) && !job_is_cancelled(&job->job)) {
JobStatus status = job->job.status;
job_state_transition(&job->job, status == JOB_STATUS_READY
? JOB_STATUS_STANDBY
: JOB_STATUS_PAUSED);
job->paused = true;
block_job_do_yield(job, -1);
job->paused = false;
job_state_transition(&job->job, status);
}
if (job->driver->resume) {
job->driver->resume(job);
}
}
/*
* Conditionally enter a block_job pending a call to fn() while
* under the block_job_lock critical section.
*/
static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job))
{
if (!block_job_started(job)) {
return;
}
if (job->job.deferred_to_main_loop) {
return;
}
block_job_lock();
if (job->busy) {
block_job_unlock();
return;
}
if (fn && !fn(job)) {
block_job_unlock();
return;
}
assert(!job->job.deferred_to_main_loop);
timer_del(&job->sleep_timer);
job->busy = true;
block_job_unlock();
aio_co_wake(job->co);
}
void block_job_enter(BlockJob *job) void block_job_enter(BlockJob *job)
{ {
block_job_enter_cond(job, NULL); job_enter_cond(&job->job, NULL);
} }
void block_job_sleep_ns(BlockJob *job, int64_t ns) void block_job_sleep_ns(BlockJob *job, int64_t ns)
{ {
assert(job->busy); assert(job->job.busy);
/* Check cancellation *before* setting busy = false, too! */ /* Check cancellation *before* setting busy = false, too! */
if (job_is_cancelled(&job->job)) { if (job_is_cancelled(&job->job)) {
return; return;
} }
if (!block_job_should_pause(job)) { if (!job_should_pause(&job->job)) {
block_job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns); job_do_yield(&job->job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
} }
block_job_pause_point(job); job_pause_point(&job->job);
} }
void block_job_yield(BlockJob *job) void block_job_yield(BlockJob *job)
{ {
assert(job->busy); assert(job->job.busy);
/* Check cancellation *before* setting busy = false, too! */ /* Check cancellation *before* setting busy = false, too! */
if (job_is_cancelled(&job->job)) { if (job_is_cancelled(&job->job)) {
return; return;
} }
if (!block_job_should_pause(job)) { if (!job_should_pause(&job->job)) {
block_job_do_yield(job, -1); job_do_yield(&job->job, -1);
} }
block_job_pause_point(job); job_pause_point(&job->job);
} }
void block_job_iostatus_reset(BlockJob *job) void block_job_iostatus_reset(BlockJob *job)
@ -1109,7 +968,7 @@ void block_job_iostatus_reset(BlockJob *job)
if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) { if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
return; return;
} }
assert(job->user_paused && job->pause_count > 0); assert(job->user_paused && job->job.pause_count > 0);
job->iostatus = BLOCK_DEVICE_IO_STATUS_OK; job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
} }

View File

@ -50,43 +50,18 @@ typedef struct BlockJob {
/** The block device on which the job is operating. */ /** The block device on which the job is operating. */
BlockBackend *blk; BlockBackend *blk;
/**
* The coroutine that executes the job. If not NULL, it is
* reentered when busy is false and the job is cancelled.
*/
Coroutine *co;
/** /**
* Set to true if the job should abort immediately without waiting * Set to true if the job should abort immediately without waiting
* for data to be in sync. * for data to be in sync.
*/ */
bool force; bool force;
/**
* Counter for pause request. If non-zero, the block job is either paused,
* or if busy == true will pause itself as soon as possible.
*/
int pause_count;
/** /**
* Set to true if the job is paused by user. Can be unpaused with the * Set to true if the job is paused by user. Can be unpaused with the
* block-job-resume QMP command. * block-job-resume QMP command.
*/ */
bool user_paused; bool user_paused;
/**
* Set to false by the job while the coroutine has yielded and may be
* re-entered by block_job_enter(). There may still be I/O or event loop
* activity pending. Accessed under block_job_mutex (in blockjob.c).
*/
bool busy;
/**
* Set to true by the job while it is in a quiescent state, where
* no I/O or event loop activity is pending.
*/
bool paused;
/** /**
* Set to true when the job is ready to be completed. * Set to true when the job is ready to be completed.
*/ */
@ -125,12 +100,6 @@ typedef struct BlockJob {
/** ret code passed to block_job_completed. */ /** ret code passed to block_job_completed. */
int ret; int ret;
/**
* Timer that is used by @block_job_sleep_ns. Accessed under
* block_job_mutex (in blockjob.c).
*/
QEMUTimer sleep_timer;
/** True if this job should automatically finalize itself */ /** True if this job should automatically finalize itself */
bool auto_finalize; bool auto_finalize;
@ -207,15 +176,6 @@ void block_job_remove_all_bdrv(BlockJob *job);
*/ */
void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp); void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp);
/**
* block_job_start:
* @job: A job that has not yet been started.
*
* Begins execution of a block job.
* Takes ownership of one reference to the job object.
*/
void block_job_start(BlockJob *job);
/** /**
* block_job_cancel: * block_job_cancel:
* @job: The job to be canceled. * @job: The job to be canceled.

View File

@ -38,9 +38,6 @@ struct BlockJobDriver {
/** Generic JobDriver callbacks and settings */ /** Generic JobDriver callbacks and settings */
JobDriver job_driver; JobDriver job_driver;
/** Mandatory: Entrypoint for the Coroutine. */
CoroutineEntry *start;
/** /**
* Optional callback for job types whose completion must be triggered * Optional callback for job types whose completion must be triggered
* manually. * manually.
@ -85,20 +82,6 @@ struct BlockJobDriver {
*/ */
void (*clean)(BlockJob *job); void (*clean)(BlockJob *job);
/**
* If the callback is not NULL, it will be invoked when the job transitions
* into the paused state. Paused jobs must not perform any asynchronous
* I/O or event loop activity. This callback is used to quiesce jobs.
*/
void coroutine_fn (*pause)(BlockJob *job);
/**
* If the callback is not NULL, it will be invoked when the job transitions
* out of the paused state. Any asynchronous I/O or event loop activity
* should be restarted from this callback.
*/
void coroutine_fn (*resume)(BlockJob *job);
/* /*
* If the callback is not NULL, it will be invoked before the job is * If the callback is not NULL, it will be invoked before the job is
* resumed in a new AioContext. This is the place to move any resources * resumed in a new AioContext. This is the place to move any resources
@ -195,15 +178,6 @@ void block_job_early_fail(BlockJob *job);
*/ */
void block_job_completed(BlockJob *job, int ret); void block_job_completed(BlockJob *job, int ret);
/**
* block_job_pause_point:
* @job: The job that is ready to pause.
*
* Pause now if block_job_pause() has been called. Block jobs that perform
* lots of I/O must call this between requests so that the job can be paused.
*/
void coroutine_fn block_job_pause_point(BlockJob *job);
/** /**
* block_job_enter: * block_job_enter:
* @job: The job to enter. * @job: The job to enter.

View File

@ -28,6 +28,7 @@
#include "qapi/qapi-types-block-core.h" #include "qapi/qapi-types-block-core.h"
#include "qemu/queue.h" #include "qemu/queue.h"
#include "qemu/coroutine.h"
typedef struct JobDriver JobDriver; typedef struct JobDriver JobDriver;
@ -50,6 +51,37 @@ typedef struct Job {
/** AioContext to run the job coroutine in */ /** AioContext to run the job coroutine in */
AioContext *aio_context; AioContext *aio_context;
/**
* The coroutine that executes the job. If not NULL, it is reentered when
* busy is false and the job is cancelled.
*/
Coroutine *co;
/**
* Timer that is used by @block_job_sleep_ns. Accessed under job_mutex (in
* job.c).
*/
QEMUTimer sleep_timer;
/**
* Counter for pause request. If non-zero, the block job is either paused,
* or if busy == true will pause itself as soon as possible.
*/
int pause_count;
/**
* Set to false by the job while the coroutine has yielded and may be
* re-entered by block_job_enter(). There may still be I/O or event loop
* activity pending. Accessed under block_job_mutex (in blockjob.c).
*/
bool busy;
/**
* Set to true by the job while it is in a quiescent state, where
* no I/O or event loop activity is pending.
*/
bool paused;
/** /**
* Set to true if the job should cancel itself. The flag must * Set to true if the job should cancel itself. The flag must
* always be tested just before toggling the busy flag from false * always be tested just before toggling the busy flag from false
@ -75,6 +107,23 @@ struct JobDriver {
/** Enum describing the operation */ /** Enum describing the operation */
JobType job_type; JobType job_type;
/** Mandatory: Entrypoint for the Coroutine. */
CoroutineEntry *start;
/**
* If the callback is not NULL, it will be invoked when the job transitions
* into the paused state. Paused jobs must not perform any asynchronous
* I/O or event loop activity. This callback is used to quiesce jobs.
*/
void coroutine_fn (*pause)(Job *job);
/**
* If the callback is not NULL, it will be invoked when the job transitions
* out of the paused state. Any asynchronous I/O or event loop activity
* should be restarted from this callback.
*/
void coroutine_fn (*resume)(Job *job);
/** Called when the job is freed */ /** Called when the job is freed */
void (*free)(Job *job); void (*free)(Job *job);
}; };
@ -103,6 +152,30 @@ void job_ref(Job *job);
*/ */
void job_unref(Job *job); void job_unref(Job *job);
/**
* Conditionally enter the job coroutine if the job is ready to run, not
* already busy and fn() returns true. fn() is called while under the job_lock
* critical section.
*/
void job_enter_cond(Job *job, bool(*fn)(Job *job));
/**
* @job: A job that has not yet been started.
*
* Begins execution of a job.
* Takes ownership of one reference to the job object.
*/
void job_start(Job *job);
/**
* @job: The job that is ready to pause.
*
* Pause now if job_pause() has been called. Jobs that perform lots of I/O
* must call this between requests so that the job can be paused.
*/
void coroutine_fn job_pause_point(Job *job);
/** Returns the JobType of a given Job. */ /** Returns the JobType of a given Job. */
JobType job_type(const Job *job); JobType job_type(const Job *job);
@ -153,5 +226,8 @@ void job_defer_to_main_loop(Job *job, JobDeferToMainLoopFn *fn, void *opaque);
/* TODO To be removed from the public interface */ /* TODO To be removed from the public interface */
void job_state_transition(Job *job, JobStatus s1); void job_state_transition(Job *job, JobStatus s1);
void coroutine_fn job_do_yield(Job *job, uint64_t ns);
bool job_should_pause(Job *job);
bool job_started(Job *job);
#endif #endif

137
job.c
View File

@ -60,6 +60,26 @@ bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
[JOB_VERB_DISMISS] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0}, [JOB_VERB_DISMISS] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
}; };
/* Right now, this mutex is only needed to synchronize accesses to job->busy
* and job->sleep_timer, such as concurrent calls to job_do_yield and
* job_enter. */
static QemuMutex job_mutex;
static void job_lock(void)
{
qemu_mutex_lock(&job_mutex);
}
static void job_unlock(void)
{
qemu_mutex_unlock(&job_mutex);
}
static void __attribute__((__constructor__)) job_init(void)
{
qemu_mutex_init(&job_mutex);
}
/* TODO Make static once the whole state machine is in job.c */ /* TODO Make static once the whole state machine is in job.c */
void job_state_transition(Job *job, JobStatus s1) void job_state_transition(Job *job, JobStatus s1)
{ {
@ -101,6 +121,16 @@ bool job_is_cancelled(Job *job)
return job->cancelled; return job->cancelled;
} }
bool job_started(Job *job)
{
return job->co;
}
bool job_should_pause(Job *job)
{
return job->pause_count > 0;
}
Job *job_next(Job *job) Job *job_next(Job *job)
{ {
if (!job) { if (!job) {
@ -143,6 +173,9 @@ void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx,
job->id = g_strdup(job_id); job->id = g_strdup(job_id);
job->refcnt = 1; job->refcnt = 1;
job->aio_context = ctx; job->aio_context = ctx;
job->busy = false;
job->paused = true;
job->pause_count = 1;
job_state_transition(job, JOB_STATUS_CREATED); job_state_transition(job, JOB_STATUS_CREATED);
@ -172,6 +205,110 @@ void job_unref(Job *job)
} }
} }
void job_enter_cond(Job *job, bool(*fn)(Job *job))
{
if (!job_started(job)) {
return;
}
if (job->deferred_to_main_loop) {
return;
}
job_lock();
if (job->busy) {
job_unlock();
return;
}
if (fn && !fn(job)) {
job_unlock();
return;
}
assert(!job->deferred_to_main_loop);
timer_del(&job->sleep_timer);
job->busy = true;
job_unlock();
aio_co_wake(job->co);
}
/* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
* Reentering the job coroutine with block_job_enter() before the timer has
* expired is allowed and cancels the timer.
*
* If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be
* called explicitly. */
void coroutine_fn job_do_yield(Job *job, uint64_t ns)
{
job_lock();
if (ns != -1) {
timer_mod(&job->sleep_timer, ns);
}
job->busy = false;
job_unlock();
qemu_coroutine_yield();
/* Set by job_enter_cond() before re-entering the coroutine. */
assert(job->busy);
}
void coroutine_fn job_pause_point(Job *job)
{
assert(job && job_started(job));
if (!job_should_pause(job)) {
return;
}
if (job_is_cancelled(job)) {
return;
}
if (job->driver->pause) {
job->driver->pause(job);
}
if (job_should_pause(job) && !job_is_cancelled(job)) {
JobStatus status = job->status;
job_state_transition(job, status == JOB_STATUS_READY
? JOB_STATUS_STANDBY
: JOB_STATUS_PAUSED);
job->paused = true;
job_do_yield(job, -1);
job->paused = false;
job_state_transition(job, status);
}
if (job->driver->resume) {
job->driver->resume(job);
}
}
/**
* All jobs must allow a pause point before entering their job proper. This
* ensures that jobs can be paused prior to being started, then resumed later.
*/
static void coroutine_fn job_co_entry(void *opaque)
{
Job *job = opaque;
assert(job && job->driver && job->driver->start);
job_pause_point(job);
job->driver->start(job);
}
void job_start(Job *job)
{
assert(job && !job_started(job) && job->paused &&
job->driver && job->driver->start);
job->co = qemu_coroutine_create(job_co_entry, job);
job->pause_count--;
job->busy = true;
job->paused = false;
job_state_transition(job, JOB_STATUS_RUNNING);
aio_co_enter(job->aio_context, job->co);
}
typedef struct { typedef struct {
Job *job; Job *job;
JobDeferToMainLoopFn *fn; JobDeferToMainLoopFn *fn;

View File

@ -524,8 +524,8 @@ BlockJobDriver test_job_driver = {
.job_driver = { .job_driver = {
.instance_size = sizeof(TestBlockJob), .instance_size = sizeof(TestBlockJob),
.free = block_job_free, .free = block_job_free,
.start = test_job_start,
}, },
.start = test_job_start,
.complete = test_job_complete, .complete = test_job_complete,
}; };
@ -549,47 +549,47 @@ static void test_blockjob_common(enum drain_type drain_type)
job = block_job_create("job0", &test_job_driver, NULL, src, 0, BLK_PERM_ALL, job = block_job_create("job0", &test_job_driver, NULL, src, 0, BLK_PERM_ALL,
0, 0, NULL, NULL, &error_abort); 0, 0, NULL, NULL, &error_abort);
block_job_add_bdrv(job, "target", target, 0, BLK_PERM_ALL, &error_abort); block_job_add_bdrv(job, "target", target, 0, BLK_PERM_ALL, &error_abort);
block_job_start(job); job_start(&job->job);
g_assert_cmpint(job->pause_count, ==, 0); g_assert_cmpint(job->job.pause_count, ==, 0);
g_assert_false(job->paused); g_assert_false(job->job.paused);
g_assert_false(job->busy); /* We're in block_job_sleep_ns() */ g_assert_false(job->job.busy); /* We're in block_job_sleep_ns() */
do_drain_begin(drain_type, src); do_drain_begin(drain_type, src);
if (drain_type == BDRV_DRAIN_ALL) { if (drain_type == BDRV_DRAIN_ALL) {
/* bdrv_drain_all() drains both src and target */ /* bdrv_drain_all() drains both src and target */
g_assert_cmpint(job->pause_count, ==, 2); g_assert_cmpint(job->job.pause_count, ==, 2);
} else { } else {
g_assert_cmpint(job->pause_count, ==, 1); g_assert_cmpint(job->job.pause_count, ==, 1);
} }
/* XXX We don't wait until the job is actually paused. Is this okay? */ /* XXX We don't wait until the job is actually paused. Is this okay? */
/* g_assert_true(job->paused); */ /* g_assert_true(job->job.paused); */
g_assert_false(job->busy); /* The job is paused */ g_assert_false(job->job.busy); /* The job is paused */
do_drain_end(drain_type, src); do_drain_end(drain_type, src);
g_assert_cmpint(job->pause_count, ==, 0); g_assert_cmpint(job->job.pause_count, ==, 0);
g_assert_false(job->paused); g_assert_false(job->job.paused);
g_assert_false(job->busy); /* We're in block_job_sleep_ns() */ g_assert_false(job->job.busy); /* We're in block_job_sleep_ns() */
do_drain_begin(drain_type, target); do_drain_begin(drain_type, target);
if (drain_type == BDRV_DRAIN_ALL) { if (drain_type == BDRV_DRAIN_ALL) {
/* bdrv_drain_all() drains both src and target */ /* bdrv_drain_all() drains both src and target */
g_assert_cmpint(job->pause_count, ==, 2); g_assert_cmpint(job->job.pause_count, ==, 2);
} else { } else {
g_assert_cmpint(job->pause_count, ==, 1); g_assert_cmpint(job->job.pause_count, ==, 1);
} }
/* XXX We don't wait until the job is actually paused. Is this okay? */ /* XXX We don't wait until the job is actually paused. Is this okay? */
/* g_assert_true(job->paused); */ /* g_assert_true(job->job.paused); */
g_assert_false(job->busy); /* The job is paused */ g_assert_false(job->job.busy); /* The job is paused */
do_drain_end(drain_type, target); do_drain_end(drain_type, target);
g_assert_cmpint(job->pause_count, ==, 0); g_assert_cmpint(job->job.pause_count, ==, 0);
g_assert_false(job->paused); g_assert_false(job->job.paused);
g_assert_false(job->busy); /* We're in block_job_sleep_ns() */ g_assert_false(job->job.busy); /* We're in block_job_sleep_ns() */
ret = block_job_complete_sync(job, &error_abort); ret = block_job_complete_sync(job, &error_abort);
g_assert_cmpint(ret, ==, 0); g_assert_cmpint(ret, ==, 0);

View File

@ -78,8 +78,8 @@ static const BlockJobDriver test_block_job_driver = {
.job_driver = { .job_driver = {
.instance_size = sizeof(TestBlockJob), .instance_size = sizeof(TestBlockJob),
.free = block_job_free, .free = block_job_free,
.start = test_block_job_run,
}, },
.start = test_block_job_run,
}; };
/* Create a block job that completes with a given return code after a given /* Create a block job that completes with a given return code after a given
@ -125,7 +125,7 @@ static void test_single_job(int expected)
txn = block_job_txn_new(); txn = block_job_txn_new();
job = test_block_job_start(1, true, expected, &result, txn); job = test_block_job_start(1, true, expected, &result, txn);
block_job_start(job); job_start(&job->job);
if (expected == -ECANCELED) { if (expected == -ECANCELED) {
block_job_cancel(job, false); block_job_cancel(job, false);
@ -165,8 +165,8 @@ static void test_pair_jobs(int expected1, int expected2)
txn = block_job_txn_new(); txn = block_job_txn_new();
job1 = test_block_job_start(1, true, expected1, &result1, txn); job1 = test_block_job_start(1, true, expected1, &result1, txn);
job2 = test_block_job_start(2, true, expected2, &result2, txn); job2 = test_block_job_start(2, true, expected2, &result2, txn);
block_job_start(job1); job_start(&job1->job);
block_job_start(job2); job_start(&job2->job);
/* Release our reference now to trigger as many nice /* Release our reference now to trigger as many nice
* use-after-free bugs as possible. * use-after-free bugs as possible.
@ -227,8 +227,8 @@ static void test_pair_jobs_fail_cancel_race(void)
txn = block_job_txn_new(); txn = block_job_txn_new();
job1 = test_block_job_start(1, true, -ECANCELED, &result1, txn); job1 = test_block_job_start(1, true, -ECANCELED, &result1, txn);
job2 = test_block_job_start(2, false, 0, &result2, txn); job2 = test_block_job_start(2, false, 0, &result2, txn);
block_job_start(job1); job_start(&job1->job);
block_job_start(job2); job_start(&job2->job);
block_job_cancel(job1, false); block_job_cancel(job1, false);

View File

@ -199,8 +199,8 @@ static const BlockJobDriver test_cancel_driver = {
.job_driver = { .job_driver = {
.instance_size = sizeof(CancelJob), .instance_size = sizeof(CancelJob),
.free = block_job_free, .free = block_job_free,
.start = cancel_job_start,
}, },
.start = cancel_job_start,
.complete = cancel_job_complete, .complete = cancel_job_complete,
}; };
@ -254,7 +254,7 @@ static void test_cancel_running(void)
s = create_common(&job); s = create_common(&job);
block_job_start(job); job_start(&job->job);
assert(job->job.status == JOB_STATUS_RUNNING); assert(job->job.status == JOB_STATUS_RUNNING);
cancel_common(s); cancel_common(s);
@ -267,7 +267,7 @@ static void test_cancel_paused(void)
s = create_common(&job); s = create_common(&job);
block_job_start(job); job_start(&job->job);
assert(job->job.status == JOB_STATUS_RUNNING); assert(job->job.status == JOB_STATUS_RUNNING);
block_job_user_pause(job, &error_abort); block_job_user_pause(job, &error_abort);
@ -284,7 +284,7 @@ static void test_cancel_ready(void)
s = create_common(&job); s = create_common(&job);
block_job_start(job); job_start(&job->job);
assert(job->job.status == JOB_STATUS_RUNNING); assert(job->job.status == JOB_STATUS_RUNNING);
s->should_converge = true; s->should_converge = true;
@ -301,7 +301,7 @@ static void test_cancel_standby(void)
s = create_common(&job); s = create_common(&job);
block_job_start(job); job_start(&job->job);
assert(job->job.status == JOB_STATUS_RUNNING); assert(job->job.status == JOB_STATUS_RUNNING);
s->should_converge = true; s->should_converge = true;
@ -322,7 +322,7 @@ static void test_cancel_pending(void)
s = create_common(&job); s = create_common(&job);
block_job_start(job); job_start(&job->job);
assert(job->job.status == JOB_STATUS_RUNNING); assert(job->job.status == JOB_STATUS_RUNNING);
s->should_converge = true; s->should_converge = true;
@ -346,7 +346,7 @@ static void test_cancel_concluded(void)
s = create_common(&job); s = create_common(&job);
block_job_start(job); job_start(&job->job);
assert(job->job.status == JOB_STATUS_RUNNING); assert(job->job.status == JOB_STATUS_RUNNING);
s->should_converge = true; s->should_converge = true;