-----BEGIN PGP SIGNATURE-----
Version: GnuPG v2 iQEcBAABCAAGBQJYE2ULAAoJEMo1YkxqkXHGvvAH/iDPIAiwBXbndL3KhQTneSHn ctd4I3VK1/VVTIBRJIetqETiWiAm/WoRhI9kBc/NrQxBFx3ko+fpSYFS2t6lJYnV EX0vjTKjFhr05tOTQDH/SQtHdU5x/x2M8SsxqrCcTyLm5VDfdPeBlMBfSNMj/L2K bwinANVEwr6LOM0h8weQ0SvOCa5MLII2p5ufGwKQmhUY5tgZvFlyPa+quDVisKoE 7CpLwWHmUQSNxUXSaru90osUJyk90wCcYxPpJN3YO1MHvpH4kG8DpZ8bnFqLAoNw zkRdqIrlfntD+mKDqRU1y0GXxu9I4VK1UDcQyRFoSdMi2oHR+L018sQEjCYTAXo= =n+CF -----END PGP SIGNATURE----- Merge remote-tracking branch 'remotes/famz/tags/for-upstream' into staging # gpg: Signature made Fri 28 Oct 2016 15:47:39 BST # gpg: using RSA key 0xCA35624C6A9171C6 # gpg: Good signature from "Fam Zheng <famz@redhat.com>" # gpg: WARNING: This key is not certified with a trusted signature! # gpg: There is no indication that the signature belongs to the owner. # Primary key fingerprint: 5003 7CB7 9706 0F76 F021 AD56 CA35 624C 6A91 71C6 * remotes/famz/tags/for-upstream: aio: convert from RFifoLock to QemuRecMutex qemu-thread: introduce QemuRecMutex iothread: release AioContext around aio_poll block: only call aio_poll on the current thread's AioContext qemu-img: call aio_context_acquire/release around block job qemu-io: acquire AioContext block: prepare bdrv_reopen_multiple to release AioContext replication: pass BlockDriverState to reopen_backing_file iothread: detach all block devices before stopping them aio: introduce qemu_get_current_aio_context sheepdog: use BDRV_POLL_WHILE nfs: use BDRV_POLL_WHILE nfs: move nfs_set_events out of the while loops block: introduce BDRV_POLL_WHILE qed: Implement .bdrv_drain block: change drain to look only at one child at a time block: add BDS field to count in-flight requests mirror: use bdrv_drained_begin/bdrv_drained_end blockjob: introduce .drain callback for jobs replication: interrupt failover if the main device is closed Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
This commit is contained in:
commit
5273a45e75
29
async.c
29
async.c
@ -61,6 +61,7 @@ void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
|
||||
smp_wmb();
|
||||
ctx->first_bh = bh;
|
||||
qemu_mutex_unlock(&ctx->bh_lock);
|
||||
aio_notify(ctx);
|
||||
}
|
||||
|
||||
QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
|
||||
@ -106,8 +107,8 @@ int aio_bh_poll(AioContext *ctx)
|
||||
* aio_notify again if necessary.
|
||||
*/
|
||||
if (atomic_xchg(&bh->scheduled, 0)) {
|
||||
/* Idle BHs and the notify BH don't count as progress */
|
||||
if (!bh->idle && bh != ctx->notify_dummy_bh) {
|
||||
/* Idle BHs don't count as progress */
|
||||
if (!bh->idle) {
|
||||
ret = 1;
|
||||
}
|
||||
bh->idle = 0;
|
||||
@ -259,7 +260,6 @@ aio_ctx_finalize(GSource *source)
|
||||
{
|
||||
AioContext *ctx = (AioContext *) source;
|
||||
|
||||
qemu_bh_delete(ctx->notify_dummy_bh);
|
||||
thread_pool_free(ctx->thread_pool);
|
||||
|
||||
#ifdef CONFIG_LINUX_AIO
|
||||
@ -284,7 +284,7 @@ aio_ctx_finalize(GSource *source)
|
||||
|
||||
aio_set_event_notifier(ctx, &ctx->notifier, false, NULL);
|
||||
event_notifier_cleanup(&ctx->notifier);
|
||||
rfifolock_destroy(&ctx->lock);
|
||||
qemu_rec_mutex_destroy(&ctx->lock);
|
||||
qemu_mutex_destroy(&ctx->bh_lock);
|
||||
timerlistgroup_deinit(&ctx->tlg);
|
||||
}
|
||||
@ -345,19 +345,6 @@ static void aio_timerlist_notify(void *opaque)
|
||||
aio_notify(opaque);
|
||||
}
|
||||
|
||||
static void aio_rfifolock_cb(void *opaque)
|
||||
{
|
||||
AioContext *ctx = opaque;
|
||||
|
||||
/* Kick owner thread in case they are blocked in aio_poll() */
|
||||
qemu_bh_schedule(ctx->notify_dummy_bh);
|
||||
}
|
||||
|
||||
static void notify_dummy_bh(void *opaque)
|
||||
{
|
||||
/* Do nothing, we were invoked just to force the event loop to iterate */
|
||||
}
|
||||
|
||||
static void event_notifier_dummy_cb(EventNotifier *e)
|
||||
{
|
||||
}
|
||||
@ -385,11 +372,9 @@ AioContext *aio_context_new(Error **errp)
|
||||
#endif
|
||||
ctx->thread_pool = NULL;
|
||||
qemu_mutex_init(&ctx->bh_lock);
|
||||
rfifolock_init(&ctx->lock, aio_rfifolock_cb, ctx);
|
||||
qemu_rec_mutex_init(&ctx->lock);
|
||||
timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
|
||||
|
||||
ctx->notify_dummy_bh = aio_bh_new(ctx, notify_dummy_bh, NULL);
|
||||
|
||||
return ctx;
|
||||
fail:
|
||||
g_source_destroy(&ctx->source);
|
||||
@ -408,10 +393,10 @@ void aio_context_unref(AioContext *ctx)
|
||||
|
||||
void aio_context_acquire(AioContext *ctx)
|
||||
{
|
||||
rfifolock_lock(&ctx->lock);
|
||||
qemu_rec_mutex_lock(&ctx->lock);
|
||||
}
|
||||
|
||||
void aio_context_release(AioContext *ctx)
|
||||
{
|
||||
rfifolock_unlock(&ctx->lock);
|
||||
qemu_rec_mutex_unlock(&ctx->lock);
|
||||
}
|
||||
|
6
block.c
6
block.c
@ -2082,7 +2082,7 @@ BlockReopenQueue *bdrv_reopen_queue(BlockReopenQueue *bs_queue,
|
||||
* to all devices.
|
||||
*
|
||||
*/
|
||||
int bdrv_reopen_multiple(BlockReopenQueue *bs_queue, Error **errp)
|
||||
int bdrv_reopen_multiple(AioContext *ctx, BlockReopenQueue *bs_queue, Error **errp)
|
||||
{
|
||||
int ret = -1;
|
||||
BlockReopenQueueEntry *bs_entry, *next;
|
||||
@ -2090,7 +2090,9 @@ int bdrv_reopen_multiple(BlockReopenQueue *bs_queue, Error **errp)
|
||||
|
||||
assert(bs_queue != NULL);
|
||||
|
||||
aio_context_release(ctx);
|
||||
bdrv_drain_all();
|
||||
aio_context_acquire(ctx);
|
||||
|
||||
QSIMPLEQ_FOREACH(bs_entry, bs_queue, entry) {
|
||||
if (bdrv_reopen_prepare(&bs_entry->state, bs_queue, &local_err)) {
|
||||
@ -2131,7 +2133,7 @@ int bdrv_reopen(BlockDriverState *bs, int bdrv_flags, Error **errp)
|
||||
Error *local_err = NULL;
|
||||
BlockReopenQueue *queue = bdrv_reopen_queue(NULL, bs, NULL, bdrv_flags);
|
||||
|
||||
ret = bdrv_reopen_multiple(queue, &local_err);
|
||||
ret = bdrv_reopen_multiple(bdrv_get_aio_context(bs), queue, &local_err);
|
||||
if (local_err != NULL) {
|
||||
error_propagate(errp, local_err);
|
||||
}
|
||||
|
@ -300,6 +300,21 @@ void backup_cow_request_end(CowRequest *req)
|
||||
cow_request_end(req);
|
||||
}
|
||||
|
||||
static void backup_drain(BlockJob *job)
|
||||
{
|
||||
BackupBlockJob *s = container_of(job, BackupBlockJob, common);
|
||||
|
||||
/* Need to keep a reference in case blk_drain triggers execution
|
||||
* of backup_complete...
|
||||
*/
|
||||
if (s->target) {
|
||||
BlockBackend *target = s->target;
|
||||
blk_ref(target);
|
||||
blk_drain(target);
|
||||
blk_unref(target);
|
||||
}
|
||||
}
|
||||
|
||||
static const BlockJobDriver backup_job_driver = {
|
||||
.instance_size = sizeof(BackupBlockJob),
|
||||
.job_type = BLOCK_JOB_TYPE_BACKUP,
|
||||
@ -307,6 +322,7 @@ static const BlockJobDriver backup_job_driver = {
|
||||
.commit = backup_commit,
|
||||
.abort = backup_abort,
|
||||
.attached_aio_context = backup_attached_aio_context,
|
||||
.drain = backup_drain,
|
||||
};
|
||||
|
||||
static BlockErrorAction backup_error_action(BackupBlockJob *job,
|
||||
@ -331,6 +347,7 @@ static void backup_complete(BlockJob *job, void *opaque)
|
||||
BackupCompleteData *data = opaque;
|
||||
|
||||
blk_unref(s->target);
|
||||
s->target = NULL;
|
||||
|
||||
block_job_completed(job, data->ret);
|
||||
g_free(data);
|
||||
|
@ -799,20 +799,25 @@ int coroutine_fn blk_co_preadv(BlockBackend *blk, int64_t offset,
|
||||
BdrvRequestFlags flags)
|
||||
{
|
||||
int ret;
|
||||
BlockDriverState *bs = blk_bs(blk);
|
||||
|
||||
trace_blk_co_preadv(blk, blk_bs(blk), offset, bytes, flags);
|
||||
trace_blk_co_preadv(blk, bs, offset, bytes, flags);
|
||||
|
||||
ret = blk_check_byte_request(blk, offset, bytes);
|
||||
if (ret < 0) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
bdrv_inc_in_flight(bs);
|
||||
|
||||
/* throttling disk I/O */
|
||||
if (blk->public.throttle_state) {
|
||||
throttle_group_co_io_limits_intercept(blk, bytes, false);
|
||||
}
|
||||
|
||||
return bdrv_co_preadv(blk->root, offset, bytes, qiov, flags);
|
||||
ret = bdrv_co_preadv(blk->root, offset, bytes, qiov, flags);
|
||||
bdrv_dec_in_flight(bs);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int coroutine_fn blk_co_pwritev(BlockBackend *blk, int64_t offset,
|
||||
@ -820,14 +825,17 @@ int coroutine_fn blk_co_pwritev(BlockBackend *blk, int64_t offset,
|
||||
BdrvRequestFlags flags)
|
||||
{
|
||||
int ret;
|
||||
BlockDriverState *bs = blk_bs(blk);
|
||||
|
||||
trace_blk_co_pwritev(blk, blk_bs(blk), offset, bytes, flags);
|
||||
trace_blk_co_pwritev(blk, bs, offset, bytes, flags);
|
||||
|
||||
ret = blk_check_byte_request(blk, offset, bytes);
|
||||
if (ret < 0) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
bdrv_inc_in_flight(bs);
|
||||
|
||||
/* throttling disk I/O */
|
||||
if (blk->public.throttle_state) {
|
||||
throttle_group_co_io_limits_intercept(blk, bytes, true);
|
||||
@ -837,7 +845,9 @@ int coroutine_fn blk_co_pwritev(BlockBackend *blk, int64_t offset,
|
||||
flags |= BDRV_REQ_FUA;
|
||||
}
|
||||
|
||||
return bdrv_co_pwritev(blk->root, offset, bytes, qiov, flags);
|
||||
ret = bdrv_co_pwritev(blk->root, offset, bytes, qiov, flags);
|
||||
bdrv_dec_in_flight(bs);
|
||||
return ret;
|
||||
}
|
||||
|
||||
typedef struct BlkRwCo {
|
||||
@ -868,7 +878,6 @@ static int blk_prw(BlockBackend *blk, int64_t offset, uint8_t *buf,
|
||||
int64_t bytes, CoroutineEntry co_entry,
|
||||
BdrvRequestFlags flags)
|
||||
{
|
||||
AioContext *aio_context;
|
||||
QEMUIOVector qiov;
|
||||
struct iovec iov;
|
||||
Coroutine *co;
|
||||
@ -890,11 +899,7 @@ static int blk_prw(BlockBackend *blk, int64_t offset, uint8_t *buf,
|
||||
|
||||
co = qemu_coroutine_create(co_entry, &rwco);
|
||||
qemu_coroutine_enter(co);
|
||||
|
||||
aio_context = blk_get_aio_context(blk);
|
||||
while (rwco.ret == NOT_DONE) {
|
||||
aio_poll(aio_context, true);
|
||||
}
|
||||
BDRV_POLL_WHILE(blk_bs(blk), rwco.ret == NOT_DONE);
|
||||
|
||||
return rwco.ret;
|
||||
}
|
||||
@ -930,6 +935,8 @@ int blk_make_zero(BlockBackend *blk, BdrvRequestFlags flags)
|
||||
static void error_callback_bh(void *opaque)
|
||||
{
|
||||
struct BlockBackendAIOCB *acb = opaque;
|
||||
|
||||
bdrv_dec_in_flight(acb->common.bs);
|
||||
acb->common.cb(acb->common.opaque, acb->ret);
|
||||
qemu_aio_unref(acb);
|
||||
}
|
||||
@ -940,6 +947,7 @@ BlockAIOCB *blk_abort_aio_request(BlockBackend *blk,
|
||||
{
|
||||
struct BlockBackendAIOCB *acb;
|
||||
|
||||
bdrv_inc_in_flight(blk_bs(blk));
|
||||
acb = blk_aio_get(&block_backend_aiocb_info, blk, cb, opaque);
|
||||
acb->blk = blk;
|
||||
acb->ret = ret;
|
||||
@ -962,6 +970,7 @@ static const AIOCBInfo blk_aio_em_aiocb_info = {
|
||||
static void blk_aio_complete(BlkAioEmAIOCB *acb)
|
||||
{
|
||||
if (acb->has_returned) {
|
||||
bdrv_dec_in_flight(acb->common.bs);
|
||||
acb->common.cb(acb->common.opaque, acb->rwco.ret);
|
||||
qemu_aio_unref(acb);
|
||||
}
|
||||
@ -983,6 +992,7 @@ static BlockAIOCB *blk_aio_prwv(BlockBackend *blk, int64_t offset, int bytes,
|
||||
BlkAioEmAIOCB *acb;
|
||||
Coroutine *co;
|
||||
|
||||
bdrv_inc_in_flight(blk_bs(blk));
|
||||
acb = blk_aio_get(&blk_aio_em_aiocb_info, blk, cb, opaque);
|
||||
acb->rwco = (BlkRwCo) {
|
||||
.blk = blk,
|
||||
|
@ -251,7 +251,7 @@ void commit_start(const char *job_id, BlockDriverState *bs,
|
||||
orig_overlay_flags | BDRV_O_RDWR);
|
||||
}
|
||||
if (reopen_queue) {
|
||||
bdrv_reopen_multiple(reopen_queue, &local_err);
|
||||
bdrv_reopen_multiple(bdrv_get_aio_context(bs), reopen_queue, &local_err);
|
||||
if (local_err != NULL) {
|
||||
error_propagate(errp, local_err);
|
||||
block_job_unref(&s->common);
|
||||
|
137
block/io.c
137
block/io.c
@ -143,7 +143,7 @@ bool bdrv_requests_pending(BlockDriverState *bs)
|
||||
{
|
||||
BdrvChild *child;
|
||||
|
||||
if (!QLIST_EMPTY(&bs->tracked_requests)) {
|
||||
if (atomic_read(&bs->in_flight)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -156,16 +156,22 @@ bool bdrv_requests_pending(BlockDriverState *bs)
|
||||
return false;
|
||||
}
|
||||
|
||||
static void bdrv_drain_recurse(BlockDriverState *bs)
|
||||
static bool bdrv_drain_recurse(BlockDriverState *bs)
|
||||
{
|
||||
BdrvChild *child;
|
||||
bool waited;
|
||||
|
||||
waited = BDRV_POLL_WHILE(bs, atomic_read(&bs->in_flight) > 0);
|
||||
|
||||
if (bs->drv && bs->drv->bdrv_drain) {
|
||||
bs->drv->bdrv_drain(bs);
|
||||
}
|
||||
|
||||
QLIST_FOREACH(child, &bs->children, next) {
|
||||
bdrv_drain_recurse(child->bs);
|
||||
waited |= bdrv_drain_recurse(child->bs);
|
||||
}
|
||||
|
||||
return waited;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
@ -174,23 +180,14 @@ typedef struct {
|
||||
bool done;
|
||||
} BdrvCoDrainData;
|
||||
|
||||
static void bdrv_drain_poll(BlockDriverState *bs)
|
||||
{
|
||||
bool busy = true;
|
||||
|
||||
while (busy) {
|
||||
/* Keep iterating */
|
||||
busy = bdrv_requests_pending(bs);
|
||||
busy |= aio_poll(bdrv_get_aio_context(bs), busy);
|
||||
}
|
||||
}
|
||||
|
||||
static void bdrv_co_drain_bh_cb(void *opaque)
|
||||
{
|
||||
BdrvCoDrainData *data = opaque;
|
||||
Coroutine *co = data->co;
|
||||
BlockDriverState *bs = data->bs;
|
||||
|
||||
bdrv_drain_poll(data->bs);
|
||||
bdrv_dec_in_flight(bs);
|
||||
bdrv_drained_begin(bs);
|
||||
data->done = true;
|
||||
qemu_coroutine_enter(co);
|
||||
}
|
||||
@ -209,6 +206,7 @@ static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs)
|
||||
.bs = bs,
|
||||
.done = false,
|
||||
};
|
||||
bdrv_inc_in_flight(bs);
|
||||
aio_bh_schedule_oneshot(bdrv_get_aio_context(bs),
|
||||
bdrv_co_drain_bh_cb, &data);
|
||||
|
||||
@ -220,6 +218,11 @@ static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs)
|
||||
|
||||
void bdrv_drained_begin(BlockDriverState *bs)
|
||||
{
|
||||
if (qemu_in_coroutine()) {
|
||||
bdrv_co_yield_to_drain(bs);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!bs->quiesce_counter++) {
|
||||
aio_disable_external(bdrv_get_aio_context(bs));
|
||||
bdrv_parent_drained_begin(bs);
|
||||
@ -227,11 +230,6 @@ void bdrv_drained_begin(BlockDriverState *bs)
|
||||
|
||||
bdrv_io_unplugged_begin(bs);
|
||||
bdrv_drain_recurse(bs);
|
||||
if (qemu_in_coroutine()) {
|
||||
bdrv_co_yield_to_drain(bs);
|
||||
} else {
|
||||
bdrv_drain_poll(bs);
|
||||
}
|
||||
bdrv_io_unplugged_end(bs);
|
||||
}
|
||||
|
||||
@ -279,7 +277,7 @@ void bdrv_drain(BlockDriverState *bs)
|
||||
void bdrv_drain_all(void)
|
||||
{
|
||||
/* Always run first iteration so any pending completion BHs run */
|
||||
bool busy = true;
|
||||
bool waited = true;
|
||||
BlockDriverState *bs;
|
||||
BdrvNextIterator it;
|
||||
BlockJob *job = NULL;
|
||||
@ -299,7 +297,6 @@ void bdrv_drain_all(void)
|
||||
aio_context_acquire(aio_context);
|
||||
bdrv_parent_drained_begin(bs);
|
||||
bdrv_io_unplugged_begin(bs);
|
||||
bdrv_drain_recurse(bs);
|
||||
aio_context_release(aio_context);
|
||||
|
||||
if (!g_slist_find(aio_ctxs, aio_context)) {
|
||||
@ -313,8 +310,8 @@ void bdrv_drain_all(void)
|
||||
* request completion. Therefore we must keep looping until there was no
|
||||
* more activity rather than simply draining each device independently.
|
||||
*/
|
||||
while (busy) {
|
||||
busy = false;
|
||||
while (waited) {
|
||||
waited = false;
|
||||
|
||||
for (ctx = aio_ctxs; ctx != NULL; ctx = ctx->next) {
|
||||
AioContext *aio_context = ctx->data;
|
||||
@ -322,13 +319,9 @@ void bdrv_drain_all(void)
|
||||
aio_context_acquire(aio_context);
|
||||
for (bs = bdrv_first(&it); bs; bs = bdrv_next(&it)) {
|
||||
if (aio_context == bdrv_get_aio_context(bs)) {
|
||||
if (bdrv_requests_pending(bs)) {
|
||||
busy = true;
|
||||
aio_poll(aio_context, busy);
|
||||
}
|
||||
waited |= bdrv_drain_recurse(bs);
|
||||
}
|
||||
}
|
||||
busy |= aio_poll(aio_context, false);
|
||||
aio_context_release(aio_context);
|
||||
}
|
||||
}
|
||||
@ -476,6 +469,28 @@ static bool tracked_request_overlaps(BdrvTrackedRequest *req,
|
||||
return true;
|
||||
}
|
||||
|
||||
void bdrv_inc_in_flight(BlockDriverState *bs)
|
||||
{
|
||||
atomic_inc(&bs->in_flight);
|
||||
}
|
||||
|
||||
static void dummy_bh_cb(void *opaque)
|
||||
{
|
||||
}
|
||||
|
||||
void bdrv_wakeup(BlockDriverState *bs)
|
||||
{
|
||||
if (bs->wakeup) {
|
||||
aio_bh_schedule_oneshot(qemu_get_aio_context(), dummy_bh_cb, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
void bdrv_dec_in_flight(BlockDriverState *bs)
|
||||
{
|
||||
atomic_dec(&bs->in_flight);
|
||||
bdrv_wakeup(bs);
|
||||
}
|
||||
|
||||
static bool coroutine_fn wait_serialising_requests(BdrvTrackedRequest *self)
|
||||
{
|
||||
BlockDriverState *bs = self->bs;
|
||||
@ -583,13 +598,9 @@ static int bdrv_prwv_co(BdrvChild *child, int64_t offset,
|
||||
/* Fast-path if already in coroutine context */
|
||||
bdrv_rw_co_entry(&rwco);
|
||||
} else {
|
||||
AioContext *aio_context = bdrv_get_aio_context(child->bs);
|
||||
|
||||
co = qemu_coroutine_create(bdrv_rw_co_entry, &rwco);
|
||||
qemu_coroutine_enter(co);
|
||||
while (rwco.ret == NOT_DONE) {
|
||||
aio_poll(aio_context, true);
|
||||
}
|
||||
BDRV_POLL_WHILE(child->bs, rwco.ret == NOT_DONE);
|
||||
}
|
||||
return rwco.ret;
|
||||
}
|
||||
@ -1097,6 +1108,8 @@ int coroutine_fn bdrv_co_preadv(BdrvChild *child,
|
||||
return ret;
|
||||
}
|
||||
|
||||
bdrv_inc_in_flight(bs);
|
||||
|
||||
/* Don't do copy-on-read if we read data before write operation */
|
||||
if (bs->copy_on_read && !(flags & BDRV_REQ_NO_SERIALISING)) {
|
||||
flags |= BDRV_REQ_COPY_ON_READ;
|
||||
@ -1132,6 +1145,7 @@ int coroutine_fn bdrv_co_preadv(BdrvChild *child,
|
||||
use_local_qiov ? &local_qiov : qiov,
|
||||
flags);
|
||||
tracked_request_end(&req);
|
||||
bdrv_dec_in_flight(bs);
|
||||
|
||||
if (use_local_qiov) {
|
||||
qemu_iovec_destroy(&local_qiov);
|
||||
@ -1480,6 +1494,7 @@ int coroutine_fn bdrv_co_pwritev(BdrvChild *child,
|
||||
return ret;
|
||||
}
|
||||
|
||||
bdrv_inc_in_flight(bs);
|
||||
/*
|
||||
* Align write if necessary by performing a read-modify-write cycle.
|
||||
* Pad qiov with the read parts and be sure to have a tracked request not
|
||||
@ -1581,6 +1596,7 @@ fail:
|
||||
qemu_vfree(tail_buf);
|
||||
out:
|
||||
tracked_request_end(&req);
|
||||
bdrv_dec_in_flight(bs);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1705,17 +1721,19 @@ static int64_t coroutine_fn bdrv_co_get_block_status(BlockDriverState *bs,
|
||||
}
|
||||
|
||||
*file = NULL;
|
||||
bdrv_inc_in_flight(bs);
|
||||
ret = bs->drv->bdrv_co_get_block_status(bs, sector_num, nb_sectors, pnum,
|
||||
file);
|
||||
if (ret < 0) {
|
||||
*pnum = 0;
|
||||
return ret;
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (ret & BDRV_BLOCK_RAW) {
|
||||
assert(ret & BDRV_BLOCK_OFFSET_VALID);
|
||||
return bdrv_get_block_status(bs->file->bs, ret >> BDRV_SECTOR_BITS,
|
||||
*pnum, pnum, file);
|
||||
ret = bdrv_get_block_status(bs->file->bs, ret >> BDRV_SECTOR_BITS,
|
||||
*pnum, pnum, file);
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (ret & (BDRV_BLOCK_DATA | BDRV_BLOCK_ZERO)) {
|
||||
@ -1757,6 +1775,8 @@ static int64_t coroutine_fn bdrv_co_get_block_status(BlockDriverState *bs,
|
||||
}
|
||||
}
|
||||
|
||||
out:
|
||||
bdrv_dec_in_flight(bs);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1822,14 +1842,10 @@ int64_t bdrv_get_block_status_above(BlockDriverState *bs,
|
||||
/* Fast-path if already in coroutine context */
|
||||
bdrv_get_block_status_above_co_entry(&data);
|
||||
} else {
|
||||
AioContext *aio_context = bdrv_get_aio_context(bs);
|
||||
|
||||
co = qemu_coroutine_create(bdrv_get_block_status_above_co_entry,
|
||||
&data);
|
||||
qemu_coroutine_enter(co);
|
||||
while (!data.done) {
|
||||
aio_poll(aio_context, true);
|
||||
}
|
||||
BDRV_POLL_WHILE(bs, !data.done);
|
||||
}
|
||||
return data.ret;
|
||||
}
|
||||
@ -2102,6 +2118,7 @@ static const AIOCBInfo bdrv_em_co_aiocb_info = {
|
||||
static void bdrv_co_complete(BlockAIOCBCoroutine *acb)
|
||||
{
|
||||
if (!acb->need_bh) {
|
||||
bdrv_dec_in_flight(acb->common.bs);
|
||||
acb->common.cb(acb->common.opaque, acb->req.error);
|
||||
qemu_aio_unref(acb);
|
||||
}
|
||||
@ -2152,6 +2169,9 @@ static BlockAIOCB *bdrv_co_aio_prw_vector(BdrvChild *child,
|
||||
Coroutine *co;
|
||||
BlockAIOCBCoroutine *acb;
|
||||
|
||||
/* Matched by bdrv_co_complete's bdrv_dec_in_flight. */
|
||||
bdrv_inc_in_flight(child->bs);
|
||||
|
||||
acb = qemu_aio_get(&bdrv_em_co_aiocb_info, child->bs, cb, opaque);
|
||||
acb->child = child;
|
||||
acb->need_bh = true;
|
||||
@ -2185,6 +2205,9 @@ BlockAIOCB *bdrv_aio_flush(BlockDriverState *bs,
|
||||
Coroutine *co;
|
||||
BlockAIOCBCoroutine *acb;
|
||||
|
||||
/* Matched by bdrv_co_complete's bdrv_dec_in_flight. */
|
||||
bdrv_inc_in_flight(bs);
|
||||
|
||||
acb = qemu_aio_get(&bdrv_em_co_aiocb_info, bs, cb, opaque);
|
||||
acb->need_bh = true;
|
||||
acb->req.error = -EINPROGRESS;
|
||||
@ -2244,23 +2267,22 @@ static void coroutine_fn bdrv_flush_co_entry(void *opaque)
|
||||
int coroutine_fn bdrv_co_flush(BlockDriverState *bs)
|
||||
{
|
||||
int ret;
|
||||
BdrvTrackedRequest req;
|
||||
|
||||
if (!bs || !bdrv_is_inserted(bs) || bdrv_is_read_only(bs) ||
|
||||
bdrv_is_sg(bs)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
tracked_request_begin(&req, bs, 0, 0, BDRV_TRACKED_FLUSH);
|
||||
bdrv_inc_in_flight(bs);
|
||||
|
||||
int current_gen = bs->write_gen;
|
||||
|
||||
/* Wait until any previous flushes are completed */
|
||||
while (bs->active_flush_req != NULL) {
|
||||
while (bs->active_flush_req) {
|
||||
qemu_co_queue_wait(&bs->flush_queue);
|
||||
}
|
||||
|
||||
bs->active_flush_req = &req;
|
||||
bs->active_flush_req = true;
|
||||
|
||||
/* Write back all layers by calling one driver function */
|
||||
if (bs->drv->bdrv_co_flush) {
|
||||
@ -2330,11 +2352,11 @@ flush_parent:
|
||||
out:
|
||||
/* Notify any pending flushes that we have completed */
|
||||
bs->flushed_gen = current_gen;
|
||||
bs->active_flush_req = NULL;
|
||||
bs->active_flush_req = false;
|
||||
/* Return value is ignored - it's ok if wait queue is empty */
|
||||
qemu_co_queue_next(&bs->flush_queue);
|
||||
|
||||
tracked_request_end(&req);
|
||||
bdrv_dec_in_flight(bs);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -2350,13 +2372,9 @@ int bdrv_flush(BlockDriverState *bs)
|
||||
/* Fast-path if already in coroutine context */
|
||||
bdrv_flush_co_entry(&flush_co);
|
||||
} else {
|
||||
AioContext *aio_context = bdrv_get_aio_context(bs);
|
||||
|
||||
co = qemu_coroutine_create(bdrv_flush_co_entry, &flush_co);
|
||||
qemu_coroutine_enter(co);
|
||||
while (flush_co.ret == NOT_DONE) {
|
||||
aio_poll(aio_context, true);
|
||||
}
|
||||
BDRV_POLL_WHILE(bs, flush_co.ret == NOT_DONE);
|
||||
}
|
||||
|
||||
return flush_co.ret;
|
||||
@ -2417,6 +2435,7 @@ int coroutine_fn bdrv_co_pdiscard(BlockDriverState *bs, int64_t offset,
|
||||
return 0;
|
||||
}
|
||||
|
||||
bdrv_inc_in_flight(bs);
|
||||
tracked_request_begin(&req, bs, offset, count, BDRV_TRACKED_DISCARD);
|
||||
|
||||
ret = notifier_with_return_list_notify(&bs->before_write_notifiers, &req);
|
||||
@ -2463,6 +2482,7 @@ out:
|
||||
bdrv_set_dirty(bs, req.offset >> BDRV_SECTOR_BITS,
|
||||
req.bytes >> BDRV_SECTOR_BITS);
|
||||
tracked_request_end(&req);
|
||||
bdrv_dec_in_flight(bs);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -2480,13 +2500,9 @@ int bdrv_pdiscard(BlockDriverState *bs, int64_t offset, int count)
|
||||
/* Fast-path if already in coroutine context */
|
||||
bdrv_pdiscard_co_entry(&rwco);
|
||||
} else {
|
||||
AioContext *aio_context = bdrv_get_aio_context(bs);
|
||||
|
||||
co = qemu_coroutine_create(bdrv_pdiscard_co_entry, &rwco);
|
||||
qemu_coroutine_enter(co);
|
||||
while (rwco.ret == NOT_DONE) {
|
||||
aio_poll(aio_context, true);
|
||||
}
|
||||
BDRV_POLL_WHILE(bs, rwco.ret == NOT_DONE);
|
||||
}
|
||||
|
||||
return rwco.ret;
|
||||
@ -2495,13 +2511,12 @@ int bdrv_pdiscard(BlockDriverState *bs, int64_t offset, int count)
|
||||
int bdrv_co_ioctl(BlockDriverState *bs, int req, void *buf)
|
||||
{
|
||||
BlockDriver *drv = bs->drv;
|
||||
BdrvTrackedRequest tracked_req;
|
||||
CoroutineIOCompletion co = {
|
||||
.coroutine = qemu_coroutine_self(),
|
||||
};
|
||||
BlockAIOCB *acb;
|
||||
|
||||
tracked_request_begin(&tracked_req, bs, 0, 0, BDRV_TRACKED_IOCTL);
|
||||
bdrv_inc_in_flight(bs);
|
||||
if (!drv || (!drv->bdrv_aio_ioctl && !drv->bdrv_co_ioctl)) {
|
||||
co.ret = -ENOTSUP;
|
||||
goto out;
|
||||
@ -2518,7 +2533,7 @@ int bdrv_co_ioctl(BlockDriverState *bs, int req, void *buf)
|
||||
qemu_coroutine_yield();
|
||||
}
|
||||
out:
|
||||
tracked_request_end(&tracked_req);
|
||||
bdrv_dec_in_flight(bs);
|
||||
return co.ret;
|
||||
}
|
||||
|
||||
|
@ -469,7 +469,11 @@ static void mirror_free_init(MirrorBlockJob *s)
|
||||
}
|
||||
}
|
||||
|
||||
static void mirror_drain(MirrorBlockJob *s)
|
||||
/* This is also used for the .pause callback. There is no matching
|
||||
* mirror_resume() because mirror_run() will begin iterating again
|
||||
* when the job is resumed.
|
||||
*/
|
||||
static void mirror_wait_for_all_io(MirrorBlockJob *s)
|
||||
{
|
||||
while (s->in_flight > 0) {
|
||||
mirror_wait_for_io(s);
|
||||
@ -528,6 +532,7 @@ static void mirror_exit(BlockJob *job, void *opaque)
|
||||
g_free(s->replaces);
|
||||
bdrv_op_unblock_all(target_bs, s->common.blocker);
|
||||
blk_unref(s->target);
|
||||
s->target = NULL;
|
||||
block_job_completed(&s->common, data->ret);
|
||||
g_free(data);
|
||||
bdrv_drained_end(src);
|
||||
@ -582,7 +587,7 @@ static int coroutine_fn mirror_dirty_init(MirrorBlockJob *s)
|
||||
sector_num += nb_sectors;
|
||||
}
|
||||
|
||||
mirror_drain(s);
|
||||
mirror_wait_for_all_io(s);
|
||||
}
|
||||
|
||||
/* First part, loop on the sectors and initialize the dirty bitmap. */
|
||||
@ -617,6 +622,7 @@ static void coroutine_fn mirror_run(void *opaque)
|
||||
MirrorExitData *data;
|
||||
BlockDriverState *bs = blk_bs(s->common.blk);
|
||||
BlockDriverState *target_bs = blk_bs(s->target);
|
||||
bool need_drain = true;
|
||||
int64_t length;
|
||||
BlockDriverInfo bdi;
|
||||
char backing_filename[2]; /* we only need 2 characters because we are only
|
||||
@ -752,11 +758,26 @@ static void coroutine_fn mirror_run(void *opaque)
|
||||
* source has dirty data to copy!
|
||||
*
|
||||
* Note that I/O can be submitted by the guest while
|
||||
* mirror_populate runs.
|
||||
* mirror_populate runs, so pause it now. Before deciding
|
||||
* whether to switch to target check one last time if I/O has
|
||||
* come in the meanwhile, and if not flush the data to disk.
|
||||
*/
|
||||
trace_mirror_before_drain(s, cnt);
|
||||
bdrv_co_drain(bs);
|
||||
|
||||
bdrv_drained_begin(bs);
|
||||
cnt = bdrv_get_dirty_count(s->dirty_bitmap);
|
||||
if (cnt > 0) {
|
||||
bdrv_drained_end(bs);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* The two disks are in sync. Exit and report successful
|
||||
* completion.
|
||||
*/
|
||||
assert(QLIST_EMPTY(&bs->tracked_requests));
|
||||
s->common.cancelled = false;
|
||||
need_drain = false;
|
||||
break;
|
||||
}
|
||||
|
||||
ret = 0;
|
||||
@ -769,13 +790,6 @@ static void coroutine_fn mirror_run(void *opaque)
|
||||
} else if (!should_complete) {
|
||||
delay_ns = (s->in_flight == 0 && cnt == 0 ? SLICE_TIME : 0);
|
||||
block_job_sleep_ns(&s->common, QEMU_CLOCK_REALTIME, delay_ns);
|
||||
} else if (cnt == 0) {
|
||||
/* The two disks are in sync. Exit and report successful
|
||||
* completion.
|
||||
*/
|
||||
assert(QLIST_EMPTY(&bs->tracked_requests));
|
||||
s->common.cancelled = false;
|
||||
break;
|
||||
}
|
||||
s->last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
|
||||
}
|
||||
@ -787,7 +801,8 @@ immediate_exit:
|
||||
* the target is a copy of the source.
|
||||
*/
|
||||
assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common)));
|
||||
mirror_drain(s);
|
||||
assert(need_drain);
|
||||
mirror_wait_for_all_io(s);
|
||||
}
|
||||
|
||||
assert(s->in_flight == 0);
|
||||
@ -799,9 +814,10 @@ immediate_exit:
|
||||
|
||||
data = g_malloc(sizeof(*data));
|
||||
data->ret = ret;
|
||||
/* Before we switch to target in mirror_exit, make sure data doesn't
|
||||
* change. */
|
||||
bdrv_drained_begin(bs);
|
||||
|
||||
if (need_drain) {
|
||||
bdrv_drained_begin(bs);
|
||||
}
|
||||
block_job_defer_to_main_loop(&s->common, mirror_exit, data);
|
||||
}
|
||||
|
||||
@ -872,14 +888,11 @@ static void mirror_complete(BlockJob *job, Error **errp)
|
||||
block_job_enter(&s->common);
|
||||
}
|
||||
|
||||
/* There is no matching mirror_resume() because mirror_run() will begin
|
||||
* iterating again when the job is resumed.
|
||||
*/
|
||||
static void coroutine_fn mirror_pause(BlockJob *job)
|
||||
static void mirror_pause(BlockJob *job)
|
||||
{
|
||||
MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
|
||||
|
||||
mirror_drain(s);
|
||||
mirror_wait_for_all_io(s);
|
||||
}
|
||||
|
||||
static void mirror_attached_aio_context(BlockJob *job, AioContext *new_context)
|
||||
@ -889,6 +902,21 @@ static void mirror_attached_aio_context(BlockJob *job, AioContext *new_context)
|
||||
blk_set_aio_context(s->target, new_context);
|
||||
}
|
||||
|
||||
static void mirror_drain(BlockJob *job)
|
||||
{
|
||||
MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
|
||||
|
||||
/* Need to keep a reference in case blk_drain triggers execution
|
||||
* of mirror_complete...
|
||||
*/
|
||||
if (s->target) {
|
||||
BlockBackend *target = s->target;
|
||||
blk_ref(target);
|
||||
blk_drain(target);
|
||||
blk_unref(target);
|
||||
}
|
||||
}
|
||||
|
||||
static const BlockJobDriver mirror_job_driver = {
|
||||
.instance_size = sizeof(MirrorBlockJob),
|
||||
.job_type = BLOCK_JOB_TYPE_MIRROR,
|
||||
@ -896,6 +924,7 @@ static const BlockJobDriver mirror_job_driver = {
|
||||
.complete = mirror_complete,
|
||||
.pause = mirror_pause,
|
||||
.attached_aio_context = mirror_attached_aio_context,
|
||||
.drain = mirror_drain,
|
||||
};
|
||||
|
||||
static const BlockJobDriver commit_active_job_driver = {
|
||||
@ -905,6 +934,7 @@ static const BlockJobDriver commit_active_job_driver = {
|
||||
.complete = mirror_complete,
|
||||
.pause = mirror_pause,
|
||||
.attached_aio_context = mirror_attached_aio_context,
|
||||
.drain = mirror_drain,
|
||||
};
|
||||
|
||||
static void mirror_start_job(const char *job_id, BlockDriverState *bs,
|
||||
|
55
block/nfs.c
55
block/nfs.c
@ -52,6 +52,7 @@ typedef struct NFSClient {
|
||||
} NFSClient;
|
||||
|
||||
typedef struct NFSRPC {
|
||||
BlockDriverState *bs;
|
||||
int ret;
|
||||
int complete;
|
||||
QEMUIOVector *iov;
|
||||
@ -90,11 +91,12 @@ static void nfs_process_write(void *arg)
|
||||
nfs_set_events(client);
|
||||
}
|
||||
|
||||
static void nfs_co_init_task(NFSClient *client, NFSRPC *task)
|
||||
static void nfs_co_init_task(BlockDriverState *bs, NFSRPC *task)
|
||||
{
|
||||
*task = (NFSRPC) {
|
||||
.co = qemu_coroutine_self(),
|
||||
.client = client,
|
||||
.bs = bs,
|
||||
.client = bs->opaque,
|
||||
};
|
||||
}
|
||||
|
||||
@ -111,6 +113,7 @@ nfs_co_generic_cb(int ret, struct nfs_context *nfs, void *data,
|
||||
{
|
||||
NFSRPC *task = private_data;
|
||||
task->ret = ret;
|
||||
assert(!task->st);
|
||||
if (task->ret > 0 && task->iov) {
|
||||
if (task->ret <= task->iov->size) {
|
||||
qemu_iovec_from_buf(task->iov, 0, data, task->ret);
|
||||
@ -118,18 +121,11 @@ nfs_co_generic_cb(int ret, struct nfs_context *nfs, void *data,
|
||||
task->ret = -EIO;
|
||||
}
|
||||
}
|
||||
if (task->ret == 0 && task->st) {
|
||||
memcpy(task->st, data, sizeof(struct stat));
|
||||
}
|
||||
if (task->ret < 0) {
|
||||
error_report("NFS Error: %s", nfs_get_error(nfs));
|
||||
}
|
||||
if (task->co) {
|
||||
aio_bh_schedule_oneshot(task->client->aio_context,
|
||||
nfs_co_generic_bh_cb, task);
|
||||
} else {
|
||||
task->complete = 1;
|
||||
}
|
||||
aio_bh_schedule_oneshot(task->client->aio_context,
|
||||
nfs_co_generic_bh_cb, task);
|
||||
}
|
||||
|
||||
static int coroutine_fn nfs_co_readv(BlockDriverState *bs,
|
||||
@ -139,7 +135,7 @@ static int coroutine_fn nfs_co_readv(BlockDriverState *bs,
|
||||
NFSClient *client = bs->opaque;
|
||||
NFSRPC task;
|
||||
|
||||
nfs_co_init_task(client, &task);
|
||||
nfs_co_init_task(bs, &task);
|
||||
task.iov = iov;
|
||||
|
||||
if (nfs_pread_async(client->context, client->fh,
|
||||
@ -149,8 +145,8 @@ static int coroutine_fn nfs_co_readv(BlockDriverState *bs,
|
||||
return -ENOMEM;
|
||||
}
|
||||
|
||||
nfs_set_events(client);
|
||||
while (!task.complete) {
|
||||
nfs_set_events(client);
|
||||
qemu_coroutine_yield();
|
||||
}
|
||||
|
||||
@ -174,7 +170,7 @@ static int coroutine_fn nfs_co_writev(BlockDriverState *bs,
|
||||
NFSRPC task;
|
||||
char *buf = NULL;
|
||||
|
||||
nfs_co_init_task(client, &task);
|
||||
nfs_co_init_task(bs, &task);
|
||||
|
||||
buf = g_try_malloc(nb_sectors * BDRV_SECTOR_SIZE);
|
||||
if (nb_sectors && buf == NULL) {
|
||||
@ -191,8 +187,8 @@ static int coroutine_fn nfs_co_writev(BlockDriverState *bs,
|
||||
return -ENOMEM;
|
||||
}
|
||||
|
||||
nfs_set_events(client);
|
||||
while (!task.complete) {
|
||||
nfs_set_events(client);
|
||||
qemu_coroutine_yield();
|
||||
}
|
||||
|
||||
@ -210,15 +206,15 @@ static int coroutine_fn nfs_co_flush(BlockDriverState *bs)
|
||||
NFSClient *client = bs->opaque;
|
||||
NFSRPC task;
|
||||
|
||||
nfs_co_init_task(client, &task);
|
||||
nfs_co_init_task(bs, &task);
|
||||
|
||||
if (nfs_fsync_async(client->context, client->fh, nfs_co_generic_cb,
|
||||
&task) != 0) {
|
||||
return -ENOMEM;
|
||||
}
|
||||
|
||||
nfs_set_events(client);
|
||||
while (!task.complete) {
|
||||
nfs_set_events(client);
|
||||
qemu_coroutine_yield();
|
||||
}
|
||||
|
||||
@ -496,6 +492,22 @@ static int nfs_has_zero_init(BlockDriverState *bs)
|
||||
return client->has_zero_init;
|
||||
}
|
||||
|
||||
static void
|
||||
nfs_get_allocated_file_size_cb(int ret, struct nfs_context *nfs, void *data,
|
||||
void *private_data)
|
||||
{
|
||||
NFSRPC *task = private_data;
|
||||
task->ret = ret;
|
||||
if (task->ret == 0) {
|
||||
memcpy(task->st, data, sizeof(struct stat));
|
||||
}
|
||||
if (task->ret < 0) {
|
||||
error_report("NFS Error: %s", nfs_get_error(nfs));
|
||||
}
|
||||
task->complete = 1;
|
||||
bdrv_wakeup(task->bs);
|
||||
}
|
||||
|
||||
static int64_t nfs_get_allocated_file_size(BlockDriverState *bs)
|
||||
{
|
||||
NFSClient *client = bs->opaque;
|
||||
@ -507,16 +519,15 @@ static int64_t nfs_get_allocated_file_size(BlockDriverState *bs)
|
||||
return client->st_blocks * 512;
|
||||
}
|
||||
|
||||
task.bs = bs;
|
||||
task.st = &st;
|
||||
if (nfs_fstat_async(client->context, client->fh, nfs_co_generic_cb,
|
||||
if (nfs_fstat_async(client->context, client->fh, nfs_get_allocated_file_size_cb,
|
||||
&task) != 0) {
|
||||
return -ENOMEM;
|
||||
}
|
||||
|
||||
while (!task.complete) {
|
||||
nfs_set_events(client);
|
||||
aio_poll(client->aio_context, true);
|
||||
}
|
||||
nfs_set_events(client);
|
||||
BDRV_POLL_WHILE(bs, !task.complete);
|
||||
|
||||
return (task.ret < 0 ? task.ret : st.st_blocks * 512);
|
||||
}
|
||||
|
@ -174,9 +174,7 @@ int qed_read_l1_table_sync(BDRVQEDState *s)
|
||||
|
||||
qed_read_table(s, s->header.l1_table_offset,
|
||||
s->l1_table, qed_sync_cb, &ret);
|
||||
while (ret == -EINPROGRESS) {
|
||||
aio_poll(bdrv_get_aio_context(s->bs), true);
|
||||
}
|
||||
BDRV_POLL_WHILE(s->bs, ret == -EINPROGRESS);
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -195,9 +193,7 @@ int qed_write_l1_table_sync(BDRVQEDState *s, unsigned int index,
|
||||
int ret = -EINPROGRESS;
|
||||
|
||||
qed_write_l1_table(s, index, n, qed_sync_cb, &ret);
|
||||
while (ret == -EINPROGRESS) {
|
||||
aio_poll(bdrv_get_aio_context(s->bs), true);
|
||||
}
|
||||
BDRV_POLL_WHILE(s->bs, ret == -EINPROGRESS);
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -268,9 +264,7 @@ int qed_read_l2_table_sync(BDRVQEDState *s, QEDRequest *request, uint64_t offset
|
||||
int ret = -EINPROGRESS;
|
||||
|
||||
qed_read_l2_table(s, request, offset, qed_sync_cb, &ret);
|
||||
while (ret == -EINPROGRESS) {
|
||||
aio_poll(bdrv_get_aio_context(s->bs), true);
|
||||
}
|
||||
BDRV_POLL_WHILE(s->bs, ret == -EINPROGRESS);
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -290,9 +284,7 @@ int qed_write_l2_table_sync(BDRVQEDState *s, QEDRequest *request,
|
||||
int ret = -EINPROGRESS;
|
||||
|
||||
qed_write_l2_table(s, request, index, n, flush, qed_sync_cb, &ret);
|
||||
while (ret == -EINPROGRESS) {
|
||||
aio_poll(bdrv_get_aio_context(s->bs), true);
|
||||
}
|
||||
BDRV_POLL_WHILE(s->bs, ret == -EINPROGRESS);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
16
block/qed.c
16
block/qed.c
@ -336,7 +336,7 @@ static void qed_need_check_timer_cb(void *opaque)
|
||||
qed_plug_allocating_write_reqs(s);
|
||||
|
||||
/* Ensure writes are on disk before clearing flag */
|
||||
bdrv_aio_flush(s->bs, qed_clear_need_check, s);
|
||||
bdrv_aio_flush(s->bs->file->bs, qed_clear_need_check, s);
|
||||
}
|
||||
|
||||
static void qed_start_need_check_timer(BDRVQEDState *s)
|
||||
@ -378,6 +378,19 @@ static void bdrv_qed_attach_aio_context(BlockDriverState *bs,
|
||||
}
|
||||
}
|
||||
|
||||
static void bdrv_qed_drain(BlockDriverState *bs)
|
||||
{
|
||||
BDRVQEDState *s = bs->opaque;
|
||||
|
||||
/* Fire the timer immediately in order to start doing I/O as soon as the
|
||||
* header is flushed.
|
||||
*/
|
||||
if (s->need_check_timer && timer_pending(s->need_check_timer)) {
|
||||
qed_cancel_need_check_timer(s);
|
||||
qed_need_check_timer_cb(s);
|
||||
}
|
||||
}
|
||||
|
||||
static int bdrv_qed_open(BlockDriverState *bs, QDict *options, int flags,
|
||||
Error **errp)
|
||||
{
|
||||
@ -1668,6 +1681,7 @@ static BlockDriver bdrv_qed = {
|
||||
.bdrv_check = bdrv_qed_check,
|
||||
.bdrv_detach_aio_context = bdrv_qed_detach_aio_context,
|
||||
.bdrv_attach_aio_context = bdrv_qed_attach_aio_context,
|
||||
.bdrv_drain = bdrv_qed_drain,
|
||||
};
|
||||
|
||||
static void bdrv_qed_init(void)
|
||||
|
@ -138,6 +138,9 @@ static void replication_close(BlockDriverState *bs)
|
||||
if (s->replication_state == BLOCK_REPLICATION_RUNNING) {
|
||||
replication_stop(s->rs, false, NULL);
|
||||
}
|
||||
if (s->replication_state == BLOCK_REPLICATION_FAILOVER) {
|
||||
block_job_cancel_sync(s->active_disk->bs->job);
|
||||
}
|
||||
|
||||
if (s->mode == REPLICATION_MODE_SECONDARY) {
|
||||
g_free(s->top_id);
|
||||
@ -319,9 +322,10 @@ static void secondary_do_checkpoint(BDRVReplicationState *s, Error **errp)
|
||||
}
|
||||
}
|
||||
|
||||
static void reopen_backing_file(BDRVReplicationState *s, bool writable,
|
||||
static void reopen_backing_file(BlockDriverState *bs, bool writable,
|
||||
Error **errp)
|
||||
{
|
||||
BDRVReplicationState *s = bs->opaque;
|
||||
BlockReopenQueue *reopen_queue = NULL;
|
||||
int orig_hidden_flags, orig_secondary_flags;
|
||||
int new_hidden_flags, new_secondary_flags;
|
||||
@ -356,13 +360,15 @@ static void reopen_backing_file(BDRVReplicationState *s, bool writable,
|
||||
}
|
||||
|
||||
if (reopen_queue) {
|
||||
bdrv_reopen_multiple(reopen_queue, &local_err);
|
||||
bdrv_reopen_multiple(bdrv_get_aio_context(bs),
|
||||
reopen_queue, &local_err);
|
||||
error_propagate(errp, local_err);
|
||||
}
|
||||
}
|
||||
|
||||
static void backup_job_cleanup(BDRVReplicationState *s)
|
||||
static void backup_job_cleanup(BlockDriverState *bs)
|
||||
{
|
||||
BDRVReplicationState *s = bs->opaque;
|
||||
BlockDriverState *top_bs;
|
||||
|
||||
top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
|
||||
@ -371,19 +377,20 @@ static void backup_job_cleanup(BDRVReplicationState *s)
|
||||
}
|
||||
bdrv_op_unblock_all(top_bs, s->blocker);
|
||||
error_free(s->blocker);
|
||||
reopen_backing_file(s, false, NULL);
|
||||
reopen_backing_file(bs, false, NULL);
|
||||
}
|
||||
|
||||
static void backup_job_completed(void *opaque, int ret)
|
||||
{
|
||||
BDRVReplicationState *s = opaque;
|
||||
BlockDriverState *bs = opaque;
|
||||
BDRVReplicationState *s = bs->opaque;
|
||||
|
||||
if (s->replication_state != BLOCK_REPLICATION_FAILOVER) {
|
||||
/* The backup job is cancelled unexpectedly */
|
||||
s->error = -EIO;
|
||||
}
|
||||
|
||||
backup_job_cleanup(s);
|
||||
backup_job_cleanup(bs);
|
||||
}
|
||||
|
||||
static bool check_top_bs(BlockDriverState *top_bs, BlockDriverState *bs)
|
||||
@ -479,7 +486,7 @@ static void replication_start(ReplicationState *rs, ReplicationMode mode,
|
||||
}
|
||||
|
||||
/* reopen the backing file in r/w mode */
|
||||
reopen_backing_file(s, true, &local_err);
|
||||
reopen_backing_file(bs, true, &local_err);
|
||||
if (local_err) {
|
||||
error_propagate(errp, local_err);
|
||||
aio_context_release(aio_context);
|
||||
@ -494,7 +501,7 @@ static void replication_start(ReplicationState *rs, ReplicationMode mode,
|
||||
if (!top_bs || !bdrv_is_root_node(top_bs) ||
|
||||
!check_top_bs(top_bs, bs)) {
|
||||
error_setg(errp, "No top_bs or it is invalid");
|
||||
reopen_backing_file(s, false, NULL);
|
||||
reopen_backing_file(bs, false, NULL);
|
||||
aio_context_release(aio_context);
|
||||
return;
|
||||
}
|
||||
@ -504,10 +511,10 @@ static void replication_start(ReplicationState *rs, ReplicationMode mode,
|
||||
backup_start("replication-backup", s->secondary_disk->bs,
|
||||
s->hidden_disk->bs, 0, MIRROR_SYNC_MODE_NONE, NULL, false,
|
||||
BLOCKDEV_ON_ERROR_REPORT, BLOCKDEV_ON_ERROR_REPORT,
|
||||
backup_job_completed, s, NULL, &local_err);
|
||||
backup_job_completed, bs, NULL, &local_err);
|
||||
if (local_err) {
|
||||
error_propagate(errp, local_err);
|
||||
backup_job_cleanup(s);
|
||||
backup_job_cleanup(bs);
|
||||
aio_context_release(aio_context);
|
||||
return;
|
||||
}
|
||||
|
@ -641,6 +641,7 @@ static void restart_co_req(void *opaque)
|
||||
|
||||
typedef struct SheepdogReqCo {
|
||||
int sockfd;
|
||||
BlockDriverState *bs;
|
||||
AioContext *aio_context;
|
||||
SheepdogReq *hdr;
|
||||
void *data;
|
||||
@ -701,6 +702,9 @@ out:
|
||||
|
||||
srco->ret = ret;
|
||||
srco->finished = true;
|
||||
if (srco->bs) {
|
||||
bdrv_wakeup(srco->bs);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@ -708,13 +712,14 @@ out:
|
||||
*
|
||||
* Return 0 on success, -errno in case of error.
|
||||
*/
|
||||
static int do_req(int sockfd, AioContext *aio_context, SheepdogReq *hdr,
|
||||
static int do_req(int sockfd, BlockDriverState *bs, SheepdogReq *hdr,
|
||||
void *data, unsigned int *wlen, unsigned int *rlen)
|
||||
{
|
||||
Coroutine *co;
|
||||
SheepdogReqCo srco = {
|
||||
.sockfd = sockfd,
|
||||
.aio_context = aio_context,
|
||||
.aio_context = bs ? bdrv_get_aio_context(bs) : qemu_get_aio_context(),
|
||||
.bs = bs,
|
||||
.hdr = hdr,
|
||||
.data = data,
|
||||
.wlen = wlen,
|
||||
@ -727,9 +732,14 @@ static int do_req(int sockfd, AioContext *aio_context, SheepdogReq *hdr,
|
||||
do_co_req(&srco);
|
||||
} else {
|
||||
co = qemu_coroutine_create(do_co_req, &srco);
|
||||
qemu_coroutine_enter(co);
|
||||
while (!srco.finished) {
|
||||
aio_poll(aio_context, true);
|
||||
if (bs) {
|
||||
qemu_coroutine_enter(co);
|
||||
BDRV_POLL_WHILE(bs, !srco.finished);
|
||||
} else {
|
||||
qemu_coroutine_enter(co);
|
||||
while (!srco.finished) {
|
||||
aio_poll(qemu_get_aio_context(), true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1125,7 +1135,7 @@ static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
|
||||
hdr.snapid = snapid;
|
||||
hdr.flags = SD_FLAG_CMD_WRITE;
|
||||
|
||||
ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
|
||||
ret = do_req(fd, s->bs, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
|
||||
if (ret) {
|
||||
error_setg_errno(errp, -ret, "cannot get vdi info");
|
||||
goto out;
|
||||
@ -1240,7 +1250,7 @@ out:
|
||||
qemu_co_mutex_unlock(&s->lock);
|
||||
}
|
||||
|
||||
static int read_write_object(int fd, AioContext *aio_context, char *buf,
|
||||
static int read_write_object(int fd, BlockDriverState *bs, char *buf,
|
||||
uint64_t oid, uint8_t copies,
|
||||
unsigned int datalen, uint64_t offset,
|
||||
bool write, bool create, uint32_t cache_flags)
|
||||
@ -1274,7 +1284,7 @@ static int read_write_object(int fd, AioContext *aio_context, char *buf,
|
||||
hdr.offset = offset;
|
||||
hdr.copies = copies;
|
||||
|
||||
ret = do_req(fd, aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
|
||||
ret = do_req(fd, bs, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
|
||||
if (ret) {
|
||||
error_report("failed to send a request to the sheep");
|
||||
return ret;
|
||||
@ -1289,22 +1299,22 @@ static int read_write_object(int fd, AioContext *aio_context, char *buf,
|
||||
}
|
||||
}
|
||||
|
||||
static int read_object(int fd, AioContext *aio_context, char *buf,
|
||||
static int read_object(int fd, BlockDriverState *bs, char *buf,
|
||||
uint64_t oid, uint8_t copies,
|
||||
unsigned int datalen, uint64_t offset,
|
||||
uint32_t cache_flags)
|
||||
{
|
||||
return read_write_object(fd, aio_context, buf, oid, copies,
|
||||
return read_write_object(fd, bs, buf, oid, copies,
|
||||
datalen, offset, false,
|
||||
false, cache_flags);
|
||||
}
|
||||
|
||||
static int write_object(int fd, AioContext *aio_context, char *buf,
|
||||
static int write_object(int fd, BlockDriverState *bs, char *buf,
|
||||
uint64_t oid, uint8_t copies,
|
||||
unsigned int datalen, uint64_t offset, bool create,
|
||||
uint32_t cache_flags)
|
||||
{
|
||||
return read_write_object(fd, aio_context, buf, oid, copies,
|
||||
return read_write_object(fd, bs, buf, oid, copies,
|
||||
datalen, offset, true,
|
||||
create, cache_flags);
|
||||
}
|
||||
@ -1331,7 +1341,7 @@ static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
|
||||
goto out;
|
||||
}
|
||||
|
||||
ret = read_object(fd, s->aio_context, (char *)inode, vid_to_vdi_oid(vid),
|
||||
ret = read_object(fd, s->bs, (char *)inode, vid_to_vdi_oid(vid),
|
||||
s->inode.nr_copies, SD_INODE_HEADER_SIZE, 0,
|
||||
s->cache_flags);
|
||||
if (ret < 0) {
|
||||
@ -1489,7 +1499,7 @@ static int sd_open(BlockDriverState *bs, QDict *options, int flags,
|
||||
}
|
||||
|
||||
buf = g_malloc(SD_INODE_SIZE);
|
||||
ret = read_object(fd, s->aio_context, buf, vid_to_vdi_oid(vid),
|
||||
ret = read_object(fd, s->bs, buf, vid_to_vdi_oid(vid),
|
||||
0, SD_INODE_SIZE, 0, s->cache_flags);
|
||||
|
||||
closesocket(fd);
|
||||
@ -1618,7 +1628,7 @@ static int do_sd_create(BDRVSheepdogState *s, uint32_t *vdi_id, int snapshot,
|
||||
hdr.copies = s->inode.nr_copies;
|
||||
hdr.block_size_shift = s->inode.block_size_shift;
|
||||
|
||||
ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
|
||||
ret = do_req(fd, NULL, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
|
||||
|
||||
closesocket(fd);
|
||||
|
||||
@ -1886,7 +1896,7 @@ static int sd_create(const char *filename, QemuOpts *opts,
|
||||
hdr.opcode = SD_OP_GET_CLUSTER_DEFAULT;
|
||||
hdr.proto_ver = SD_PROTO_VER;
|
||||
|
||||
ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
|
||||
ret = do_req(fd, NULL, (SheepdogReq *)&hdr,
|
||||
NULL, &wlen, &rlen);
|
||||
closesocket(fd);
|
||||
if (ret) {
|
||||
@ -1951,7 +1961,7 @@ static void sd_close(BlockDriverState *bs)
|
||||
hdr.data_length = wlen;
|
||||
hdr.flags = SD_FLAG_CMD_WRITE;
|
||||
|
||||
ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
|
||||
ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
|
||||
s->name, &wlen, &rlen);
|
||||
|
||||
closesocket(fd);
|
||||
@ -2000,7 +2010,7 @@ static int sd_truncate(BlockDriverState *bs, int64_t offset)
|
||||
/* we don't need to update entire object */
|
||||
datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
|
||||
s->inode.vdi_size = offset;
|
||||
ret = write_object(fd, s->aio_context, (char *)&s->inode,
|
||||
ret = write_object(fd, s->bs, (char *)&s->inode,
|
||||
vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
|
||||
datalen, 0, false, s->cache_flags);
|
||||
close(fd);
|
||||
@ -2070,7 +2080,7 @@ static bool sd_delete(BDRVSheepdogState *s)
|
||||
return false;
|
||||
}
|
||||
|
||||
ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
|
||||
ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
|
||||
s->name, &wlen, &rlen);
|
||||
closesocket(fd);
|
||||
if (ret) {
|
||||
@ -2126,7 +2136,7 @@ static int sd_create_branch(BDRVSheepdogState *s)
|
||||
goto out;
|
||||
}
|
||||
|
||||
ret = read_object(fd, s->aio_context, buf, vid_to_vdi_oid(vid),
|
||||
ret = read_object(fd, s->bs, buf, vid_to_vdi_oid(vid),
|
||||
s->inode.nr_copies, SD_INODE_SIZE, 0, s->cache_flags);
|
||||
|
||||
closesocket(fd);
|
||||
@ -2411,7 +2421,7 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
ret = write_object(fd, s->aio_context, (char *)&s->inode,
|
||||
ret = write_object(fd, s->bs, (char *)&s->inode,
|
||||
vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
|
||||
datalen, 0, false, s->cache_flags);
|
||||
if (ret < 0) {
|
||||
@ -2426,7 +2436,7 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
ret = read_object(fd, s->aio_context, (char *)inode,
|
||||
ret = read_object(fd, s->bs, (char *)inode,
|
||||
vid_to_vdi_oid(new_vid), s->inode.nr_copies, datalen, 0,
|
||||
s->cache_flags);
|
||||
|
||||
@ -2528,7 +2538,7 @@ static bool remove_objects(BDRVSheepdogState *s)
|
||||
i++;
|
||||
}
|
||||
|
||||
ret = write_object(fd, s->aio_context,
|
||||
ret = write_object(fd, s->bs,
|
||||
(char *)&inode->data_vdi_id[start_idx],
|
||||
vid_to_vdi_oid(s->inode.vdi_id), inode->nr_copies,
|
||||
(i - start_idx) * sizeof(uint32_t),
|
||||
@ -2600,7 +2610,7 @@ static int sd_snapshot_delete(BlockDriverState *bs,
|
||||
return -1;
|
||||
}
|
||||
|
||||
ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
|
||||
ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
|
||||
buf, &wlen, &rlen);
|
||||
closesocket(fd);
|
||||
if (ret) {
|
||||
@ -2652,8 +2662,7 @@ static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
|
||||
req.opcode = SD_OP_READ_VDIS;
|
||||
req.data_length = max;
|
||||
|
||||
ret = do_req(fd, s->aio_context, &req,
|
||||
vdi_inuse, &wlen, &rlen);
|
||||
ret = do_req(fd, s->bs, &req, vdi_inuse, &wlen, &rlen);
|
||||
|
||||
closesocket(fd);
|
||||
if (ret) {
|
||||
@ -2679,7 +2688,7 @@ static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
|
||||
}
|
||||
|
||||
/* we don't need to read entire object */
|
||||
ret = read_object(fd, s->aio_context, (char *)&inode,
|
||||
ret = read_object(fd, s->bs, (char *)&inode,
|
||||
vid_to_vdi_oid(vid),
|
||||
0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0,
|
||||
s->cache_flags);
|
||||
@ -2745,11 +2754,11 @@ static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
|
||||
|
||||
create = (offset == 0);
|
||||
if (load) {
|
||||
ret = read_object(fd, s->aio_context, (char *)data, vmstate_oid,
|
||||
ret = read_object(fd, s->bs, (char *)data, vmstate_oid,
|
||||
s->inode.nr_copies, data_len, offset,
|
||||
s->cache_flags);
|
||||
} else {
|
||||
ret = write_object(fd, s->aio_context, (char *)data, vmstate_oid,
|
||||
ret = write_object(fd, s->bs, (char *)data, vmstate_oid,
|
||||
s->inode.nr_copies, data_len, offset, create,
|
||||
s->cache_flags);
|
||||
}
|
||||
|
37
blockjob.c
37
blockjob.c
@ -74,17 +74,6 @@ BlockJob *block_job_get(const char *id)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Normally the job runs in its BlockBackend's AioContext. The exception is
|
||||
* block_job_defer_to_main_loop() where it runs in the QEMU main loop. Code
|
||||
* that supports both cases uses this helper function.
|
||||
*/
|
||||
static AioContext *block_job_get_aio_context(BlockJob *job)
|
||||
{
|
||||
return job->deferred_to_main_loop ?
|
||||
qemu_get_aio_context() :
|
||||
blk_get_aio_context(job->blk);
|
||||
}
|
||||
|
||||
static void block_job_attached_aio_context(AioContext *new_context,
|
||||
void *opaque)
|
||||
{
|
||||
@ -97,6 +86,17 @@ static void block_job_attached_aio_context(AioContext *new_context,
|
||||
block_job_resume(job);
|
||||
}
|
||||
|
||||
static void block_job_drain(BlockJob *job)
|
||||
{
|
||||
/* If job is !job->busy this kicks it into the next pause point. */
|
||||
block_job_enter(job);
|
||||
|
||||
blk_drain(job->blk);
|
||||
if (job->driver->drain) {
|
||||
job->driver->drain(job);
|
||||
}
|
||||
}
|
||||
|
||||
static void block_job_detach_aio_context(void *opaque)
|
||||
{
|
||||
BlockJob *job = opaque;
|
||||
@ -106,12 +106,8 @@ static void block_job_detach_aio_context(void *opaque)
|
||||
|
||||
block_job_pause(job);
|
||||
|
||||
if (!job->paused) {
|
||||
/* If job is !job->busy this kicks it into the next pause point. */
|
||||
block_job_enter(job);
|
||||
}
|
||||
while (!job->paused && !job->completed) {
|
||||
aio_poll(block_job_get_aio_context(job), true);
|
||||
block_job_drain(job);
|
||||
}
|
||||
|
||||
block_job_unref(job);
|
||||
@ -413,14 +409,21 @@ static int block_job_finish_sync(BlockJob *job,
|
||||
assert(blk_bs(job->blk)->job == job);
|
||||
|
||||
block_job_ref(job);
|
||||
|
||||
finish(job, &local_err);
|
||||
if (local_err) {
|
||||
error_propagate(errp, local_err);
|
||||
block_job_unref(job);
|
||||
return -EBUSY;
|
||||
}
|
||||
/* block_job_drain calls block_job_enter, and it should be enough to
|
||||
* induce progress until the job completes or moves to the main thread.
|
||||
*/
|
||||
while (!job->deferred_to_main_loop && !job->completed) {
|
||||
block_job_drain(job);
|
||||
}
|
||||
while (!job->completed) {
|
||||
aio_poll(block_job_get_aio_context(job), true);
|
||||
aio_poll(qemu_get_aio_context(), true);
|
||||
}
|
||||
ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
|
||||
block_job_unref(job);
|
||||
|
@ -105,13 +105,10 @@ a BH in the target AioContext beforehand and then call qemu_bh_schedule(). No
|
||||
acquire/release or locking is needed for the qemu_bh_schedule() call. But be
|
||||
sure to acquire the AioContext for aio_bh_new() if necessary.
|
||||
|
||||
The relationship between AioContext and the block layer
|
||||
-------------------------------------------------------
|
||||
The AioContext originates from the QEMU block layer because it provides a
|
||||
scoped way of running event loop iterations until all work is done. This
|
||||
feature is used to complete all in-flight block I/O requests (see
|
||||
bdrv_drain_all()). Nowadays AioContext is a generic event loop that can be
|
||||
used by any QEMU subsystem.
|
||||
AioContext and the block layer
|
||||
------------------------------
|
||||
The AioContext originates from the QEMU block layer, even though nowadays
|
||||
AioContext is a generic event loop that can be used by any QEMU subsystem.
|
||||
|
||||
The block layer has support for AioContext integrated. Each BlockDriverState
|
||||
is associated with an AioContext using bdrv_set_aio_context() and
|
||||
@ -122,13 +119,22 @@ Block layer code must therefore expect to run in an IOThread and avoid using
|
||||
old APIs that implicitly use the main loop. See the "How to program for
|
||||
IOThreads" above for information on how to do that.
|
||||
|
||||
If main loop code such as a QMP function wishes to access a BlockDriverState it
|
||||
must first call aio_context_acquire(bdrv_get_aio_context(bs)) to ensure the
|
||||
IOThread does not run in parallel.
|
||||
If main loop code such as a QMP function wishes to access a BlockDriverState
|
||||
it must first call aio_context_acquire(bdrv_get_aio_context(bs)) to ensure
|
||||
that callbacks in the IOThread do not run in parallel.
|
||||
|
||||
Long-running jobs (usually in the form of coroutines) are best scheduled in the
|
||||
BlockDriverState's AioContext to avoid the need to acquire/release around each
|
||||
bdrv_*() call. Be aware that there is currently no mechanism to get notified
|
||||
when bdrv_set_aio_context() moves this BlockDriverState to a different
|
||||
AioContext (see bdrv_detach_aio_context()/bdrv_attach_aio_context()), so you
|
||||
may need to add this if you want to support long-running jobs.
|
||||
Code running in the monitor typically needs to ensure that past
|
||||
requests from the guest are completed. When a block device is running
|
||||
in an IOThread, the IOThread can also process requests from the guest
|
||||
(via ioeventfd). To achieve both objects, wrap the code between
|
||||
bdrv_drained_begin() and bdrv_drained_end(), thus creating a "drained
|
||||
section". The functions must be called between aio_context_acquire()
|
||||
and aio_context_release(). You can freely release and re-acquire the
|
||||
AioContext within a drained section.
|
||||
|
||||
Long-running jobs (usually in the form of coroutines) are best scheduled in
|
||||
the BlockDriverState's AioContext to avoid the need to acquire/release around
|
||||
each bdrv_*() call. The functions bdrv_add/remove_aio_context_notifier,
|
||||
or alternatively blk_add/remove_aio_context_notifier if you use BlockBackends,
|
||||
can be used to get a notification whenever bdrv_set_aio_context() moves a
|
||||
BlockDriverState to a different AioContext.
|
||||
|
@ -189,13 +189,11 @@ void virtio_scsi_dataplane_stop(VirtIOSCSI *s)
|
||||
assert(s->ctx == iothread_get_aio_context(vs->conf.iothread));
|
||||
|
||||
aio_context_acquire(s->ctx);
|
||||
|
||||
virtio_scsi_clear_aio(s);
|
||||
aio_context_release(s->ctx);
|
||||
|
||||
blk_drain_all(); /* ensure there are no in-flight requests */
|
||||
|
||||
aio_context_release(s->ctx);
|
||||
|
||||
for (i = 0; i < vs->conf.num_queues + 2; i++) {
|
||||
virtio_bus_set_host_notifier(VIRTIO_BUS(qbus), i, false);
|
||||
}
|
||||
|
@ -18,7 +18,6 @@
|
||||
#include "qemu/queue.h"
|
||||
#include "qemu/event_notifier.h"
|
||||
#include "qemu/thread.h"
|
||||
#include "qemu/rfifolock.h"
|
||||
#include "qemu/timer.h"
|
||||
|
||||
typedef struct BlockAIOCB BlockAIOCB;
|
||||
@ -54,7 +53,7 @@ struct AioContext {
|
||||
GSource source;
|
||||
|
||||
/* Protects all fields from multi-threaded access */
|
||||
RFifoLock lock;
|
||||
QemuRecMutex lock;
|
||||
|
||||
/* The list of registered AIO handlers */
|
||||
QLIST_HEAD(, AioHandler) aio_handlers;
|
||||
@ -116,9 +115,6 @@ struct AioContext {
|
||||
bool notified;
|
||||
EventNotifier notifier;
|
||||
|
||||
/* Scheduling this BH forces the event loop it iterate */
|
||||
QEMUBH *notify_dummy_bh;
|
||||
|
||||
/* Thread pool for performing work and receiving completion callbacks */
|
||||
struct ThreadPool *thread_pool;
|
||||
|
||||
@ -452,6 +448,24 @@ static inline bool aio_node_check(AioContext *ctx, bool is_external)
|
||||
return !is_external || !atomic_read(&ctx->external_disable_cnt);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the AioContext whose event loop runs in the current thread.
|
||||
*
|
||||
* If called from an IOThread this will be the IOThread's AioContext. If
|
||||
* called from another thread it will be the main loop AioContext.
|
||||
*/
|
||||
AioContext *qemu_get_current_aio_context(void);
|
||||
|
||||
/**
|
||||
* @ctx: the aio context
|
||||
*
|
||||
* Return whether we are running in the I/O thread that manages @ctx.
|
||||
*/
|
||||
static inline bool aio_context_in_iothread(AioContext *ctx)
|
||||
{
|
||||
return ctx == qemu_get_current_aio_context();
|
||||
}
|
||||
|
||||
/**
|
||||
* aio_context_setup:
|
||||
* @ctx: the aio context
|
||||
|
@ -218,7 +218,7 @@ BlockDriverState *bdrv_open(const char *filename, const char *reference,
|
||||
BlockReopenQueue *bdrv_reopen_queue(BlockReopenQueue *bs_queue,
|
||||
BlockDriverState *bs,
|
||||
QDict *options, int flags);
|
||||
int bdrv_reopen_multiple(BlockReopenQueue *bs_queue, Error **errp);
|
||||
int bdrv_reopen_multiple(AioContext *ctx, BlockReopenQueue *bs_queue, Error **errp);
|
||||
int bdrv_reopen(BlockDriverState *bs, int bdrv_flags, Error **errp);
|
||||
int bdrv_reopen_prepare(BDRVReopenState *reopen_state,
|
||||
BlockReopenQueue *queue, Error **errp);
|
||||
@ -334,6 +334,35 @@ void bdrv_drain(BlockDriverState *bs);
|
||||
void coroutine_fn bdrv_co_drain(BlockDriverState *bs);
|
||||
void bdrv_drain_all(void);
|
||||
|
||||
#define BDRV_POLL_WHILE(bs, cond) ({ \
|
||||
bool waited_ = false; \
|
||||
BlockDriverState *bs_ = (bs); \
|
||||
AioContext *ctx_ = bdrv_get_aio_context(bs_); \
|
||||
if (aio_context_in_iothread(ctx_)) { \
|
||||
while ((cond)) { \
|
||||
aio_poll(ctx_, true); \
|
||||
waited_ = true; \
|
||||
} \
|
||||
} else { \
|
||||
assert(qemu_get_current_aio_context() == \
|
||||
qemu_get_aio_context()); \
|
||||
/* Ask bdrv_dec_in_flight to wake up the main \
|
||||
* QEMU AioContext. Extra I/O threads never take \
|
||||
* other I/O threads' AioContexts (see for example \
|
||||
* block_job_defer_to_main_loop for how to do it). \
|
||||
*/ \
|
||||
assert(!bs_->wakeup); \
|
||||
bs_->wakeup = true; \
|
||||
while ((cond)) { \
|
||||
aio_context_release(ctx_); \
|
||||
aio_poll(qemu_get_aio_context(), true); \
|
||||
aio_context_acquire(ctx_); \
|
||||
waited_ = true; \
|
||||
} \
|
||||
bs_->wakeup = false; \
|
||||
} \
|
||||
waited_; })
|
||||
|
||||
int bdrv_pdiscard(BlockDriverState *bs, int64_t offset, int count);
|
||||
int bdrv_co_pdiscard(BlockDriverState *bs, int64_t offset, int count);
|
||||
int bdrv_has_zero_init_1(BlockDriverState *bs);
|
||||
|
@ -62,8 +62,6 @@
|
||||
enum BdrvTrackedRequestType {
|
||||
BDRV_TRACKED_READ,
|
||||
BDRV_TRACKED_WRITE,
|
||||
BDRV_TRACKED_FLUSH,
|
||||
BDRV_TRACKED_IOCTL,
|
||||
BDRV_TRACKED_DISCARD,
|
||||
};
|
||||
|
||||
@ -445,7 +443,7 @@ struct BlockDriverState {
|
||||
note this is a reference count */
|
||||
|
||||
CoQueue flush_queue; /* Serializing flush queue */
|
||||
BdrvTrackedRequest *active_flush_req; /* Flush request in flight */
|
||||
bool active_flush_req; /* Flush request in flight? */
|
||||
unsigned int write_gen; /* Current data generation */
|
||||
unsigned int flushed_gen; /* Flushed write generation */
|
||||
|
||||
@ -473,9 +471,12 @@ struct BlockDriverState {
|
||||
/* Callback before write request is processed */
|
||||
NotifierWithReturnList before_write_notifiers;
|
||||
|
||||
/* number of in-flight serialising requests */
|
||||
/* number of in-flight requests; overall and serialising */
|
||||
unsigned int in_flight;
|
||||
unsigned int serialising_in_flight;
|
||||
|
||||
bool wakeup;
|
||||
|
||||
/* Offset after the highest byte written to */
|
||||
uint64_t wr_highest_offset;
|
||||
|
||||
@ -634,6 +635,21 @@ void bdrv_remove_aio_context_notifier(BlockDriverState *bs,
|
||||
void (*aio_context_detached)(void *),
|
||||
void *opaque);
|
||||
|
||||
/**
|
||||
* bdrv_wakeup:
|
||||
* @bs: The BlockDriverState for which an I/O operation has been completed.
|
||||
*
|
||||
* Wake up the main thread if it is waiting on BDRV_POLL_WHILE. During
|
||||
* synchronous I/O on a BlockDriverState that is attached to another
|
||||
* I/O thread, the main thread lets the I/O thread's event loop run,
|
||||
* waiting for the I/O operation to complete. A bdrv_wakeup will wake
|
||||
* up the main thread if necessary.
|
||||
*
|
||||
* Manual calls to bdrv_wakeup are rarely necessary, because
|
||||
* bdrv_dec_in_flight already calls it.
|
||||
*/
|
||||
void bdrv_wakeup(BlockDriverState *bs);
|
||||
|
||||
#ifdef _WIN32
|
||||
int is_windows_drive(const char *filename);
|
||||
#endif
|
||||
@ -787,6 +803,9 @@ bool bdrv_requests_pending(BlockDriverState *bs);
|
||||
void bdrv_clear_dirty_bitmap(BdrvDirtyBitmap *bitmap, HBitmap **out);
|
||||
void bdrv_undo_clear_dirty_bitmap(BdrvDirtyBitmap *bitmap, HBitmap *in);
|
||||
|
||||
void bdrv_inc_in_flight(BlockDriverState *bs);
|
||||
void bdrv_dec_in_flight(BlockDriverState *bs);
|
||||
|
||||
void blockdev_close_all_bdrv_states(void);
|
||||
|
||||
#endif /* BLOCK_INT_H */
|
||||
|
@ -92,6 +92,13 @@ typedef struct BlockJobDriver {
|
||||
* besides job->blk to the new AioContext.
|
||||
*/
|
||||
void (*attached_aio_context)(BlockJob *job, AioContext *new_context);
|
||||
|
||||
/*
|
||||
* If the callback is not NULL, it will be invoked when the job has to be
|
||||
* synchronously cancelled or completed; it should drain BlockDriverStates
|
||||
* as required to ensure progress.
|
||||
*/
|
||||
void (*drain)(BlockJob *job);
|
||||
} BlockJobDriver;
|
||||
|
||||
/**
|
||||
|
@ -1,54 +0,0 @@
|
||||
/*
|
||||
* Recursive FIFO lock
|
||||
*
|
||||
* Copyright Red Hat, Inc. 2013
|
||||
*
|
||||
* Authors:
|
||||
* Stefan Hajnoczi <stefanha@redhat.com>
|
||||
*
|
||||
* This work is licensed under the terms of the GNU GPL, version 2 or later.
|
||||
* See the COPYING file in the top-level directory.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef QEMU_RFIFOLOCK_H
|
||||
#define QEMU_RFIFOLOCK_H
|
||||
|
||||
#include "qemu/thread.h"
|
||||
|
||||
/* Recursive FIFO lock
|
||||
*
|
||||
* This lock provides more features than a plain mutex:
|
||||
*
|
||||
* 1. Fairness - enforces FIFO order.
|
||||
* 2. Nesting - can be taken recursively.
|
||||
* 3. Contention callback - optional, called when thread must wait.
|
||||
*
|
||||
* The recursive FIFO lock is heavyweight so prefer other synchronization
|
||||
* primitives if you do not need its features.
|
||||
*/
|
||||
typedef struct {
|
||||
QemuMutex lock; /* protects all fields */
|
||||
|
||||
/* FIFO order */
|
||||
unsigned int head; /* active ticket number */
|
||||
unsigned int tail; /* waiting ticket number */
|
||||
QemuCond cond; /* used to wait for our ticket number */
|
||||
|
||||
/* Nesting */
|
||||
QemuThread owner_thread; /* thread that currently has ownership */
|
||||
unsigned int nesting; /* amount of nesting levels */
|
||||
|
||||
/* Contention callback */
|
||||
void (*cb)(void *); /* called when thread must wait, with ->lock
|
||||
* held so it may not recursively lock/unlock
|
||||
*/
|
||||
void *cb_opaque;
|
||||
} RFifoLock;
|
||||
|
||||
void rfifolock_init(RFifoLock *r, void (*cb)(void *), void *opaque);
|
||||
void rfifolock_destroy(RFifoLock *r);
|
||||
void rfifolock_lock(RFifoLock *r);
|
||||
void rfifolock_unlock(RFifoLock *r);
|
||||
|
||||
#endif /* QEMU_RFIFOLOCK_H */
|
@ -4,6 +4,12 @@
|
||||
#include <pthread.h>
|
||||
#include <semaphore.h>
|
||||
|
||||
typedef QemuMutex QemuRecMutex;
|
||||
#define qemu_rec_mutex_destroy qemu_mutex_destroy
|
||||
#define qemu_rec_mutex_lock qemu_mutex_lock
|
||||
#define qemu_rec_mutex_try_lock qemu_mutex_try_lock
|
||||
#define qemu_rec_mutex_unlock qemu_mutex_unlock
|
||||
|
||||
struct QemuMutex {
|
||||
pthread_mutex_t lock;
|
||||
};
|
||||
|
@ -8,6 +8,16 @@ struct QemuMutex {
|
||||
LONG owner;
|
||||
};
|
||||
|
||||
typedef struct QemuRecMutex QemuRecMutex;
|
||||
struct QemuRecMutex {
|
||||
CRITICAL_SECTION lock;
|
||||
};
|
||||
|
||||
void qemu_rec_mutex_destroy(QemuRecMutex *mutex);
|
||||
void qemu_rec_mutex_lock(QemuRecMutex *mutex);
|
||||
int qemu_rec_mutex_trylock(QemuRecMutex *mutex);
|
||||
void qemu_rec_mutex_unlock(QemuRecMutex *mutex);
|
||||
|
||||
struct QemuCond {
|
||||
LONG waiters, target;
|
||||
HANDLE sema;
|
||||
|
@ -25,6 +25,9 @@ void qemu_mutex_lock(QemuMutex *mutex);
|
||||
int qemu_mutex_trylock(QemuMutex *mutex);
|
||||
void qemu_mutex_unlock(QemuMutex *mutex);
|
||||
|
||||
/* Prototypes for other functions are in thread-posix.h/thread-win32.h. */
|
||||
void qemu_rec_mutex_init(QemuRecMutex *mutex);
|
||||
|
||||
void qemu_cond_init(QemuCond *cond);
|
||||
void qemu_cond_destroy(QemuCond *cond);
|
||||
|
||||
|
33
iothread.c
33
iothread.c
@ -16,10 +16,12 @@
|
||||
#include "qom/object_interfaces.h"
|
||||
#include "qemu/module.h"
|
||||
#include "block/aio.h"
|
||||
#include "block/block.h"
|
||||
#include "sysemu/iothread.h"
|
||||
#include "qmp-commands.h"
|
||||
#include "qemu/error-report.h"
|
||||
#include "qemu/rcu.h"
|
||||
#include "qemu/main-loop.h"
|
||||
|
||||
typedef ObjectClass IOThreadClass;
|
||||
|
||||
@ -28,26 +30,27 @@ typedef ObjectClass IOThreadClass;
|
||||
#define IOTHREAD_CLASS(klass) \
|
||||
OBJECT_CLASS_CHECK(IOThreadClass, klass, TYPE_IOTHREAD)
|
||||
|
||||
static __thread IOThread *my_iothread;
|
||||
|
||||
AioContext *qemu_get_current_aio_context(void)
|
||||
{
|
||||
return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
|
||||
}
|
||||
|
||||
static void *iothread_run(void *opaque)
|
||||
{
|
||||
IOThread *iothread = opaque;
|
||||
bool blocking;
|
||||
|
||||
rcu_register_thread();
|
||||
|
||||
my_iothread = iothread;
|
||||
qemu_mutex_lock(&iothread->init_done_lock);
|
||||
iothread->thread_id = qemu_get_thread_id();
|
||||
qemu_cond_signal(&iothread->init_done_cond);
|
||||
qemu_mutex_unlock(&iothread->init_done_lock);
|
||||
|
||||
while (!iothread->stopping) {
|
||||
aio_context_acquire(iothread->ctx);
|
||||
blocking = true;
|
||||
while (!iothread->stopping && aio_poll(iothread->ctx, blocking)) {
|
||||
/* Progress was made, keep going */
|
||||
blocking = false;
|
||||
}
|
||||
aio_context_release(iothread->ctx);
|
||||
while (!atomic_read(&iothread->stopping)) {
|
||||
aio_poll(iothread->ctx, true);
|
||||
}
|
||||
|
||||
rcu_unregister_thread();
|
||||
@ -190,6 +193,18 @@ IOThreadInfoList *qmp_query_iothreads(Error **errp)
|
||||
void iothread_stop_all(void)
|
||||
{
|
||||
Object *container = object_get_objects_root();
|
||||
BlockDriverState *bs;
|
||||
BdrvNextIterator it;
|
||||
|
||||
for (bs = bdrv_first(&it); bs; bs = bdrv_next(&it)) {
|
||||
AioContext *ctx = bdrv_get_aio_context(bs);
|
||||
if (ctx == qemu_get_aio_context()) {
|
||||
continue;
|
||||
}
|
||||
aio_context_acquire(ctx);
|
||||
bdrv_set_aio_context(bs, qemu_get_aio_context());
|
||||
aio_context_release(ctx);
|
||||
}
|
||||
|
||||
object_child_foreach(container, iothread_stop, NULL);
|
||||
}
|
||||
|
@ -795,6 +795,7 @@ static void run_block_job(BlockJob *job, Error **errp)
|
||||
{
|
||||
AioContext *aio_context = blk_get_aio_context(job->blk);
|
||||
|
||||
aio_context_acquire(aio_context);
|
||||
do {
|
||||
aio_poll(aio_context, true);
|
||||
qemu_progress_print(job->len ?
|
||||
@ -802,6 +803,7 @@ static void run_block_job(BlockJob *job, Error **errp)
|
||||
} while (!job->ready);
|
||||
|
||||
block_job_complete_sync(job, errp);
|
||||
aio_context_release(aio_context);
|
||||
|
||||
/* A block job may finish instantaneously without publishing any progress,
|
||||
* so just signal completion here */
|
||||
@ -819,6 +821,7 @@ static int img_commit(int argc, char **argv)
|
||||
Error *local_err = NULL;
|
||||
CommonBlockJobCBInfo cbi;
|
||||
bool image_opts = false;
|
||||
AioContext *aio_context;
|
||||
|
||||
fmt = NULL;
|
||||
cache = BDRV_DEFAULT_CACHE;
|
||||
@ -928,8 +931,11 @@ static int img_commit(int argc, char **argv)
|
||||
.bs = bs,
|
||||
};
|
||||
|
||||
aio_context = bdrv_get_aio_context(bs);
|
||||
aio_context_acquire(aio_context);
|
||||
commit_active_start("commit", bs, base_bs, 0, BLOCKDEV_ON_ERROR_REPORT,
|
||||
common_block_job_cb, &cbi, &local_err, false);
|
||||
aio_context_release(aio_context);
|
||||
if (local_err) {
|
||||
goto done;
|
||||
}
|
||||
|
@ -1956,7 +1956,7 @@ static int reopen_f(BlockBackend *blk, int argc, char **argv)
|
||||
qemu_opts_reset(&reopen_opts);
|
||||
|
||||
brq = bdrv_reopen_queue(NULL, bs, opts, flags);
|
||||
bdrv_reopen_multiple(brq, &local_err);
|
||||
bdrv_reopen_multiple(bdrv_get_aio_context(bs), brq, &local_err);
|
||||
if (local_err) {
|
||||
error_report_err(local_err);
|
||||
} else {
|
||||
@ -2216,6 +2216,7 @@ static const cmdinfo_t help_cmd = {
|
||||
|
||||
bool qemuio_command(BlockBackend *blk, const char *cmd)
|
||||
{
|
||||
AioContext *ctx;
|
||||
char *input;
|
||||
const cmdinfo_t *ct;
|
||||
char **v;
|
||||
@ -2227,7 +2228,10 @@ bool qemuio_command(BlockBackend *blk, const char *cmd)
|
||||
if (c) {
|
||||
ct = find_command(v[0]);
|
||||
if (ct) {
|
||||
ctx = blk ? blk_get_aio_context(blk) : qemu_get_aio_context();
|
||||
aio_context_acquire(ctx);
|
||||
done = command(blk, ct, c, v);
|
||||
aio_context_release(ctx);
|
||||
} else {
|
||||
fprintf(stderr, "command \"%s\" not found\n", v[0]);
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ stub-obj-y += gdbstub.o
|
||||
stub-obj-y += get-fd.o
|
||||
stub-obj-y += get-next-serial.o
|
||||
stub-obj-y += get-vm-name.o
|
||||
stub-obj-y += iothread.o
|
||||
stub-obj-y += iothread-lock.o
|
||||
stub-obj-y += is-daemonized.o
|
||||
stub-obj-y += machine-init-done.o
|
||||
|
8
stubs/iothread.c
Normal file
8
stubs/iothread.c
Normal file
@ -0,0 +1,8 @@
|
||||
#include "qemu/osdep.h"
|
||||
#include "block/aio.h"
|
||||
#include "qemu/main-loop.h"
|
||||
|
||||
AioContext *qemu_get_current_aio_context(void)
|
||||
{
|
||||
return qemu_get_aio_context();
|
||||
}
|
1
tests/.gitignore
vendored
1
tests/.gitignore
vendored
@ -67,7 +67,6 @@ test-qmp-marshal.c
|
||||
test-qobject-output-visitor
|
||||
test-rcu-list
|
||||
test-replication
|
||||
test-rfifolock
|
||||
test-string-input-visitor
|
||||
test-string-output-visitor
|
||||
test-thread-pool
|
||||
|
@ -45,7 +45,6 @@ check-unit-y += tests/test-visitor-serialization$(EXESUF)
|
||||
check-unit-y += tests/test-iov$(EXESUF)
|
||||
gcov-files-test-iov-y = util/iov.c
|
||||
check-unit-y += tests/test-aio$(EXESUF)
|
||||
check-unit-$(CONFIG_POSIX) += tests/test-rfifolock$(EXESUF)
|
||||
check-unit-y += tests/test-throttle$(EXESUF)
|
||||
gcov-files-test-aio-$(CONFIG_WIN32) = aio-win32.c
|
||||
gcov-files-test-aio-$(CONFIG_POSIX) = aio-posix.c
|
||||
@ -491,7 +490,6 @@ tests/check-qom-proplist$(EXESUF): tests/check-qom-proplist.o $(test-qom-obj-y)
|
||||
tests/test-char$(EXESUF): tests/test-char.o qemu-char.o qemu-timer.o $(test-util-obj-y) $(qtest-obj-y) $(test-io-obj-y)
|
||||
tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(test-block-obj-y)
|
||||
tests/test-aio$(EXESUF): tests/test-aio.o $(test-block-obj-y)
|
||||
tests/test-rfifolock$(EXESUF): tests/test-rfifolock.o $(test-util-obj-y)
|
||||
tests/test-throttle$(EXESUF): tests/test-throttle.o $(test-block-obj-y)
|
||||
tests/test-blockjob$(EXESUF): tests/test-blockjob.o $(test-block-obj-y) $(test-util-obj-y)
|
||||
tests/test-blockjob-txn$(EXESUF): tests/test-blockjob-txn.o $(test-block-obj-y) $(test-util-obj-y)
|
||||
|
@ -100,6 +100,7 @@ static void event_ready_cb(EventNotifier *e)
|
||||
|
||||
typedef struct {
|
||||
QemuMutex start_lock;
|
||||
EventNotifier notifier;
|
||||
bool thread_acquired;
|
||||
} AcquireTestData;
|
||||
|
||||
@ -111,6 +112,11 @@ static void *test_acquire_thread(void *opaque)
|
||||
qemu_mutex_lock(&data->start_lock);
|
||||
qemu_mutex_unlock(&data->start_lock);
|
||||
|
||||
/* event_notifier_set might be called either before or after
|
||||
* the main thread's call to poll(). The test case's outcome
|
||||
* should be the same in either case.
|
||||
*/
|
||||
event_notifier_set(&data->notifier);
|
||||
aio_context_acquire(ctx);
|
||||
aio_context_release(ctx);
|
||||
|
||||
@ -125,20 +131,19 @@ static void set_event_notifier(AioContext *ctx, EventNotifier *notifier,
|
||||
aio_set_event_notifier(ctx, notifier, false, handler);
|
||||
}
|
||||
|
||||
static void dummy_notifier_read(EventNotifier *unused)
|
||||
static void dummy_notifier_read(EventNotifier *n)
|
||||
{
|
||||
g_assert(false); /* should never be invoked */
|
||||
event_notifier_test_and_clear(n);
|
||||
}
|
||||
|
||||
static void test_acquire(void)
|
||||
{
|
||||
QemuThread thread;
|
||||
EventNotifier notifier;
|
||||
AcquireTestData data;
|
||||
|
||||
/* Dummy event notifier ensures aio_poll() will block */
|
||||
event_notifier_init(¬ifier, false);
|
||||
set_event_notifier(ctx, ¬ifier, dummy_notifier_read);
|
||||
event_notifier_init(&data.notifier, false);
|
||||
set_event_notifier(ctx, &data.notifier, dummy_notifier_read);
|
||||
g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */
|
||||
|
||||
qemu_mutex_init(&data.start_lock);
|
||||
@ -152,12 +157,13 @@ static void test_acquire(void)
|
||||
/* Block in aio_poll(), let other thread kick us and acquire context */
|
||||
aio_context_acquire(ctx);
|
||||
qemu_mutex_unlock(&data.start_lock); /* let the thread run */
|
||||
g_assert(!aio_poll(ctx, true));
|
||||
g_assert(aio_poll(ctx, true));
|
||||
g_assert(!data.thread_acquired);
|
||||
aio_context_release(ctx);
|
||||
|
||||
qemu_thread_join(&thread);
|
||||
set_event_notifier(ctx, ¬ifier, NULL);
|
||||
event_notifier_cleanup(¬ifier);
|
||||
set_event_notifier(ctx, &data.notifier, NULL);
|
||||
event_notifier_cleanup(&data.notifier);
|
||||
|
||||
g_assert(data.thread_acquired);
|
||||
}
|
||||
|
@ -1,91 +0,0 @@
|
||||
/*
|
||||
* RFifoLock tests
|
||||
*
|
||||
* Copyright Red Hat, Inc. 2013
|
||||
*
|
||||
* Authors:
|
||||
* Stefan Hajnoczi <stefanha@redhat.com>
|
||||
*
|
||||
* This work is licensed under the terms of the GNU LGPL, version 2 or later.
|
||||
* See the COPYING.LIB file in the top-level directory.
|
||||
*/
|
||||
|
||||
#include "qemu/osdep.h"
|
||||
#include "qemu-common.h"
|
||||
#include "qemu/rfifolock.h"
|
||||
|
||||
static void test_nesting(void)
|
||||
{
|
||||
RFifoLock lock;
|
||||
|
||||
/* Trivial test, ensure the lock is recursive */
|
||||
rfifolock_init(&lock, NULL, NULL);
|
||||
rfifolock_lock(&lock);
|
||||
rfifolock_lock(&lock);
|
||||
rfifolock_lock(&lock);
|
||||
rfifolock_unlock(&lock);
|
||||
rfifolock_unlock(&lock);
|
||||
rfifolock_unlock(&lock);
|
||||
rfifolock_destroy(&lock);
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
RFifoLock lock;
|
||||
int fd[2];
|
||||
} CallbackTestData;
|
||||
|
||||
static void rfifolock_cb(void *opaque)
|
||||
{
|
||||
CallbackTestData *data = opaque;
|
||||
int ret;
|
||||
char c = 0;
|
||||
|
||||
ret = write(data->fd[1], &c, sizeof(c));
|
||||
g_assert(ret == 1);
|
||||
}
|
||||
|
||||
static void *callback_thread(void *opaque)
|
||||
{
|
||||
CallbackTestData *data = opaque;
|
||||
|
||||
/* The other thread holds the lock so the contention callback will be
|
||||
* invoked...
|
||||
*/
|
||||
rfifolock_lock(&data->lock);
|
||||
rfifolock_unlock(&data->lock);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void test_callback(void)
|
||||
{
|
||||
CallbackTestData data;
|
||||
QemuThread thread;
|
||||
int ret;
|
||||
char c;
|
||||
|
||||
rfifolock_init(&data.lock, rfifolock_cb, &data);
|
||||
ret = qemu_pipe(data.fd);
|
||||
g_assert(ret == 0);
|
||||
|
||||
/* Hold lock but allow the callback to kick us by writing to the pipe */
|
||||
rfifolock_lock(&data.lock);
|
||||
qemu_thread_create(&thread, "callback_thread",
|
||||
callback_thread, &data, QEMU_THREAD_JOINABLE);
|
||||
ret = read(data.fd[0], &c, sizeof(c));
|
||||
g_assert(ret == 1);
|
||||
rfifolock_unlock(&data.lock);
|
||||
/* If we got here then the callback was invoked, as expected */
|
||||
|
||||
qemu_thread_join(&thread);
|
||||
close(data.fd[0]);
|
||||
close(data.fd[1]);
|
||||
rfifolock_destroy(&data.lock);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
g_test_init(&argc, &argv, NULL);
|
||||
g_test_add_func("/nesting", test_nesting);
|
||||
g_test_add_func("/callback", test_callback);
|
||||
return g_test_run();
|
||||
}
|
@ -25,7 +25,6 @@ util-obj-y += uuid.o
|
||||
util-obj-y += throttle.o
|
||||
util-obj-y += getauxval.o
|
||||
util-obj-y += readline.o
|
||||
util-obj-y += rfifolock.o
|
||||
util-obj-y += rcu.o
|
||||
util-obj-y += qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o
|
||||
util-obj-y += qemu-coroutine-sleep.o
|
||||
|
@ -80,6 +80,20 @@ void qemu_mutex_unlock(QemuMutex *mutex)
|
||||
error_exit(err, __func__);
|
||||
}
|
||||
|
||||
void qemu_rec_mutex_init(QemuRecMutex *mutex)
|
||||
{
|
||||
int err;
|
||||
pthread_mutexattr_t attr;
|
||||
|
||||
pthread_mutexattr_init(&attr);
|
||||
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
|
||||
err = pthread_mutex_init(&mutex->lock, &attr);
|
||||
pthread_mutexattr_destroy(&attr);
|
||||
if (err) {
|
||||
error_exit(err, __func__);
|
||||
}
|
||||
}
|
||||
|
||||
void qemu_cond_init(QemuCond *cond)
|
||||
{
|
||||
int err;
|
||||
|
@ -79,6 +79,31 @@ void qemu_mutex_unlock(QemuMutex *mutex)
|
||||
LeaveCriticalSection(&mutex->lock);
|
||||
}
|
||||
|
||||
void qemu_rec_mutex_init(QemuRecMutex *mutex)
|
||||
{
|
||||
InitializeCriticalSection(&mutex->lock);
|
||||
}
|
||||
|
||||
void qemu_rec_mutex_destroy(QemuRecMutex *mutex)
|
||||
{
|
||||
DeleteCriticalSection(&mutex->lock);
|
||||
}
|
||||
|
||||
void qemu_rec_mutex_lock(QemuRecMutex *mutex)
|
||||
{
|
||||
EnterCriticalSection(&mutex->lock);
|
||||
}
|
||||
|
||||
int qemu_rec_mutex_trylock(QemuRecMutex *mutex)
|
||||
{
|
||||
return !TryEnterCriticalSection(&mutex->lock);
|
||||
}
|
||||
|
||||
void qemu_rec_mutex_unlock(QemuRecMutex *mutex)
|
||||
{
|
||||
LeaveCriticalSection(&mutex->lock);
|
||||
}
|
||||
|
||||
void qemu_cond_init(QemuCond *cond)
|
||||
{
|
||||
memset(cond, 0, sizeof(*cond));
|
||||
|
@ -1,78 +0,0 @@
|
||||
/*
|
||||
* Recursive FIFO lock
|
||||
*
|
||||
* Copyright Red Hat, Inc. 2013
|
||||
*
|
||||
* Authors:
|
||||
* Stefan Hajnoczi <stefanha@redhat.com>
|
||||
*
|
||||
* This work is licensed under the terms of the GNU LGPL, version 2 or later.
|
||||
* See the COPYING.LIB file in the top-level directory.
|
||||
*
|
||||
*/
|
||||
|
||||
#include "qemu/osdep.h"
|
||||
#include "qemu/rfifolock.h"
|
||||
|
||||
void rfifolock_init(RFifoLock *r, void (*cb)(void *), void *opaque)
|
||||
{
|
||||
qemu_mutex_init(&r->lock);
|
||||
r->head = 0;
|
||||
r->tail = 0;
|
||||
qemu_cond_init(&r->cond);
|
||||
r->nesting = 0;
|
||||
r->cb = cb;
|
||||
r->cb_opaque = opaque;
|
||||
}
|
||||
|
||||
void rfifolock_destroy(RFifoLock *r)
|
||||
{
|
||||
qemu_cond_destroy(&r->cond);
|
||||
qemu_mutex_destroy(&r->lock);
|
||||
}
|
||||
|
||||
/*
|
||||
* Theory of operation:
|
||||
*
|
||||
* In order to ensure FIFO ordering, implement a ticketlock. Threads acquiring
|
||||
* the lock enqueue themselves by incrementing the tail index. When the lock
|
||||
* is unlocked, the head is incremented and waiting threads are notified.
|
||||
*
|
||||
* Recursive locking does not take a ticket since the head is only incremented
|
||||
* when the outermost recursive caller unlocks.
|
||||
*/
|
||||
void rfifolock_lock(RFifoLock *r)
|
||||
{
|
||||
qemu_mutex_lock(&r->lock);
|
||||
|
||||
/* Take a ticket */
|
||||
unsigned int ticket = r->tail++;
|
||||
|
||||
if (r->nesting > 0 && qemu_thread_is_self(&r->owner_thread)) {
|
||||
r->tail--; /* put ticket back, we're nesting */
|
||||
} else {
|
||||
while (ticket != r->head) {
|
||||
/* Invoke optional contention callback */
|
||||
if (r->cb) {
|
||||
r->cb(r->cb_opaque);
|
||||
}
|
||||
qemu_cond_wait(&r->cond, &r->lock);
|
||||
}
|
||||
qemu_thread_get_self(&r->owner_thread);
|
||||
}
|
||||
|
||||
r->nesting++;
|
||||
qemu_mutex_unlock(&r->lock);
|
||||
}
|
||||
|
||||
void rfifolock_unlock(RFifoLock *r)
|
||||
{
|
||||
qemu_mutex_lock(&r->lock);
|
||||
assert(r->nesting > 0);
|
||||
assert(qemu_thread_is_self(&r->owner_thread));
|
||||
if (--r->nesting == 0) {
|
||||
r->head++;
|
||||
qemu_cond_broadcast(&r->cond);
|
||||
}
|
||||
qemu_mutex_unlock(&r->lock);
|
||||
}
|
Loading…
Reference in New Issue
Block a user