diff --git a/block/block-copy.c b/block/block-copy.c index 5808cfe657..0becad52da 100644 --- a/block/block-copy.c +++ b/block/block-copy.c @@ -28,10 +28,18 @@ #define BLOCK_COPY_MAX_WORKERS 64 #define BLOCK_COPY_SLICE_TIME 100000000ULL /* ns */ +typedef enum { + COPY_READ_WRITE_CLUSTER, + COPY_READ_WRITE, + COPY_WRITE_ZEROES, + COPY_RANGE_SMALL, + COPY_RANGE_FULL +} BlockCopyMethod; + static coroutine_fn int block_copy_task_entry(AioTask *task); typedef struct BlockCopyCallState { - /* IN parameters. Initialized in block_copy_async() and never changed. */ + /* Fields initialized in block_copy_async() and never changed. */ BlockCopyState *s; int64_t offset; int64_t bytes; @@ -40,34 +48,60 @@ typedef struct BlockCopyCallState { bool ignore_ratelimit; BlockCopyAsyncCallbackFunc cb; void *cb_opaque; - /* Coroutine where async block-copy is running */ Coroutine *co; + /* Fields whose state changes throughout the execution */ + bool finished; /* atomic */ + QemuCoSleep sleep; /* TODO: protect API with a lock */ + bool cancelled; /* atomic */ /* To reference all call states from BlockCopyState */ QLIST_ENTRY(BlockCopyCallState) list; - /* State */ - int ret; - bool finished; - QemuCoSleep sleep; - bool cancelled; - - /* OUT parameters */ + /* + * Fields that report information about return values and erros. + * Protected by lock in BlockCopyState. + */ bool error_is_read; + /* + * @ret is set concurrently by tasks under mutex. Only set once by first + * failed task (and untouched if no task failed). + * After finishing (call_state->finished is true), it is not modified + * anymore and may be safely read without mutex. + */ + int ret; } BlockCopyCallState; typedef struct BlockCopyTask { AioTask task; + /* + * Fields initialized in block_copy_task_create() + * and never changed. + */ BlockCopyState *s; BlockCopyCallState *call_state; int64_t offset; - int64_t bytes; - bool zeroes; - bool copy_range; - QLIST_ENTRY(BlockCopyTask) list; + /* + * @method can also be set again in the while loop of + * block_copy_dirty_clusters(), but it is never accessed concurrently + * because the only other function that reads it is + * block_copy_task_entry() and it is invoked afterwards in the same + * iteration. + */ + BlockCopyMethod method; + + /* + * Fields whose state changes throughout the execution + * Protected by lock in BlockCopyState. + */ CoQueue wait_queue; /* coroutines blocked on this task */ + /* + * Only protect the case of parallel read while updating @bytes + * value in block_copy_task_shrink(). + */ + int64_t bytes; + QLIST_ENTRY(BlockCopyTask) list; } BlockCopyTask; static int64_t task_end(BlockCopyTask *task) @@ -83,17 +117,25 @@ typedef struct BlockCopyState { */ BdrvChild *source; BdrvChild *target; - BdrvDirtyBitmap *copy_bitmap; - int64_t in_flight_bytes; - int64_t cluster_size; - bool use_copy_range; - int64_t copy_size; - uint64_t len; - QLIST_HEAD(, BlockCopyTask) tasks; /* All tasks from all block-copy calls */ - QLIST_HEAD(, BlockCopyCallState) calls; + /* + * Fields initialized in block_copy_state_new() + * and never changed. + */ + int64_t cluster_size; + int64_t max_transfer; + uint64_t len; BdrvRequestFlags write_flags; + /* + * Fields whose state changes throughout the execution + * Protected by lock. + */ + CoMutex lock; + int64_t in_flight_bytes; + BlockCopyMethod method; + QLIST_HEAD(, BlockCopyTask) tasks; /* All tasks from all block-copy calls */ + QLIST_HEAD(, BlockCopyCallState) calls; /* * skip_unallocated: * @@ -108,16 +150,15 @@ typedef struct BlockCopyState { * skip unallocated regions, clear them in the copy_bitmap, and invoke * block_copy_reset_unallocated() every time it does. */ - bool skip_unallocated; - + bool skip_unallocated; /* atomic */ + /* State fields that use a thread-safe API */ + BdrvDirtyBitmap *copy_bitmap; ProgressMeter *progress; - SharedResource *mem; - - uint64_t speed; RateLimit rate_limit; } BlockCopyState; +/* Called with lock held */ static BlockCopyTask *find_conflicting_task(BlockCopyState *s, int64_t offset, int64_t bytes) { @@ -135,6 +176,9 @@ static BlockCopyTask *find_conflicting_task(BlockCopyState *s, /* * If there are no intersecting tasks return false. Otherwise, wait for the * first found intersecting tasks to finish and return true. + * + * Called with lock held. May temporary release the lock. + * Return value of 0 proves that lock was NOT released. */ static bool coroutine_fn block_copy_wait_one(BlockCopyState *s, int64_t offset, int64_t bytes) @@ -145,22 +189,43 @@ static bool coroutine_fn block_copy_wait_one(BlockCopyState *s, int64_t offset, return false; } - qemu_co_queue_wait(&task->wait_queue, NULL); + qemu_co_queue_wait(&task->wait_queue, &s->lock); return true; } +/* Called with lock held */ +static int64_t block_copy_chunk_size(BlockCopyState *s) +{ + switch (s->method) { + case COPY_READ_WRITE_CLUSTER: + return s->cluster_size; + case COPY_READ_WRITE: + case COPY_RANGE_SMALL: + return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER), + s->max_transfer); + case COPY_RANGE_FULL: + return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_COPY_RANGE), + s->max_transfer); + default: + /* Cannot have COPY_WRITE_ZEROES here. */ + abort(); + } +} + /* * Search for the first dirty area in offset/bytes range and create task at * the beginning of it. */ -static BlockCopyTask *block_copy_task_create(BlockCopyState *s, - BlockCopyCallState *call_state, - int64_t offset, int64_t bytes) +static coroutine_fn BlockCopyTask * +block_copy_task_create(BlockCopyState *s, BlockCopyCallState *call_state, + int64_t offset, int64_t bytes) { BlockCopyTask *task; - int64_t max_chunk = MIN_NON_ZERO(s->copy_size, call_state->max_chunk); + int64_t max_chunk; + QEMU_LOCK_GUARD(&s->lock); + max_chunk = MIN_NON_ZERO(block_copy_chunk_size(s), call_state->max_chunk); if (!bdrv_dirty_bitmap_next_dirty_area(s->copy_bitmap, offset, offset + bytes, max_chunk, &offset, &bytes)) @@ -184,7 +249,7 @@ static BlockCopyTask *block_copy_task_create(BlockCopyState *s, .call_state = call_state, .offset = offset, .bytes = bytes, - .copy_range = s->use_copy_range, + .method = s->method, }; qemu_co_queue_init(&task->wait_queue); QLIST_INSERT_HEAD(&s->tasks, task, list); @@ -202,6 +267,7 @@ static BlockCopyTask *block_copy_task_create(BlockCopyState *s, static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task, int64_t new_bytes) { + QEMU_LOCK_GUARD(&task->s->lock); if (new_bytes == task->bytes) { return; } @@ -218,11 +284,15 @@ static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task, static void coroutine_fn block_copy_task_end(BlockCopyTask *task, int ret) { + QEMU_LOCK_GUARD(&task->s->lock); task->s->in_flight_bytes -= task->bytes; if (ret < 0) { bdrv_set_dirty_bitmap(task->s->copy_bitmap, task->offset, task->bytes); } QLIST_REMOVE(task, list); + progress_set_remaining(task->s->progress, + bdrv_get_dirty_count(task->s->copy_bitmap) + + task->s->in_flight_bytes); qemu_co_queue_restart_all(&task->wait_queue); } @@ -268,37 +338,39 @@ BlockCopyState *block_copy_state_new(BdrvChild *source, BdrvChild *target, .len = bdrv_dirty_bitmap_size(copy_bitmap), .write_flags = write_flags, .mem = shres_create(BLOCK_COPY_MAX_MEM), + .max_transfer = QEMU_ALIGN_DOWN( + block_copy_max_transfer(source, target), + cluster_size), }; - if (block_copy_max_transfer(source, target) < cluster_size) { + if (s->max_transfer < cluster_size) { /* * copy_range does not respect max_transfer. We don't want to bother * with requests smaller than block-copy cluster size, so fallback to * buffered copying (read and write respect max_transfer on their * behalf). */ - s->use_copy_range = false; - s->copy_size = cluster_size; + s->method = COPY_READ_WRITE_CLUSTER; } else if (write_flags & BDRV_REQ_WRITE_COMPRESSED) { /* Compression supports only cluster-size writes and no copy-range. */ - s->use_copy_range = false; - s->copy_size = cluster_size; + s->method = COPY_READ_WRITE_CLUSTER; } else { /* - * We enable copy-range, but keep small copy_size, until first + * If copy range enabled, start with COPY_RANGE_SMALL, until first * successful copy_range (look at block_copy_do_copy). */ - s->use_copy_range = use_copy_range; - s->copy_size = MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER); + s->method = use_copy_range ? COPY_RANGE_SMALL : COPY_READ_WRITE; } ratelimit_init(&s->rate_limit); + qemu_co_mutex_init(&s->lock); QLIST_INIT(&s->tasks); QLIST_INIT(&s->calls); return s; } +/* Only set before running the job, no need for locking. */ void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm) { s->progress = pm; @@ -344,17 +416,14 @@ static coroutine_fn int block_copy_task_run(AioTaskPool *pool, * * No sync here: nor bitmap neighter intersecting requests handling, only copy. * - * @copy_range is an in-out argument: if *copy_range is false, copy_range is not - * done. If *copy_range is true, copy_range is attempted. If the copy_range - * attempt fails, the function falls back to the usual read+write and - * *copy_range is set to false. *copy_range and zeroes must not be true - * simultaneously. - * + * @method is an in-out argument, so that copy_range can be either extended to + * a full-size buffer or disabled if the copy_range attempt fails. The output + * value of @method should be used for subsequent tasks. * Returns 0 on success. */ static int coroutine_fn block_copy_do_copy(BlockCopyState *s, int64_t offset, int64_t bytes, - bool zeroes, bool *copy_range, + BlockCopyMethod *method, bool *error_is_read) { int ret; @@ -368,9 +437,9 @@ static int coroutine_fn block_copy_do_copy(BlockCopyState *s, assert(offset + bytes <= s->len || offset + bytes == QEMU_ALIGN_UP(s->len, s->cluster_size)); assert(nbytes < INT_MAX); - assert(!(*copy_range && zeroes)); - if (zeroes) { + switch (*method) { + case COPY_WRITE_ZEROES: ret = bdrv_co_pwrite_zeroes(s->target, offset, nbytes, s->write_flags & ~BDRV_REQ_WRITE_COMPRESSED); if (ret < 0) { @@ -378,99 +447,86 @@ static int coroutine_fn block_copy_do_copy(BlockCopyState *s, *error_is_read = false; } return ret; - } - if (*copy_range) { + case COPY_RANGE_SMALL: + case COPY_RANGE_FULL: ret = bdrv_co_copy_range(s->source, offset, s->target, offset, nbytes, 0, s->write_flags); - if (ret < 0) { - trace_block_copy_copy_range_fail(s, offset, ret); - *copy_range = false; - /* Fallback to read+write with allocated buffer */ - } else { + if (ret >= 0) { + /* Successful copy-range, increase chunk size. */ + *method = COPY_RANGE_FULL; return 0; } + + trace_block_copy_copy_range_fail(s, offset, ret); + *method = COPY_READ_WRITE; + /* Fall through to read+write with allocated buffer */ + + case COPY_READ_WRITE_CLUSTER: + case COPY_READ_WRITE: + /* + * In case of failed copy_range request above, we may proceed with + * buffered request larger than BLOCK_COPY_MAX_BUFFER. + * Still, further requests will be properly limited, so don't care too + * much. Moreover the most likely case (copy_range is unsupported for + * the configuration, so the very first copy_range request fails) + * is handled by setting large copy_size only after first successful + * copy_range. + */ + + bounce_buffer = qemu_blockalign(s->source->bs, nbytes); + + ret = bdrv_co_pread(s->source, offset, nbytes, bounce_buffer, 0); + if (ret < 0) { + trace_block_copy_read_fail(s, offset, ret); + *error_is_read = true; + goto out; + } + + ret = bdrv_co_pwrite(s->target, offset, nbytes, bounce_buffer, + s->write_flags); + if (ret < 0) { + trace_block_copy_write_fail(s, offset, ret); + *error_is_read = false; + goto out; + } + + out: + qemu_vfree(bounce_buffer); + break; + + default: + abort(); } - /* - * In case of failed copy_range request above, we may proceed with buffered - * request larger than BLOCK_COPY_MAX_BUFFER. Still, further requests will - * be properly limited, so don't care too much. Moreover the most likely - * case (copy_range is unsupported for the configuration, so the very first - * copy_range request fails) is handled by setting large copy_size only - * after first successful copy_range. - */ - - bounce_buffer = qemu_blockalign(s->source->bs, nbytes); - - ret = bdrv_co_pread(s->source, offset, nbytes, bounce_buffer, 0); - if (ret < 0) { - trace_block_copy_read_fail(s, offset, ret); - *error_is_read = true; - goto out; - } - - ret = bdrv_co_pwrite(s->target, offset, nbytes, bounce_buffer, - s->write_flags); - if (ret < 0) { - trace_block_copy_write_fail(s, offset, ret); - *error_is_read = false; - goto out; - } - -out: - qemu_vfree(bounce_buffer); - return ret; } -static void block_copy_handle_copy_range_result(BlockCopyState *s, - bool is_success) -{ - if (!s->use_copy_range) { - /* already disabled */ - return; - } - - if (is_success) { - /* - * Successful copy-range. Now increase copy_size. copy_range - * does not respect max_transfer (it's a TODO), so we factor - * that in here. - */ - s->copy_size = - MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_COPY_RANGE), - QEMU_ALIGN_DOWN(block_copy_max_transfer(s->source, - s->target), - s->cluster_size)); - } else { - /* Copy-range failed, disable it. */ - s->use_copy_range = false; - s->copy_size = MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER); - } -} - static coroutine_fn int block_copy_task_entry(AioTask *task) { BlockCopyTask *t = container_of(task, BlockCopyTask, task); + BlockCopyState *s = t->s; bool error_is_read = false; - bool copy_range = t->copy_range; + BlockCopyMethod method = t->method; int ret; - ret = block_copy_do_copy(t->s, t->offset, t->bytes, t->zeroes, - ©_range, &error_is_read); - if (t->copy_range) { - block_copy_handle_copy_range_result(t->s, copy_range); - } - if (ret < 0) { - if (!t->call_state->ret) { - t->call_state->ret = ret; - t->call_state->error_is_read = error_is_read; + ret = block_copy_do_copy(s, t->offset, t->bytes, &method, &error_is_read); + + WITH_QEMU_LOCK_GUARD(&s->lock) { + if (s->method == t->method) { + s->method = method; + } + + if (ret < 0) { + if (!t->call_state->ret) { + t->call_state->ret = ret; + t->call_state->error_is_read = error_is_read; + } + } else { + progress_work_done(s->progress, t->bytes); } - } else { - progress_work_done(t->s->progress, t->bytes); } - co_put_to_shres(t->s->mem, t->bytes); + co_put_to_shres(s->mem, t->bytes); block_copy_task_end(t, ret); return ret; @@ -483,7 +539,7 @@ static int block_copy_block_status(BlockCopyState *s, int64_t offset, BlockDriverState *base; int ret; - if (s->skip_unallocated) { + if (qatomic_read(&s->skip_unallocated)) { base = bdrv_backing_chain_next(s->source->bs); } else { base = NULL; @@ -570,10 +626,12 @@ int64_t block_copy_reset_unallocated(BlockCopyState *s, bytes = clusters * s->cluster_size; if (!ret) { + qemu_co_mutex_lock(&s->lock); bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes); progress_set_remaining(s->progress, bdrv_get_dirty_count(s->copy_bitmap) + s->in_flight_bytes); + qemu_co_mutex_unlock(&s->lock); } *count = bytes; @@ -609,7 +667,8 @@ block_copy_dirty_clusters(BlockCopyCallState *call_state) assert(QEMU_IS_ALIGNED(offset, s->cluster_size)); assert(QEMU_IS_ALIGNED(bytes, s->cluster_size)); - while (bytes && aio_task_pool_status(aio) == 0 && !call_state->cancelled) { + while (bytes && aio_task_pool_status(aio) == 0 && + !qatomic_read(&call_state->cancelled)) { BlockCopyTask *task; int64_t status_bytes; @@ -631,11 +690,9 @@ block_copy_dirty_clusters(BlockCopyCallState *call_state) if (status_bytes < task->bytes) { block_copy_task_shrink(task, status_bytes); } - if (s->skip_unallocated && !(ret & BDRV_BLOCK_ALLOCATED)) { + if (qatomic_read(&s->skip_unallocated) && + !(ret & BDRV_BLOCK_ALLOCATED)) { block_copy_task_end(task, 0); - progress_set_remaining(s->progress, - bdrv_get_dirty_count(s->copy_bitmap) + - s->in_flight_bytes); trace_block_copy_skip_range(s, task->offset, task->bytes); offset = task_end(task); bytes = end - offset; @@ -643,25 +700,22 @@ block_copy_dirty_clusters(BlockCopyCallState *call_state) continue; } if (ret & BDRV_BLOCK_ZERO) { - task->zeroes = true; - task->copy_range = false; + task->method = COPY_WRITE_ZEROES; } - if (s->speed) { - if (!call_state->ignore_ratelimit) { - uint64_t ns = ratelimit_calculate_delay(&s->rate_limit, 0); - if (ns > 0) { - block_copy_task_end(task, -EAGAIN); - g_free(task); - qemu_co_sleep_ns_wakeable(&call_state->sleep, - QEMU_CLOCK_REALTIME, ns); - continue; - } + if (!call_state->ignore_ratelimit) { + uint64_t ns = ratelimit_calculate_delay(&s->rate_limit, 0); + if (ns > 0) { + block_copy_task_end(task, -EAGAIN); + g_free(task); + qemu_co_sleep_ns_wakeable(&call_state->sleep, + QEMU_CLOCK_REALTIME, ns); + continue; } - - ratelimit_calculate_delay(&s->rate_limit, task->bytes); } + ratelimit_calculate_delay(&s->rate_limit, task->bytes); + trace_block_copy_process(s, task->offset); co_get_from_shres(s->mem, task->bytes); @@ -717,15 +771,40 @@ void block_copy_kick(BlockCopyCallState *call_state) static int coroutine_fn block_copy_common(BlockCopyCallState *call_state) { int ret; + BlockCopyState *s = call_state->s; - QLIST_INSERT_HEAD(&call_state->s->calls, call_state, list); + qemu_co_mutex_lock(&s->lock); + QLIST_INSERT_HEAD(&s->calls, call_state, list); + qemu_co_mutex_unlock(&s->lock); do { ret = block_copy_dirty_clusters(call_state); - if (ret == 0 && !call_state->cancelled) { - ret = block_copy_wait_one(call_state->s, call_state->offset, - call_state->bytes); + if (ret == 0 && !qatomic_read(&call_state->cancelled)) { + WITH_QEMU_LOCK_GUARD(&s->lock) { + /* + * Check that there is no task we still need to + * wait to complete + */ + ret = block_copy_wait_one(s, call_state->offset, + call_state->bytes); + if (ret == 0) { + /* + * No pending tasks, but check again the bitmap in this + * same critical section, since a task might have failed + * between this and the critical section in + * block_copy_dirty_clusters(). + * + * block_copy_wait_one return value 0 also means that it + * didn't release the lock. So, we are still in the same + * critical section, not interrupted by any concurrent + * access to state. + */ + ret = bdrv_dirty_bitmap_next_dirty(s->copy_bitmap, + call_state->offset, + call_state->bytes) >= 0; + } + } } /* @@ -737,15 +816,17 @@ static int coroutine_fn block_copy_common(BlockCopyCallState *call_state) * 2. We have waited for some intersecting block-copy request * It may have failed and produced new dirty bits. */ - } while (ret > 0 && !call_state->cancelled); + } while (ret > 0 && !qatomic_read(&call_state->cancelled)); - call_state->finished = true; + qatomic_store_release(&call_state->finished, true); if (call_state->cb) { call_state->cb(call_state->cb_opaque); } + qemu_co_mutex_lock(&s->lock); QLIST_REMOVE(call_state, list); + qemu_co_mutex_unlock(&s->lock); return ret; } @@ -800,44 +881,50 @@ void block_copy_call_free(BlockCopyCallState *call_state) return; } - assert(call_state->finished); + assert(qatomic_read(&call_state->finished)); g_free(call_state); } bool block_copy_call_finished(BlockCopyCallState *call_state) { - return call_state->finished; + return qatomic_read(&call_state->finished); } bool block_copy_call_succeeded(BlockCopyCallState *call_state) { - return call_state->finished && !call_state->cancelled && - call_state->ret == 0; + return qatomic_load_acquire(&call_state->finished) && + !qatomic_read(&call_state->cancelled) && + call_state->ret == 0; } bool block_copy_call_failed(BlockCopyCallState *call_state) { - return call_state->finished && !call_state->cancelled && - call_state->ret < 0; + return qatomic_load_acquire(&call_state->finished) && + !qatomic_read(&call_state->cancelled) && + call_state->ret < 0; } bool block_copy_call_cancelled(BlockCopyCallState *call_state) { - return call_state->cancelled; + return qatomic_read(&call_state->cancelled); } int block_copy_call_status(BlockCopyCallState *call_state, bool *error_is_read) { - assert(call_state->finished); + assert(qatomic_load_acquire(&call_state->finished)); if (error_is_read) { *error_is_read = call_state->error_is_read; } return call_state->ret; } +/* + * Note that cancelling and finishing are racy. + * User can cancel a block-copy that is already finished. + */ void block_copy_call_cancel(BlockCopyCallState *call_state) { - call_state->cancelled = true; + qatomic_set(&call_state->cancelled, true); block_copy_kick(call_state); } @@ -848,15 +935,12 @@ BdrvDirtyBitmap *block_copy_dirty_bitmap(BlockCopyState *s) void block_copy_set_skip_unallocated(BlockCopyState *s, bool skip) { - s->skip_unallocated = skip; + qatomic_set(&s->skip_unallocated, skip); } void block_copy_set_speed(BlockCopyState *s, uint64_t speed) { - s->speed = speed; - if (speed > 0) { - ratelimit_set_speed(&s->rate_limit, speed, BLOCK_COPY_SLICE_TIME); - } + ratelimit_set_speed(&s->rate_limit, speed, BLOCK_COPY_SLICE_TIME); /* * Note: it's good to kick all call states from here, but it should be done diff --git a/block/meson.build b/block/meson.build index 01861e1545..ef1ba3d973 100644 --- a/block/meson.build +++ b/block/meson.build @@ -13,6 +13,7 @@ block_ss.add(files( 'commit.c', 'copy-on-read.c', 'preallocate.c', + 'progress_meter.c', 'create.c', 'crypto.c', 'dirty-bitmap.c', diff --git a/block/progress_meter.c b/block/progress_meter.c new file mode 100644 index 0000000000..aa2e60248c --- /dev/null +++ b/block/progress_meter.c @@ -0,0 +1,64 @@ +/* + * Helper functionality for some process progress tracking. + * + * Copyright (c) 2011 IBM Corp. + * Copyright (c) 2012, 2018 Red Hat, Inc. + * Copyright (c) 2020 Virtuozzo International GmbH + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +#include "qemu/osdep.h" +#include "qemu/progress_meter.h" + +void progress_init(ProgressMeter *pm) +{ + qemu_mutex_init(&pm->lock); +} + +void progress_destroy(ProgressMeter *pm) +{ + qemu_mutex_destroy(&pm->lock); +} + +void progress_get_snapshot(ProgressMeter *pm, uint64_t *current, + uint64_t *total) +{ + QEMU_LOCK_GUARD(&pm->lock); + + *current = pm->current; + *total = pm->total; +} + +void progress_work_done(ProgressMeter *pm, uint64_t done) +{ + QEMU_LOCK_GUARD(&pm->lock); + pm->current += done; +} + +void progress_set_remaining(ProgressMeter *pm, uint64_t remaining) +{ + QEMU_LOCK_GUARD(&pm->lock); + pm->total = pm->current + remaining; +} + +void progress_increase_remaining(ProgressMeter *pm, uint64_t delta) +{ + QEMU_LOCK_GUARD(&pm->lock); + pm->total += delta; +} diff --git a/blockjob.c b/blockjob.c index dc1d9e0e46..4bad1408cb 100644 --- a/blockjob.c +++ b/blockjob.c @@ -300,28 +300,29 @@ bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp) int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n) { - if (!job->speed) { - return 0; - } - return ratelimit_calculate_delay(&job->limit, n); } BlockJobInfo *block_job_query(BlockJob *job, Error **errp) { BlockJobInfo *info; + uint64_t progress_current, progress_total; if (block_job_is_internal(job)) { error_setg(errp, "Cannot query QEMU internal jobs"); return NULL; } + + progress_get_snapshot(&job->job.progress, &progress_current, + &progress_total); + info = g_new0(BlockJobInfo, 1); info->type = g_strdup(job_type_str(&job->job)); info->device = g_strdup(job->job.id); info->busy = qatomic_read(&job->job.busy); info->paused = job->job.pause_count > 0; - info->offset = job->job.progress.current; - info->len = job->job.progress.total; + info->offset = progress_current; + info->len = progress_total; info->speed = job->speed; info->io_status = job->iostatus; info->ready = job_is_ready(&job->job), @@ -348,15 +349,19 @@ static void block_job_iostatus_set_err(BlockJob *job, int error) static void block_job_event_cancelled(Notifier *n, void *opaque) { BlockJob *job = opaque; + uint64_t progress_current, progress_total; if (block_job_is_internal(job)) { return; } + progress_get_snapshot(&job->job.progress, &progress_current, + &progress_total); + qapi_event_send_block_job_cancelled(job_type(&job->job), job->job.id, - job->job.progress.total, - job->job.progress.current, + progress_total, + progress_current, job->speed); } @@ -364,6 +369,7 @@ static void block_job_event_completed(Notifier *n, void *opaque) { BlockJob *job = opaque; const char *msg = NULL; + uint64_t progress_current, progress_total; if (block_job_is_internal(job)) { return; @@ -373,10 +379,13 @@ static void block_job_event_completed(Notifier *n, void *opaque) msg = error_get_pretty(job->job.err); } + progress_get_snapshot(&job->job.progress, &progress_current, + &progress_total); + qapi_event_send_block_job_completed(job_type(&job->job), job->job.id, - job->job.progress.total, - job->job.progress.current, + progress_total, + progress_current, job->speed, !!msg, msg); @@ -397,15 +406,19 @@ static void block_job_event_pending(Notifier *n, void *opaque) static void block_job_event_ready(Notifier *n, void *opaque) { BlockJob *job = opaque; + uint64_t progress_current, progress_total; if (block_job_is_internal(job)) { return; } + progress_get_snapshot(&job->job.progress, &progress_current, + &progress_total); + qapi_event_send_block_job_ready(job_type(&job->job), job->job.id, - job->job.progress.total, - job->job.progress.current, + progress_total, + progress_current, job->speed); } @@ -472,12 +485,9 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver, blk_set_disable_request_queuing(blk, true); blk_set_allow_aio_context_change(blk, true); - /* Only set speed when necessary to avoid NotSupported error */ - if (speed != 0) { - if (!block_job_set_speed(job, speed, errp)) { - job_early_fail(&job->job); - return NULL; - } + if (!block_job_set_speed(job, speed, errp)) { + job_early_fail(&job->job); + return NULL; } return job; diff --git a/include/block/block-copy.h b/include/block/block-copy.h index 338f2ea7fd..5c8278895c 100644 --- a/include/block/block-copy.h +++ b/include/block/block-copy.h @@ -18,6 +18,8 @@ #include "block/block.h" #include "qemu/co-shared-resource.h" +/* All APIs are thread-safe */ + typedef void (*BlockCopyAsyncCallbackFunc)(void *opaque); typedef struct BlockCopyState BlockCopyState; typedef struct BlockCopyCallState BlockCopyCallState; diff --git a/include/qemu/co-shared-resource.h b/include/qemu/co-shared-resource.h index 4e4503004c..78ca5850f8 100644 --- a/include/qemu/co-shared-resource.h +++ b/include/qemu/co-shared-resource.h @@ -26,15 +26,13 @@ #ifndef QEMU_CO_SHARED_RESOURCE_H #define QEMU_CO_SHARED_RESOURCE_H - +/* Accesses to co-shared-resource API are thread-safe */ typedef struct SharedResource SharedResource; /* * Create SharedResource structure * * @total: total amount of some resource to be shared between clients - * - * Note: this API is not thread-safe. */ SharedResource *shres_create(uint64_t total); diff --git a/include/qemu/progress_meter.h b/include/qemu/progress_meter.h index 9a23ff071c..dadf822bbf 100644 --- a/include/qemu/progress_meter.h +++ b/include/qemu/progress_meter.h @@ -27,6 +27,8 @@ #ifndef QEMU_PROGRESS_METER_H #define QEMU_PROGRESS_METER_H +#include "qemu/lockable.h" + typedef struct ProgressMeter { /** * Current progress. The unit is arbitrary as long as the ratio between @@ -37,22 +39,24 @@ typedef struct ProgressMeter { /** Estimated current value at the completion of the process */ uint64_t total; + + QemuMutex lock; /* protects concurrent access to above fields */ } ProgressMeter; -static inline void progress_work_done(ProgressMeter *pm, uint64_t done) -{ - pm->current += done; -} +void progress_init(ProgressMeter *pm); +void progress_destroy(ProgressMeter *pm); -static inline void progress_set_remaining(ProgressMeter *pm, uint64_t remaining) -{ - pm->total = pm->current + remaining; -} +/* Get a snapshot of internal current and total values */ +void progress_get_snapshot(ProgressMeter *pm, uint64_t *current, + uint64_t *total); -static inline void progress_increase_remaining(ProgressMeter *pm, - uint64_t delta) -{ - pm->total += delta; -} +/* Increases the amount of work done so far by @done */ +void progress_work_done(ProgressMeter *pm, uint64_t done); + +/* Sets how much work has to be done to complete to @remaining */ +void progress_set_remaining(ProgressMeter *pm, uint64_t remaining); + +/* Increases the total work to do by @delta */ +void progress_increase_remaining(ProgressMeter *pm, uint64_t delta); #endif /* QEMU_PROGRESS_METER_H */ diff --git a/include/qemu/ratelimit.h b/include/qemu/ratelimit.h index 003ea6d5a3..48bf59e857 100644 --- a/include/qemu/ratelimit.h +++ b/include/qemu/ratelimit.h @@ -43,7 +43,11 @@ static inline int64_t ratelimit_calculate_delay(RateLimit *limit, uint64_t n) double delay_slices; QEMU_LOCK_GUARD(&limit->lock); - assert(limit->slice_quota && limit->slice_ns); + if (!limit->slice_quota) { + /* Throttling disabled. */ + return 0; + } + assert(limit->slice_ns); if (limit->slice_end_time < now) { /* Previous, possibly extended, time slice finished; reset the @@ -83,7 +87,11 @@ static inline void ratelimit_set_speed(RateLimit *limit, uint64_t speed, { QEMU_LOCK_GUARD(&limit->lock); limit->slice_ns = slice_ns; - limit->slice_quota = MAX(((double)speed * slice_ns) / 1000000000ULL, 1); + if (speed == 0) { + limit->slice_quota = 0; + } else { + limit->slice_quota = MAX(((double)speed * slice_ns) / 1000000000ULL, 1); + } } #endif diff --git a/job-qmp.c b/job-qmp.c index 34c4da094f..829a28aa70 100644 --- a/job-qmp.c +++ b/job-qmp.c @@ -144,16 +144,20 @@ void qmp_job_dismiss(const char *id, Error **errp) static JobInfo *job_query_single(Job *job, Error **errp) { JobInfo *info; + uint64_t progress_current; + uint64_t progress_total; assert(!job_is_internal(job)); + progress_get_snapshot(&job->progress, &progress_current, + &progress_total); info = g_new(JobInfo, 1); *info = (JobInfo) { .id = g_strdup(job->id), .type = job_type(job), .status = job->status, - .current_progress = job->progress.current, - .total_progress = job->progress.total, + .current_progress = progress_current, + .total_progress = progress_total, .has_error = !!job->err, .error = job->err ? \ g_strdup(error_get_pretty(job->err)) : NULL, diff --git a/job.c b/job.c index 8775c1803b..e7a5d28854 100644 --- a/job.c +++ b/job.c @@ -339,6 +339,8 @@ void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn, job->cb = cb; job->opaque = opaque; + progress_init(&job->progress); + notifier_list_init(&job->on_finalize_cancelled); notifier_list_init(&job->on_finalize_completed); notifier_list_init(&job->on_pending); @@ -382,6 +384,7 @@ void job_unref(Job *job) QLIST_REMOVE(job, job_list); + progress_destroy(&job->progress); error_free(job->err); g_free(job->id); g_free(job); diff --git a/qemu-img.c b/qemu-img.c index a5993682aa..7956a89965 100644 --- a/qemu-img.c +++ b/qemu-img.c @@ -900,6 +900,7 @@ static void common_block_job_cb(void *opaque, int ret) static void run_block_job(BlockJob *job, Error **errp) { + uint64_t progress_current, progress_total; AioContext *aio_context = blk_get_aio_context(job->blk); int ret = 0; @@ -908,9 +909,11 @@ static void run_block_job(BlockJob *job, Error **errp) do { float progress = 0.0f; aio_poll(aio_context, true); - if (job->job.progress.total) { - progress = (float)job->job.progress.current / - job->job.progress.total * 100.f; + + progress_get_snapshot(&job->job.progress, &progress_current, + &progress_total); + if (progress_total) { + progress = (float)progress_current / progress_total * 100.f; } qemu_progress_print(progress, 0); } while (!job_is_ready(&job->job) && !job_is_completed(&job->job)); diff --git a/util/qemu-co-shared-resource.c b/util/qemu-co-shared-resource.c index 1c83cd9d29..a66cc07e75 100644 --- a/util/qemu-co-shared-resource.c +++ b/util/qemu-co-shared-resource.c @@ -28,10 +28,13 @@ #include "qemu/co-shared-resource.h" struct SharedResource { - uint64_t total; - uint64_t available; + uint64_t total; /* Set in shres_create() and not changed anymore */ + /* State fields protected by lock */ + uint64_t available; CoQueue queue; + + QemuMutex lock; }; SharedResource *shres_create(uint64_t total) @@ -40,6 +43,7 @@ SharedResource *shres_create(uint64_t total) s->total = s->available = total; qemu_co_queue_init(&s->queue); + qemu_mutex_init(&s->lock); return s; } @@ -47,10 +51,12 @@ SharedResource *shres_create(uint64_t total) void shres_destroy(SharedResource *s) { assert(s->available == s->total); + qemu_mutex_destroy(&s->lock); g_free(s); } -bool co_try_get_from_shres(SharedResource *s, uint64_t n) +/* Called with lock held. */ +static bool co_try_get_from_shres_locked(SharedResource *s, uint64_t n) { if (s->available >= n) { s->available -= n; @@ -60,16 +66,24 @@ bool co_try_get_from_shres(SharedResource *s, uint64_t n) return false; } +bool co_try_get_from_shres(SharedResource *s, uint64_t n) +{ + QEMU_LOCK_GUARD(&s->lock); + return co_try_get_from_shres_locked(s, n); +} + void coroutine_fn co_get_from_shres(SharedResource *s, uint64_t n) { assert(n <= s->total); - while (!co_try_get_from_shres(s, n)) { - qemu_co_queue_wait(&s->queue, NULL); + QEMU_LOCK_GUARD(&s->lock); + while (!co_try_get_from_shres_locked(s, n)) { + qemu_co_queue_wait(&s->queue, &s->lock); } } void coroutine_fn co_put_to_shres(SharedResource *s, uint64_t n) { + QEMU_LOCK_GUARD(&s->lock); assert(s->total - s->available >= n); s->available += n; qemu_co_queue_restart_all(&s->queue);